Queues

Queues are a powerful architectural pattern for managing background tasks, improving performance, and scaling applications effectively. AssegaiPHP provides first-class support for queueing through the assegaiphp/rabbitmq and assegaiphp/beanstalkd packages, both of which offer robust integrations with industry-standard queue systems:

You can install either package using Composer or the Assegai CLI:

$ composer require assegaiphp/rabbitmq
$ composer require assegaiphp/beanstalkd

or

$ assegai add rabbitmq
$ assegai add beanstalkd

Configuring Queues

To define and configure queues, edit the config/queues.php file in your project. Here's an example:

config/queues.php
return [
  'rabbitmq' => [
    'notes' => [
      'host' => 'localhost',
      'port' => 5672,
      'username' => 'guest',
      'password' => 'guest',
      'vhost' => '/',
      'exchange_name' => 'notes',
      'routing_key' => 'notes',
      'passive' => false,
      'durable' => true,
      'exclusive' => false,
      'auto_delete' => false,
    ],
  ],
  'beanstalk' => [
    'notifications' => [
      'host' => 'localhost',
      'port' => 11300,
      'connection_timeout' => 10,
      'receive_timeout' => 10,
    ],
  ],
];

Each top-level key (rabbitmq, beanstalk) represents a queue driver. Inside each driver, you define named queues (e.g., notes, notifications) along with their respective connection and behavior settings.

WARNING Queue names used in #[InjectQueue()] or #[Processor()] attributes must match the ones defined in this configuration file.

Why Use Queues?

Queues help you solve several common problems in modern applications:

1. Smooth Out Processing Peaks

Smooth Out Processing Peaks

Instead of executing resource-intensive tasks (like sending emails, generating reports, or processing images) directly in the user request lifecycle, you can enqueue those tasks and handle them asynchronously. Worker processes can pull tasks from the queue and execute them at a steady, scalable rate, ensuring that user interactions remain fast and responsive.

2. Decouple and Distribute Workloads

Decouple and Distribute Workloads

Queues make it easy to break large, monolithic operations into smaller jobs that can be processed in parallel or by separate services. This allows you to offload CPU- or IO-intensive tasks to background workers without blocking the main application flow.

3. Reliable Communication Across Services

Reliable Communication Across Services

In distributed systems, queues act as reliable communication channels. You can enqueue jobs in one service and process them in another, even across different machines or containers. If a worker crashes mid-task, the message stays in the queue and can be retried or reprocessed automatically once the system recovers.

Queue Providers

You can choose the best tool for your project based on your needs:

Provider Library Best For
RabbitMQ PhpAmqpLib Complex routing, durable messaging, fanout
Beanstalkd Pheanstalk Lightweight, fast job queueing

AssegaiPHP wraps both libraries behind a unified QueueInterface, making it easy to switch or extend queue providers.

Producers

Here’s how you can use Beanstalkd to enqueue a job in AssegaiPHP. In the following example, we define a NotificationsService that pushes a CreateNotificationDto payload onto a Beanstalk queue:

src/Notifications/NotificationsService.php
namespace Assegai\App\Notifications;

use Assegai\Core\Attributes\Injectable;
use Assegai\Core\Queues\Attributes\InjectQueue;
use Assegai\Core\Queues\Interfaces\QueueInterface;
use Assegaiphp\Todo2\Notifications\DTOs\CreateNotificationDto;

#[Injectable]
readonly class NotificationsService
{
  public function __construct(
    #[InjectQueue('beanstalk.notifications')] private QueueInterface $notificationsQueue
  ) {}

  public function create(CreateNotificationDto $createNotificationDto): string
  {
    $this->notificationsQueue->add($createNotificationDto);
    return 'Notification enqueued!';
  }
}
  • The #[InjectQueue('beanstalk.notifications')] attribute tells the framework to inject a queue instance bound to the beanstalk.notifications tube.
  • The call to $this->notificationsQueue->add(...) places the DTO payload into the queue, where it will be picked up by a worker later.

Consumers

To handle jobs on the queue, you can define workers that listen to specific queues. These workers deserialize the payload and execute your custom business logic (e.g., saving to the database, sending an email, pushing notifications, etc.).

AssegaiPHP provides decorators and interfaces to simplify consumer logic, event listening, and job retries. Here’s how to create a Beanstalk or RabbitMQ job consumer in AssegaiPHP:

namespace Assegaiphp\Todo2\Notifications\Consumers;

use Assegai\Core\Attributes\Injectable;
use Assegai\Core\Queues\Attributes\Processor;
use Assegai\Core\Queues\Interfaces\QueueProcessResultInterface;
use Assegai\Core\Queues\QueueProcessResult;
use Assegai\Core\Queues\WorkerHost;

#[Processor('beanstalk.notifications')]
class NotificationsConsumer extends WorkerHost
{
  public function process(object $job): QueueProcessResultInterface
  {
    // Simulate background work    
    for ($i = 0; $i <= 100; $i++) {
      $this->updateProgress($job, $i);
      // perform work here, e.g., send email, save to database, etc.;      
    }

    return new QueueProcessResult(['status' => 'done']);
  }
}

Key Concepts

  • #[Processor('beanstalk.notifications')] binds this consumer to the beanstalk.notifications queue.
  • The process() method receives the job payload and executes your business logic.
  • The method returns a QueueProcessResult indicating success or failure.
TIP You can also call $this->updateProgress($job, $percent) inside the loop to track job progress, and AssegaiPHP will emit events if a progress listener is defined.

Error Handling

If your process method throws an exception or returns a QueueProcessResult with exceptions/errors, AssegaiPHP will treat the job as failed and may retry it depending on your queue's configuration.

Putting It Together

With producers like NotificationsService creating jobs and consumers like NotificationsConsumer processing them, AssegaiPHP enables a clean separation of concerns between business logic and background processing.

You can scale horizontally by spawning multiple worker processes, each handling jobs from one or more queues concurrently.