logo le blog invivoo blanc

Reactor – N’ayez plus peur de la programmation non bloquante avec Reactor !

26 janvier 2022 | Design & Code, Java | 0 comments

Introduction à Reactor

Parlons de Reactor : avec l’amélioration continue des techniques et des besoins, la programmation réactive est devenue la référence pour les programmes demandant disponibilité, résilience, souplesse et répondant à des événements asynchrones. Elle est d’autant plus une référence lorsque l’on souhaite répartir la charge et utiliser les ressources tout en permettant de mieux découpler le code d’une application. L’écosystème Java, toujours soucieux de permettre à ses développeurs de créer des applications performantes, de devait de suivre ce mouvement, et ça n’a pas manqué ! Après la publication de la première version du Manifeste Réactif en 2013 par Jonas Bonér, plusieurs librairies sont apparues permettant ainsi de profiter de ce nouveau modèle de programmation, dont certaines sont particulièrement devenues populaires :

  • RxJava (Reactive eXtension for Java): Principalement connu dans l’écosystème Android,elle se distingue par sa compatibilité officielle avec les différentes versions de ce SDK et utilise le design pattern Observable
  • Project Reactor: Venant de la communauté de Spring, elle en est le cœur de WebFlux (qui permet de développement d’APIs réactives et non-bloquantes), implémente la spécification Reactive Streams et profite de l’avantage d’être basée sur les APIs Java (CompletableFuture, Stream, Duration, ExecutorService, Flow…) et utilise aussi le design pattern Observable
  • Eclipse Vert.X: Utilisée par beaucoup de grandes enseignes, et à l’époque faisant partie des plus performantes, cette librairie se distingue par sa modularité et sa simplicité, et intègre un bus d’évènements permettant de centraliser et multiplexer les messages

Néanmoins, ces différentes librairies évoluent, et la différence de performance et de moins en moins présente, voire quasi inexistante. De plus, chacune d’entre elles s’importe très facilement par un simple ajout d’une dépendance (voire plus suivant les modules nécessaires, comme un client http, kafka ou encore UDP).

Malgré ces différentes librairies, nous allons nous concentrer sur le projet Reactor qui m’a permis d’aborder très sereinement ces nouvelles contraintes lorsque j’y ai été confronté chez ManoMano.

Reactor, kesako ?

La programmation réactive a pour base de concept chez Reactor un flux d’éléments, fini ou infini, représenté par deux classes principales, plutôt parlantes :

  • Mono : flux d’éléments pouvant en émettre un seul au maximum, ayant donc une cardinalité de 0 à 1
  • Flux : flux d’éléments pouvant en émettre à l’infini, ayant une cardinalité de 0 à N

Chacun de ces deux types de flux représente un Publisher et Reactor apporte tout un tas de méthodes nommées “opérateurs”, permettant la manipulation des éléments émis par ces flux afin de les regrouper, les diviser, les différer, les filtrer, les convertir, et bien plus encore… Il vous permet aussi de gérer les erreurs (je sais, personne n’est parfait) qui peuvent survenir pendant le traitement de votre flux en vous donnant la possibilité de les ignorer, de les convertir en un autre élément, etc.  Ces outils permettent donc une manipulation simple et très lisible (syntaxe encourageant la programmation fonctionnelle) des flux « en mode asynchrone » et Reactor, de par sa structure, vous fait gagner en disponibilité, résilience et souplesse de manière transparente !

Plan de l’article Reactor

Dans un premier temps, je vais vous exposer le contexte dans lequel Reactor sera mis en œuvre avec le code final (histoire d’attiser votre curiosité) et ensuite je vais le décrire pas-à-pas. Au travers de cette description, je souhaite vous apporter une meilleure compréhension des problématiques soulevées par la programmation réactive et vous montrer comment Reactor vous permet de les maîtriser facilement pour que vous n’ayez plus aucune excuse pour vous y mettre ! Et si cette librairie vous intrigue, je vous indiquerai quelques concepts à creuser pour d’aller plus loin !

C’est parti !

Le cas d’usage Reactor

Prenons le côté positif d’un sujet d’actualité : le nombre de guéris du Covid-19.

