Batching Symfony Messenger messages
Sometimes we want to make messages in Symfony Messenger that are consumed as a batch and not one by one. Recently we came across a situation where we send updated translations for our entities through Messenger and then send them to our translation provider.
But since there is a strong rate limitation on this translation provider, we can’t send them one by one. We must send them to store all received ones by a given consumer and send all the messages if we wait more than 10 seconds without a new message or if we have more than 100 messages stored.
Now let’s show how we made it works:
// Symfony Messenger Message:
class TranslationUpdate
{
public function __construct(
public string $locale,
public string $key,
public string $value,
) {
}
}
class TranslationUpdateHandler implements MessageHandlerInterface
{
private const BUFFER_TIMER = 10; // in seconds
private const BUFFER_LIMIT = 100;
private array $buffer = [];
public function __construct(
private MessageBusInterface $messageBus,
) {
pcntl_async_signals(true);
pcntl_signal(SIGALRM, \Closure::fromCallable([$this, 'batchBuffer']));
}
public function __invoke(TranslationUpdate $message): void
{
$this->buffer[] = $message;
if (\count($this->buffer) >= self::BUFFER_LIMIT) {
$this->batchBuffer();
} else {
pcntl_alarm(self::BUFFER_TIMER);
}
}
private function batchBuffer(): void
{
if (0 === \count($this->buffer)) {
return;
}
$translationBatch = new TranslationBatch($this->buffer);
$this->messageBus->dispatch($translationBatch);
$this->buffer = [];
}
}
Here we can see the Messenger message, basically we dispatch it every time, we have a translation update but this could apply to any message we need.
Then we have the message handler which will get all the messages and put them in an array buffer. When the buffer hits 100 elements or if we have no new elements during 10s we will trigger the batchBuffer
method.
For the 10s timer, we are using pcntl_alarm
which allows us to make asynchronous call of the batchBuffer
method if needed.
PCNTL is a way to handle system signals in our PHP code, you can read more about it in the PHP documentation or if you can read french we made a blog post about it. We will set a timer that will send a SIGALRM signal to the process after the given number of seconds. Then when the signal is received by the process it will call the callback function we gave as the second argument of pcntl_signal
. The callback is set for our entire application, so we can use this batching trick only once.
Then in the batchBuffer
method we are using a new Messenger dispatch because we want to keep track of the messages if there are issues, and if we hit the method with pcntl we won’t have Messenger retry handling if there is an exception.
class TranslationBatch
{
/**
* @param TranslationUpdate[] $notifications
*/
public function __construct(
private array $notifications,
) {
}
}
class TranslationBatchHandler implements MessageHandlerInterface
{
public function __invoke(TranslationBatch $message): void
{
// handle all our messages
}
}
And finally we have the batch handler which will always get a list of messages to send! With that we can batch our Messenger messages with ease and avoid using a cron!
Edit: This method is only a proof of work, if you want to use it in production I recommend to use some more persistent storage for the buffer like Redis.
Commentaires et discussions
Symfony Messenger et l’interopérabilité
Le composant Messenger a été mergé dans Symfony 4.1, sorti en mai 2018. Il ajoute une couche d’abstraction entre un producteur de données (publisher) et son consommateur de données (consumer). Symfony est ainsi capable d’envoyer des messages (la donnée) dans un bus, le plus souvent…
Lire la suite de l’article Symfony Messenger et l’interopérabilité
Nos articles sur le même sujet
Ces clients ont profité de notre expertise
L’équipe de Finarta a fait appel à JoliCode pour le développement de leur plateforme Web. Basée sur le framework Symfony 2, l’application est un réseau privé de galerie et se veut être une place de communication et de vente d’oeuvres d’art entre ses membres. Pour cela, de nombreuses règles de droits ont été mises en places et une administration poussée…
En tant que joaillier 100 % numérique, l’équipe de Courbet Paris a souhaité se doter d’une plateforme eCommerce, capable d’offrir une expérience moderne qui revalorise l’acte d’achat de produits de joaillerie sur internet. JoliCode a accompagné leur équipe en développant une plateforme robuste, mais aussi évolutive, afin de répondre aux enjeux business…
Ouibus a pour ambition de devenir la référence du transport en bus longue distance. Dans cette optique, les enjeux à venir de la compagnie sont nombreux (vente multi-produit, agrandissement du réseau, diminution du time-to-market, amélioration de l’expérience et de la satisfaction client) et ont des conséquences sur la structuration de la nouvelle…