Hello,
I think reactive types' errors at the moment are not handled correctly.
According to RxJava specs, it is not possible to throw an error from within a Reactive data structure (Maybe, Observable, Flowable, etc.) in order to avoid situations where methods are not allowed to end their work gracefully.
To solve this problem, other frameworks provide a reference to the response object with a send method and the ability to build your response with an error status and message eventually.
At the moment in the following case I get either an empty list in case I use a non blocking approach and return an Observable or a null pointer exception if I use the blocking approach.
In both of these cases, however, I am not able to make a custom exception pop up in order to trigger the global error handler.
This is the non blocking block of code:
return Maybe.<String>create(emitter -> {
try {
if (subject.getValue() != null) {
emitter.onSuccess(subject.getValue());
} else {
emitter.onComplete();
}
} catch (Throwable ex) {
emitter.onError(ex);
}
})
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.single())
.toObservable()
.switchIfEmpty(Observable.create(emitter ->
{
emitter.onError(new NoMessageReceivedException("No message from hyperledger"));
}))
.retryWhen(attempts -> attempts.zipWith(Observable.range(1, 3), (error, i) ->
{
if (i < 3) {
return Observable.timer(i, TimeUnit.SECONDS);
} else {
return Observable.error(new NoMessageReceivedException("No message from hyperledger"));
}
}))
.doOnError(throwable -> Observable.error(throwable.getCause()))
.map(command::getSuccessScalarResponse);
And this is the blocking code
return Maybe.<String>create(emitter -> {
try {
if (subject.getValue() != null) {
emitter.onSuccess(subject.getValue());
} else {
emitter.onComplete();
}
} catch (Exception ex) {
emitter.onError(new NoMessageReceivedException("No message from hyperledger"));
}
})
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.single())
.toObservable()
.switchIfEmpty(Observable.create(emitter ->
{
emitter.onError(new NoMessageReceivedException("No message from hyperledger"));
}))
.retryWhen(attempts -> attempts.zipWith(Observable.range(1, 3), (error, i) ->
{
if (i < 3) {
return Observable.timer(i, TimeUnit.SECONDS);
} else {
return Observable.error(new NoMessageReceivedException("No message from hyperledger"));
}
}))
.doOnError(throwable -> {
log.info("No message from Hyperledger!");
})
.blockingFirst();
1) Create a Subject and do not provide it any data.
2) Use a Maybe data structure which handles the null case which should return some value to the controller.
[X ] Stacktrace (if present) provided
[V ] Example that reproduces the problem uploaded to Github
The controller should return the exception
According if it's a blocking scenario or a non blocking scenario the controller returns either a null pointer exception or an empty list with status 200.
As a workaround I have found this solution, id est to use a try-catch block:
private static Map<String, Object> getResult(HyperledgerAccountHandler command) {
final BehaviorSubject<String> subject = command.kafkaMessageListener.bus.getEvents();
Map<String, Object> result = null;
try {
final Map<String, Object> stringObjectMap = Maybe.<String>create(emitter -> {
try {
if (subject.getValue() != null) {
emitter.onSuccess(subject.getValue());
} else {
emitter.onComplete();
}
} catch (Throwable ex) {
emitter.onError(ex);
}
})
.subscribeOn(Schedulers.io())
// .observeOn(Schedulers.single())
.toObservable()
.switchIfEmpty(Observable.create(emitter ->
{
emitter.onError(new NoMessageReceivedException("No message from hyperledger"));
}))
.retryWhen(attempts -> attempts.zipWith(Observable.range(1, 3), (error, i) ->
{
if (i < 3) {
return Observable.timer(i, TimeUnit.SECONDS);
} else {
return Observable.error(new NoMessageReceivedException("No message from hyperledger"));
}
}))
.map(command::getSuccessScalarResponse).blockingFirst();
result = stringObjectMap;
} catch (NoSuchElementException ex) {
throw new NoMessageReceivedException("No message from hyperledger");
}
return result;
}
Just so you know, that is the RxJava version of:
final CountDownLatch latch = new CountDownLatch(30);
while (!latch.await(10, TimeUnit.SECONDS)) {
if (command.kafkaMessageListener.messages.peek() == null) {
latch.countDown();
} else {
return command.getSuccessMonoResponse(command.kafkaMessageListener.messages.poll());
}
}
Well, it looks like this might be related also to a design matter, however I think people should be given the freedom to choose what fits their use case best:
https://softwareengineering.stackexchange.com/questions/358243/should-no-results-be-an-error-in-a-restful-response
https://en.wikipedia.org/wiki/Null_object_pattern
Here a brief article describing how you can modify the response of the server in case of an error in Spring.
Basically pretty much the same principle used in Express where you have a reference to the response object.
https://www.baeldung.com/spring-webflux-errors
I am failing to understand how Micronaut is different to Spring in this respect. You can equally use onErrorResume and HttpResponse.ok("Error handled") to modify the response.
@Deviad Can you document the steps to reproduce the issue with your application? What branch? Perhaps include a curl command or document what controller to call
@jameskleeh I have created an ad hoc repo which creates simple responses with just what it's needed to reproduce the problem:
https://github.com/Deviad/micronaut-errorhandling
Just run the main app file
https://github.com/Deviad/micronaut-errorhandling/blob/master/src/main/java/reactive/micronaut/Application.java
And have a look here for what each endpoint does:
https://github.com/Deviad/micronaut-errorhandling/blob/master/src/main/java/reactive/micronaut/resource/MonoDemoController.java
Port 8081
/test/successmono
/test/errorMono
/test/errorMono2
/test/errorMono3
Thank you.
Have a great day!
Davide
In order for it to not hang the observable needs to complete
MyRxOutputBean<String> bean = new MyRxOutputBean<>();
return Flux.interval(Duration.ofMillis(500))
.take(1)
.flatMap(x -> bean.getEvents().next())
results in a Mono that never completes, because no events are emitted through the subject
The absence of an event is not the same as an error
@jameskleeh Well, this is about the philosophical matter described here and which I mentioned before:
https://softwareengineering.stackexchange.com/questions/358243/should-no-results-be-an-error-in-a-restful-response
What you are describing as: "The absence of an event is not the same as an error".
The point is should there be any means for this to be detected as an error for those that want to choose this path?
I have the gut feeling that in 90% of the companies having legacy code and some sort of company policies for such things in this case an error should be represented.
For example at the company where I have been working there are architectural documentations (named technical concepts). In there even this kind of behaviour is documented and in some cases a technical decision at an organizational level could be present. In such circumstances at the moment the only alternative is the try catch that I have described above which is kind of an anti-pattern.
@Deviad That SO really doesn't have anything to do with this. The absence of an event is not an error has to do with how reactive streams works. You can use things like the timeout operator if you want to emit an error if no event has been emitted for a given period of time.
There is no philosophical decisions to be made, its all how you want to use the reactive streams api and doesn't have anything to do with Micronaut itself.
@jameskleeh you might very well be right, however in one of my attempts I could get the null value after 3 tries with 200 status, this is why I have catalogued also the case when no event is present as the null kind of situation. In any case, creating a utility function which makes the code above abstract (the try catch that for me could solve the issue) is not a big deal. I was thinking that since this is a common problem that lots of users will encounter, maybe it made sense to treat it inside the framework. From a "single responsability" point of view, you are right, this is something related to the library (RXJava/Reactor), from a user point of view (the developer working on a project) this might be a pain point (imho).
@Deviad I agree that it might be a pain point. Programming with reactive streams has a steep learning curve and is very complex. There isn't anything we can do about that, as it can't be our responsibility to teach that programming model to developers.
Because reactive streams is hard, we allow for controller methods to return blocking types and we automatically delegate those methods to a separate thread pool. We put in a lot of effort to allow for the blocking style of coding and keep as many of the advantages of the underlying server (Netty) as possible.
Most helpful comment
@Deviad I agree that it might be a pain point. Programming with reactive streams has a steep learning curve and is very complex. There isn't anything we can do about that, as it can't be our responsibility to teach that programming model to developers.
Because reactive streams is hard, we allow for controller methods to return blocking types and we automatically delegate those methods to a separate thread pool. We put in a lot of effort to allow for the blocking style of coding and keep as many of the advantages of the underlying server (Netty) as possible.