Java – Observable merge() detects which observable is triggered

Observable merge() detects which observable is triggered… here is a solution to the problem.

Observable merge() detects which observable is triggered

I’m creating an Observable list using a list of values, and each value is a custom Observable. I run them using a merge, but I can’t detect which triggers onNext() or onError().

Code like below:

 List<Observable<MyHttpRsObj>> observables = new ArrayList<>();

for (String param : paramsList) {
        Observable<MyHttpRsObj> objObservable = MyRestClient.get().doHttpRequest(param);
        observables.add(fileUploadObservable);
    }

Observable<BaseRs> combinedObservables = Observable.merge(observables);

combinedObservables.observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe(new Subscriber<MyHttpRsObj>() {
                @Override
                public void onCompleted() {
                    called only once when all Observables finished
                }

@Override
                public void onError(Throwable throwable) {
                    how to know which Observable has error (which param)
                }

@Override
                public void onNext(MyHttpRsObj myHttpRsObj) {
                    how to know which Observable has sccess  (which param)
                }
            });

Solution

It is impossible to know which observable triggered the error because you merged all observables into one.

The best thing to do is to use one observer for each observable. The last Observable for merging.

Like this:

 List<Observable<MyHttpRsObj>> observables = new ArrayList<>();

for (String param : paramsList) {
        change to connectable Observable
        ConnectableObservable<MyHttpRsObj> objObservable = MyRestClient.get()
                 .doHttpRequest(param)
                 .publish();

don't forget to connect
        observable.connect();
        observables.add(fileUploadObservable);

subscribe for each observable
        objObservable.observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(new Subscriber<MyHttpRsObj>() {
                    @Override
                    public void onCompleted() {
                        just partial completed
                    }

@Override
                    public void onError(Throwable throwable) {
                        you can access param from here

}

@Override
                    public void onNext(MyHttpRsObj myHttpRsObj) {
                        access onNext here
                        you can access param from here
                    }
                });
    }

Observable<BaseRs> combinedObservables = Observable.merge(observables);

combinedObservables.observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe(new Subscriber<MyHttpRsObj>() {
                @Override
                public void onCompleted() {
                    called only once when all Observables finished
                }

@Override
                public void onError(Throwable throwable) {
                    don't handle error here
                }

@Override
                public void onNext(MyHttpRsObj myHttpRsObj) {

}
            });

Use ConnectableObservable Avoid firing twice

Related Problems and Solutions