Le but ici va être d’appeler une API qui va récupérer le nombre de guéris d’hier et d’avant-hier en parallèle pour en afficher la différence le plus rapidement possible, tout en faisant attention à une possible erreur de format de données ou de réseau. Pour se faire, nous devons l’appeler deux fois : une pour chaque date (hier et avant-hier).

L’API en question est coronavirusapi-france.now.sh qui expose plusieurs routes permettant d’obtenir les statistiques du covid-19 de manière globale et mondiale, par pays, par département… Et par date, ce qui nous intéresse donc ici. En se basant sur le 15 novembre 2020, la route à appeler serait : GET coronavirusapi-france.now.sh/FranceGlobalDataByDate?date=2020-11-15, et le résultat en JSON serait par exemple :

{
  "FranceGlobalDataByDate" : [
    {
      "date" : "2020-11-15",
      "sourceType" : "ministere-sante",
      "gueris" : 1111
      [...]
    },
    {
      "date" : "2020-11-15",
      "gueris" : 2222,
      "sourceType" : "opencovid19-fr"
      [...]
    }
  ]
}

Le JSON obtenu nous permet de savoir le nombre absolu de guéris en fonction des sources, comme les données open source de opencovid19-fr, ou le Ministère de la santé. Ce sera cette dernière source que nous allons traiter, en ignorant les autres.

Assez parlé du contexte, voici le code final permettant de faire notre calcul :

@SpringBootApplication
public class LesGuerisDuCovid19 implements CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(LesGuerisDuCovid19.class);
   private static final DateTimeFormatter DATE_PARAM_FORMATTER = DateTimeFormatter.ISO_DATE;
    private static final String SOURCE_INTERESSANTE = "ministere-sante";

    private final WebClient httpClient = WebClient.create("https://coronavirusapi-france.now.sh");
    
    public static void main(final String[] args) {
         var app = new SpringApplication(LesGuerisDuCovid19.class);
        app.setWebApplicationType(WebApplicationType.NONE);// Pour ne pas lancer le serveur web
        app.run(args);
    }
    
    @Override
    public void run(final String... args) {
        var hier = LocalDate.now().minusDays(1);
        var avantHier = hier.minusDays(1);

        Integer nouveauxGueris = Mono.zip(getNombreGueris(hier, SOURCE_INTERESSANTE), getNombreGueris(avantHier, SOURCE_INTERESSANTE)) // 8
            .doFirst(() -> log.info("Récupération des données entre hier et avant-hier...")) // 9
            .map(tuple -> tuple.getT1() - tuple.getT2()) // 10
            .onErrorResume(error -> Mono.fromRunnable(() -> log.error("Problème lors de la récupération des données: {}", error.getMessage()))) // 11
            .block(); // 12
        // .... À vous d'écrire la suite ....
    }

    private Mono<Integer> getNombreGueris(final LocalDate date, final String source) {
        return httpClient.get()
                     .uri(uri -> uri.path("/FranceGlobalDataByDate").queryParam("date", DATE_PARAM_FORMATTER.format(date)).build())
                     .retrieve()
                     .bodyToMono(JsonNode.class) // 1
                     .doFirst(() -> log.info("Récupération des données pour le {} sur la source '{}'...", date, source)) // 2.a
                     .doOnNext(jsonData -> log.info("Données reçues: {}", jsonData.toPrettyString())) // 2.b
                     .map(jsonData -> jsonData.get("FranceGlobalDataByDate")) // 3
                     .flatMapMany(dataByDate -> Flux.range(0, dataByDate.size()).map(dataByDate::get)) // 4
                     .filter(dataByDate -> source.equals(dataByDate.get("sourceType").textValue())).next() // 5
                     .flatMap(data -> Mono.justOrEmpty(data.get("gueris"))).map(JsonNode::intValue) // 6
                     .switchIfEmpty(Mono.error(new RuntimeException(String.format("Pas de données pour le %s sur la source '%s' !", date, source)))); // 7
    }
}

