Observable merge() detects which observable is triggered… here is a solution to the problem.
Observable merge() detects which observable is triggered
I’m creating an Observable list using a list of values, and each value is a custom Observable
. I run them using a merge, but I can’t detect which triggers
onNext()
or onError().
Code like below:
List<Observable<MyHttpRsObj>> observables = new ArrayList<>();
for (String param : paramsList) {
Observable<MyHttpRsObj> objObservable = MyRestClient.get().doHttpRequest(param);
observables.add(fileUploadObservable);
}
Observable<BaseRs> combinedObservables = Observable.merge(observables);
combinedObservables.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<MyHttpRsObj>() {
@Override
public void onCompleted() {
called only once when all Observables finished
}
@Override
public void onError(Throwable throwable) {
how to know which Observable has error (which param)
}
@Override
public void onNext(MyHttpRsObj myHttpRsObj) {
how to know which Observable has sccess (which param)
}
});
Solution
It is impossible to know which observable triggered the error because you merged all observables into one.
The best thing to do is to use one observer for each observable. The last Observable for merging.
Like this:
List<Observable<MyHttpRsObj>> observables = new ArrayList<>();
for (String param : paramsList) {
change to connectable Observable
ConnectableObservable<MyHttpRsObj> objObservable = MyRestClient.get()
.doHttpRequest(param)
.publish();
don't forget to connect
observable.connect();
observables.add(fileUploadObservable);
subscribe for each observable
objObservable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<MyHttpRsObj>() {
@Override
public void onCompleted() {
just partial completed
}
@Override
public void onError(Throwable throwable) {
you can access param from here
}
@Override
public void onNext(MyHttpRsObj myHttpRsObj) {
access onNext here
you can access param from here
}
});
}
Observable<BaseRs> combinedObservables = Observable.merge(observables);
combinedObservables.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<MyHttpRsObj>() {
@Override
public void onCompleted() {
called only once when all Observables finished
}
@Override
public void onError(Throwable throwable) {
don't handle error here
}
@Override
public void onNext(MyHttpRsObj myHttpRsObj) {
}
});
Use ConnectableObservable Avoid firing twice