RxJava Collection() & takeUntil()… here is a solution to the problem.
RxJava Collection() & takeUntil()
I have a list of users of unknown size. What I want is to query first 30 and update the UI. Then I want to query all other users by offsetting in steps of 100 until I get the last batch of users – should I use takeUntil
here? When I get – I add the remaining users by adding (combined with reduce()
I believe).
Here is my code :
final int INITIAL_OFFSET = 0;
final int INITIAL_LIMIT = 30;
Loading first 30 users to immediately update UI (better UX)
getServerApi().getAllFriends(userId, "photo_50", INITIAL_OFFSET, INITIAL_LIMIT)
Loading remaining users 100 by 100 and updating UI after all users been loaded
.flatMap(users -> {
AtomicInteger newOffset = new AtomicInteger(INITIAL_LIMIT);
return Observable.just(users)
.flatMap(users1 -> getServerApi().getAllFriends(userId, "photo_50", newOffset.get(), Config.DEFAULT_FRIEND_REQUEST_COUNT))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.collect(() -> new ArrayList<User>(), (b, s) -> {
b.addAll(s);
newOffset.set(newOffset.get() + Config.DEFAULT_FRIEND_REQUEST_COUNT);
})
.repeat()
.takeUntil(friends -> friends.size() == 0);
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(users -> getView().appendAllFriends(users),
throwable -> getView().setError(processFail(throwable, ServerApi.Action.GET_ALL_FRIENDS), false));
But it seems like I’m doing something wrong because onNext is called every time a makeover call is made.
Solution
Answer my own questions. Adel’s answer is good, but I need a subscription (I’m using Nucleus MVP library) and I want to use collect() and takeUntil() instead of a while loop ( This requires blocking the interface approach).
It took a few hours to finally get it :
final int INITIAL_LIMIT = 30;
Loading first 30 users to immediately update UI (better UX)
getServerApi().getAllFriends(userId, "photo_50", null, INITIAL_LIMIT)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
Updating UI 1st time or show error
.doOnNext(users -> getView().appendAllFriends(users))
.doOnError(throwable -> getView().setError(processFail(throwable, ServerApi.Action.GET_ALL_FRIENDS), false))
Loading remaining users 100 by 100 and updating UI after all users been loaded
.flatMap(users -> {
AtomicInteger newOffset = new AtomicInteger(INITIAL_LIMIT);
ArrayList<User> remainingUsers = new ArrayList<>();
AtomicBoolean hasMore = new AtomicBoolean(true);
return Observable.just(users)
.flatMap(users1 -> getServerApi().getAllFriends(userId, "photo_50", newOffset.get(), Config.DEFAULT_FRIEND_REQUEST_COUNT))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.collect(() -> remainingUsers, (b, s) -> {
Needed for takeUntil
hasMore.set(b.addAll(s));
newOffset.set(newOffset.get() + Config.DEFAULT_FRIEND_REQUEST_COUNT);
})
.repeat()
.takeUntil(friends -> !hasMore.get())
Grab all items emitted by collect()
.last()
Updating UI last time
.doOnNext(users2 -> getView().appendAllFriends(users2));
})
.subscribe();
Maybe it’s useful for others who are also using Nucleus.