En résumé, getNombreGueris(…) va récupérer la valeur du nombre de guéris dans le résultat (en JSON) de la réponse de l’API, et émettre une erreur si jamais ce champ n’est pas trouvé. La méthode run() est le point d’entrée de notre programme, qui va appeler getNombreGueris pour la date d’hier et d’avant-hier en parallèle, afin de pouvoir calculer la différence de guéris et l’afficher dans la console.

Et en avant pour l’explication. Dans un premier temps, je vais vous expliquer comment initialiser votre projet en ajoutant les dépendances nécessaires pour utiliser Reactor et Spring WebFlux. Ensuite nous verrons comment récupérer le nombre de guéris pour une date donnée. Pour finir, je vous expliquerai comment combiner le résultat des deux appels.

Dépendance

La première chose à faire est d’ajouter les dépendances nécessaires pour utiliser Reactor, ainsi que WebClient, une fonctionnalité de Spring nous permettant d’appeler une API de manière asynchrone, basé sur… Reactor ! Quelle coïncidence 😉

Il vous faudra donc n’ajouter qu’une seule dépendance à voitre projet :

  • Avec gradle
implementation group: 'org.springframework.boot', name: 'spring-boot-starter-webflux', version: '2.4.0'
  • Avec maven
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
  <version>2.4.0</version>
</dependency>

Explications pas-à-pas

Nous allons partir d’une base très simple, en laissant Spring boot configurer le nécessaire.

Ici, l’auto-configuration va surtout nous servir à avoir un format de logs sympa, avec le niveau par défaut à INFO.

@SpringBootApplication
public class LesGuerisDuCovid19 implements CommandLineRunner {
   private static final Logger log = LoggerFactory.getLogger(LesGuerisDuCovid19.class);
    public static void main(final String[] args) {
        var app = new SpringApplication(LesGuerisDuCovid19.class);
        app.setWebApplicationType(WebApplicationType.NONE);// Pour ne pas lancer le serveur web
        app.run(args);
    }
    
    @Override
    public void run(final String... args) {
      // Notre code à exécuter ici
    }
}

1.

Pour faire notre requête http vers l’API, il va falloir initialiser un WebClient en donnant l’url de base, ici https://coronavirusapi-france.now.sh :

WebClient clientHttp = WebClient.create("https://coronavirusapi-france.now.sh")

WebClient nous propose une abstraction de tous les verbes http habituels, à savoir GET, POST, PUT, DELETE, etc… Ici, nous allons utiliser GET pour le chemin /FranceGlobalDataByDate qui attend le paramètre de requête date formaté en date ISO (aaaa-MM-jj), et récupérer le JSON sorti tout droit de l’API :

Mono<JsonNode> resultatJson = httpClient.get()
          .uri(uri -> uri.path("/FranceGlobalDataByDate").queryParam("date", DateTimeFormatter.ISO_DATE.format(date)).build())
          .retrieve()
          .bodyToMono(JsonNode.class);

Si nous voulons passer plus de paramètre de requête, il suffit de chaîner les appels avec .queryParam(“param”, value)

JsonNode.class pourrait être remplacé par FranceGlobalData.class où FranceGlobalData serait un simple POJO, représentant le schéma du JSON à extraire, plutôt que d’utiliser JsonNode qui est générique et pourra représenter n’importe quel schéma JSON. Si la réponse est totalement vide (pas de JSON retournée), le Mono retourné sera lui aussi vide, mais chut ! Un peu de suspens avant d’en parler.

2.a.

Il est possible d’effectuer une action lorsqu’un flux se déclenche (autrement lors d’un subscribe), avant tout le reste.

Ici, on veut écrire un log juste avant d’exécuter la requête http :

.doFirst(() -> log.info("Récupération des données pour le {} sur la source '{}'...", date, source))

2.b.

Mais on peut aussi effectuer une action lorsqu’un élément est émis dans un flux.

Ici, on veut écrire un log juste après avoir exécuté la requête http :

.doOnNext(jsonData -> log.info("Données reçues: {}", jsonData.toPrettyString()))

3.

Nous avons dans le Mono l’objet JSON suivant :

 "FranceGlobalDataByDate" : [ ... ]
}

Nous allons mapper la donnée reçue pour récupérer le contenu du champ FranceGlobalDataByDate :

