RxJava topics and error handling
I’m trying to implement behavior similar to an event bus. For my request, PublishSubject
seems to be a good fit.
Topics emit items that represent the results of some global operation that might resolve successfully or fail when an exception occurs. I can’t use onNext
() for success events, use onError() with Throwable
in case an error occurs, because once onError() is called, the topic terminates, any future except onError
().
of subscribers will not get emissions.
Now that I
see the way it is, I have to create a class that represents the event, and in case of an error, I have the option to reference Throwable
. However, this seems unwise because errors must be handled in onNext().
What would you do?
Solution
Creating a generic class wrapper event is one possible approach. Let’s say we call it the ResponseOrError class, which should basically contain two fields
private T data;
private Throwable error;
and two simple factory methods:
public static <T> ResponseOrError<T> fromError(Throwable throwable) {
return new ResponseOrError<>(throwable);
}
public static <T> ResponseOrError<T> fromData(T data) {
return new ResponseOrError<>(data);
}
To remove some boilerplate code, you can provide a transformer to make an Observable of type ResponseOrError.
public static <T> Observable.Transformer<T, ResponseOrError<T>> toResponseOrErrorObservable() {
return new Observable.Transformer<T, ResponseOrError<T>>() {
@Override
public Observable<ResponseOrError<T>> call(final Observable<T> observable) {
return observable
.map(new Func1<T, ResponseOrError<T>>() {
@Override
public ResponseOrError<T> call(final T t) {
return ResponseOrError.fromData(t);
}
})
.onErrorResumeNext(new Func1<Throwable, Observable<? extends ResponseOrError<T>>>() {
@Override
public Observable<? extends ResponseOrError<T>> call(final Throwable throwable) {
return Observable.just(ResponseOrError.<T>fromError(throwable));
}
});
}
};
}
Then you can use it like that :
final Observable<ResponseOrError<ImportantData>> compose = mNetworkService
.getImportantData()
.compose(ResponseOrError.<ImportantData>toResponseOrErrorObservable());
Now you can easily map results based on success or failure, and even provide another transformer to return the mapped Observable< T> instead of Observable< ResponseOrError< T >>