Queues
As described in the Asynchronous Processing chapter, your application layer can allow commands to be queued via a command queuer driving port. Additionally, it may also choose to implement some internal processes as internal commands that are executed asynchronously.
To do either (or both!), you need to define a queue driven port. The adapter implementation then handles pushing commands onto a queue, and dispatching the command when it is pulled from the queue.
We provide several queue adapters that you can use. These are designed to be simple to use and allow you to plug into any PHP queue implementation that you choose to use. This chapter describes these queue implementations.
Queue Port
To allow your application layer to queue commands, we need a driven port for the queue. Although there is a generic queue interface, our bounded context needs to expose its specific queue interface.
We do this by defining an interface in our application's driven ports:
namespace App\Modules\EventManagement\Application\Ports\Driven\Queue;
use CloudCreativity\Modules\Contracts\Application\Ports\Driven\Queue as Port;
interface Queue extends Port
{
}
If you have a command queuer driving port, you will need to inject your queue adapter into the command queuer. See the command queuer documentation for examples.
This allows the presentation and delivery layer to asynchronously dispatch commands. When pulling commands from the queue, your queue adapter will need to dispatch the command to the command bus.
We provide two concrete classes that allow you to push work onto a queue via your preferred PHP implementation. If neither of these work for you, you can instead write a queue that implements the above interface.
Internal Commands
The asynchronous processing chapter introduces the concept of internal commands. These are commands that are not exposed as use cases of your bounded context. Instead they are used to split long-running or complex work up into smaller write operations (commands) that are sequenced via a workflow.
If you have an internal command bus, you can provide a separate queue port for these commands. This segregates internal commands to a separate queue, which is advantageous to separate the concerns of commands that are use cases of your bounded context (driving ports) or internal to the application layer.
In this scenario, define another driven port:
namespace App\Modules\EventManagement\Application\Ports\Driven\Queue;
use CloudCreativity\Modules\Contracts\Application\Ports\Driven\Queue as Port;
interface InternalQueue extends Port
{
}
Then wherever your application layer needs to queue an internal command, it can do this via the internal queue port.
Closure Queuing
Our ClosureQueue
allows you to register closures that handle pushing work onto your queue. This is useful where wiring in a queue implementation is extremely simple - as is the case with our Laravel example below.
Define a queue adapter by extending this class:
namespace App\Modules\EventManagement\Infrastructure\Queue;
use App\Modules\EventManagement\Application\Ports\Driven\Queue\Queue;
use CloudCreativity\Modules\Infrastructure\Queue\ClosureQueue;
final class QueueAdapter extends ClosureQueue
implements Queue
{
}
Then you can create the adapter by providing it with the default closure for queuing commands. For example:
namespace App\Modules\EventManagement\Infrastructure\Queue;
use App\Modules\EventManagement\Application\Ports\Driven\Queue\Queue;
use CloudCreativity\Modules\Contracts\Toolkit\Messages\Command;
use CloudCreativity\Modules\Infrastructure\Queue\Middleware\LogPushedToQueue;
use CloudCreativity\Modules\Toolkit\Pipeline\PipeContainer;
final class QueueAdapterProvider
{
public function __construct(
private readonly LoggerInterface $logger,
) {
}
public function getQueue(): Queue
{
$adapter = new QueueAdapter(
fn: function (Command $command): void {
DispatchCommandJob::dispatch($command);
},
middleware: $middleware = new PipeContainer(),
);
$middleware->bind(
LogPushedToQueue::class,
fn () => new LogPushedToQueue($this->logger),
);
$queue->through([LogPushedToQueue::class]);
return $adapter;
}
}
TIP
As shown, the queue adapter can be configured with queue middleware.
This default closure will be used for all commands, unless you register closures for specific commands. For example:
$queue = new QueueAdapter(
fn: function (Command $command): void {
DispatchCommandJob::dispatch($command);
},
);
$queue->bind(
RecalculateSalesAtEventCommand::class,
function (RecalculateSalesAtEventCommand $command): void {
DispatchCommandJob::dispatch($command)
->onQueue('reporting');
},
);
Class-Based Queuing
Our ComponentQueue
allows you to use define queuing logic on classes. Each class is an enqueuer - a component that handles pushing an item (in this case a command message) onto a queue.
This is useful in scenarios where you want to use constructor dependency injection when integrating with your PHP queue implementation. Or alternatively, if wiring into your implementation is more complex than can be defined in a simple closure.
Define a queue adapter by extending this class:
namespace App\Modules\EventManagement\Infrastructure\Queue;
use App\Modules\EventManagement\Application\Ports\Driven\Queue\Queue;
use CloudCreativity\Modules\Infrastructure\Queue\ComponentQueue;
final class QueueAdapter extends ComponentQueue
implements Queue
{
}
Then you can create the adapter by providing it with a default enqueuer for queuing commands. For example:
namespace App\Modules\EventManagement\Infrastructure\Queue;
use App\Modules\EventManagement\Application\Ports\Driven\Queue\Queue;
use CloudCreativity\Modules\Infrastructure\Queue\Middleware\LogPushedToQueue;
use CloudCreativity\Modules\Infrastructure\Queue\EnqueuerContainer;
use CloudCreativity\Modules\Toolkit\Pipeline\PipeContainer;
final class QueueAdapterProvider
{
public function __construct(
private readonly LoggerInterface $logger,
) {
}
public function getQueue(): Queue
{
$queue = new QueueAdapter(
enqueuers: new EnqueuerContainer(
fn () => new DefaultEnqueuer(),
),
);
$middleware->bind(
LogPushedToQueue::class,
fn () => new LogPushedToQueue($this->logger),
);
$queue->through([LogPushedToQueue::class]);
return $adapter;
}
}
TIP
As shown, the queue adapter can be configured with queue middleware.
The closure provided to the adapter's constructor is the default enqueuer factory that will be used for all work that is being queued. You can bind alternative enqueuers for specific commands as follows:
$queue = new QueueAdapter(
enqueuers: $enqueuers = new EnqueuerContainer(
fn () => new DefaultEnqueuer(),
),
);
$enqueuers->bind(
RecalculateSalesAtEventCommand::class,
fn () => new ReportingEnqueuer(),
);
The enqueuer class can be implemented as you need. All it needs is a push()
method that queues the given command. For example:
namespace App\Modules\EventManagement\Infrastructure\Queue;
use CloudCreativity\Modules\Contracts\Toolkit\Messages\Command;
final class DefaultEnqueuer
{
public function push(Command $command): void
{
// ...implementation
}
}
final class ReportingEnqueuer
{
public function push(RecalculateSalesAtEventCommand $command): void
{
// ...implementation
}
}
Writing a Queue
If neither of these two queue adapters work for you, you can write your own queue adapter. This is a simple class that implements the port interface that is extended in your application layer:
namespace CloudCreativity\Modules\Application\Ports\Driven\Queue;
use CloudCreativity\Modules\Contracts\Toolkit\Messages\Command;
interface Queue
{
/**
* Push a command on to the queue.
*
* @param Command $command
* @return void
*/
public function push(Command $command): void;
}
TIP
If you want your custom queue to use middleware, take a look at either of our two implementations to see how that works.
Laravel Example
Laravel provides a full-featured queue implementation, via queue jobs. For our bounded context to use this as the queue adapter, we would need a Laravel job that takes a command message and dispatches it to the command bus.
TIP
If we also have an internal command bus, we would need another Laravel job that takes an internal command message and dispatches it to the internal command bus.
Default Queue Job
For example, a default Laravel job for queuing and dispatching commands would be:
namespace App\Modules\EventManagement\Infrastructure\Queue;
use App\Modules\EventManagement\Application\Ports\Driving\CommandBus;
use CloudCreativity\Modules\Contracts\Toolkit\Messages\Command;
use CloudCreativity\Modules\Toolkit\Result\FailedResultException;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
class DispatchCommandJob implements ShouldQueue
{
use Dispatchable;
use InteractsWithQueue;
use Queueable;
public function __construct(
public readonly Command $command
) {
}
public function handle(CommandBus $bus): void
{
$result = $bus->dispatch($this->command);
if ($result->didFail()) {
throw new FailedResultException($result);
}
}
}
Specific Queue Job
This default queue job may work for all of your asynchronous processing via command messages. However, you may want to customise the behaviour for specific commands. For example, to prevent job overlapping, or customise how failures are handled.
An example of a queue job for a specific command might be:
namespace App\Modules\EventManagement\Infrastructure\Queue;
use App\Modules\EventManagement\Application\Ports\Driving\CommandBus;
use App\Modules\EventManagement\Application\UseCases\Commands\{
RecalculateSalesAtEvent\ErrorCodeEnum,
RecalculateSalesAtEvent\RecalculateSalesAtEventCommand,
};
use CloudCreativity\Modules\Toolkit\Result\FailedResultException;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\Middleware\WithoutOverlapping;
class QueueRecalculateSalesAtEventJob implements ShouldQueue
{
use Dispatchable;
use InteractsWithQueue;
use Queueable;
public function __construct(
public readonly RecalculateSalesAtEventCommand $command
) {
}
public function handle(CommandBus $bus): void
{
$result = $bus->dispatch($this->command);
$errors = $result->errors();
if ($errors->contains(ErrorCodeEnum::TemporaryFailure)) {
$this->release(now()->addSeconds(30));
return;
}
if ($result->didFail()) {
throw new FailedResultException($result);
}
}
public function middleware(): array
{
return [
new WithoutOverlapping($this->command->eventId->value),
];
}
}
TIP
Hopefully you can see from these examples that this queue implementation works well with the encapsulation of our business logic.
The Laravel job is purely an infrastructure concern. There is no business logic in its handle()
method, as that is encapsulated in the command handler that will be executed via the command bus.
Instead, the Laravel handle()
method and class structure simply needs to concern itself with how the job should run on the Laravel queue, and what it should do if it receives a failure result.
Creating the Queue
We can now create our queue adapter. This will extend the closure queue adapter described earlier in this chapter. For example, to create the queue that is injected into our command bus:
// default command queuing
$queue = new QueueAdapter(
fn: function (Command $command): void {
DispatchCommandJob::dispatch($command);
},
);
// specific command queuing
$queue->bind(
RecalculateSalesAtEventCommand::class,
function (RecalculateSalesAtEventCommand $command): void {
QueueRecalculateSalesAtEventJob::dispatch($command);
},
);
Middleware
Both our queue adapters give you complete control over how to compose the queuing of command messages, via middleware. Middleware is a powerful way to add cross-cutting concerns to your queue, such as logging.
To apply middleware to the queue, use the through()
method - as shown in the example below. Middleware is executed in the order it is added to the queue.
use CloudCreativity\Modules\Infrastructure\Queue\ClosureQueue;
use CloudCreativity\Modules\Infrastructure\Queue\Middleware\LogPushedToQueue;
use CloudCreativity\Modules\Toolkit\Pipeline\PipeContainer;
$queue = new ClosureQueue(
fn: function (Command $command): void {
DispatchCommandJob::dispatch($command);
},
middleware: $middleware = new PipeContainer(),
);
$queue->through([LogPushedToQueue::class]);
Queue Logging
Use our LogPushedToQueue
middleware to log a command being pushed into the queue. The middleware takes a PSR Logger.
use CloudCreativity\Modules\Infrastructure\Queue\Middleware\LogPushedToQueue;
$queue = new ClosureQueue(
fn: function (Command $command): void {
DispatchCommandJob::dispatch($command);
},
middleware: $middleware = new PipeContainer(),
);
$middleware->bind(
LogPushedToQueue::class,
fn (): LogPushedToQueue => new LogPushedToQueue(
$this->dependencies->getLogger(),
),
);
$queue->through([LogPushedToQueue::class]);
The use of this middleware is identical to that described in the Commands chapter. See those instructions for more information, such as configuring the log levels and customising the log context for the command message or queue job.
Writing Middleware
You can write your own middleware to suit your specific needs. Middleware is a simple invokable class, with the following signature:
namespace App\Modules\Shared\Infrastructure\Queue\Middleware;
use Closure;
use CloudCreativity\Modules\Contracts\Infrastructure\Queue\QueueMiddleware;
use CloudCreativity\Modules\Contracts\Toolkit\Messages\Command;
final class MyQueueMiddleware implements QueueMiddleware
{
/**
* Handle the command message being queued.
*
* @param Command $command
* @param Closure(Command): void $next
* @return void
*/
public function __invoke(
Command $command,
Closure $next,
): void
{
// executes before the command is pushed to the queue.
$next($command);
// executes after the command is pushed to the queue.
}
}
TIP
If you're writing middleware that is only meant to be used for a specific command message or queue job, do not implement the QueueMiddleware
interface. Instead, use the same signature but change the type-hint for to the specific command message your middleware is designed for.
Testing
We provide a fake queue that you can use in tests. This is the CloudCreativity\Modules\Testing\FakeQueue
class.
You can access any queued commands via the $commands
property:
use App\Modules\EventManagement\Application\Ports\Driven\Queue\Queue as Port;
use CloudCreativity\Modules\Testing\FakeQueue;
$queue = new class () extends FakeQueue implements Port {};
// do work that might queue a command
$this->assertCount(2, $queue->commands);
If you expect exactly one command to be queued, use the sole()
helper:
$expected = new SomeCommand();
$this->assertEquals($expected, $queue->sole());