About Queue Service

Windwalker Queue provides an universal interface to wrap different message queue services, and a simple Job interface to easily manage your tasks. Currently we support these drivers:

This service is inspired by Laravel and we try to implement it in our way.

Configuration

In etc/secret.yml, you can see queue config:

queue:
    connection: sync
    sync:
        driver: sync
    database:
        driver: database
        table: queue_jobs
        queue: default
        timeout: 60
    sqs:
        driver: sqs
        key:
        secret:
        queue: default
        region: ap-northeast-1
    ironmq:
        driver: ironmq
        project_id:
        token:
        queue: default
        region: mq-aws-eu-west-1-1
    rabbitmq:
        driver: rabbitmq
        queue: default
    beanstalkd:
        driver: beanstalkd
        queue: default
        host: 127.0.0.1
        timeout: 60
    resque:
        driver: resque
        queue: default
        host: localhost
        port: 6379
    failer:
        driver: database
        table: queue_failed_jobs

Change the connection option and Windwalker will use different drivers.

Sync

sync is default driver, it will not connect to any message queue service but only execute jobs immediately.

Database

database uses your database as queue storage, you must create migration file first by this command:

php windwalker queue table

SQS

sqs connect to AWS SQS service, you must prepare your access key and secret, fill in key and secret options.

Need aws/aws-sdk-php package installed.

IronMQ

ironmq uses iron.io's MQ service, please create an account and a project first, then get project_id and token from project setting.

Need iron-io/iron_mq package installed.

RabbitMQ

rabbitmq must install and run RabbitMQ first. By default, you don't need any host information, Windwalker will connect to localhost:5672 RabbitMQ instance with guest/guest credential.

If you want to configure the connection and account, add these options after rabbitmq setting:

rabbitmq:
    driver: rabbitmq
    queue: default
    # Add these
    host: localhost
    port: 5672
    user: guest
    password: guest

Need php-amqplib/php-amqplib package installed.

Beanstalkd

beanstalkd is a very simple, lightweight message queue, just install it and run, then connect to 127.0.0.1 to use it. You can change the host by modify settings.

Need pda/pheanstalk package installed.

PHP Resque (Redis)

resque is a port of ruby resque in PHP and work with Redis. Default it will connect to localhost:6379, to chane this host, modify host and port in secret.yml.

Need chrisboulton/php-resque <= 1.2 installed. If you want to use delayed messages, you must also install chrisboulton/php-resque-scheduler.

Get Queue Object

// In controller
$queue = $this->app->queue;

// By Container
$queue = $container->get('queue');

// By Ioc
$queue = Ioc::getQueue();

Create A Job

You task or logic must wrap in a JobInterface instance:

class HelloJob implements JobInterface
{
    public function getName()
    {
        return 'hello';
    }

    public function execute()
    {
        echo 'Hello';
    }
}

Then push it to queue:

// In controller

$this->app->queue->push(new HelloJob);

Now we can wait works handle it later.

Add More Information to Job

Sometimes you need more information to handle things, add them to constructor:

class HelloJob implements JobInterface
{
    protected $url;
    protected $path;
    protected $size;
    protected $crop;

    public function __construct($url, $path, $size = 600, $crop = true)
    {
        $this->url = $url;
        $this->path = $path;
        $this->size = $size;
        $this->crop = $crop;
    }

    public function getName()
    {
        return 'hello';
    }

    public function execute()
    {
        $imgData = (new HttpClient)->get($this->url);

        ImageHelper::load($imgData)
            ->resize($this->size, $this->size, $this->crop)
            ->save($this->path)
    }
}

Then inject these information when you creating Jobs:

$this->app->queue->push(new HelloJob(
    'http://example/image.jpg',
    WINDWALKER_PUBLIC . '/images/image.jpg',
    400,
    true
));

Create Job With System Services

If you need some system object, use DI pattern to declare it in constructor:

use Windwalker\Core\Error\Handler\ErrorHandlerInterface;
use Windwalker\Core\Logger\LoggerManager;

class HelloJob implements JobInterface
{
    protected $error;
    protected $data;
    private $logger;

    public function __construct(ErrorHandlerInterface $error, LoggerManager $logger, $data = [])
    {
        $this->error = $error;
        $this->data = $data;
        $this->logger = $logger;
    }

    public function getName()
    {
        return 'hello';
    }

    public function execute()
    {
        if (/* Failure */)
        {
            $this->logger->error('error', 'Something wrong...');
        }
    }
}

And you must resolve the dependencies by container:

$this->app->queue->push(
    $this->container->newInstance(HelloJob::class, ['data' => ['foo', 'bar']])
);

Use Queue Object

Push

Simple use push to add a job object as new message:

$queue->push(new MyJob($data));

Push message but wait for 10 seconds later to run:

$queue->push(new MyJob($data), 10);

Push to directly queue:

$queue->push(new MyJob($data), 0, 'flower');

Push raw data instead job object:

$queue->pushRaw(['flower' => 'sakura'], 0, 'flower');

Pop, Delete and Release

Use pop() to get next message:

$message = $queue->pop(); // QueueMessage object

$message->getJob();
$message->getBody();
$message->getRawBody();
$message->getId();
$message->getAttempts();
$message->get('flower'); // Get data from body