.map(jsonData -> jsonData.get("FranceGlobalDataByDate"))

4.

Nous avons maintenant dans le Mono l’élément JSON suivant (un tableau d’objets) :

[
  {
  "date" : "aaaa-MM-jj",
  "sourceType" : "xxx",
    "gueris" : 1111
    [...]
  },
  {
    ...
  }
]

Nous allons convertir le Mono émettant le tableau d’objets vers un Flux d’objets, rendant bien plus simple les futures opérations pour chaque objet.

La particularité du JsonNode est qu’il ne fournit pas directement une liste ou un stream de tous les objets.

Pour avoir accès à chacun d’eux, on aurait donc naturellement fait :

for (int i = 0; i < dataByDate.size(); i++){
  var object = dataByDate.get(i);
}

Mais Reactor nous permet de générer un flux d’index où nous allons par la suite mapper chaque index vers une case de notre tableau, et vous allez voir, c’est plus concis :

Flux.range(0, dataByDate.size()).map(dataByDate::get)

Enfin, pour convertir le Mono vers un Flux, il faut utiliser .flatMapMany. Voici à quoi ressemble le code tout assemblé :

.flatMapMany(dataByDate -> Flux.range(0, dataByDate.size()).map(dataByDate::get))

Si le tableau est vide, le Flux retourné sera lui aussi vide, le suspense est encore de la partie… Mais soyez patients, l’explication arrive !

5.

Nous avons maintenant dans le Flux aucun à plusieurs objets JSON de type :

{
  "date" : "aaaa-MM-jj",
  "sourceType" : "xxx",
  "gueris" : 1111
  [...]
}

Ce que nous voulons, c’est seulement récupérer les données du Ministère de la santé. Il va donc falloir filtrer avec .filter le flux en ne récupérant que les objets dont le champ sourceType contient la valeur ministere-sante.

Comme chaque tableau n’a qu’un seul objet par type de source, nous prendrons uniquement le premier qui satisfait cette condition avec .next, ce qui aura pour effet de convertir le Flux en Mono .

Voici le code :

.filter(dataByDate -> source.equals(dataByDate.get("sourceType").textValue())).next()

Si aucun élément ne correspond à notre source, le Mono retourné sera vide, qui pourra donc être géré par la suite avec des opérateurs spécifiques ! Mais… Encore du suspens #roulementDeTambours

6.

Nous avons maintenant dans le Mono un objet JSON suivant :

{
  "date" : "aaaa-MM-jj",
  "sourceType" : "ministere-sante",
  "gueris" : 1111
  [...]
}

Nous allons mapper l’objet pour récupérer le contenu du champ gueris.

.map(data -> data.get("gueris").intValue())

Le problème du .map avec Reactor, c’est qu’il n’accepte pas que la valeur obtenue dans le mapper soit null.

Pour des raisons d’apprentissage, on va faire comme si le champ gueris pouvait être null. Pour gérer ce cas, le seul moyen est d’utiliser le .flatMap qui permet de mapper un élément vers 0 à 1 élément (et 0 à N éléments pour un Flux). Pour représenter 0 élément, il faut utiliser Mono.empty() et pour 1 élément, nous avons Mono.just(valeur):

.flatMap(jsonData -> jsonData.get("gueris") == null ? Mono.empty() : Mono.just(jsonData.get("gueris")))

Mais Reactor ne s’arrête pas là, on peut simplifier notre ternaire en un seul morceau avec Mono.justOrEmpty(valeur), où si la valeur est nulle, elle est mappée toute seule vers un Mono.empty() !

.flatMap(jsonData -> Mono.justOrEmpty(jsonData.get("gueris")))

Ce code aurait pu être écrit avec un Optional de cette manière :

.map(jsonData -> Optional.ofNullable(jsonData.get("gueris")))
.filter(Optional::isPresent)
.map(Optional::get)

Avec la gestion du champ null, le code sera donc le suivant :

.flatMap(data -> Mono.justOrEmpty(data.get("gueris"))).map(JsonNode::intValue)

Oui, encore un Mono pouvant être vide si le champ gueris est vide ou n’existe pas.

7.

