logo le blog invivoo blanc

Retour Devoxx France 2016: Architecture découplée grâce aux Reactive Extensions – 4eme partie

19 septembre 2016 | Front-End, Java

Composition de services

Dans une architecture distribuée, il est typique d’avoir des services faisant appels à plusieurs autres services afin de traiter les requêtes qu’ils reçoivent. Nous allons voir à travers l’exemple suivant comment RxJava permet d’implémenter la composition de services.

Soit donc une API qui propose un service dépendant de deux autres services que nous appellerons A et B.

Figure 4 1 Service composite dépendant des services A et B

Figure 4 1 Service composite dépendant des services A et B

 

En partant sur une implémentation en JAX-RX par exemple, nous aurons un code du type :

 1     @GET
 2     public void compositeService(@Suspended final AsyncResponse response) {
 3         Observable.fromCallable(() -> makeInterestingValue())
 4 
 5                .flatMap(lastName -> {
 6 
 7                     // Observable pour l'appel du service A
 8                     Observable<String> aObs = Observable.fromCallable(() -> wsA(lastName)).map(r -> r.readEntity(String.class));
 9 
10                     // Observable pour l'appel du service B
11                     Observable<String> bObs = Observable.fromCallable(() -> wsB(lastName)).map(r -> r.readEntity(String.class));
12 
13                     // composition des Observable d'appel des deux services
14                     return Observable.zip(aObs, bObs, (a, b) -> String.format("=> Result : %s, %s", a, b));
15                 })
16 
17                 .subscribe(response::resume, // retour du resultat en asynchrone,
18                         response::resume); // sinon retour de l'erreur en asynchrone
19     }

Ligne 8 : construction de l’Observable aObs pour l’appel du service A

Ligne 11 : construction de l’Observable bObs pour l’appel du service B

Ligne 11 Composition des Observable aObs et bObs avec l’opérateur Rx zip.

Ligne 17 et 18 : souscription à l’Observable composite avec retour de l’éventuel résultat ou de l’éventuelle erreur.

 

Tout notre code RxJava dans cet état va s’exécuter dans le thread courant, donc de façon séquentielle. Comme nous l’avons vu précédemment, nous pouvons changer le contexte d’exécution des instructions RxJava avec les Scheduler. Nous allons faire en sorte que les appels des deux services s’exécutent en parallèle en laissant à RxJava le soin de trouver le moment où les résultats des deux appels seront disponibles et prêts à être composés tel que nous le lui avons indiqué (ligne 20 de l’exemple) :

 1     @GET
 2     public void compositeService(@Suspended final AsyncResponse response) {
 3         Observable.fromCallable(() -> makeInterestingValue())
 4 
 5                 .flatMap(lastName -> {
 6 
 7                     // Observable pour l'appel du service A
 8                     Observable<String> aObs = Observable.fromCallable(() -> wsA(lastName))
 9                             // appels réseaux effectués sur des threads dédiés aux I/O
10                             .subscribeOn(Schedulers.io())
11                             .map(r -> r.readEntity(String.class));
12 
13                     // Observable pour l'appel du service B
14                     Observable<String> bObs = Observable.fromCallable(() -> wsB(lastName))
15                             // appels réseaux effectués sur des threads dédiés aux I/O
16                             .subscribeOn(Schedulers.io())
17                             .map(r -> r.readEntity(String.class));
18 
19                     // composition des Observable d'appel des deux services
20                     return Observable.zip(aObs, bObs, (a, b) -> String.format("=> Result : %s, %s", a, b));
21                 })
22 
23                 .subscribe(response::resume, // retour du resultat en asynchrone,
24                         response::resume); // sinon retour de l'erreur en asynchrone
25     }

L’ajout des méthodes subscribeOn avec le Scheduler io() (lignes 10 et 16) permet d’exécuter les instructions RxJava des deux Observables en parallèle dans des threads dédiés.

 

 Spécialisation des erreurs

