14

Reactive Java: Handling Errors in Reactive Streams

 2 years ago
source link: https://blog.knoldus.com/reactive-java-handling-errors-in-reactive-streams/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Reactive Java: Handling Errors in Reactive Streams

Reading Time: 4 minutes

 If errors and failures are passed to the right component, which can handle them as notifications, the application can become more fault-tolerant or resilient. So if we build our system to be event-driven, we can more easily achieve scalability and failure tolerance, and a scalable, decoupled, and error-proof application is fast and responsive to users.”

Nickolay Tsvetinov

Reactive in layman’s language says how quickly the client handles the streaming data sent by the server. According to the Java community, it refers to asynchronous I/O and non-blocking processes. It works on the event-driven system to achieve responsiveness to users.

Reactive Stream API is a product of collaboration between Kaazing, Netflix, Pivotal, Red Hat, Twitter, Typesafe, and many others. It provides a powerful mechanism to address many problem statements in event-driven systems. It provides complete abstraction so that developers can focus only on their business logic.

A quick walk-through of error handling approaches using Spring Boot.

1. Simply Catch the error (onErrorResume)

The following code has a Flux named “stringFlux” that emits “RunTimeException” after creating a flux of three elements. In case of error, control jumps to “onErrorResume” block of code analogous to catch in java exception handling. We are using StepVerifier to subscribe to the events and verify event flow.