C’est le moment ! À plusieurs reprises, je vous ai dit que nous pouvions obtenir un Mono ou Flux vide. Et nous allons gérer cette particularité en générant une erreur, non pas avec throw, mais avec Mono.error.

En déclenchant cette erreur, n’importe quel opérateur qui interviendrait après serait totalement ignoré (excepté certains qu’on va voir juste après), permettant de court-circuiter un Mono/Flux :

.switchIfEmpty(Mono.error(new RuntimeException()))
.doOnNext(v -> log.info("Voici un log qui ne sera pas écrit si aucun élément est émis"))

Cependant, on pourrait très bien imaginer retourner une valeur par défaut -1, faisable de cette manière (et dans ce cas, les opérateurs qui suivront pouront intervenir sur notre valeur par défaut) :

.defaultIfEmpty(-1)

Avec un message d’erreur plus parlant, voici le code :

.switchIfEmpty(Mono.error(new RuntimeException(String.format("Pas de données pour le %s sur la source '%s' !", date, source))))

8.

Maintenant que nous avons extrait la logique de récupération d’un nombre de guéris en fonction d’une date et d’une source dans un Mono<Integer>, nous pouvons récupérer ce nombre pour la date d’hier et d’avant-hier. Le mieux serait de déclencher en parallèle les deux requêtes et joindre les résultats ensemble, même si elles ne répondent pas à la même vitesse.

Et c’est faisable facilement à l’aide de Mono.zip(requete1, requete2), qui va prendre nos deux appels, et joindre les résultats dans un Tuple2<Resultat1, Resultat2>, où tuple.getT1() correspondra au résultat de l’appel en 1ère position dans le .zip, et tuple.getT2() correspondra à celui de l’appel en 2ème position. Si un appel déclenche une erreur, l’autre appel sera annulé et l’erreur sera retransmise dans la suite des opérateurs. Si un appel ne retourne rien, l’autre appel sera annulé mais aucune erreur ne sera déclenchée (le Mono retourné par le .zip sera annulé) Il est possible de combiner les résultats autrement qu’avec un tuple, mais ce cas, bien qu’il soit intéressant, est hors de notre sujet. Voici le code, aussi simple qu’efficace, permettant de récupérer le nombre de guéris d’hier et d’avant hier pour la source ministere-sante, et qui nous retournera un Mono<Tuple2<Integer, Integer>> :

var hier = LocalDate.now().minusDays(1);
var avantHier = hier.minusDays(1);
Mono.zip(getNombreGueris(hier, "ministere-sante"), getNombreGueris(avantHier, "ministere-sante"))

9.