Vu qu’une erreur dans une instruction RxJava se traduit par l’appel du handler de l’Observer qui traite l’erreur, le service tel qu’implémenté se contente de retourner cette erreur au client sous la forme d’une erreur HTTP 500 (erreur interne du serveur) qui ne donne aucune précision sur la nature de l’erreur. Si le niveau de connaissance des dépendances du service composite permet de retourner des statuts d’erreur plus clairs pour le client, il est presque de notre devoir de lui rendre ce service.
Par exemple si l’appel du service A peut retourner une erreur HTTP 503 (Service temporairement indisponible ou en maintenance), cette situation peut être gérée par la sortie en erreur volontaire de l’Observable avec la levée d’une exception dédiée, cela donnera un code du type (lignes 12 à 17) :

 1     @GET
 2     public void compositeService(@Suspended final AsyncResponse response) {
 3         Observable.fromCallable(() -> makeInterestingValue())
 4 
 5                 .flatMap(lastName -> {
 6 
 7                     // Observable pour l'appel du service A
 8                     Observable<String> aObs = Observable.fromCallable(() -> wsA(lastName))
 9                             // appels réseaux effectués sur des threads dédiés aux I/O
10                             .subscribeOn(Schedulers.io())
11                             .flatMap(
12                                     r -> {
13                                         if (r.getStatus() == 503) {
14                                             return Observable.error(
15                                                     new ServiceUnavailable("service A"));
16                                         }
17                                         return Observable.just(r.readEntity(String.class));
18                                     });
19 
20                     // Observable pour l'appel du service B
21                     Observable<String> bObs = Observable.fromCallable(() -> wsB(lastName))
22                             // appels réseaux effectués sur des threads dédiés aux I/O
23                             .subscribeOn(Schedulers.io())
24                             .map(r -> r.readEntity(String.class));
25 
26                     // composition des Observable d'appel des deux services
27                     return Observable.zip(aObs, bObs, (a, b) -> String.format("=> Result : %s, %s", a, b));
28                 })
29 
30                 .subscribe(response::resume, // retour du resultat en asynchrone,
31                         response::resume); // sinon retour de l'erreur en asynchrone
32     }

Une exception spécialisée est levée pour un cas précis, une bonne chose de faite. Mais les spécificités de cette exception ne sont pas exploitées dans le gestionnaire d’erreur de l’Observable composite, il faut remédier à cela. En supposant que nous ayons hiérarchisé les exceptions, le gestionnaire d’erreur de l’Observable pourrait s’écrire :

1                 .subscribe(response::resume, // retour du resultat en asynchrone,
2 
3                         (error) -> { // sinon construction et retour de l'erreur en asynchrone
4                             response.resume(
5                                     Response.status(((HttpServiceEx) error).httpStatus)
6                                             .entity(error.getMessage())
7                                             .build());
8                         }
9                 );

 4.2 Nombre de tentatives d’exécution d’un Observable

Suite à la réception d’un statut d’un service, nous pouvons vouloir retenter l’appel, un certain nombre de fois. Ce comportement est facile à réaliser avec RxJava (ligne 9) :

1                             .flatMap(
2                                     r -> {
3                                         if (r.getStatus() == 503) {
4                                             return Observable.error(
5                                                     new ServiceUnavailable("service A"));
6                                         }
7                                         return Observable.just(r.readEntity(String.class));
8                                     })
9                             .retry(2); // 2  tentatives d'exécution de l'Observable en cas d'erreur

 4.3 Temps de réponse acceptable

RxJava permet de spécifier la durée d’exécution maximale acceptable d’un Observable, au-delà de laquelle l’Observable peut être mis en erreur. En considérant par exemple qu’un temps de réponse de plus de 500 millisecondes du service doit être interprété comme une exception, voici comment le comportement souhaité s’écrirait (lignes 7 et 8, ServiceTimeout étant une exception que nous avons écrite) :

1                     // Observable pour l'appel du service B
2                     Observable<String> bObs = Observable.fromCallable(() -> wsB(lastName))
3                             // appels réseaux effectués sur des threads dédiés aux I/O
4                             .subscribeOn(Schedulers.io())
5                             .map(r -> r.readEntity(String.class))
6                             .timeout(500, TimeUnit.MILLISECONDS,
7                                     error(new ServiceTimeout("service B")));
8

En résumé RxJava intègre des mécanismes pour faciliter la gestion des statuts d’exécution des Observable. Pour des cas simples l’usage de ces mécanismes reste appropriés, mais en imaginant devoir réagir à de nombreux et variés statuts de plusieurs Observables, ces mécanismes relativement plus techniques risquent de ne pas faire bon ménage avec les instructions de production et de consommation de données liées aux Observable et aux Observer, ces instructions étant relativement plus orientées « métier ». Pour ne pas submerger les instructions métiers « encapsulées » par RxJava par ces mécanismes techniques, nous pouvons recourir à des outils spécialisés qui permettent une gestion évoluée des statuts des dépendances d’un service sans impacter la clarté des instructions d’accès à ces dépendances. Hystrix est actuellement une référence dans ce domaine et s’intègre en plus très bien avec RxJava. Le prochain article est justement consacré à la présentation de Hytrix.

4.4 Références