Delete a message:

$queue->delete($message);

// argument should be a QueueMessage object
$message = new \Windwalker\Core\Queue\QueueMessage;
$message->setId($id);

$queue->delete($message);

// You can delete by ID
$queue->delete($id);

// Check this message deleted
$message->isDeleted();

Release back to queue list (attempts will auto +1):

$queue->release($message);

// You can release by ID
$queue->release($id);

// Wait a while to run again:
$queue->release($message, 15);

Run Jobs By Worker

Windwalker has a worker to help you create daemon to run jobs, use this command:

php windwalker queue worker

Run once and set max fail attempts to 3 times:

php windwalker queue worker --once --tries=3

Every loop will sleep 1 second, you can set a shorter period, and set max time limit per job:

php windwalker queue worker --sleep=0.3 --timeout=120

The options:

  -c | --connection        The connection of queue.
  -o | --once              Only run next job.
  -d | --delay             Delay time for failed job to wait next run.
  -f | --force             Force run worker if in pause mode.
  -m | --memory            The memory limit in megabytes.
  -s | --sleep             Number of seconds to sleep after job run complete.
  -t | --tries             Number of times to attempt a job if it failed.
  --timeout                Number of seconds that a job can run.

If you want to run specify queue list, use arguments:

php windwalker queue worker default --tries=3

Or run multiple queues with priority.

php windwalker queue worker high normal mail --tries=3

Memory Control

Please make sure your Job will not store any unnecessary data to global object to prevent memory leak.

You can set memory limit of your worker, if memory exceeded, worker will close self.

php windwalker queue worker --memory=512 # MB

Forever Background Running

To make worker always running or restart itself, you can use process control system like Supervisor or forever.js to monitor your process.

forever -c php windwalker queue worker --tries=3 

Restart

After you modified your code, you may want to all background workers restart to use new code, use restart to send signal to all works:

php windwalker queue restart

Pause Mode

If you set site offline by windwalker system down, all workers will pause but not closed. After you set site online by system up, workers will continue work. You are allow to use --force to ignore pause mode:

php windwalker queue worker --force

Send Mail in Jobs

Windwalker separate web and console as two different environment, by default, many services which web loaded has not been loaded in console. If you write a Job like this:

class HelloJob implements JobInterface
{
    public function getName()
    {
        return 'hello';
    }

    public function execute()
    {
        $message = Mailer::createMessage()
            ->subject('Hello Mail')
            ->to('hello@windwalker.io')
            ->renderBody('hello', [], 'edge', 'flower');

        Mailer::send($message);
    }
}

You may get some error messages:

  • Key widget.manager has not registered in container.
  • Key uri has not registered in container.
  • Key router has not registered in container.

And more...

A solution is that generate MailMessage before job create and inject it, queue service will serialize and handle them later.

class HelloJob implements JobInterface
{
    protected $message;

    public function __construct(MailMessage $message)
    {
        $this->message = $message;
    }

    public function getName()
    {
        return 'hello';
    }

    public function execute()
    {
        Mailer::send($this->message);
    }
}

Push job:

$message = Mailer::createMessage()
    ->subject('Hello Mail')
    ->to('hello@windwalker.io')
    ->renderBody('hello', [], 'edge', 'flower');

$queue->push(new HelloJob($message))

Now since HelloJob only use Mail service without any other services, so it will work perfect.

But if you still need to generate MailMessage and mail body in Job, Windwalker provides a helper to implement it:

use Windwalker\Core\Console\ConsoleHelper;

class HelloJob implements JobInterface
{
    public function getName()
    {
        return 'hello';
    }

    public function execute()
    {
        ConsoleHelper::prepareWebEnvironment(
            'web',
            'http://domain.com/windwalker/www/dev.php/flower/sakuras',
            'windwalker/www/dev.php'
        );

        $message = Mailer::createMessage()
            ->subject('Hello Mail')
            ->to('hello@windwalker.io')
            ->renderBody('hello', [], 'edge', 'flower');

        Mailer::send($message);
    }
}

Use ConsoleHelper, it will help us prepare some important environment of web, you must provide 3 arguments:

  • env --> Use web or dev, Windwalker will load config about this env.
  • url --> Create a URL to simulate web request.
  • script --> The php script name start from DocumentRoot, The URL path after it will be route.

Now you can use $uri : UriData in your widget or view template to get uri information. You can also use $router service to build route.

If your route not found, maybe console environment didn't load your route settings, load it by ConsoleHelper:

ConsoleHelper::prepareWebEnvironment(
    'web',
    'http://domain.com/windwalker/www/dev.php/flower/sakuras',
    'windwalker/www/dev.php'
    [
        WINDWALKER_ETC . '/routing.yml'
    ]
);

Failed Jobs

If jobs failed, you may want to log them in a place and retry later or check the error message. Windwalker provides a database driven failed handler. Please run this command to create migration file:

php windwalker queue failed-table

After you migrate it, all failed jobs will store in queue_failed_jobs table. Use retry {ID} to retry a job:

php windwalker queue retry 3

Or retry all:

php windwalker queue retry --all

Delete failed jobs:

php windwalker queue remove-failed 3

php windwalker queue remove-failed --all

If you found a typo or error, please help us improve this document.