Mettons une trace écrite, qui sera affichée avant le .doFirst de chaque appel (voir le pas #2.a) :

.doFirst(() -> log.info("Récupération des données entre hier et avant-hier..."))

10.

Lorsque les deux requêtes sont terminées, nous obtenons un tuple contenant le nombre de guéris d’hier (T1) et avant-hier (T2). Pour récupérer le nombre de nouveaux guéris, il faut simplement soustraire les guéris d’hier à ceux d’avant-hier :

.map(tuple -> tuple.getT1() - tuple.getT2())

11.

Pour la gestion de notre erreur précédente, c’est simple : nous pouvons convertir une erreur en un autre Mono (ou un autre Flux lorsque l’erreur est attrapée dans un flux) en utilisant .onErrorResume.

Dans notre cas, nous allons nous contenter d’écrire un log avec le message d’erreur sans émettre d’élément supplémentaire, et donc le Mono retourné sera vide :

.onErrorResume(error -> Mono.fromRunnable(() -> log.error("Problème lors de la récupération des données: {}", error.getMessage())))

12.

Voici la méthode la plus importante et souvent la plus oubliée :

.subscribe();

Ou celle-ci :

var value = mono.block();

Imaginez-vous dans votre vieille Clio par temps pluvieux, prêt à avancer, mais vos essuie-glaces ne font pas leur travail car vous ne les avez pas enclenchés (oui, il fut un temps où ils n’étaient pas automatiques). Ici, c’est le même problème : si nous n’appelons pas cette méthode, le flux ne sera pas lancé.

Cependant, il existe une réelle différence entre ces deux méthodes :

  • .subscribe() va déclencher le flux de manière asynchrone et non-bloquante, et le retour de cette méthode sera donc immédiat. Voici un exemple qui va d’abord afficher Hello ! puis ensuite Valeur reçue :
void run() {
  mono.doOnNext(value -> log.info("Valeur reçue")).subscribe();
  log.info("Hello !");
}

.block() va déclencher le flux, mais le retour de cette méthode sera synchrone, c’est à dire lorsque le flux sera terminé. Comme son nom l’indique, elle va bloquer la méthode run() jusqu’à ce que le Mono se complète. Voici un exemple qui va d’abord afficher Valeur reçue puis ensuite Hello ! :

void run() {
  var value = mono.doOnNext(value -> log.info("Valeur reçue")).block();
  log.info("Hello !");
}

Dans notre cas, nous voulons récupérer le nombre de nouveau guéris dans une variable pour vous laisser la possibilité d’imaginer une suite non-réactive. Mais attention, si une erreur est déclenchée, nous avons choisi d’avoir un log, et de renvoyer un Mono vide. Et si le Mono est vide, la valeur retournée par .block() sera null :

Integer nouveauxGueris = mono.block();

Si jamais vous appelez 10 fois la méthode block ou subscribe, le publisher va se ré-exécuter 10 fois ! Et c’est là tout l’intérêt des Cold publishers

Comment tester ?

Pour tester, Reactor met à disposition l’objet StepVerifier qui permet de tester les différentes étapes de manipulation de votre flux. Voici quelques use cases avec le code pour tester :

  • un Flux ou un Mono ne devant retourner aucune valeur :
StepVerifier.create(mono).verifyComplete();
  • un Flux ou un Mono devant retourner exactement une valeur :
Object expectedValue = ...;
StepVerifier.create(mono).expectNext(expectedValue).verifyComplete();
  • un Flux devant retourner plusieurs valeurs, dans l’ordre donné :
StepVerifier.create(flux).expectNext(value1).expectNext(value2).expectNext(value3).verifyComplete();
  • un Flux ou un Mono devant déclencher une erreur :
StepVerifier.create(Mono.empty()).expectError().verify();

Comment debug ?

Le debug de Reactor peut souvent s’avérer compliqué lorsqu’il s’agit d’une erreur remontée en plein milieu de la chaîne d’opérateurs. En effet, comme l’exception est déclenchée dans une suite d’appels au sein d’un Mono ou d’un Flux (car ce n’est pas votre code qui appelle directement votre lambda présent dans un map par exemple, mais celui de Reactor), on aura une stacktrace pas très claire !

Prenons le code suivant, où l’on retourne une valeur nulle dans un map, ce qui est formellement interdit :

public class Lanceur {
    public static void main(String[] args) {
        Service.disBonjour().block();
    }

}

class Service {
    static Mono<String> disBonjour() {
        return Mono.just("hello")
                   .map(t -> null);
    }
}

Pour obtenir de plus amples informations non pas sur l’exécution, mais l’emplacement des opérateurs où ils ont été assemblés, il suffit juste d’avoir la dépendance :

io.projectreactor:reactor-tools:3.4.0 Et d’avoir cette ligne avant le code à debug (en première ligne de notre main()) :

ReactorDebugAgent.init();

Cela va donc nous préfixer notre stacktrace avec l’emplacement de l’assemblage (ou déclaration) de nos opérateurs :

Exception in thread "main" java.lang.NullPointerException: The mapper returned a null value.
	at java.base/java.util.Objects.requireNonNull(Objects.java:246)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoMap] :
	reactor.core.publisher.Mono.map
	Service.execution(Lanceur.java:16)
Error has been observed at the following site(s):
	|_ Mono.map ⇢ at Service.execution(Lanceur.java:16)
Stack trace:
		at java.base/java.util.Objects.requireNonNull(Objects.java:246)
		at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
		at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2346)
		at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:162)
		at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:50)
		at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
		at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
		at reactor.core.publisher.Mono.subscribe(Mono.java:3987)
		at reactor.core.publisher.Mono.block(Mono.java:1678)
		at Lanceur.main(Lanceur.java:8)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
		at reactor.core.publisher.Mono.block(Mono.java:1679)
		at Lanceur.main(Lanceur.java:8)

