Spring Cloud Function et Spring Cloud Stream

Comment déployer ses Spring Cloud Functions dans une infra messaging...


Cet article fait partie d’une série traitant de Spring Cloud Function:

  1. Introduction à Spring Cloud Function
  2. Tests avec Spring Cloud Function
  3. Spring Cloud Function et Déploiement Web
  4. Spring Cloud Function et Spring Cloud Stream

Introduction

Dans ce billet, nous allons voir comment utiliser nos fonctions pour construire les briques d’une architecture event-driven à l’aide de Spring Cloud Stream.

RabbitMQ

Qui dit messaging, dit broker ! Pour nos expérimentations, on va s’appuyer sur RabbitMQ. Voici un fichier docker-compose qui permet de bootstraper un serveur:

version: '2.0'
services:
  rabbitmq:
    image: "rabbitmq:3-management"
    ports:
      - "5672:5672"
      - "15672:15672"

Le port 5672 est celui que notre application va utiliser pour communiquer RabbitMQ.

Le port 15672 expose une application web permettant de gérer RabbitMQ: vous pouvez donc y accéder à l’adresse http://localhost:15672 (guest/guest pour l’accès).

Dans le cadre de cet article, je ne vais pas utiliser cette console web (j’avoue avoir un peu la flemme de faire des copies d’écran) mais plutôt un CLI nommé rabbitmqadmin permettant d’interagir avec le serveur.

Dépendances

Voici les dépendances Maven dont vous aurez besoin dans le cadre de cet article:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

La première dépendance correspond au framework Spring Cloud Stream et la seconde à ce que SCS appelle un binder dédié à RabbitMQ. Il existe d’autres binders pour d’autres infrastructure (Kafka par exemple).

Supplier

Commençons par voir ce qui se passe avec un simple Supplier:

@Bean
public Supplier<String> supplier() {
    return () -> {
        System.out.println(">> supplier called");
        return "hello from Supplier";
    };
}

Lançons l’application et voyons ce que la console affiche:

