Java – RxJava topics and error handling

RxJava topics and error handling… here is a solution to the problem.

RxJava topics and error handling

I’m trying to implement behavior similar to an event bus. For my request, PublishSubject seems to be a good fit.

Topics emit items that represent the results of some global operation that might resolve successfully or fail when an exception occurs. I can’t use onNext() for success events, use onError() with Throwable in case an error occurs, because once onError() is called, the topic terminates, any future except onError(). of subscribers will not get emissions.

Now that I

see the way it is, I have to create a class that represents the event, and in case of an error, I have the option to reference Throwable. However, this seems unwise because errors must be handled in onNext().

What would you do?

Solution

Creating a generic class wrapper event is one possible approach. Let’s say we call it the ResponseOrError class, which should basically contain two fields

private T data;
private Throwable error;

and two simple factory methods:

public static <T> ResponseOrError<T> fromError(Throwable throwable) {
    return new ResponseOrError<>(throwable);
}

public static <T> ResponseOrError<T> fromData(T data) {
    return new ResponseOrError<>(data);
}

To remove some boilerplate code, you can provide a transformer to make an Observable of type ResponseOrError.

public static <T> Observable.Transformer<T, ResponseOrError<T>> toResponseOrErrorObservable() {
    return new Observable.Transformer<T, ResponseOrError<T>>() {

@Override
        public Observable<ResponseOrError<T>> call(final Observable<T> observable) {
            return observable
                    .map(new Func1<T, ResponseOrError<T>>() {
                        @Override
                        public ResponseOrError<T> call(final T t) {
                            return ResponseOrError.fromData(t);
                        }
                    })
                    .onErrorResumeNext(new Func1<Throwable, Observable<? extends ResponseOrError<T>>>() {
                        @Override
                        public Observable<? extends ResponseOrError<T>> call(final Throwable throwable) {
                            return Observable.just(ResponseOrError.<T>fromError(throwable));
                        }
                    });
        }
    };
}

Then you can use it like that :

final Observable<ResponseOrError<ImportantData>> compose = mNetworkService
               .getImportantData()
               .compose(ResponseOrError.<ImportantData>toResponseOrErrorObservable());

Now you can easily map results based on success or failure, and even provide another transformer to return the mapped Observable< T> instead of Observable< ResponseOrError< T >>

Related Problems and Solutions