Conclusion sur Reactor

Grâce à ce tutoriel, vous allez pouvoir appeler des micro-services JSON en parallèle aisément, et manipuler les données de manière réactive en un temps record, tout en ayant la main sur le debug aisément. Comme on a pu le voir, que ce soit .flatMap (ou .flatMapMany), on peut convertir un élément vers d’autre(s) élément(s). Mais ces éléments pourraient très bien provenir d’une autre API réactive, d’une requête SQL, une entrée utilisateur, et bien encore… Et c’est là la puissance de Reactor : tout est flux de données, et quand nous utilisons une méthode permettant d’obtenir un Mono ou un Flux, ce n’est pas la source de la donnée qui nous intéresse, mais la ou les valeurs obtenues. Reactor va naturellement imposer cette abstraction adaptable dans tous les cas imaginables. Bien évidemment, nous avons à peine frôlé la puissance et la flexibilité de Reactor mais vous avez les éléments de base pour vous lancer.

Reactor en quelques lignes :

  • À quoi ça sert : Faciliter la mise en place d’applications réactives et non-bloquantes sous forme de flux d’événements
  • Facilité d’intégration : Une dépendance principale pour le cœur, et une par module (client web, serveur web, client kafka, …), voire une seule lors de l’utilisation avec Spring
  • Facilité d’implémentation : écriture fonctionnelle, seulement 2 types centraux (Mono et Flux) qui partagent les mêmes opérateurs (map, flatMap, zip…)
  • Large communauté : Projet porté par Spring, donc beaucoup de développeurs actifs
  • Design pattern Obvservable
  • Utilise les APIs officielles de la JDK (reactive streams, Duration, …)

Allez, je vous laisse mais pour ceux qui veulent jouer un peu, je vous soumets un petit quizz, pour voir si vous avez bien suivi :

  • Que renvoie le code suivant ?
Mono.just(42).map(v -> v * 2);

R: rien, car ni .block(), ni .subscribe() a été appelé donc la chaîne n’a pas été déclenchée

  • Que va contenir la variable value ?
var mono = Mono.just(42)
mono.map(v -> v * 2);
var value = mono.block();

R: 42, il aurait fallut faire mono = mono.map(v -> v * 2); (ou chaîner les appels) pour obtenir 84

  • Prenons la méthode returnMono(int value): Mono<Integer>. Quelle est la bonne manière de mapper une valeur d’un Mono<Integer> avec cette méthode ?
    • 1: mono.map(this::returnMono)
    • 2: mono.flatMap(this::returnMono)

R: 2. Avec 1. nous obtiendrions un Mono<Mono<Integer>>

  • Que peut on faire quand une erreur est déclenchée dans un Mono ?
    • 1: rien
    • 2: émettre un élément
    • 3: appeler un microservice http
    • 4: appeler une BDD
    • 5: écrire un log

R: Toutes les réponses sont justes, car une erreur peut être convertie en Mono, et un Mono peut être vide, ou avec une valeur provenant d’une BDD ou d’un MS, et bien encore…

Aller plus loin avec Reactor

Encore là ? Nous avons vu toutes ces méthodes : map, flatMap, flatMapMany, doFirst, doOnNext, switchIfEmpty, onErrorResume, filter, Mono.just, Mono.justOrEmpty, Mono.zip, Flux.range, subscribe et block… Mais il existe près de 400 méthodes rien pour Flux et 200 pour Mono ! En plus de tout ça, et si cet article a attisé votre curiosité, je vous laisse avec quelques concepts à découvrir

  • Les Sinks : Une extension de Mono et Flux, n’émettant qu’une seule fois chaque élément, qu’il y ait des Subscribers ou non
  • Les Schedulers : Permet de gérer le multithreading de votre appli
  • Retry : Relance un traitement lors d’une erreur (réseau coupé par exemple)
  • Mono.using() et Flux.using() : Try-with-resource réactif
  • … pour le reste, à vous de jouer !