Java – How to perform long calculations on background threads in RXJava Android

How to perform long calculations on background threads in RXJava Android… here is a solution to the problem.

How to perform long calculations on background threads in RXJava Android

I want to use RXJava in android to perform long calculations on a background thread. After the calculation, I tried to display the result in Recylerview. I’m using the following code:

 Observable.just("true")
                .subscribeOn(Schedulers.io())
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                        feedlist.clear();
                        if (eventFeedItems != null && !eventFeedItems.isEmpty()) {
                            for (int i = 0; i < eventFeedItems.size(); i++) {
                                if (eventFeedItems != null && eventFeedItems.get(i) != null
                                        && ((eventFeedItems.get(i).getType() != null && eventFeedItems.get(i).getType().equalsIgnoreCase("EVENT"))
                                        || (eventFeedItems.get(i).getActivityRequestType() != null && eventFeedItems.get(i).getActivityRequestType().equalsIgnoreCase(EventConstants.TRENDING_ACTIVITY)))) {
                                    if (eventFeedItems.get(i).getActivityRequestType() != null && !eventFeedItems.get(i).getActivityRequestType().equalsIgnoreCase("")) {
                                        feedlist.add(new FeedsListModel(eventFeedItems.get(i), eventFeedItems.get(i).getActivityRequestType(), null));
                                    } else if (eventFeedItems.get(i).getRequestType() != null && !eventFeedItems.get(i).getRequestType().equalsIgnoreCase("")) {
                                        feedlist.add(new FeedsListModel(eventFeedItems.get(i), eventFeedItems.get(i).getRequestType(), null));
                                    } else
                                        feedlist.add(new FeedsListModel(eventFeedItems.get(i), EventConstants.ATTENDEE_POST, null));
                                }
                            }
                        }
                        Log.d("calculations","Completed");
                        return "";
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        feed_list.setLayoutManager(mLayoutManager);
                        feedListAdapter.notifyDataSetChanged();
                        Log.d("Adapter", "Set");
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.d("Exception", "oh! fish...");
                        throwable.printStackTrace();
                    }
                }); 

Since the size of the

ArrayList eventFeedItems is about over 300 items, using the code above I am facing a UI Hindring. I’m new to RXJava. Please help me.

Solution

You do not use map-operator to achieve concurrency.

The first subscribeOn moves all launches to IO-scheduler. There is no concurrency happening here.

.subscribeOn(Schedulers.io())

Map-operator will be called synchronously from the previous thread. In your case, it will be some thread from the IO-threadpool.

.map(new Func1<String, String>() {

After executing map-Operator, you will use the following command to move the values from the IO thread to Android-UI-event-loop

.observeOn(AndroidSchedulers.mainThread())

After the value is converted from the

IO thread to the UI thread, the next value from the initial observable is processed.

Observable.just("true")

In your example, there will no longer be any value because you will produce only one value.

For concurrency, you should use flatMap instead of map. And use subscribeOn() in flatMap to create each stream on another thread.

Consider this example to see how concurrency occurs. Each observable is subscribed to immediately, so maximum. The test time is approximately 5 seconds. If concurrency occurs now, it takes 1+2+3+4+5 seconds plus execution time.

@Test
public void name1() throws Exception {

Observable<Integer> value = Observable.just(1_000, 2_000, 3_000, 4_000, 5_000)
                .flatMap(i -> Observable.fromCallable(() -> doWork(i)).subscribeOn(Schedulers.io())
                ).doOnNext(integer -> System.out.println("value"));

value.test().awaitTerminalEvent();
}

private int doWork(int sleepMilli) {
        try {
            Thread.sleep(sleepMilli);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

return -1;
}

If you want to learn more about how concurrency occurs with flatMap, consider reading http://tomstechnicalblog.blogspot.de/2015/11/rxjava-achieving-parallelization.html

Regarding your code, I suggest:

    Move the anonymous implementation of

  • the interface to a private inner class implementation and use one instance of it. You’ll get a more readable observable
  • Do not use side effects on global variables of internal operators. you
    If concurrency is involved, a race condition is obtained.

    List<FeedsListModel> eventFeedItems = Arrays.asList(new FeedsListModel(), new FeedsListModel());
    
    Observable<FeedsListModel> feedsListModelObservable = Observable.fromIterable(eventFeedItems)
                        .flatMap(feedsListModel -> Observable.fromCallable(() -> calculation(feedsListModel))
                                .subscribeOn(Schedulers.computation())
    );
    
    feedsListModelObservable
            .toList()
            .observeOn(Schedulers.io())
            .subscribe(feedsListModels -> {
                 do UI stuff
    });
    

Help method:

private FeedsListModel calculation(FeedsListModel model) {
     do calculation here

return new FeedsListModel();
}

Related Problems and Solutions