CodeNewbie Community 🌱

Cover image for How to publish domain events with doctrine listener
Albert Colom
Albert Colom

Posted on • Originally published at Medium on

How to publish domain events with doctrine listener

Publish domain events on the Queue system when Doctrine flush entity using clean architectures.

In this example, we use:

Create DomainEvent Entity

First of all, create an abstract Entity of which they will inherit all domain events. This entity will be placed on a shared domain as it will be used in different contexts and of this example it will be placed in the writing layer as we implement CQRS.

src/Shared/Domain/Write/Event/DomainEvent.php

<?php

declare(strict_types=1);

namespace App\Shared\Domain\Write\Event;

use DateTimeImmutable;

abstract class DomainEvent
{
    private const DATE_FORMAT = 'Y-m-d H:i:s';

    public function __construct(private ?string $occurredOn = null)
    {
        $this->occurredOn = $occurredOn ?? (new DateTimeImmutable())->format(self::DATE_FORMAT);
    }

    public function occurredOn(): string
    {
        return $this->occurredOn;
    }
}
Enter fullscreen mode Exit fullscreen mode

Create AggregateRoot Entity

Create an abstract class called AggregateRoot witch inherit all root entities. The AggregateRoot is used to define the main entity from aggregate and can store on memory and publish the domain events from the entity.

src/Shared/Domain/Write/Aggregate/AggregateRoot.php

<?php

declare(strict_types=1);

namespace App\Shared\Domain\Write\Aggregate;

use App\Shared\Domain\Write\Event\DomainEvent;

abstract class AggregateRoot
{
    private array $domainEvents = [];

    final protected function recordEvent(DomainEvent $domainEvent): void
    {
        $this->domainEvents[] = $domainEvent;
    }

    final public function domainEventsEmpty(): bool
    {
        return empty($this->domainEvents);
    }

    final public function pullDomainEvents(): array
    {
        $recordedEvents = $this->domainEvents;
        $this->domainEvents = [];

        return $recordedEvents;
    }
}
Enter fullscreen mode Exit fullscreen mode

And create a simple root entity to use which uses the above class and record an event when construct.

Firstly create a simple Event like a DTO called FooWasCreated .

src/Context/Foo/Domain/Write/Event/FooWasCreated.php

<?php

declare(strict_types=1);

namespace App\Context\Foo\Domain\Write\Event;

use App\Shared\Domain\Write\Event\DomainEvent;

final class FooWasCreated extends DomainEvent
{
    public function __construct(
        public readonly string $id,
        public readonly string $name,
        public readonly string $createdAt,
        ?string $occurredOn = null
    ) {
        parent::__construct($occurredOn);
    }
}
Enter fullscreen mode Exit fullscreen mode

Then can create a root entity to record the event and store it in memory when construct the entity.

src/Context/Foo/Domain/Write/Foo.php

<?php

declare(strict_types=1);

namespace App\Context\Foo\Domain\Write;

use App\Context\Foo\Domain\Write\Event\FooWasCreated;
use App\Shared\Domain\Write\Aggregate\AggregateRoot;
use DateTimeImmutable;

final class Foo extends AggregateRoot
{
    public function __construct(
        private string $id,
        private string $name,
        private DateTimeImmutable $createdAt
    ) {
        $this->recordEvent(
            new FooWasCreated($id, $name, $createdAt->format('Y-m-d H:i:s'))
        );
    }

    public function id(): string
    {
        return $this->id;
    }

    public function name(): string
    {
        return $this->name;
    }

    public function createdAt(): DateTimeImmutable
    {
        return $this->createdAt;
    }
}
Enter fullscreen mode Exit fullscreen mode

NOTE : This entity must define as Doctrine mapping because when persist this entity trigger the events. In this example not define doctrine infrastructure layer to persist the entity, I think you can find a lot of tutorials about that and I don’t want to make this tutorial longer.

Define bus for events with messenger

Define the interface on a shared domain in the write layer.

src/Shared/Domain/Write/Bus/EventBus.php

<?php

declare(strict_types=1);

namespace App\Shared\Domain\Write\Bus\Event;

use App\Shared\Domain\Write\Event\DomainEvent;

interface EventBus
{
    public function publish(DomainEvent ...$domainEvents): void;
}
Enter fullscreen mode Exit fullscreen mode

Implement the interface with the concrete Symfony Messenger on the infrastructure layer. This bus is quite simple just has one method publish is responsible for dispatching messages on MessageBus .

src/Shared/Infrastructure/Bus/MessengerEventBus.php

<?php

declare(strict_types=1);

namespace App\Shared\Infrastructure\Bus;

use App\Shared\Domain\Write\Bus\EventBus;
use App\Shared\Domain\Write\Event\DomainEvent;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;