@Test public void fluxErrorHandling(){

Flux<String> stringFlux = Flux.just("a","b","c") .concatWith(Flux.error(new RuntimeException("Exception Occurred"))) .concatWith(Flux.just("D")) .onErrorResume((e) -> { // on error this block gets executed – we are returning a flux on error value System.out.println(e); return Flux.just("default"); });

StepVerifier.create(stringFlux.log()) .expectSubscription() .expectNext("a","b","c") .expectNext("default") .expectError(RuntimeException.class) .verify();

}

Execution of the above code snippet gives:-

java.lang.AssertionError: expectation “expectError(Class)” failed (expected: onError(RuntimeException); actual: onNext(default))

This happens because as soon as an exception occurs control jumps to onErrrorResume block where it prints the exception and returns a flux with default as a value. To correct above exception refer to the below-defined test case:-

@Test public void fluxErrorHandling(){

Flux<String> stringFlux = Flux.just("a","b","c") .concatWith(Flux.error(new RuntimeException("Exception Occurred"))) .concatWith(Flux.just("D")) .onErrorResume((e) -> { // on error this block gets executed – we are returning a flux on error value System.out.println(e); return Flux.just("default"); });

StepVerifier.create(stringFlux.log()) .expectSubscription() .expectNext("a","b","c") .expectNext("default") .verifyComplete();

}

Event Sequence :-

15:12:52.127 [main] INFO reactor.Flux.OnErrorResume.1 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
15:12:52.135 [main] INFO reactor.Flux.OnErrorResume.1 - request(unbounded)
15:12:52.137 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a)
15:12:52.137 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(b)
15:12:52.138 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(c)
java.lang.RuntimeException: Exception Occurred
15:12:52.139 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(default)
15:12:52.140 [main] INFO reactor.Flux.OnErrorResume.1 - onComplete()

Note: We are using verifyComplete() instead of verify() in updated test case.

This is because the onComplete() event terminates the error sequence. When there is no onComplete() event, it simply means some error occurred and the event sequence did not complete. In the above example, since we are returning a flux in error handling code, the event stream completes with onComplete().

Now, let’s have a look at other ways of error handling in reactive streams.

2. Catch the error and return static value using onErrorReturn()

This is a classic technique where we detect the error and return a static value on an error as our handling mechanism.In Contrast to above technique, onErrorReturn() returns a static value instead of Flux.

@Test public void fluxErrorHandling_onErrorReturn(){

Flux<String> stringFlux = Flux.just("a","b","c") .concatWith(Flux.error(new RuntimeException("Exception Occurred"))) .concatWith(Flux.just("D")) .onErrorReturn("default"); // here returning a simple string on any errors

StepVerifier.create(stringFlux.log()) .expectSubscription() .expectNext("a","b","c") .expectNext("default") .verifyComplete(); }

Event Sequence :-

15:13:48.969 [main] INFO reactor.Flux.OnErrorResume.1 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
15:13:48.974 [main] INFO reactor.Flux.OnErrorResume.1 - request(unbounded)
15:13:48.976 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a)
15:13:48.976 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(b)
15:13:48.976 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(c)
15:13:48.978 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(default)
15:13:48.983 [main] INFO reactor.Flux.OnErrorResume.1 - onComplete()

3. Catch the error and translate it to a custom exception (onErrorMap)

Nothing fancy about the below piece of code. This is the most common methodology for handling errors inside the map(). The handling logic here is responsible for translating the error into meaningful business exceptions.

@Test public void fluxErrorHandling_onErrorMap(){

Flux<String> stringFlux = Flux.just("a","b","c") .concatWith(Flux.error(new RuntimeException("Exception Occurred"))) .concatWith(Flux.just("D")) .onErrorMap((e) -> new CustomException(e)); // here returning a simple string on any errors

StepVerifier.create(stringFlux.log()) .expectSubscription() .expectNext("a","b","c") .expectError(CustomException.class) .verify(); }

Event Sequence is as follows:-

15:15:51.883 [main] INFO reactor.Flux.OnErrorResume.1 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
15:15:51.890 [main] INFO reactor.Flux.OnErrorResume.1 - request(unbounded)
15:15:51.892 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a)
15:15:51.892 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(b)
15:15:51.892 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(c)
15:15:51.935 [main] ERROR reactor.Flux.OnErrorResume.1 - onError(CustomException: Exception Occurred)
15:15:51.937 [main] ERROR reactor.Flux.OnErrorResume.1 - 
CustomException: Exception Occurred

4. Catch the error and retry the same stream for a specific number of times (retry)

Retry mechanism is required when you lose connection with the source emitting the data and you would like to retry establishing the connection to the same source or a different one.

@Test public void fluxErrorHandling_withRetry(){

Flux<String> stringFlux = Flux.just("a","b","c") .concatWith(Flux.error(new RuntimeException("Exception Occurred"))) .concatWith(Flux.just("D")) .onErrorMap((e) -> new CustomException(e)) .retry(2);

// P.s. Retry produces same stream again

StepVerifier.create(stringFlux.log()) .expectSubscription() .expectNext("a","b","c") .expectNext("a","b","c") .expectNext("a","b","c") .expectError(CustomException.class) .verify(); }

Event Sequence displayed below clearly depicts 2 retries that happened with onNext Events in repetition.

Event Sequence :-

15:16:48.154 [main] INFO reactor.Flux.Retry.1 - onSubscribe(FluxRetry.RetrySubscriber)
15:16:48.160 [main] INFO reactor.Flux.Retry.1 - request(unbounded)
15:16:48.163 [main] INFO reactor.Flux.Retry.1 - onNext(a)
15:16:48.163 [main] INFO reactor.Flux.Retry.1 - onNext(b)
15:16:48.164 [main] INFO reactor.Flux.Retry.1 - onNext(c)
15:16:48.215 [main] INFO reactor.Flux.Retry.1 - onNext(a)
15:16:48.215 [main] INFO reactor.Flux.Retry.1 - onNext(b)
15:16:48.215 [main] INFO reactor.Flux.Retry.1 - onNext(c)
15:16:48.216 [main] INFO reactor.Flux.Retry.1 - onNext(a)
15:16:48.216 [main] INFO reactor.Flux.Retry.1 - onNext(b)
15:16:48.216 [main] INFO reactor.Flux.Retry.1 - onNext(c)
15:16:48.217 [main] ERROR reactor.Flux.Retry.1 - onError(CustomException: Exception Occurred)
15:16:48.219 [main] ERROR reactor.Flux.Retry.1 - 
CustomException: Exception Occurred

5. Catch the error using back off along with retry (retryBackoff)

Before attempting a retry, when you would like to wait for a specific duration.

@Test public void fluxErrorHandling_withRetryBackoff(){

Flux<String> stringFlux = Flux.just("a","b","c") .concatWith(Flux.error(new RuntimeException("Exception Occurred"))) .concatWith(Flux.just("D")) .onErrorMap((e) -> new CustomException(e)) .retryBackoff(2, Duration.ofSeconds(5));// when you want to perform a backoff before retry

StepVerifier.create(stringFlux.log()) .expectSubscription() .expectNext("a","b","c") .expectNext("a","b","c") .expectNext("a","b","c") .expectError(IllegalStateException.class) .verify(); }

Event Sequence:-

Event Sequence shows log message and we can see error signal is in play.

15:18:11.551 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:18:11.630 [main] INFO reactor.Flux.RetryWhen.1 - onSubscribe(SerializedSubscriber)
15:18:11.636 [main] INFO reactor.Flux.RetryWhen.1 - request(unbounded)
15:18:11.650 [main] INFO reactor.Flux.RetryWhen.1 - onNext(a)
15:18:11.651 [main] INFO reactor.Flux.RetryWhen.1 - onNext(b)
15:18:11.651 [main] INFO reactor.Flux.RetryWhen.1 - onNext(c)
15:18:19.067 [parallel-1] INFO reactor.Flux.RetryWhen.1 - onNext(a)
15:18:19.067 [parallel-1] INFO reactor.Flux.RetryWhen.1 - onNext(b)
15:18:19.067 [parallel-1] INFO reactor.Flux.RetryWhen.1 - onNext(c)
15:18:28.744 [parallel-2] INFO reactor.Flux.RetryWhen.1 - onNext(a)
15:18:28.744 [parallel-2] INFO reactor.Flux.RetryWhen.1 - onNext(b)
15:18:28.744 [parallel-2] INFO reactor.Flux.RetryWhen.1 - onNext(c)
15:18:28.750 [parallel-2] ERROR reactor.Flux.RetryWhen.1 - onError(java.lang.IllegalStateException: Retries exhausted: 2/2)
15:18:28.753 [parallel-2] ERROR reactor.Flux.RetryWhen.1 - 
java.lang.IllegalStateException: Retries exhausted: 2/2

Observe the event stream above, you obtain IllegalStateException after all the retries are exhausted.

References

Knoldus-blog-footer-image

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK