Java – Couchbase Error : rx. exceptions. OnErrorThrowable$OnNextValue: OnError while emitting onNext value

Couchbase Error : rx. exceptions. OnErrorThrowable$OnNextValue: OnError while emitting onNext value… here is a solution to the problem.

Couchbase Error : rx. exceptions. OnErrorThrowable$OnNextValue: OnError while emitting onNext value

I have a Spring-boot application that saves data to a Couchbase server.

I want to save about 500,000 students with this code:

for (String student: students) {
  ...
  bucket.upsert(JsonDocument.create(<unique key here>, TTL, JsonObject.fromJson(studentAsJson)));                
}

This applies to a small number of students (up to 100K +-).

But I get this error whenever the app reaches about 110K-140K students:

2018-12-23 16:20:57.804 ERROR 17468 --- [nio-8989-exec-6] o.a.c.c.C.[.[. [/]. [dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is com.couchbase.client.java.error.TemporaryFailureException] with root cause

rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.kv.UpsertResponse.class
at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:118) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:73) ~[rxjava-1.3.8.jar:1.3.8]
at rx.observers.Subscribers$5.onNext(Subscribers.java:235) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.producers.SingleProducer.request(SingleProducer.java:65) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeTimeoutTimedWithFallback$TimeoutMainSubscriber.setProducer(OnSubscribeTimeoutTimedWithFallback.java:155) ~[rxjava-1.3.8.jar:1.3.8]
at rx. Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102) ~[rxjava-1.3.8.jar:1.3.8]
at rx. Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx. Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:103) ~[rxjava-1.3.8.jar:1.3.8]
at com.couchbase.client.core.endpoint.AbstractGenericHandler.completeResponse(AbstractGenericHandler.java:508) ~[core-io-1.7.1.jar:na]
at com.couchbase.client.core.endpoint.AbstractGenericHandler.access$000(AbstractGenericHandler.java:86) ~[core-io-1.7.1.jar:na]
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:526) ~[core-io-1.7.1.jar:na]
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) ~[rxjava-1.3.8.jar:1.3.8]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_161]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_161]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_161]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_161]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

My Couchbase is running in Docker on localhost and I’m wondering if this is a real issue or just due to docker and localhost limitations.

Solution

Never used CouchBase, but it seems to be a backpressure problem (https://github.com/ ReactiveX/RxJava/wiki/Backpressure-(2.0))。

So basically your consumers (in this case, CouchBase) can’t consume the data as quickly as you provide it. You can tell from the logs that you are using RxJava1. I recommend upgrading to RxJava2. This will take some time due to the changes, but you’ll have access to Flowable( that provides BackPressure out of the box http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html)。

By setting up Flowable with back pressure, you’ll be able to solve the problem of slow consumers because it allows you to wait for consumers to consume data and then send it new data.

If you can’t update the version of RxJava, this article by David Karnok is worth a look because it shows how to handle this chain without backpressure: https://github.com/ReactiveX/RxJava/wiki/Backpressure

Hope this helps you.

Related Problems and Solutions