2022-01-15 15:16:55.907  INFO 27592 --- [           main] scf.Application                          : Started Application in 2.483 seconds (JVM running for 2.998)
2022-01-15 15:16:55.916  INFO 27592 --- [   scheduling-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#80cc995:0/SimpleConnection@beb1f3c3 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56589]
>> supplier called
>> supplier called
>> supplier called
>> supplier called
>> supplier called
>> supplier called
>> supplier called
>> supplier called
>> supplier called
>> supplier called
>> supplier called

Notre supplier est appelé par Spring Cloud Stream à intervalles réguliers (chaque seconde par défaut) pour publier un message contenant la chaîne de caractères hello from Supplier.

Comme nous n’avons rien configuré, Spring Cloud Stream va déclarer par défaut un exchange RabbitMQ dont le nom sera construit à partir de celui de la fonction: supplier-out-0. Si on utilisait Kafka, cela correspondrait au nom du topic destinataire des messages.

out indique que l’exchange est destiné aux messages publiés (par opposition à in dédié aux messages consommés) et 0 est un index qui correspond à un numéro de partition (qui ne nous concerne pas aujourd’hui) ou lorsque l’on a plusieurs sorties (on verra ça en fin d’article).

Vérifions que l’exchange existe bien:

rabbitmqadmin list exchanges

+--------------------+---------+
|        name        |  type   |
+--------------------+---------+
|                    | direct  |
| amq.direct         | direct  |
| amq.fanout         | fanout  |
| amq.headers        | headers |
| amq.match          | headers |
| amq.rabbitmq.trace | topic   |
| amq.topic          | topic   |
| supplier-out-0     | topic   |
+--------------------+---------+

L’exchange est effectivement créé.

Avec RabbitMQ, les messages sont routés par les exchanges vers des files. Est-ce que Spring Cloud Stream en a aussi déclaré une pour nous ?

rabbitmqadmin list queues

No items

Et bien, en fait, non (cela surprend souvent les personnes qui débutent avec le framework). Spring Cloud Sream ne déclare des files que pour les consummers, pas les publishers.

Du coup, comment fait-on pour vérifier que les messages sont bien reroutés vers par l’exchange ? Le plus simple est d’en déclarer une manuellement et de la rattacher à l’exchange:

rabbitmqadmin declare queue name=supplier.monitoring durable=true
rabbitmqadmin declare binding source=supplier-out-0 destination=supplier.monitoring routing_key="#"

J’ai configuré la file comme étant durable ce qui permet de faire en sorte qu’elle existe toujours en cas de redémarrage de RabbitMQ.

La routing key # permet de s’assurer que l’exchange transmet tous les messages qu’il reçoit à notre file. Relançons l’application et exécutons à nouveau la commande rabbitmqadmin list queues:

rabbitmqadmin list queues

+---------------------+----------+
|        name         | messages |
+---------------------+----------+
| supplier.monitoring | 11       |
+---------------------+----------+

A présent, la file contient 11 messages (ce sera probablement différent chez vous selon le temps que l’application aura tourné). Visualisons à présent le contenu de quelques messages:

rabbitmqadmin get queue=supplier.monitoring count=3

+----------------+----------------+---------------+---------------------+---------------+------------------+-------------+
|  routing_key   |    exchange    | message_count |       payload       | payload_bytes | payload_encoding | redelivered |
+----------------+----------------+---------------+---------------------+---------------+------------------+-------------+
| supplier-out-0 | supplier-out-0 | 13            | hello from Supplier | 19            | string           | False       |
| supplier-out-0 | supplier-out-0 | 12            | hello from Supplier | 19            | string           | False       |
| supplier-out-0 | supplier-out-0 | 11            | hello from Supplier | 19            | string           | False       |
+----------------+----------------+---------------+---------------------+---------------+------------------+-------------+

La commande permet de visualiser les trois premiers messages et on constate que leur contenu correspond bien à celui publié par l’application. Tout cela, sans rien configurer du tout !

Bien entendu, il est possible de personnaliser de nombreuses choses avec Spring Cloud Stream mais le but de cet article n’est pas de faire un deep dive dans ce framework.

Et avec un Supplier Reactive ?

Essayons la même expérience mais avec un supplier reactive:

@Bean
public Supplier<Flux<String>> reactiveSupplier() {
    return () -> {
        System.out.println(">> reactive supplier called");
        return Flux.fromStream(Stream.generate(
                () -> {
                    try {
                        Thread.sleep(2000);
                        return "hello from reactive supplier";
                    } catch (Exception e) {
                    }
                    return "end of reactive supplier";
                }
        ));
    };
}

Si on lance l’application…elle s’arrête aussitôt ! La raison pour laquelle l’application ne démarre pas est que Spring Cloud Stream constate qu’il y a plus d’un bean de type Supplier (ou Consumer, ou Function) et qu’il n’est pas certain desquels doivent bénéficier d’une correspondance dans l’infrastructure ou pas.

Du coup, on doit l’aider un peu en spécifiant la ou les définitions de fonctions qu’il doit considérer. Pour cela, on utilise la même configuration que pour le déploiement web: spring.cloud.function.definition:

spring:
  cloud:
    function:
      definition: reactiveSupplier

Il faudra bien entendu mettre à jour la valeur de cette propriété pour activer les autres fonctions de l’article (éventuellement séparées par des ;), je compte sur vous pour ça.

L’application démarre bien ce coup-ci et on voit que notre log rudimentaire n’apparaît…qu’une seule fois ce coup-ci !

Il se passe ici que Spring Cloud Stream détecte qu’il s’agit d’un flux réactif et n’appelle notre fonction qu’une seule fois et celle-ci produit un flux continu de messages.

Vérifions-le en créant une file de monitoring comme pour le supplier précédent:

rabbitmqadmin declare queue name=reactiveSupplier.monitoring durable=true
rabbitmqadmin declare binding source=reactiveSupplier-out-0 destination=reactiveSupplier.monitoring routing_key="#"

rabbitmqadmin list queues
+-----------------------------+----------+
|            name             | messages |
+-----------------------------+----------+
| reactiveSupplier.monitoring | 124      |
| supplier.monitoring         | 14       |
+-----------------------------+----------+

rabbitmqadmin get queue=reactiveSupplier.monitoring count=1

+------------------------+------------------------+---------------+------------------------------+---------------+------------------+-------------+
|      routing_key       |        exchange        | message_count |           payload            | payload_bytes | payload_encoding | redelivered |
+------------------------+------------------------+---------------+------------------------------+---------------+------------------+-------------+
| reactiveSupplier-out-0 | reactiveSupplier-out-0 | 157           | hello from reactive supplier | 28            | string           | False       |
+------------------------+------------------------+---------------+------------------------------+---------------+------------------+-------------+

On constate que la file se remplit bien comme prévu avec les messages attendus, cool.

Si vous avez lu l’article concernant le déploiement web des Spring Cloud Functions, vous vous rappelez peut-être que les flux infinis ne fonctionnaient pas (pour le moment). Avec Spring Cloud Stream, on voit qu’il n’y a aucun problème.

Maintenant, supposons que vous ayiez besoin que votre supplier reactive soit appelé plusieurs fois par le framework, par exemple pour récupérer des infos de façon réactive dans une base de données et les envoyer sous forme de messages à intervalle régulier. Dans ce genre de situation, il faut utiliser l’annotation @PollableBean comme dans l’exemple suivant:

@PollableBean
public Supplier<Flux<String>> finiteReactiveSupplier() {
    return () -> {
        System.out.println(">> reactive supplier called");
        return Flux.just("hello", "from", "finite", "reactive", "supplier");
    };
}

Si on lance l’application (en ayant ajouté la fonction à la configuration), on constate que notre supplier est en effet appelé chaque seconde comme dans le cas du supplier non reactive.

Consumer

Passons à présent à l’écriture d’un Consumer:

@Bean
public Consumer<String> consumer() {
    return log::info;
}

Lançons l’application et vérifions ce que Spring Cloud Stream a créé pour nous par défaut:

rabbitmqadmin list exchanges

+------------------------------+---------+
|             name             |  type   |
+------------------------------+---------+
|                              | direct  |
| amq.direct                   | direct  |
| amq.fanout                   | fanout  |
| amq.headers                  | headers |
| amq.match                    | headers |
| amq.rabbitmq.trace           | topic   |
| amq.topic                    | topic   |
| consumer-in-0                | topic   |
| finiteReactiveSupplier-out-0 | topic   |
| reactiveSupplier-out-0       | topic   |
| supplier-out-0               | topic   |
+------------------------------+---------+

On voit qu’un exchange nommé consumer-in-0 a été déclaré. Et si on regarde du côté des files:

rabbitmqadmin list queues

+------------------------------------------------+----------+
|                      name                      | messages |
+------------------------------------------------+----------+
| consumer-in-0.anonymous.FGON7F8aQP6qNBpEQQ0rEw | 0        |
| reactiveSupplier.monitoring                    | 18       |
| supplier.monitoring                            | 14       |
+------------------------------------------------+----------+

Ce coup-ci, une file a été déclarée (comme expliqué plus haut, je n’ai pas menti) mais si vous regardez le nom de cette file, vous pouvez constater qu’il est préfixé avec le nom de l’exchange suivi de anonymous et de caractères aléatoires.

Par défaut, Spring Cloud Stream créé une file non durable qui sera supprimée à l’arrêt de l’application. C’est pratique pour développer mais en production, on voudra probablement éviter cela surtout si on a plusieurs instances de notre application qui souhaitent consommer la même file pour améliorer la scalabilité.

Pour que cette file soit durable, il faut définir une propriété qui associe notre application à un groupe:

spring:
  cloud:
    stream:
      bindings:
        consumer-in-0:
          group: springboot

Si on reliste les files après avoir relancé l’application, on constate que la file a un nom fixe à présent:

rabbitmqadmin list queues

+-----------------------------+----------+
|            name             | messages |
+-----------------------------+----------+
| consumer-in-0.springboot    | 0        |
| reactiveSupplier.monitoring | 18       |
| supplier.monitoring         | 14       |
+-----------------------------+----------+

Essayons de publier un message à présent:

rabbitmqadmin publish exchange=consumer-in-0 routing_key="" payload="hello from terminal"

Si on jette un oeil aux logs, on voit nos traces apparaître:

2022-01-15 17:31:37.700  INFO 96147 --- [           main] scf.Application                          : Started Application in 2.132 seconds (JVM running for 2.761)
hello from terminal

Trop facile !

La valeur de la routing_key importe peu car Spring Cloud Stream a configuré la file pour recevoir tous les messages transitant par l’exchange par défaut.

Et avec un POJO ?

Essayons de voir ce qu’il se passe si on utilise quelque chose de plus complexe qu’une chaîne de caractères:

@Bean
public Consumer<User> pojoConsumer() {
    return user -> log.info(user.toString());
}

@ToString
@Setter
static class User {
    String name;
}

Vérifions que l’exchange a bien été créé:

rabbitmqadmin list exchanges

+------------------------------+---------+
|             name             |  type   |
+------------------------------+---------+
|                              | direct  |
| amq.direct                   | direct  |
| amq.fanout                   | fanout  |
| amq.headers                  | headers |
| amq.match                    | headers |
| amq.rabbitmq.trace           | topic   |
| amq.topic                    | topic   |
| consumer-in-0                | topic   |
| finiteReactiveSupplier-out-0 | topic   |
| pojoConsumer-in-0            | topic   |
| reactiveSupplier-out-0       | topic   |
| supplier-out-0               | topic   |
+------------------------------+---------+

Et qu’une file (anonyme) est également présente pour notre fonction:

rabbitmqadmin list queues

+----------------------------------------------------+----------+
|                        name                        | messages |
+----------------------------------------------------+----------+
| consumer-in-0.springboot                           | 0        |
| pojoConsumer-in-0.anonymous.XZJFEJ6KTH64OgoEiruJkg | 0        |
| reactiveSupplier.monitoring                        | 18       |
| supplier.monitoring                                | 14       |
+----------------------------------------------------+----------+

Oui, elle est bien là !

Publions un message au format JSON:

rabbitmqadmin publish exchange=pojoConsumer-in-0 routing_key="" payload='
{
    "name": "John Doe"
}
'

Message published

Si on regarde les logs, on constate que cela a bien fonctionné:

2022-01-15 15:10:21.918  INFO 22803 --- [H64OgoEiruJkg-1] scf.Application                          : Application.User(name=John Doe)

Spring Cloud Stream a automatiquement désérialisé la chaîne de caractères comme étant du JSON, pratique.

Function

Passons à présent aux Functions qui permettent de combiner Consumer et Supplier dans une même méthode:

@Bean
public Function<String, String> function() {
    return input -> "Hello, " + input;
}

Si on liste les exchanges, on constate que deux ont été déclarées (function-in-0 et function-out-0):

rabbitmqadmin list exchanges

+------------------------------+---------+
|             name             |  type   |
+------------------------------+---------+
|                              | direct  |
| amq.direct                   | direct  |
| amq.fanout                   | fanout  |
| amq.headers                  | headers |
| amq.match                    | headers |
| amq.rabbitmq.trace           | topic   |
| amq.topic                    | topic   |
| consumer-in-0                | topic   |
| finiteReactiveSupplier-out-0 | topic   |
| function-in-0                | topic   |
| function-out-0               | topic   |
| pojoConsumer-in-0            | topic   |
| reactiveSupplier-out-0       | topic   |
| supplier-out-0               | topic   |
+------------------------------+---------+

Et pour les files, une seule l’a été (pour la consommation):

rabbitmqadmin list queues

+------------------------------------------------+----------+
|                      name                      | messages |
+------------------------------------------------+----------+
| consumer-in-0.springboot                       | 0        |
| function-in-0.anonymous.GQGExbEIRWKiQnS947KCtQ | 0        |
| reactiveSupplier.monitoring                    | 18       |
| supplier.monitoring                            | 14       |
+------------------------------------------------+----------+

Ajoutons-en une pour monitorer la sortie de la fonction:

rabbitmqadmin declare queue name=function.monitoring
rabbitmqadmin declare binding source=function-out-0 destination=function.monitoring routing_key="#"

Et publions un message ensuite:

rabbitmqadmin publish exchange=function-in-0 routing_key="" payload="hello function"

Message published

Si on liste les files à nouveau:

rabbitmqadmin list queues

+------------------------------------------------+----------+
|                      name                      | messages |
+------------------------------------------------+----------+
| consumer-in-0.springboot                       | 1        |
| function-in-0.anonymous.GQGExbEIRWKiQnS947KCtQ | 0        |
| function.monitoring                            | 1        |
| reactiveSupplier.monitoring                    | 18       |
| supplier.monitoring                            | 14       |
+------------------------------------------------+----------+

On voit que notre file function.monitoring contient bien un message:

rabbitmqadmin get queue=function.monitoring count=1

+----------------+----------------+---------------+-----------------------+---------------+------------------+-------------+
|  routing_key   |    exchange    | message_count |        payload        | payload_bytes | payload_encoding | redelivered |
+----------------+----------------+---------------+-----------------------+---------------+------------------+-------------+
| function-out-0 | function-out-0 | 0             | Hello, hello function | 21            | string           | False       |
+----------------+----------------+---------------+-----------------------+---------------+------------------+-------------+

Et le message qui s’y trouve est bien ce à quoi l’on s’attend, excellent !

Et ça marche aussi en Reactive ?

Testons une fonction réactive:

@Bean
public Function<Flux<String>, Flux<String>> reactiveFunction() {
    return input -> input
            .log()
            .map(s -> s.toUpperCase());
}

Je vous épargne les commandes pour déclarer la file de monitoring pour directement publier un message:

rabbitmqadmin publish exchange=reactiveFunction-in-0 routing_key="" payload="coucou"

Message published

Et si on visualise le message dans notre file de monitoring:

rabbitmqadmin get queue=reactiveFunction.monitoring count=1

+------------------------+------------------------+---------------+---------+---------------+------------------+-------------+
|      routing_key       |        exchange        | message_count | payload | payload_bytes | payload_encoding | redelivered |
+------------------------+------------------------+---------------+---------+---------------+------------------+-------------+
| reactiveFunction-out-0 | reactiveFunction-out-0 | 0             | COUCOU  | 6             | string           | False       |
+------------------------+------------------------+---------------+---------+---------------+------------------+-------------+

COUCOU est bien là, tout s’est passé comme prévu !

Multiples messages en impératif

Cette section et la suivante vont montrer comment retourner plusieurs messages dans la file de sortie alors qu’on n’en consomme qu’un seul en entrée. C’est une façon d’implémenter le pattern Splitter.

Nous allons d’abord voir comment faire ça en utilisant un modèle de programmation impératif (càd non reactive) et la section suivante montrera comment faire la même chose en reactive.

Le code Java:

@Bean
public Function<Order, List<Message<OrderItem>>> multipleMessages() {
    return input -> input
            .getItems()
            .stream()
            .map(item -> MessageBuilder.withPayload(item).build())
            .collect(Collectors.toList());
}

@ToString
@Data
static class Order {
    private List<OrderItem> items;
}

@ToString
@Data
static class OrderItem {
    private String id;
}

Notez comment le second argument de notre function est de type List<Message>: cela indique au framework qu’il faut retourner autant de messages qu’il y a d’éléments dans la liste plutôt que de retourner un message avec une liste d’objets dedans.

Si on publie le message suivant:

rabbitmqadmin publish exchange=multipleMessages-in-0 routing_key="" payload='
{
    "items": [
        { "id": "item-1" },
        { "id": "item-2" },
        { "id": "item-3" }
    ]
}
'

Message published

Et que l’on regarde la file de monitoring multipleMessages.monitoring associée, on voit que 3 messages s’y trouvent:

rabbitmqadmin list queues

+--------------------------------------------------------+----------+
|                          name                          | messages |
+--------------------------------------------------------+----------+
| consumer-in-0.springboot                               | 1        |
| function.monitoring                                    | 1        |
| multipleMessages-in-0.anonymous.7LKC32WATlmyFt9MGEjYig | 0        |
| multipleMessages.monitoring                            | 3        |
| reactiveFunction.monitoring                            | 1        |
| reactiveSupplier.monitoring                            | 18       |
| supplier.monitoring                                    | 14       |
+--------------------------------------------------------+----------+

Regardons le contenu de ces messages:

rabbitmqadmin get queue=multipleMessages.monitoring count=3

+------------------------+------------------------+---------------+-----------------+---------------+------------------+-------------+
|      routing_key       |        exchange        | message_count |     payload     | payload_bytes | payload_encoding | redelivered |
+------------------------+------------------------+---------------+-----------------+---------------+------------------+-------------+
| multipleMessages-out-0 | multipleMessages-out-0 | 2             | {"id":"item-1"} | 15            | string           | False       |
| multipleMessages-out-0 | multipleMessages-out-0 | 1             | {"id":"item-2"} | 15            | string           | False       |
| multipleMessages-out-0 | multipleMessages-out-0 | 0             | {"id":"item-3"} | 15            | string           | False       |
+------------------------+------------------------+---------------+-----------------+---------------+------------------+-------------+

On voit qu’effectivement chaque message contient un OrderItem dans son payload.

Multiples messages en reactive

Essayons de reproduire la même chose mais en tant utilisant le modèle de programmation reactive:

@Bean
public Function<Flux<Order>, Flux<OrderItem>> reactiveMultipleMessages() {
    return input -> input
            .flatMap(order -> Flux.fromIterable(order.getItems()));
}

C’est un peu moins verbeux pour le coup…

Si on publie le message suivant:

rabbitmqadmin publish exchange=reactiveMultipleMessages-in-0 routing_key="" payload='
{
    "items": [
        { "id": "item-1" },
        { "id": "item-2" },
        { "id": "item-3" }
    ]
}
'

Message published

Regardons le contenu de ces messages:

rabbitmqadmin get queue=reactiveMultipleMessages.monitoring count=3

+--------------------------------+--------------------------------+---------------+-----------------+---------------+------------------+-------------+
|          routing_key           |            exchange            | message_count |     payload     | payload_bytes | payload_encoding | redelivered |
+--------------------------------+--------------------------------+---------------+-----------------+---------------+------------------+-------------+
| reactiveMultipleMessages-out-0 | reactiveMultipleMessages-out-0 | 2             | {"id":"item-1"} | 15            | string           | False       |
| reactiveMultipleMessages-out-0 | reactiveMultipleMessages-out-0 | 1             | {"id":"item-2"} | 15            | string           | False       |
| reactiveMultipleMessages-out-0 | reactiveMultipleMessages-out-0 | 0             | {"id":"item-3"} | 15            | string           | False       |
+--------------------------------+--------------------------------+---------------+-----------------+---------------+------------------+-------------+

Là aussi, cela a fonctionné, génial !

L’autre mode Batch

Il existe également un autre mode batch implémenté dans Spring AMQP et utilisable avec Spring Cloud Stream et nos fonctions.

Le principe est le suivant: si on publie des messages en s’appuyant sur une implémentation de BatchingStrategy, il est possible de combiner dans un seul message RabbitMQ un ensemble de messages individuels. Ces messages peuvent ensuite être débatchés pour que notre fonction les consomme sous forme de liste.

Avec un exemple, ce sera probablement plus clair:

@Bean
public Function<List<String>, String> batchConsumer() {
    return input -> {
        log.info("Consuming: {}", input);
        return String.join(";", input);
    };
}```

Notre fonction va concaténer les différents messages et publier un message contenant le résultat de cette concaténation.

Pour que cela fonctionne, on doit activer le mode batch:

spring: cloud: stream: bindings: batchConsumer-in-0: consumer: batch-mode: true rabbit: bindings: batchConsumer-in-0: consumer: enable-batching: true batch-size: 5 receive-timeout: 1000


La propriété `batch-size` indique le nombre d'éléments max de nos batches. La propriété `receive-timeout` indique le temps que le framework attend au maximum avant qu'un batch ne soit complet. Au bout de cette durée, le batch sera envoyé au `Consumer` même si sa taille est inférieure à celle configurée.

Afin de publier des messages, un bout de code java est nécessaire cette fois-ci car le message contenant le batch doit être construit d'une certaine façon par Spring AMQP pour que cela fonctionne:

```java
@Bean
@ConditionalOnExpression("'${spring.cloud.function.definition}'.contains('batchConsumer')" )
public ApplicationRunner runner(RabbitTemplate rabbitTemplate) {
    BatchingStrategy strategy = new SimpleBatchingStrategy(5, 25_000, 1_000);
    TaskScheduler scheduler = new ConcurrentTaskScheduler();
    BatchingRabbitTemplate template = new BatchingRabbitTemplate(strategy, scheduler);
    template.setConnectionFactory(rabbitTemplate.getConnectionFactory());

    return args -> {
        for (var i = 0; i < 12; i++) {
            var props = new MessageProperties();
            props.setContentType("text/plain");
            org.springframework.amqp.core.Message message = new org.springframework.amqp.core.Message(("message " + i).getBytes(StandardCharsets.UTF_8), props);
            template.send("batchConsumer-in-0", "", message, null);
        }
    };
}

Ce bout de code publie 12 messages (dans le code) qui généreront donc 3 vrais messages publiés dans RabbitMQ car nous définissons à 5 le nombre max d’éléments dans notre stratégie de batching.

Faîtes attention au content-type si, comme moi, vous consommez un type simple (String par exemple). Si vous ne le renseignez pas, le framework va considérer qu’il s’agit de JSON et va planter.

L’annotation ConditionalOnExpression évite de publier les messages si la fonction de batch n’est pas activée.

Si on lance l’application, les logs semblent confirmer que cela fonctionne:

2022-01-15 14:48:09.899  INFO 80008 --- [a-0rAYNOEuOaA-1] scf.Application                          : Consuming: [message 0, message 1, message 2, message 3, message 4]
2022-01-15 14:48:09.904  INFO 80008 --- [a-0rAYNOEuOaA-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
...
2022-01-15 14:48:09.925  INFO 80008 --- [a-0rAYNOEuOaA-1] scf.Application                          : Consuming: [message 5, message 6, message 7, message 8, message 9]
2022-01-15 14:48:10.892  INFO 80008 --- [a-0rAYNOEuOaA-1] scf.Application                          : Consuming: [message 10, message 11]

Et si nous regardons notre file de monitoring créée pour l’occasion:

rabbitmqadmin list queues

+-----------------------------------------------------+----------+
|                        name                         | messages |
+-----------------------------------------------------+----------+
| batchConsumer-in-0.anonymous.4nCInWpKRa-0rAYNOEuOaA | 0        |
| batchConsumer.monitoring                            | 3        |
| consumer-in-0.springboot                            | 0        |
| function.monitoring                                 | 1        |
| hello.world.queue                                   | 0        |
| multipleMessages.monitoring                         | 3        |
| reactiveFunction.monitoring                         | 1        |
| reactiveMultipleMessages.monitoring                 | 3        |
| reactiveSupplier.monitoring                         | 18       |
| supplier.monitoring                                 | 14       |
+-----------------------------------------------------+----------+

On constate qu’il y a bien 3 messages seulement dans batchConsumer.monitoring. Jetons un oeil au contenu des messages à présent:

rabbitmqadmin get queue=batchConsumer.monitoring count=3

+---------------------+---------------------+---------------+---------------------------------------------------+---------------+------------------+-------------+
|     routing_key     |      exchange       | message_count |                      payload                      | payload_bytes | payload_encoding | redelivered |
+---------------------+---------------------+---------------+---------------------------------------------------+---------------+------------------+-------------+
| batchConsumer-out-0 | batchConsumer-out-0 | 2             | message 0;message 1;message 2;message 3;message 4 | 49            | string           | True        |
| batchConsumer-out-0 | batchConsumer-out-0 | 1             | message 5;message 6;message 7;message 8;message 9 | 49            | string           | True        |
| batchConsumer-out-0 | batchConsumer-out-0 | 0             | message 10;message 11                             | 21            | string           | True        |
+---------------------+---------------------+---------------+---------------------------------------------------+---------------+------------------+-------------+

Yes, cela a marché ! Nos deux premiers messages contiennent chacun 5 messages et le troisième les deux derniers messages publiés.

Multiples outputs

Une autre fonctionnalité intéressante à tester concerne les fonctions avec plusieurs entrées et sorties. La doc donne quelques use cases pour ce type de fonction: big data et aggrégation de données par exemple.

Cela ne fonctionne qu’avec le modèle reactive et en utilisant les Tuples, comme nous l’avons vu dans l’introduction à Spring Cloud Function.

Pour illustrer cela, nous allons implémenter le pattern Scatter-Gather.

Dans une première fonction (Scatter), nous allons consommer un flux de nombres que nous allons diviser en deux flux: le premier contiendra les multiples de 3 et le second les autres. Chacun de ces flux sera redirigé vers sa propre file RabbitMQ.

@Configuration
public class ScatterGather {
    @Bean
    public Function<Flux<Integer>, Tuple2<Flux<Integer>, Flux<Integer>>> scatter() {
        return order -> {
            var flux = order.publish().autoConnect(2);

            Sinks.Many<Integer> multipleOf3 = Sinks.many().unicast().onBackpressureError();
            var multipleOf3Flux = flux
                    .filter(n -> n % 3 == 0)
                    .doOnNext(multipleOf3::tryEmitNext);

            Sinks.Many<Integer> nonMultipleOf3 = Sinks.many().unicast().onBackpressureError();
            var nonMultipleOf3Flux = flux
                    .filter(n -> n % 3 != 0)
                    .doOnNext(nonMultipleOf3::tryEmitNext);

            return Tuples.of(
                    multipleOf3.asFlux().doOnSubscribe(x -> multipleOf3Flux.subscribe()),
                    nonMultipleOf3.asFlux().doOnSubscribe(x -> nonMultipleOf3Flux.subscribe())
            );
        };
    }
}

Si vous n’avez jamais fait de programmation reactive (ou seulement un peu), ce bout de code ne vous parle sans doute pas beaucoup mais dans l’idée, il fait ce que j’ai décrit un peu plus haut.

Vérifions que chaque flux est bien associé à un exchange:

+--------------------------------+---------+
|              name              |  type   |
+--------------------------------+---------+
|                                | direct  |
| amq.direct                     | direct  |
 ...                     
| scatter-in-0                   | topic   |
| scatter-out-0                  | topic   |
| scatter-out-1                  | topic   |
+--------------------------------+---------+

On voit qu’effectivement, trois exchanges ont cette fois été déclarés: un pour l’entrée et deux pour les sorties.

Essayons de publier quelques nombres:

for i in {1..20}; do rabbitmqadmin publish exchange=scatter-in-0 routing_key="" payload="$i";  done

Message published
...
Message published

Vérifions que les nombres ont bien été répartis selon notre algorithme:

rabbitmqadmin list queues
+-----------------------------------------------+----------+
|                     name                      | messages |
+-----------------------------------------------+----------+
| scatter-in-0.anonymous.o0b5DMZYRbK-r18OjXbm7Q | 0        |
| scatter-out-0.monitoring                      | 6        |
| scatter-out-1.monitoring                      | 14       |
+-----------------------------------------------+----------+

6 multiples de 3 et 14 non multiples, vérifions le contenu des files à présent:

rabbitmqadmin get queue=scatter-out-0.monitoring count=6

+---------------+---------------+---------------+---------+---------------+------------------+-------------+
|  routing_key  |   exchange    | message_count | payload | payload_bytes | payload_encoding | redelivered |
+---------------+---------------+---------------+---------+---------------+------------------+-------------+
| scatter-out-0 | scatter-out-0 | 5             | 3       | 1             | string           | False       |
| scatter-out-0 | scatter-out-0 | 4             | 6       | 1             | string           | False       |
| scatter-out-0 | scatter-out-0 | 3             | 9       | 1             | string           | False       |
| scatter-out-0 | scatter-out-0 | 2             | 12      | 2             | string           | False       |
| scatter-out-0 | scatter-out-0 | 1             | 15      | 2             | string           | False       |
| scatter-out-0 | scatter-out-0 | 0             | 18      | 2             | string           | False       |
+---------------+---------------+---------------+---------+---------------+------------------+-------------+

rabbitmqadmin get queue=scatter-out-1.monitoring count=14
+---------------+---------------+---------------+---------+---------------+------------------+-------------+
|  routing_key  |   exchange    | message_count | payload | payload_bytes | payload_encoding | redelivered |
+---------------+---------------+---------------+---------+---------------+------------------+-------------+
| scatter-out-1 | scatter-out-1 | 13            | 1       | 1             | string           | False       |
| scatter-out-1 | scatter-out-1 | 12            | 2       | 1             | string           | False       |
| scatter-out-1 | scatter-out-1 | 11            | 4       | 1             | string           | False       |
| scatter-out-1 | scatter-out-1 | 10            | 5       | 1             | string           | False       |
| scatter-out-1 | scatter-out-1 | 9             | 7       | 1             | string           | False       |
| scatter-out-1 | scatter-out-1 | 8             | 8       | 1             | string           | False       |
| scatter-out-1 | scatter-out-1 | 7             | 10      | 2             | string           | False       |
| scatter-out-1 | scatter-out-1 | 6             | 11      | 2             | string           | False       |
| scatter-out-1 | scatter-out-1 | 5             | 13      | 2             | string           | False       |
| scatter-out-1 | scatter-out-1 | 4             | 14      | 2             | string           | False       |
| scatter-out-1 | scatter-out-1 | 3             | 16      | 2             | string           | False       |
| scatter-out-1 | scatter-out-1 | 2             | 17      | 2             | string           | False       |
| scatter-out-1 | scatter-out-1 | 1             | 19      | 2             | string           | False       |
| scatter-out-1 | scatter-out-1 | 0             | 20      | 2             | string           | False       |
+---------------+---------------+---------------+---------+---------------+------------------+-------------+

Les nombres sont bien répartis comme attendu, super !

Faisons l’opération inverse à présent (Gather) en consommant les deux files puis en les mergeant pour les publier dans une nouvelle file:

@Bean
public Function<Tuple2<Flux<Integer>, Flux<Integer>>, Flux<Integer>> gather() {
    return tuple -> {
        Flux<Integer> multipleOf3 = tuple.getT1();
        Flux<Integer> nonMultipleOf3 = tuple.getT2();

        return Flux
                .merge(multipleOf3, nonMultipleOf3)
                .log();
    };
}

Le code est un peu plus simple ce coup-ci. Par contre, il y a un peu de configuration à ajouter pour que notre fonction gather consomme les files de sortie de notre fonction scatter:

spring:
  cloud:
    stream:
      bindings:
        scatter-out-0:
          group: multipleOf3
        scatter-out-1:
          group: nonMultipleOf3
        gather-in-0:
          destination: scatter-out-0
          group: multipleOf3
        gather-in-1:
          destination: scatter-out-1
          group: nonMultipleOf3

On est obligé d’utiliser la propriété group pour fixer la file. La propriété destination indique le nom de l’exchange que l’on souhaite utiliser.

Après avoir relancé l’application et lister les files, voici ce que l’on a:

rabbitmqadmin list queues

+-----------------------------------------------+----------+
|                     name                      | messages |
+-----------------------------------------------+----------+
| gather-out-0.monitoring                       | 0        |
| scatter-in-0.anonymous.o0b5DMZYRbK-r18OjXbm7Q | 0        |
| scatter-out-0.monitoring                      | 6        |
| scatter-out-0.multipleOf3                     | 0        |
| scatter-out-1.monitoring                      | 14       |
| scatter-out-1.nonMultipleOf3                  | 0        |
+-----------------------------------------------+----------+

(n’oubliez pas que les files suffixées par .monitoring ont été créées manuellement)

Si on republie les messages, les nombres devraient maintenant se retrouver groupés dans la file gather-out-0.monitoring:

for i in {1..20}; do rabbitmqadmin publish exchange=scatter-in-0 routing_key="" payload="$i";  done

Message published
...
Message published

rabbitmqadmin list queues

+-----------------------------------------------+----------+
|                     name                      | messages |
+-----------------------------------------------+----------+
| gather-out-0.monitoring                       | 20       |
| scatter-in-0.anonymous.o0b5DMZYRbK-r18OjXbm7Q | 0        |
| scatter-out-0.monitoring                      | 6        |
| scatter-out-0.multipleOf3                     | 0        |
| scatter-out-1.monitoring                      | 14       |
| scatter-out-1.nonMultipleOf3                  | 0        |
+-----------------------------------------------+----------+

rabbitmqadmin get queue=gather-out-0.monitoring count=20

+--------------+--------------+---------------+---------+---------------+------------------+-------------+
| routing_key  |   exchange   | message_count | payload | payload_bytes | payload_encoding | redelivered |
+--------------+--------------+---------------+---------+---------------+------------------+-------------+
| gather-out-0 | gather-out-0 | 19            | 1       | 1             | string           | False       |
| gather-out-0 | gather-out-0 | 18            | 2       | 1             | string           | False       |
| gather-out-0 | gather-out-0 | 17            | 3       | 1             | string           | False       |
| gather-out-0 | gather-out-0 | 16            | 4       | 1             | string           | False       |
| gather-out-0 | gather-out-0 | 15            | 5       | 1             | string           | False       |
| gather-out-0 | gather-out-0 | 14            | 6       | 1             | string           | False       |
| gather-out-0 | gather-out-0 | 13            | 7       | 1             | string           | False       |
| gather-out-0 | gather-out-0 | 12            | 8       | 1             | string           | False       |
| gather-out-0 | gather-out-0 | 11            | 9       | 1             | string           | False       |
| gather-out-0 | gather-out-0 | 10            | 10      | 2             | string           | False       |
| gather-out-0 | gather-out-0 | 9             | 11      | 2             | string           | False       |
| gather-out-0 | gather-out-0 | 8             | 12      | 2             | string           | False       |
| gather-out-0 | gather-out-0 | 7             | 13      | 2             | string           | False       |
| gather-out-0 | gather-out-0 | 6             | 14      | 2             | string           | False       |
| gather-out-0 | gather-out-0 | 5             | 15      | 2             | string           | False       |
| gather-out-0 | gather-out-0 | 4             | 16      | 2             | string           | False       |
| gather-out-0 | gather-out-0 | 3             | 17      | 2             | string           | False       |
| gather-out-0 | gather-out-0 | 2             | 18      | 2             | string           | False       |
| gather-out-0 | gather-out-0 | 1             | 19      | 2             | string           | False       |
| gather-out-0 | gather-out-0 | 0             | 20      | 2             | string           | False       |
+--------------+--------------+---------------+---------+---------------+------------------+-------------+

Oui, nos 20 nombres de départ sont bien rassemblés à nouveau !

Conclusion

Je crois qu’on a vu pas mal de choses en s’amusant avec nos Spring Cloud Functions et Spring Cloud Stream: j’espère que cela vous aura été utile comme cela l’a été pour moi !

On n’a clairement pas tout essayé mais les possibilités de combinaisons permettent d’imaginer de nombreux scenarios typiques d’architectures event-driven. Probablement une idée pour un futur article :-)

Liens

  1. Les exemples de l’article
  2. Spring Cloud Stream

© 2023 Du côté de chez Fouad. Tous droits réservés.