Cet article fait partie d’une série traitant de Spring Cloud Function:
- Introduction à Spring Cloud Function
- Tests avec Spring Cloud Function
- Spring Cloud Function et Déploiement Web
- Spring Cloud Function et Spring Cloud Stream
Introduction
Dans ce billet, nous allons voir comment utiliser nos fonctions pour construire les briques d’une architecture event-driven à l’aide de Spring Cloud Stream.
RabbitMQ
Qui dit messaging, dit broker ! Pour nos expérimentations, on va s’appuyer sur RabbitMQ. Voici un fichier docker-compose qui permet de bootstraper un serveur:
version: '2.0'
services:
rabbitmq:
image: "rabbitmq:3-management"
ports:
- "5672:5672"
- "15672:15672"
Le port 5672
est celui que notre application va utiliser pour communiquer RabbitMQ.
Le port 15672
expose une application web permettant de gérer RabbitMQ: vous pouvez donc y accéder à l’adresse http://localhost:15672
(guest/guest pour l’accès).
Dans le cadre de cet article, je ne vais pas utiliser cette console web (j’avoue avoir un peu la flemme de faire des copies d’écran) mais plutôt un CLI nommé rabbitmqadmin permettant d’interagir avec le serveur.
Dépendances
Voici les dépendances Maven dont vous aurez besoin dans le cadre de cet article:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
La première dépendance correspond au framework Spring Cloud Stream et la seconde à ce que SCS appelle un binder
dédié à RabbitMQ. Il existe d’autres binders pour d’autres infrastructure (Kafka par exemple).
Supplier
Commençons par voir ce qui se passe avec un simple Supplier:
@Bean
public Supplier<String> supplier() {
return () -> {
System.out.println(">> supplier called");
return "hello from Supplier";
};
}
Lançons l’application et voyons ce que la console affiche:
2022-01-15 15:16:55.907 INFO 27592 --- [ main] scf.Application : Started Application in 2.483 seconds (JVM running for 2.998)
2022-01-15 15:16:55.916 INFO 27592 --- [ scheduling-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#80cc995:0/SimpleConnection@beb1f3c3 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56589]
>> supplier called
>> supplier called
>> supplier called
>> supplier called
>> supplier called
>> supplier called
>> supplier called
>> supplier called
>> supplier called
>> supplier called
>> supplier called
Notre supplier est appelé par Spring Cloud Stream à intervalles réguliers (chaque seconde par défaut) pour publier un message contenant la chaîne de caractères hello from Supplier
.
Comme nous n’avons rien configuré, Spring Cloud Stream va déclarer par défaut un exchange RabbitMQ dont le nom sera construit à partir de celui de la fonction: supplier-out-0
. Si on utilisait Kafka, cela correspondrait au nom du topic destinataire des messages.
out
indique que l’exchange est destiné aux messages publiés (par opposition à in
dédié aux messages consommés) et 0
est un index qui correspond à un numéro de partition (qui ne nous concerne pas aujourd’hui) ou lorsque l’on a plusieurs sorties (on verra ça en fin d’article).
Vérifions que l’exchange existe bien:
rabbitmqadmin list exchanges
+--------------------+---------+
| name | type |
+--------------------+---------+
| | direct |
| amq.direct | direct |
| amq.fanout | fanout |
| amq.headers | headers |
| amq.match | headers |
| amq.rabbitmq.trace | topic |
| amq.topic | topic |
| supplier-out-0 | topic |
+--------------------+---------+
L’exchange est effectivement créé.
Avec RabbitMQ, les messages sont routés par les exchanges vers des files. Est-ce que Spring Cloud Stream en a aussi déclaré une pour nous ?
rabbitmqadmin list queues
No items
Et bien, en fait, non (cela surprend souvent les personnes qui débutent avec le framework). Spring Cloud Sream ne déclare des files que pour les consummers, pas les publishers.
Du coup, comment fait-on pour vérifier que les messages sont bien reroutés vers par l’exchange ? Le plus simple est d’en déclarer une manuellement et de la rattacher à l’exchange:
rabbitmqadmin declare queue name=supplier.monitoring durable=true
rabbitmqadmin declare binding source=supplier-out-0 destination=supplier.monitoring routing_key="#"
J’ai configuré la file comme étant durable
ce qui permet de faire en sorte qu’elle existe toujours en cas de redémarrage de RabbitMQ.
La routing key #
permet de s’assurer que l’exchange transmet tous les messages qu’il reçoit à notre file. Relançons l’application et exécutons à nouveau la commande rabbitmqadmin list queues
:
rabbitmqadmin list queues
+---------------------+----------+
| name | messages |
+---------------------+----------+
| supplier.monitoring | 11 |
+---------------------+----------+
A présent, la file contient 11 messages (ce sera probablement différent chez vous selon le temps que l’application aura tourné). Visualisons à présent le contenu de quelques messages:
rabbitmqadmin get queue=supplier.monitoring count=3
+----------------+----------------+---------------+---------------------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+----------------+----------------+---------------+---------------------+---------------+------------------+-------------+
| supplier-out-0 | supplier-out-0 | 13 | hello from Supplier | 19 | string | False |
| supplier-out-0 | supplier-out-0 | 12 | hello from Supplier | 19 | string | False |
| supplier-out-0 | supplier-out-0 | 11 | hello from Supplier | 19 | string | False |
+----------------+----------------+---------------+---------------------+---------------+------------------+-------------+
La commande permet de visualiser les trois premiers messages et on constate que leur contenu correspond bien à celui publié par l’application. Tout cela, sans rien configurer du tout !
Bien entendu, il est possible de personnaliser de nombreuses choses avec Spring Cloud Stream mais le but de cet article n’est pas de faire un deep dive dans ce framework.
Et avec un Supplier Reactive ?
Essayons la même expérience mais avec un supplier reactive:
@Bean
public Supplier<Flux<String>> reactiveSupplier() {
return () -> {
System.out.println(">> reactive supplier called");
return Flux.fromStream(Stream.generate(
() -> {
try {
Thread.sleep(2000);
return "hello from reactive supplier";
} catch (Exception e) {
}
return "end of reactive supplier";
}
));
};
}
Si on lance l’application…elle s’arrête aussitôt ! La raison pour laquelle l’application ne démarre pas est que Spring Cloud Stream constate qu’il y a plus d’un bean de type Supplier
(ou Consumer
, ou Function
) et qu’il n’est pas certain desquels doivent bénéficier d’une correspondance dans l’infrastructure ou pas.
Du coup, on doit l’aider un peu en spécifiant la ou les définitions de fonctions qu’il doit considérer. Pour cela, on utilise la même configuration que pour le déploiement web: spring.cloud.function.definition
:
spring:
cloud:
function:
definition: reactiveSupplier
Il faudra bien entendu mettre à jour la valeur de cette propriété pour activer les autres fonctions de l’article (éventuellement séparées par des ;), je compte sur vous pour ça.
L’application démarre bien ce coup-ci et on voit que notre log rudimentaire n’apparaît…qu’une seule fois ce coup-ci !
Il se passe ici que Spring Cloud Stream détecte qu’il s’agit d’un flux réactif et n’appelle notre fonction qu’une seule fois et celle-ci produit un flux continu de messages.
Vérifions-le en créant une file de monitoring comme pour le supplier précédent:
rabbitmqadmin declare queue name=reactiveSupplier.monitoring durable=true
rabbitmqadmin declare binding source=reactiveSupplier-out-0 destination=reactiveSupplier.monitoring routing_key="#"
rabbitmqadmin list queues
+-----------------------------+----------+
| name | messages |
+-----------------------------+----------+
| reactiveSupplier.monitoring | 124 |
| supplier.monitoring | 14 |
+-----------------------------+----------+
rabbitmqadmin get queue=reactiveSupplier.monitoring count=1
+------------------------+------------------------+---------------+------------------------------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+------------------------+------------------------+---------------+------------------------------+---------------+------------------+-------------+
| reactiveSupplier-out-0 | reactiveSupplier-out-0 | 157 | hello from reactive supplier | 28 | string | False |
+------------------------+------------------------+---------------+------------------------------+---------------+------------------+-------------+
On constate que la file se remplit bien comme prévu avec les messages attendus, cool.
Si vous avez lu l’article concernant le déploiement web des Spring Cloud Functions, vous vous rappelez peut-être que les flux infinis ne fonctionnaient pas (pour le moment). Avec Spring Cloud Stream, on voit qu’il n’y a aucun problème.
Maintenant, supposons que vous ayiez besoin que votre supplier reactive soit appelé plusieurs fois par le framework, par exemple pour récupérer des infos de façon réactive dans une base de données et les envoyer sous forme de messages à intervalle régulier. Dans ce genre de situation, il faut utiliser l’annotation @PollableBean
comme dans l’exemple suivant:
@PollableBean
public Supplier<Flux<String>> finiteReactiveSupplier() {
return () -> {
System.out.println(">> reactive supplier called");
return Flux.just("hello", "from", "finite", "reactive", "supplier");
};
}
Si on lance l’application (en ayant ajouté la fonction à la configuration), on constate que notre supplier est en effet appelé chaque seconde comme dans le cas du supplier non reactive.
Consumer
Passons à présent à l’écriture d’un Consumer
:
@Bean
public Consumer<String> consumer() {
return log::info;
}
Lançons l’application et vérifions ce que Spring Cloud Stream a créé pour nous par défaut:
rabbitmqadmin list exchanges
+------------------------------+---------+
| name | type |
+------------------------------+---------+
| | direct |
| amq.direct | direct |
| amq.fanout | fanout |
| amq.headers | headers |
| amq.match | headers |
| amq.rabbitmq.trace | topic |
| amq.topic | topic |
| consumer-in-0 | topic |
| finiteReactiveSupplier-out-0 | topic |
| reactiveSupplier-out-0 | topic |
| supplier-out-0 | topic |
+------------------------------+---------+
On voit qu’un exchange nommé consumer-in-0
a été déclaré. Et si on regarde du côté des files:
rabbitmqadmin list queues
+------------------------------------------------+----------+
| name | messages |
+------------------------------------------------+----------+
| consumer-in-0.anonymous.FGON7F8aQP6qNBpEQQ0rEw | 0 |
| reactiveSupplier.monitoring | 18 |
| supplier.monitoring | 14 |
+------------------------------------------------+----------+
Ce coup-ci, une file a été déclarée (comme expliqué plus haut, je n’ai pas menti) mais si vous regardez le nom de cette file, vous pouvez constater qu’il est préfixé avec le nom de l’exchange suivi de anonymous
et de caractères aléatoires.
Par défaut, Spring Cloud Stream créé une file non durable
qui sera supprimée à l’arrêt de l’application. C’est pratique pour développer mais en production, on voudra probablement éviter cela surtout si on a plusieurs instances de notre application qui souhaitent consommer la même file pour améliorer la scalabilité.
Pour que cette file soit durable, il faut définir une propriété qui associe notre application à un groupe
:
spring:
cloud:
stream:
bindings:
consumer-in-0:
group: springboot
Si on reliste les files après avoir relancé l’application, on constate que la file a un nom fixe à présent:
rabbitmqadmin list queues
+-----------------------------+----------+
| name | messages |
+-----------------------------+----------+
| consumer-in-0.springboot | 0 |
| reactiveSupplier.monitoring | 18 |
| supplier.monitoring | 14 |
+-----------------------------+----------+
Essayons de publier un message à présent:
rabbitmqadmin publish exchange=consumer-in-0 routing_key="" payload="hello from terminal"
Si on jette un oeil aux logs, on voit nos traces apparaître:
2022-01-15 17:31:37.700 INFO 96147 --- [ main] scf.Application : Started Application in 2.132 seconds (JVM running for 2.761)
hello from terminal
Trop facile !
La valeur de la routing_key
importe peu car Spring Cloud Stream a configuré la file pour recevoir tous les messages transitant par l’exchange par défaut.
Et avec un POJO ?
Essayons de voir ce qu’il se passe si on utilise quelque chose de plus complexe qu’une chaîne de caractères:
@Bean
public Consumer<User> pojoConsumer() {
return user -> log.info(user.toString());
}
@ToString
@Setter
static class User {
String name;
}
Vérifions que l’exchange a bien été créé:
rabbitmqadmin list exchanges
+------------------------------+---------+
| name | type |
+------------------------------+---------+
| | direct |
| amq.direct | direct |
| amq.fanout | fanout |
| amq.headers | headers |
| amq.match | headers |
| amq.rabbitmq.trace | topic |
| amq.topic | topic |
| consumer-in-0 | topic |
| finiteReactiveSupplier-out-0 | topic |
| pojoConsumer-in-0 | topic |
| reactiveSupplier-out-0 | topic |
| supplier-out-0 | topic |
+------------------------------+---------+
Et qu’une file (anonyme) est également présente pour notre fonction:
rabbitmqadmin list queues
+----------------------------------------------------+----------+
| name | messages |
+----------------------------------------------------+----------+
| consumer-in-0.springboot | 0 |
| pojoConsumer-in-0.anonymous.XZJFEJ6KTH64OgoEiruJkg | 0 |
| reactiveSupplier.monitoring | 18 |
| supplier.monitoring | 14 |
+----------------------------------------------------+----------+
Oui, elle est bien là !
Publions un message au format JSON:
rabbitmqadmin publish exchange=pojoConsumer-in-0 routing_key="" payload='
{
"name": "John Doe"
}
'
Message published
Si on regarde les logs, on constate que cela a bien fonctionné:
2022-01-15 15:10:21.918 INFO 22803 --- [H64OgoEiruJkg-1] scf.Application : Application.User(name=John Doe)
Spring Cloud Stream a automatiquement désérialisé la chaîne de caractères comme étant du JSON, pratique.
Function
Passons à présent aux Functions
qui permettent de combiner Consumer
et Supplier
dans une même méthode:
@Bean
public Function<String, String> function() {
return input -> "Hello, " + input;
}
Si on liste les exchanges, on constate que deux ont été déclarées (function-in-0
et function-out-0
):
rabbitmqadmin list exchanges
+------------------------------+---------+
| name | type |
+------------------------------+---------+
| | direct |
| amq.direct | direct |
| amq.fanout | fanout |
| amq.headers | headers |
| amq.match | headers |
| amq.rabbitmq.trace | topic |
| amq.topic | topic |
| consumer-in-0 | topic |
| finiteReactiveSupplier-out-0 | topic |
| function-in-0 | topic |
| function-out-0 | topic |
| pojoConsumer-in-0 | topic |
| reactiveSupplier-out-0 | topic |
| supplier-out-0 | topic |
+------------------------------+---------+
Et pour les files, une seule l’a été (pour la consommation):
rabbitmqadmin list queues
+------------------------------------------------+----------+
| name | messages |
+------------------------------------------------+----------+
| consumer-in-0.springboot | 0 |
| function-in-0.anonymous.GQGExbEIRWKiQnS947KCtQ | 0 |
| reactiveSupplier.monitoring | 18 |
| supplier.monitoring | 14 |
+------------------------------------------------+----------+
Ajoutons-en une pour monitorer la sortie de la fonction:
rabbitmqadmin declare queue name=function.monitoring
rabbitmqadmin declare binding source=function-out-0 destination=function.monitoring routing_key="#"
Et publions un message ensuite:
rabbitmqadmin publish exchange=function-in-0 routing_key="" payload="hello function"
Message published
Si on liste les files à nouveau:
rabbitmqadmin list queues
+------------------------------------------------+----------+
| name | messages |
+------------------------------------------------+----------+
| consumer-in-0.springboot | 1 |
| function-in-0.anonymous.GQGExbEIRWKiQnS947KCtQ | 0 |
| function.monitoring | 1 |
| reactiveSupplier.monitoring | 18 |
| supplier.monitoring | 14 |
+------------------------------------------------+----------+
On voit que notre file function.monitoring
contient bien un message:
rabbitmqadmin get queue=function.monitoring count=1
+----------------+----------------+---------------+-----------------------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+----------------+----------------+---------------+-----------------------+---------------+------------------+-------------+
| function-out-0 | function-out-0 | 0 | Hello, hello function | 21 | string | False |
+----------------+----------------+---------------+-----------------------+---------------+------------------+-------------+
Et le message qui s’y trouve est bien ce à quoi l’on s’attend, excellent !
Et ça marche aussi en Reactive ?
Testons une fonction réactive:
@Bean
public Function<Flux<String>, Flux<String>> reactiveFunction() {
return input -> input
.log()
.map(s -> s.toUpperCase());
}
Je vous épargne les commandes pour déclarer la file de monitoring pour directement publier un message:
rabbitmqadmin publish exchange=reactiveFunction-in-0 routing_key="" payload="coucou"
Message published
Et si on visualise le message dans notre file de monitoring:
rabbitmqadmin get queue=reactiveFunction.monitoring count=1
+------------------------+------------------------+---------------+---------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+------------------------+------------------------+---------------+---------+---------------+------------------+-------------+
| reactiveFunction-out-0 | reactiveFunction-out-0 | 0 | COUCOU | 6 | string | False |
+------------------------+------------------------+---------------+---------+---------------+------------------+-------------+
COUCOU
est bien là, tout s’est passé comme prévu !
Multiples messages en impératif
Cette section et la suivante vont montrer comment retourner plusieurs messages dans la file de sortie alors qu’on n’en consomme qu’un seul en entrée. C’est une façon d’implémenter le pattern Splitter.
Nous allons d’abord voir comment faire ça en utilisant un modèle de programmation impératif (càd non reactive) et la section suivante montrera comment faire la même chose en reactive.
Le code Java:
@Bean
public Function<Order, List<Message<OrderItem>>> multipleMessages() {
return input -> input
.getItems()
.stream()
.map(item -> MessageBuilder.withPayload(item).build())
.collect(Collectors.toList());
}
@ToString
@Data
static class Order {
private List<OrderItem> items;
}
@ToString
@Data
static class OrderItem {
private String id;
}
Notez comment le second argument de notre function est de type List<Message>
: cela indique au framework qu’il faut retourner autant de messages qu’il y a d’éléments dans la liste plutôt que de retourner un message avec une liste d’objets dedans.
Si on publie le message suivant:
rabbitmqadmin publish exchange=multipleMessages-in-0 routing_key="" payload='
{
"items": [
{ "id": "item-1" },
{ "id": "item-2" },
{ "id": "item-3" }
]
}
'
Message published
Et que l’on regarde la file de monitoring multipleMessages.monitoring
associée, on voit que 3 messages s’y trouvent:
rabbitmqadmin list queues
+--------------------------------------------------------+----------+
| name | messages |
+--------------------------------------------------------+----------+
| consumer-in-0.springboot | 1 |
| function.monitoring | 1 |
| multipleMessages-in-0.anonymous.7LKC32WATlmyFt9MGEjYig | 0 |
| multipleMessages.monitoring | 3 |
| reactiveFunction.monitoring | 1 |
| reactiveSupplier.monitoring | 18 |
| supplier.monitoring | 14 |
+--------------------------------------------------------+----------+
Regardons le contenu de ces messages:
rabbitmqadmin get queue=multipleMessages.monitoring count=3
+------------------------+------------------------+---------------+-----------------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+------------------------+------------------------+---------------+-----------------+---------------+------------------+-------------+
| multipleMessages-out-0 | multipleMessages-out-0 | 2 | {"id":"item-1"} | 15 | string | False |
| multipleMessages-out-0 | multipleMessages-out-0 | 1 | {"id":"item-2"} | 15 | string | False |
| multipleMessages-out-0 | multipleMessages-out-0 | 0 | {"id":"item-3"} | 15 | string | False |
+------------------------+------------------------+---------------+-----------------+---------------+------------------+-------------+
On voit qu’effectivement chaque message contient un OrderItem
dans son payload.
Multiples messages en reactive
Essayons de reproduire la même chose mais en tant utilisant le modèle de programmation reactive:
@Bean
public Function<Flux<Order>, Flux<OrderItem>> reactiveMultipleMessages() {
return input -> input
.flatMap(order -> Flux.fromIterable(order.getItems()));
}
C’est un peu moins verbeux pour le coup…
Si on publie le message suivant:
rabbitmqadmin publish exchange=reactiveMultipleMessages-in-0 routing_key="" payload='
{
"items": [
{ "id": "item-1" },
{ "id": "item-2" },
{ "id": "item-3" }
]
}
'
Message published
Regardons le contenu de ces messages:
rabbitmqadmin get queue=reactiveMultipleMessages.monitoring count=3
+--------------------------------+--------------------------------+---------------+-----------------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+--------------------------------+--------------------------------+---------------+-----------------+---------------+------------------+-------------+
| reactiveMultipleMessages-out-0 | reactiveMultipleMessages-out-0 | 2 | {"id":"item-1"} | 15 | string | False |
| reactiveMultipleMessages-out-0 | reactiveMultipleMessages-out-0 | 1 | {"id":"item-2"} | 15 | string | False |
| reactiveMultipleMessages-out-0 | reactiveMultipleMessages-out-0 | 0 | {"id":"item-3"} | 15 | string | False |
+--------------------------------+--------------------------------+---------------+-----------------+---------------+------------------+-------------+
Là aussi, cela a fonctionné, génial !
L’autre mode Batch
Il existe également un autre mode batch implémenté dans Spring AMQP et utilisable avec Spring Cloud Stream et nos fonctions.
Le principe est le suivant: si on publie des messages en s’appuyant sur une implémentation de BatchingStrategy
, il est possible de combiner dans un seul message RabbitMQ un ensemble de messages individuels. Ces messages peuvent ensuite être débatchés
pour que notre fonction les consomme sous forme de liste.
Avec un exemple, ce sera probablement plus clair:
@Bean
public Function<List<String>, String> batchConsumer() {
return input -> {
log.info("Consuming: {}", input);
return String.join(";", input);
};
}```
Notre fonction va concaténer les différents messages et publier un message contenant le résultat de cette concaténation.
Pour que cela fonctionne, on doit activer le mode batch:
spring: cloud: stream: bindings: batchConsumer-in-0: consumer: batch-mode: true rabbit: bindings: batchConsumer-in-0: consumer: enable-batching: true batch-size: 5 receive-timeout: 1000
La propriété `batch-size` indique le nombre d'éléments max de nos batches. La propriété `receive-timeout` indique le temps que le framework attend au maximum avant qu'un batch ne soit complet. Au bout de cette durée, le batch sera envoyé au `Consumer` même si sa taille est inférieure à celle configurée.
Afin de publier des messages, un bout de code java est nécessaire cette fois-ci car le message contenant le batch doit être construit d'une certaine façon par Spring AMQP pour que cela fonctionne:
```java
@Bean
@ConditionalOnExpression("'${spring.cloud.function.definition}'.contains('batchConsumer')" )
public ApplicationRunner runner(RabbitTemplate rabbitTemplate) {
BatchingStrategy strategy = new SimpleBatchingStrategy(5, 25_000, 1_000);
TaskScheduler scheduler = new ConcurrentTaskScheduler();
BatchingRabbitTemplate template = new BatchingRabbitTemplate(strategy, scheduler);
template.setConnectionFactory(rabbitTemplate.getConnectionFactory());
return args -> {
for (var i = 0; i < 12; i++) {
var props = new MessageProperties();
props.setContentType("text/plain");
org.springframework.amqp.core.Message message = new org.springframework.amqp.core.Message(("message " + i).getBytes(StandardCharsets.UTF_8), props);
template.send("batchConsumer-in-0", "", message, null);
}
};
}
Ce bout de code publie 12 messages (dans le code) qui généreront donc 3 vrais messages publiés dans RabbitMQ car nous définissons à 5
le nombre max d’éléments dans notre stratégie de batching.
Faîtes attention au content-type
si, comme moi, vous consommez un type simple (String
par exemple). Si vous ne le renseignez pas, le framework va considérer qu’il s’agit de JSON et va planter.
L’annotation ConditionalOnExpression
évite de publier les messages si la fonction de batch n’est pas activée.
Si on lance l’application, les logs semblent confirmer que cela fonctionne:
2022-01-15 14:48:09.899 INFO 80008 --- [a-0rAYNOEuOaA-1] scf.Application : Consuming: [message 0, message 1, message 2, message 3, message 4]
2022-01-15 14:48:09.904 INFO 80008 --- [a-0rAYNOEuOaA-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
...
2022-01-15 14:48:09.925 INFO 80008 --- [a-0rAYNOEuOaA-1] scf.Application : Consuming: [message 5, message 6, message 7, message 8, message 9]
2022-01-15 14:48:10.892 INFO 80008 --- [a-0rAYNOEuOaA-1] scf.Application : Consuming: [message 10, message 11]
Et si nous regardons notre file de monitoring créée pour l’occasion:
rabbitmqadmin list queues
+-----------------------------------------------------+----------+
| name | messages |
+-----------------------------------------------------+----------+
| batchConsumer-in-0.anonymous.4nCInWpKRa-0rAYNOEuOaA | 0 |
| batchConsumer.monitoring | 3 |
| consumer-in-0.springboot | 0 |
| function.monitoring | 1 |
| hello.world.queue | 0 |
| multipleMessages.monitoring | 3 |
| reactiveFunction.monitoring | 1 |
| reactiveMultipleMessages.monitoring | 3 |
| reactiveSupplier.monitoring | 18 |
| supplier.monitoring | 14 |
+-----------------------------------------------------+----------+
On constate qu’il y a bien 3 messages seulement dans batchConsumer.monitoring
. Jetons un oeil au contenu des messages à présent:
rabbitmqadmin get queue=batchConsumer.monitoring count=3
+---------------------+---------------------+---------------+---------------------------------------------------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+---------------------+---------------------+---------------+---------------------------------------------------+---------------+------------------+-------------+
| batchConsumer-out-0 | batchConsumer-out-0 | 2 | message 0;message 1;message 2;message 3;message 4 | 49 | string | True |
| batchConsumer-out-0 | batchConsumer-out-0 | 1 | message 5;message 6;message 7;message 8;message 9 | 49 | string | True |
| batchConsumer-out-0 | batchConsumer-out-0 | 0 | message 10;message 11 | 21 | string | True |
+---------------------+---------------------+---------------+---------------------------------------------------+---------------+------------------+-------------+
Yes, cela a marché ! Nos deux premiers messages contiennent chacun 5 messages et le troisième les deux derniers messages publiés.
Multiples outputs
Une autre fonctionnalité intéressante à tester concerne les fonctions avec plusieurs entrées et sorties. La doc donne quelques use cases pour ce type de fonction: big data et aggrégation de données par exemple.
Cela ne fonctionne qu’avec le modèle reactive et en utilisant les Tuples, comme nous l’avons vu dans l’introduction à Spring Cloud Function.
Pour illustrer cela, nous allons implémenter le pattern Scatter-Gather.
Dans une première fonction (Scatter
), nous allons consommer un flux de nombres que nous allons diviser en deux flux: le premier contiendra les multiples de 3 et le second les autres. Chacun de ces flux sera redirigé vers sa propre file RabbitMQ.
@Configuration
public class ScatterGather {
@Bean
public Function<Flux<Integer>, Tuple2<Flux<Integer>, Flux<Integer>>> scatter() {
return order -> {
var flux = order.publish().autoConnect(2);
Sinks.Many<Integer> multipleOf3 = Sinks.many().unicast().onBackpressureError();
var multipleOf3Flux = flux
.filter(n -> n % 3 == 0)
.doOnNext(multipleOf3::tryEmitNext);
Sinks.Many<Integer> nonMultipleOf3 = Sinks.many().unicast().onBackpressureError();
var nonMultipleOf3Flux = flux
.filter(n -> n % 3 != 0)
.doOnNext(nonMultipleOf3::tryEmitNext);
return Tuples.of(
multipleOf3.asFlux().doOnSubscribe(x -> multipleOf3Flux.subscribe()),
nonMultipleOf3.asFlux().doOnSubscribe(x -> nonMultipleOf3Flux.subscribe())
);
};
}
}
Si vous n’avez jamais fait de programmation reactive (ou seulement un peu), ce bout de code ne vous parle sans doute pas beaucoup mais dans l’idée, il fait ce que j’ai décrit un peu plus haut.
Vérifions que chaque flux est bien associé à un exchange:
+--------------------------------+---------+
| name | type |
+--------------------------------+---------+
| | direct |
| amq.direct | direct |
...
| scatter-in-0 | topic |
| scatter-out-0 | topic |
| scatter-out-1 | topic |
+--------------------------------+---------+
On voit qu’effectivement, trois exchanges ont cette fois été déclarés: un pour l’entrée et deux pour les sorties.
Essayons de publier quelques nombres:
for i in {1..20}; do rabbitmqadmin publish exchange=scatter-in-0 routing_key="" payload="$i"; done
Message published
...
Message published
Vérifions que les nombres ont bien été répartis selon notre algorithme:
rabbitmqadmin list queues
+-----------------------------------------------+----------+
| name | messages |
+-----------------------------------------------+----------+
| scatter-in-0.anonymous.o0b5DMZYRbK-r18OjXbm7Q | 0 |
| scatter-out-0.monitoring | 6 |
| scatter-out-1.monitoring | 14 |
+-----------------------------------------------+----------+
6 multiples de 3 et 14 non multiples, vérifions le contenu des files à présent:
rabbitmqadmin get queue=scatter-out-0.monitoring count=6
+---------------+---------------+---------------+---------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+---------------+---------------+---------------+---------+---------------+------------------+-------------+
| scatter-out-0 | scatter-out-0 | 5 | 3 | 1 | string | False |
| scatter-out-0 | scatter-out-0 | 4 | 6 | 1 | string | False |
| scatter-out-0 | scatter-out-0 | 3 | 9 | 1 | string | False |
| scatter-out-0 | scatter-out-0 | 2 | 12 | 2 | string | False |
| scatter-out-0 | scatter-out-0 | 1 | 15 | 2 | string | False |
| scatter-out-0 | scatter-out-0 | 0 | 18 | 2 | string | False |
+---------------+---------------+---------------+---------+---------------+------------------+-------------+
rabbitmqadmin get queue=scatter-out-1.monitoring count=14
+---------------+---------------+---------------+---------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+---------------+---------------+---------------+---------+---------------+------------------+-------------+
| scatter-out-1 | scatter-out-1 | 13 | 1 | 1 | string | False |
| scatter-out-1 | scatter-out-1 | 12 | 2 | 1 | string | False |
| scatter-out-1 | scatter-out-1 | 11 | 4 | 1 | string | False |
| scatter-out-1 | scatter-out-1 | 10 | 5 | 1 | string | False |
| scatter-out-1 | scatter-out-1 | 9 | 7 | 1 | string | False |
| scatter-out-1 | scatter-out-1 | 8 | 8 | 1 | string | False |
| scatter-out-1 | scatter-out-1 | 7 | 10 | 2 | string | False |
| scatter-out-1 | scatter-out-1 | 6 | 11 | 2 | string | False |
| scatter-out-1 | scatter-out-1 | 5 | 13 | 2 | string | False |
| scatter-out-1 | scatter-out-1 | 4 | 14 | 2 | string | False |
| scatter-out-1 | scatter-out-1 | 3 | 16 | 2 | string | False |
| scatter-out-1 | scatter-out-1 | 2 | 17 | 2 | string | False |
| scatter-out-1 | scatter-out-1 | 1 | 19 | 2 | string | False |
| scatter-out-1 | scatter-out-1 | 0 | 20 | 2 | string | False |
+---------------+---------------+---------------+---------+---------------+------------------+-------------+
Les nombres sont bien répartis comme attendu, super !
Faisons l’opération inverse à présent (Gather
) en consommant les deux files puis en les mergeant pour les publier dans une nouvelle file:
@Bean
public Function<Tuple2<Flux<Integer>, Flux<Integer>>, Flux<Integer>> gather() {
return tuple -> {
Flux<Integer> multipleOf3 = tuple.getT1();
Flux<Integer> nonMultipleOf3 = tuple.getT2();
return Flux
.merge(multipleOf3, nonMultipleOf3)
.log();
};
}
Le code est un peu plus simple ce coup-ci. Par contre, il y a un peu de configuration à ajouter pour que notre fonction gather
consomme les files de sortie de notre fonction scatter
:
spring:
cloud:
stream:
bindings:
scatter-out-0:
group: multipleOf3
scatter-out-1:
group: nonMultipleOf3
gather-in-0:
destination: scatter-out-0
group: multipleOf3
gather-in-1:
destination: scatter-out-1
group: nonMultipleOf3
On est obligé d’utiliser la propriété group
pour fixer la file. La propriété destination
indique le nom de l’exchange que l’on souhaite utiliser.
Après avoir relancé l’application et lister les files, voici ce que l’on a:
rabbitmqadmin list queues
+-----------------------------------------------+----------+
| name | messages |
+-----------------------------------------------+----------+
| gather-out-0.monitoring | 0 |
| scatter-in-0.anonymous.o0b5DMZYRbK-r18OjXbm7Q | 0 |
| scatter-out-0.monitoring | 6 |
| scatter-out-0.multipleOf3 | 0 |
| scatter-out-1.monitoring | 14 |
| scatter-out-1.nonMultipleOf3 | 0 |
+-----------------------------------------------+----------+
(n’oubliez pas que les files suffixées par .monitoring
ont été créées manuellement)
Si on republie les messages, les nombres devraient maintenant se retrouver groupés dans la file gather-out-0.monitoring
:
for i in {1..20}; do rabbitmqadmin publish exchange=scatter-in-0 routing_key="" payload="$i"; done
Message published
...
Message published
rabbitmqadmin list queues
+-----------------------------------------------+----------+
| name | messages |
+-----------------------------------------------+----------+
| gather-out-0.monitoring | 20 |
| scatter-in-0.anonymous.o0b5DMZYRbK-r18OjXbm7Q | 0 |
| scatter-out-0.monitoring | 6 |
| scatter-out-0.multipleOf3 | 0 |
| scatter-out-1.monitoring | 14 |
| scatter-out-1.nonMultipleOf3 | 0 |
+-----------------------------------------------+----------+
rabbitmqadmin get queue=gather-out-0.monitoring count=20
+--------------+--------------+---------------+---------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+--------------+--------------+---------------+---------+---------------+------------------+-------------+
| gather-out-0 | gather-out-0 | 19 | 1 | 1 | string | False |
| gather-out-0 | gather-out-0 | 18 | 2 | 1 | string | False |
| gather-out-0 | gather-out-0 | 17 | 3 | 1 | string | False |
| gather-out-0 | gather-out-0 | 16 | 4 | 1 | string | False |
| gather-out-0 | gather-out-0 | 15 | 5 | 1 | string | False |
| gather-out-0 | gather-out-0 | 14 | 6 | 1 | string | False |
| gather-out-0 | gather-out-0 | 13 | 7 | 1 | string | False |
| gather-out-0 | gather-out-0 | 12 | 8 | 1 | string | False |
| gather-out-0 | gather-out-0 | 11 | 9 | 1 | string | False |
| gather-out-0 | gather-out-0 | 10 | 10 | 2 | string | False |
| gather-out-0 | gather-out-0 | 9 | 11 | 2 | string | False |
| gather-out-0 | gather-out-0 | 8 | 12 | 2 | string | False |
| gather-out-0 | gather-out-0 | 7 | 13 | 2 | string | False |
| gather-out-0 | gather-out-0 | 6 | 14 | 2 | string | False |
| gather-out-0 | gather-out-0 | 5 | 15 | 2 | string | False |
| gather-out-0 | gather-out-0 | 4 | 16 | 2 | string | False |
| gather-out-0 | gather-out-0 | 3 | 17 | 2 | string | False |
| gather-out-0 | gather-out-0 | 2 | 18 | 2 | string | False |
| gather-out-0 | gather-out-0 | 1 | 19 | 2 | string | False |
| gather-out-0 | gather-out-0 | 0 | 20 | 2 | string | False |
+--------------+--------------+---------------+---------+---------------+------------------+-------------+
Oui, nos 20 nombres de départ sont bien rassemblés à nouveau !
Conclusion
Je crois qu’on a vu pas mal de choses en s’amusant avec nos Spring Cloud Functions et Spring Cloud Stream: j’espère que cela vous aura été utile comme cela l’a été pour moi !
On n’a clairement pas tout essayé mais les possibilités de combinaisons permettent d’imaginer de nombreux scenarios typiques d’architectures event-driven. Probablement une idée pour un futur article :-)