Queues
Queues are a powerful architectural pattern for managing background tasks, improving performance,
and scaling applications effectively. AssegaiPHP provides first-class support for queueing through
the assegaiphp/rabbitmq
and assegaiphp/beanstalkd
packages, both of which offer robust integrations
with industry-standard queue systems:
- RabbitMQ via PhpAmqpLib
- Beanstalkd via Pheanstalk
You can install either package using Composer or the Assegai CLI:
$ composer require assegaiphp/rabbitmq $ composer require assegaiphp/beanstalkd
or
$ assegai add rabbitmq $ assegai add beanstalkd
Configuring Queues
To define and configure queues, edit the config/queues.php
file in your project. Here's an example:
return [ 'rabbitmq' => [ 'notes' => [ 'host' => 'localhost', 'port' => 5672, 'username' => 'guest', 'password' => 'guest', 'vhost' => '/', 'exchange_name' => 'notes', 'routing_key' => 'notes', 'passive' => false, 'durable' => true, 'exclusive' => false, 'auto_delete' => false, ], ], 'beanstalk' => [ 'notifications' => [ 'host' => 'localhost', 'port' => 11300, 'connection_timeout' => 10, 'receive_timeout' => 10, ], ], ];
Each top-level key (rabbitmq
, beanstalk
) represents a queue driver. Inside each
driver, you define named queues (e.g., notes
, notifications
) along with their
respective connection and behavior settings.
WARNING Queue names used in#[InjectQueue()]
or#[Processor()]
attributes must match the ones defined in this configuration file.
Why Use Queues?
Queues help you solve several common problems in modern applications:
1. Smooth Out Processing Peaks

Instead of executing resource-intensive tasks (like sending emails, generating reports, or processing images) directly in the user request lifecycle, you can enqueue those tasks and handle them asynchronously. Worker processes can pull tasks from the queue and execute them at a steady, scalable rate, ensuring that user interactions remain fast and responsive.
2. Decouple and Distribute Workloads

Queues make it easy to break large, monolithic operations into smaller jobs that can be processed in parallel or by separate services. This allows you to offload CPU- or IO-intensive tasks to background workers without blocking the main application flow.
3. Reliable Communication Across Services

In distributed systems, queues act as reliable communication channels. You can enqueue jobs in one service and process them in another, even across different machines or containers. If a worker crashes mid-task, the message stays in the queue and can be retried or reprocessed automatically once the system recovers.
Queue Providers
You can choose the best tool for your project based on your needs:
Provider | Library | Best For |
---|---|---|
RabbitMQ | PhpAmqpLib | Complex routing, durable messaging, fanout |
Beanstalkd | Pheanstalk | Lightweight, fast job queueing |
AssegaiPHP wraps both libraries behind a unified QueueInterface, making it easy to switch or extend queue providers.
Producers
Here’s how you can use Beanstalkd to enqueue a job in AssegaiPHP. In the following example, we define a
NotificationsService
that pushes a CreateNotificationDto
payload onto a Beanstalk queue:
namespace Assegai\App\Notifications; use Assegai\Core\Attributes\Injectable; use Assegai\Core\Queues\Attributes\InjectQueue; use Assegai\Core\Queues\Interfaces\QueueInterface; use Assegaiphp\Todo2\Notifications\DTOs\CreateNotificationDto; #[Injectable] readonly class NotificationsService { public function __construct( #[InjectQueue('beanstalk.notifications')] private QueueInterface $notificationsQueue ) {} public function create(CreateNotificationDto $createNotificationDto): string { $this->notificationsQueue->add($createNotificationDto); return 'Notification enqueued!'; } }
- The
#[InjectQueue('beanstalk.notifications')]
attribute tells the framework to inject a queue instance bound to the beanstalk.notifications tube. - The call to
$this->notificationsQueue->add(...)
places the DTO payload into the queue, where it will be picked up by a worker later.
Consumers
To handle jobs on the queue, you can define workers that listen to specific queues. These workers deserialize the payload and execute your custom business logic (e.g., saving to the database, sending an email, pushing notifications, etc.).
AssegaiPHP provides decorators and interfaces to simplify consumer logic, event listening, and job retries. Here’s how to create a Beanstalk or RabbitMQ job consumer in AssegaiPHP:
namespace Assegaiphp\Todo2\Notifications\Consumers; use Assegai\Core\Attributes\Injectable; use Assegai\Core\Queues\Attributes\Processor; use Assegai\Core\Queues\Interfaces\QueueProcessResultInterface; use Assegai\Core\Queues\QueueProcessResult; use Assegai\Core\Queues\WorkerHost; #[Processor('beanstalk.notifications')] class NotificationsConsumer extends WorkerHost { public function process(object $job): QueueProcessResultInterface { // Simulate background work for ($i = 0; $i <= 100; $i++) { $this->updateProgress($job, $i); // perform work here, e.g., send email, save to database, etc.; } return new QueueProcessResult(['status' => 'done']); } }
Key Concepts
#[Processor('beanstalk.notifications')]
binds this consumer to the beanstalk.notifications queue.- The
process()
method receives the job payload and executes your business logic. - The method returns a
QueueProcessResult
indicating success or failure.
TIP You can also call $this->updateProgress($job, $percent)
inside the loop to
track job progress, and AssegaiPHP will emit events if a progress listener is defined.
Error Handling
If your process
method throws an exception or returns a QueueProcessResult
with exceptions/errors, AssegaiPHP will treat the job as failed and may retry it depending on your queue's configuration.
Putting It Together
With producers like NotificationsService
creating jobs and consumers like NotificationsConsumer
processing
them, AssegaiPHP enables a clean separation of concerns between business logic and background processing.
You can scale horizontally by spawning multiple worker processes, each handling jobs from one or more queues concurrently.