Java – Write synchronous unit tests for Rx v2 Flowable

Write synchronous unit tests for Rx v2 Flowable… here is a solution to the problem.

Write synchronous unit tests for Rx v2 Flowable

I’m converting my project from Rx v1 to Rx v2 and currently I’m changing some v1 Observable to v2 Flowable

(It’s in an Android project, using Spock to write unit tests in Groovy.)

Usually I override the scheduler with hooks. I can still do this in v2 by registering the dispatcher handler. This makes Observable always use (new?). Schedulers.single() synchronization. However, due to the backpressure mechanism (?) Flowable is still asynchronous.

I tried to fix the issue using:

Flowable<LogEntry> flowable = Flowable.create(new FlowableOnSubscribe<LogEntry>() {
    @Override
    void subscribe(FlowableEmitter<LogEntry> emitter) throws Exception {
        for (def log : logs) {
            emitter.onNext(log)
        }

emitter.onComplete()
    }
}, FlowableEmitter.BackpressureMode.NONE);

But this still makes them asynchronous.

I’ve covered the scheduler like this:

RxJavaPlugins.reset()
RxJavaPlugins.setIoSchedulerHandler(new Function<Scheduler, Scheduler>() {
    @Override
    Scheduler apply(Scheduler scheduler) throws Exception {
        return Schedulers.single()
    }
})

RxAndroidPlugins.reset()
RxAndroidPlugins.setMainThreadSchedulerHandler(new Function<Scheduler, Scheduler>() {
    @Override
    Scheduler apply(Scheduler scheduler) throws Exception {
        return Schedulers.from(new Executor() {
            @Override
            void execute(Runnable command) {
                command.run()
            }
        })
    }
})

I can’t seem to figure out why Observable syncs like this, but Flowable doesn’t (except for the backpressure mechanism).

Solution

Schedulers.single() is a single-threaded asynchronous scheduler. You need Schedulers.trampoline() to stay on the same thread.

Related Problems and Solutions