final readonly class MessengerEventBus implements EventBus
{
    public function __construct(private MessageBusInterface $messageBus)
    {
    }

    public function publish(DomainEvent ...$domainEvents): void
    {
        foreach ($domainEvents as $currentEvent) {
            $this->messageBus->dispatch(
                (new Envelope($currentEvent))->with(new DispatchAfterCurrentBusStamp())
            );
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Define Bus service messenger on messenger.yaml in this example define async bus to publish events on the ampqp transport with RabbitMQ.

In this case, we use ampqp because no need other dependences than Symfony Messenger but you can define other transports like a kafka

NOTE : You can find more information on: https://symfony.com/doc/current/messenger.html#transports-async-queued-messages

.env

MESSENGER_TRANSPORT_DSN=amqp://guest:guest@rabbitmq:5672/%2f/messag
Enter fullscreen mode Exit fullscreen mode

config/packages/messenger.yaml

To simplify the example I just define de basic configuration of the event bus but you should define the other bus like a query or command and the correct retry policy.

framework:
    messenger:
        transports:
            ampqp:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
        buses:
            async.event.bus:
                default_middleware: allow_no_handlers
        routing:
            'App\Shared\Domain\Write\Event\DomainEvent': ampqp
Enter fullscreen mode Exit fullscreen mode

config/services.yaml

To simplify this example I just defined the minimum services definition for this case but you need to define your different services need for your application.

parameters:

services:
    _defaults:
        autowire: true
        autoconfigure: true

    App\:
        resource: '../src/'

    App\Shared\Infrastructure\Bus\MessengerEventBus:
        arguments:
            - '@async.event.bus'
Enter fullscreen mode Exit fullscreen mode

Create Doctrine Listener to publish DomainEvents when flush

The idea is quite simple when flush the entity the listener gets the entities to update from Doctrine UnitOfWork and publish the domain events if they have.

Once the events are published Symfony Messenger takes care of sending the Queue system.

<?php

declare(strict_types=1);

namespace App\Shared\Infrastructure\Listener;

use App\Shared\Domain\Write\Bus\Event\EventBus;
use App\Shared\Domain\Write\Aggregate\AggregateRoot;
use Doctrine\ORM\Event\OnFlushEventArgs;

final readonly class DoctrinePublishDomainEventsOnFlushListener
{
    public function __construct(private EventBus $eventBus)
    {
    }

    public function onFlush(OnFlushEventArgs $eventArgs): void
    {
        $unitOfWork = $eventArgs->getObjectManager()->getUnitOfWork();

        foreach ($unitOfWork->getScheduledEntityInsertions() as $entity) {
            $this->publishDomainEvent($entity);
        }

        foreach ($unitOfWork->getScheduledEntityUpdates() as $entity) {
            $this->publishDomainEvent($entity);
        }

        foreach ($unitOfWork->getScheduledEntityDeletions() as $entity) {
            $this->publishDomainEvent($entity);
        }

        foreach ($unitOfWork->getScheduledCollectionDeletions() as $collection) {
            foreach ($collection as $entity) {
                $this->publishDomainEvent($entity);
            }
        }

        foreach ($unitOfWork->getScheduledCollectionUpdates() as $collection) {
            foreach ($collection as $entity) {
                $this->publishDomainEvent($entity);
            }
        }
    }

    private function publishDomainEvent(object $entity): void
    {
        if ($entity instanceof AggregateRoot && !$entity->domainEventsEmpty()) {
            $this->eventBus->publish(...$entity->pullDomainEvents());
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

And define the listener on a service definition.

config/services.yaml


parameters:

services:
    _defaults:
        autowire: true
        autoconfigure: true

    App\:
        resource: '../src/'

    App\Shared\Infrastructure\Bus\MessengerEventBus:
        arguments:
            - '@async.event.bus'

    App\Shared\Infrastructure\Listener\DoctrinePublishDomainEventsOnFlushListener:
            tags:
                - { name: doctrine.event_listener, event: onFlush }
Enter fullscreen mode Exit fullscreen mode

NOTE : You can find more doctrine events on: https://www.doctrine-project.org/projects/doctrine-orm/en/2.15/reference/events.html#events-overview

Conclusion

In this example, you can see how to implement an EventBus with Symfony Messenger and implement doctrine listener to publish domain events on flush at RabbitMQ following clean architectures.


Original published at: albertcolom.com

Top comments (1)

Collapse
 
carsonreed432 profile image
carson Reed

The latest version of Messenger Lite offers improved speed, stability, and efficiency, focusing on delivering essential messaging functions without the bulk. This updated version retains core features like texting, voice calls, and photo sharing, optimized to use minimal data and device storage. It’s ideal for users with older devices or those in areas with limited connectivity, providing a streamlined way to stay connected without sacrificing performance.