State replication with Symfony Workflow, Messenger, and RabbitMQ
In this article, we’ll see how we can replicate some data between two different applications that live in two different locations. But the special thing is that the network is not reliable! Yeah, network is never reliable, but here, it’s really not 😁
The project has two applications:
- the “Core”, that serves the website to the customer, hosted in the cloud. It’s a traditional e-commerce application built with Symfony.
- the “Warehouse”, that manages the logistics, hosted in each warehouse. It’s an application built on top of Symfony, with a heavy use of the Workflow component.
Since warehouses are located deep in the country land, the overall connectivity can fail from time to time. Many people work there. They can’t stop working if the warehouse loses internet connection. That’s why we must host the application on premise. Finally, all warehouses must send the state of each article to the Core, in near real time.
Section intitulée the-architectureThe Architecture
In each zone, we have a Symfony application and a RabbitMQ instance, and other services which are not relevant here. Each warehouse will publish messages to their local RabbitMQ. And when the connectivity is up, RabbitMQ will move messages from the warehouse to the core RabbitMQ instance.
Section intitulée symfony-workflowSymfony Workflow
The workflow lives in the Warehouse application, and is quite big: When the article reaches certain places, we want to notify the core application. So, in the workflow definition, we’ll add this information on each place’s metadata we want to notify.
framework:
workflows:
article:
places:
- name: 'new'
- name: 'received'
metadata:
sync-to-core: true
- name: 'opened'
Finally, we need a listener to publish a message with Messenger
namespace App\Article\Workflow;
use App\Article\Messenger\Message\SyncArticleToCoreMessage;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Workflow\Event\TransitionEvent;
class SyncListener
{
public function __construct(
private readonly MessageBusInterface $bus,
) {
}
#[AsEventListener('workflow.article.transition')]
public function onEnteredNew(TransitionEvent $e)
{
// We have only one place since it's a state machine
$to = $e->getTransition()->getTos()[0];
$shouldSync = (bool) $e->getWorkflow()->getMetadataStore()->getMetadata('sync-to-core', $to);
if (!$shouldSync) {
return;
}
// SyncArticleToCoreMessage is a POPO
$this->bus->dispatch(new SyncArticleToCoreMessage(
$e->getSubject()->getId(),
$to,
$e->getContext(),
));
}
}
From the workflow, we dispatch a Messenger message. But remember, this very application won’t consume the message. The message must be routed to another RabbitMQ, to be consumed by another application!
Section intitulée how-to-move-messages-from-one-rabbitmq-to-another-oneHow to Move Messages from one RabbitMQ to Another One?
We’ll use RabbitMQ Federation plugin! This plugin will connect one server (downstream – Core in our case) to another (upstream – warehouses). It will create buffer queues, exchanges and all it needs to perform the replication. This plugin is now integrated within the RabbitMQ server, so it’s only a matter of enabling it.
Section intitulée how-to-configure-rabbitmq-federationHow to Configure RabbitMQ Federation
First we need to enable the plugin. On all RabbitMQ nodes, run:
rabbitmq-plugins enable rabbitmq_federation_management
If you want to reproduce it, we provide this docker-compose.yml
file:
version: "3.8"
services:
rabbitmq:
build:
target: rabbitmq
container_name: rabbitmq
rabbitmq-wh-1:
build:
target: rabbitmq
container_name: rabbitmq-wh-1
rabbitmq-wh-2:
build:
target: rabbitmq
container_name: rabbitmq-wh-2
And the dockerfile:
FROM rabbitmq:${RABBITMQ_VERSION:-management-alpine} as rabbitmq
ADD --chmod=0755 https://raw.githubusercontent.com/rabbitmq/rabbitmq-server/v3.11.x/deps/rabbitmq_management/bin/rabbitmqadmin /usr/local/bin/rabbitmqadmin
RUN rabbitmq-plugins enable rabbitmq_federation_management
On the downstream server, aka the “Core” server, we declare an exchange, a queue, and bind them. This queue will receive all messages from all warehouses.
docker exec rabbitmq rabbitmqadmin declare exchange name=upstream-wh type=fanout
docker exec rabbitmq rabbitmqadmin declare queue name=sync-from-wh
docker exec rabbitmq rabbitmqadmin declare binding source=upstream-wh destination=sync-from-wh
On the same server, we add all upstream servers:
docker exec rabbitmq rabbitmqctl set_parameter federation-upstream upstream-wh1 '{"uri":"amqp://rabbitmq-wh-1", "exchange":"to-core"}'
# …
And we setup a policy that configures the exchange to fetch messages from upstream:
docker exec rabbitmq rabbitmqctl set_policy --apply-to exchanges upstream-wh "^upstream-wh" '{"federation-upstream-set":"all"}'
Finally, we need to declare an exchange in each warehouse. These exchanges will automatically forward messages to the downstream server. To be accurate, the exchange will route messages to a local “buffer” queue, and the downstream server will consume them. If the connectivity is down, no problem! The messages will stay in warehouse buffers until the connectivity is back.
docker exec rabbitmq-wh-1 rabbitmqadmin declare exchange name=to-core type=fanout
Now, we need to configure a bit Symfony Messenger, because our setup is not the default one.
Section intitulée the-messenger-configurationThe Messenger Configuration
First thing first, we don’t want to send serialized PHP over the network. It’s not interoperable and can cause major crashes to the receiver. We already wrote about messenger and interoperability. So we won’t talk too much about JSON, custom serializer and Messenger configuration. We let you read the blog post for more information. Anyway, we need to configure the transport on each application.
Section intitulée configuration-of-the-warehouseConfiguration of the Warehouse
The Messenger configuration looks like this:
framework:
messenger:
transports:
to-core:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
serializer: App\Messenger\Serializer\JsonSerializer
options:
auto_setup: false
exchange:
name: to-core
read_timeout: 5
write_timeout: 5
connect_timeout: 5
confirm_timeout: 5
rpc_timeout: 5
routing:
App\Article\Messenger\Message\SyncArticleToCoreMessage: to-core
- we use a custom serializer for serializing the message in JSON
- we disable the
auto_setup
, we already configured it in a previous chapter - we only specify which exchange we want to use:
to-core
- we configure the routing to use this transport
- we configure all timeouts Fun fact: we don’t even need to create a handler since handling is done by another application
Section intitulée configuration-of-the-coreConfiguration of the Core
The Messenger configuration looks like this:
framework:
messenger:
transports:
from-warehouse:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
serializer: App\Messenger\Serializer\JsonSerializer
options:
queues:
sync-from-wh: ~
read_timeout: 5
write_timeout: 5
connect_timeout: 5
confirm_timeout: 5
rpc_timeout: 5
routing:
App\Article\Messenger\Message\SyncFromWareHouseMessage: from-warehouse
The configuration is similar to the warehouse, but here we specify the queue only. And we let Messenger set everything up if it’s not the case, so the retry mechanism can work.
Section intitulée conclusionConclusion
This use case is not so common, but we are pretty sure some of you can have the same. With the right tools, RabbitMQ, its federation plugin, and symfony/messenger, it’s straightforward to send updates from one application to another, and deal with network failure. To sum everything up, there following events occur
- Someone applies a transition in the warehouse’s workflow;
- We check if the place must be synced via the place metadata;
- We send a message to a special exchange with Messenger;
- This exchange will route the message to a queue;
- When the connectivity is up, the Core’s RabbitMQ will fetch all messages from the WH one, and place it in
sync-from-wh
queue; - A Core handler consumes and processes the message;
- If it fails, the retry system will automatically republish the message. 🎉 Everything is now in sync 🎉
We hope you have learnt more about the different components used!
Commentaires et discussions
Nos formations sur ce sujet
Notre expertise est aussi disponible sous forme de formations professionnelles !
Symfony avancée
Découvrez les fonctionnalités et concepts avancés de Symfony
Ces clients ont profité de notre expertise
JoliCode a assuré le développement et les évolutions d’une des API centrale de l’épargne salariale chez Groupama. Cette API permet à plusieurs applications de récupérer des informations et d’alimenter en retour le centre de donnée opérationnel. Cette pièce applicative centrale permet de développer rapidement des applications avec une source de vérité…
LOOK Cycle bénéficie désormais d’une nouvelle plateforme eCommerce disponible sur 70 pays et 5 langues. La base technique modulaire de Sylius permet de répondre aux exigences de LOOK Cycle en terme de catalogue, produits, tunnel d’achat, commandes, expéditions, gestion des clients et des expéditions.
Afin de soutenir le développement de son trafic, Qobuz a fait appel à JoliCode afin d’optimiser l’infrastructure technique du site et les échanges d’informations entre les composants de la plateforme. Suite à la mise en place de solution favorisant l’asynchronicité et la performance Web côté serveur, nous avons outillé la recherche de performance et…