Java – Rxjava2 + Retrofit2 + Android. The best way to make hundreds of network calls

Rxjava2 + Retrofit2 + Android. The best way to make hundreds of network calls… here is a solution to the problem.

Rxjava2 + Retrofit2 + Android. The best way to make hundreds of network calls

I have an app. I have a big button that allows users to sync all their data to the cloud at once. The resync feature allows them to send all the data again. (more than 300 entries).

I’m using RXjava2 and retrofit2. I have my unit tests working with a phone. But I need to make N network calls.

What I want to avoid is having the observable call the next item in the queue. I’m at the point where I need to implement my runnable object. I’ve seen some information about map, but I haven’t seen anyone using it as a queue. I also want to avoid one project failing and report that all projects fail, just like the Zip feature. Should I only do the nasty manager class that tracks queues? Or is there a more concise way to send hundreds of elements?

NOTE: SOLUTIONS CANNOT RELY ON JAVA8/LAMBDAS. This turned out to be much more than a reasonable amount of work.

Note that all items are the same object.

    @Test
public void test_Upload() {
    TestSubscriber<Record> testSubscriber = new TestSubscriber<>();
    ClientSecureDataToolKit clientSecureDataToolKit = ClientSecureDataToolKit.getClientSecureDataKit();
    clientSecureDataToolKit.putUserDataToSDK(mPayloadSecureDataToolKit).subscribe(testSubscriber);

testSubscriber.awaitTerminalEvent();
    testSubscriber.assertNoErrors();
    testSubscriber.assertValueCount(1);
    testSubscriber.assertCompleted();
}

Help me collect and send all my elements

public class SecureDataToolKitHelper {
private final static String TAG = "SecureDataToolKitHelper";
private final static SimpleDateFormat timeStampSimpleDateFormat =
        new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public static void uploadAll(Context context, RuntimeExceptionDao<EventModel, UUID> eventDao) {
    List<EventModel> eventModels = eventDao.queryForAll();

QueryBuilder<EventModel, UUID> eventsQuery = eventDao.queryBuilder();
    String[] columns = {...};

eventsQuery.selectColumns(columns);

try {
        List<EventModel> models;

models = eventsQuery.orderBy("timeStamp", false).query();
        if (models == null || models.size() == 0) {
            return;
        }

ArrayList<PayloadSecureDataToolKit> toSendList = new ArrayList<>();
        for (EventModel eventModel : models) {
            try {
                PayloadSecureDataToolKit payloadSecureDataToolKit = new PayloadSecureDataToolKit();

if (eventModel != null) {

 map my items ... not shown

toSendList.add(payloadSecureDataToolKit);
                }
            } catch (Exception e) {
                Log.e(TAG, "Error adding payload! " + e + " ..... Skipping entry");
            }
        }

doAllNetworkCalls(toSendList);

} catch (SQLException e) {
        e.printStackTrace();
    }

}

My Retrofit stuff

public class ClientSecureDataToolKit {

private static ClientSecureDataToolKit mClientSecureDataToolKit;
    private static Retrofit mRetrofit;

private ClientSecureDataToolKit(){
        mRetrofit = new Retrofit.Builder()
        .baseUrl(Utilities.getSecureDataToolkitURL())
        .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
        .addConverterFactory(GsonConverterFactory.create())
        .build();
    }

public static ClientSecureDataToolKit getClientSecureDataKit(){
        if(mClientSecureDataToolKit == null){
            mClientSecureDataToolKit = new ClientSecureDataToolKit();
        }
        return mClientSecureDataToolKit;
    }

public Observable<Record> putUserDataToSDK(PayloadSecureDataToolKit payloadSecureDataToolKit){
        InterfaceSecureDataToolKit interfaceSecureDataToolKit = mRetrofit.create(InterfaceSecureDataToolKit.class);
        Observable<Record> observable = interfaceSecureDataToolKit.putRecord(NetworkUtils.SECURE_DATA_TOOL_KIT_AUTH, payloadSecureDataToolKit);
        return observable;
    }

}

public interface InterfaceSecureDataToolKit {

@Headers({
        "Content-Type: application/json"
})

@POST("/api/create")
Observable<Record> putRecord(@Query("api_token") String api_token, @Body PayloadSecureDataToolKit payloadSecureDataToolKit);
 }

Update. I’ve been trying to apply this answer to bad luck. I’m exhausted tonight. I’m trying to implement this as a unit test, like I did with my initial call to a project. It looks like there might be something wrong with using lambda….

public class RxJavaBatchTest {
    Context context;
    final static List<EventModel> models = new ArrayList<>();

@Before
    public void before() throws Exception {
        context = new MockContext();
        EventModel eventModel = new EventModel();
        manually set all my eventmodel data here.. not shown 

eventModel.setSampleId("SAMPLE0");
        models.add(eventModel);
        eventModel.setSampleId("SAMPLE1");
        models.add(eventModel);
        eventModel.setSampleId("SAMPLE3");
        models.add(eventModel);

}

@Test
    public void testSetupData() {
        Assert.assertEquals(3, models.size());
    }

@Test
    public void testBatchSDK_Upload() {

Callable<List<EventModel> > callable = new Callable<List<EventModel> >() {

@Override
            public List<EventModel> call() throws Exception {
                return models;
            }
        };

Observable.fromCallable(callable)
                .flatMapIterable(models -> models)
                .flatMap(eventModel -> {
                    PayloadSecureDataToolKit payloadSecureDataToolKit = new PayloadSecureDataToolKit(eventModel);
                    return doNetworkCall(payloadSecureDataToolKit) // I assume this is just my normal network call.. I am getting incompatibility errors when I apply a testsubscriber...
                            .subscribeOn(Schedulers.io());
                }, true, 1);
    }

private Observable<Record> doNetworkCall(PayloadSecureDataToolKit payloadSecureDataToolKit) {

ClientSecureDataToolKit clientSecureDataToolKit = ClientSecureDataToolKit.getClientSecureDataKit();
        Observable observable = clientSecureDataToolKit.putUserDataToSDK(payloadSecureDataToolKit);//.subscribe((Observer<? super Record>) testSubscriber);
        return observable;
    }

The result is…

An exception has occurred in the compiler (1.8.0_112-release). Please file a bug against the Java compiler via the Java bug reporting page (http://bugreport.java.com) after checking the Bug Database (http://bugs.java.com) for duplicates. Include your program and the following diagnostic in your report. Thank you.
com.sun.tools.javac.code.Symbol$CompletionFailure: class file for java.lang.invoke.MethodType not found

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':app:compile<MyBuildFlavorhere>UnitTestJavaWithJavac'.
> Compilation failed; see the compiler error output for details.

Edit. Don’t try Lambdas again. Even with the path set on my mac, javahome points to 1.8 etc. I can’t get it to work. If this were a newer project, I would have worked harder. However, since this is an inherited Android app written by a web developer trying to use Android, it is not a great option. It’s also not worth the time to get it to work. Already into the days of this task instead of the half day it should take.

I can’t find good examples of non-lambda floor plans. I tried it myself and it got more and more messy.

Solution

If I’m not mistaken, do you want to call in parallel?

So the rx-y way is this:

    Observable.fromCallable(() -> eventsQuery.orderBy("timeStamp", false).query())
            .flatMapIterable(models -> models)
            .flatMap(model -> {
                 map your model

avoid throwing exceptions in a chain, just return Observable.error(e) if you really need to
                try to wrap your methods that throw exceptions in an Observable via Observable.fromCallable()

return doNetworkCall(someParameter)
                        .subscribeOn(Schedulers.io());
            }, true /*because you don't want to terminate a stream if error occurs*/, maxConcurrent /* specify number of concurrent calls, typically available processors + 1 */)
            .subscribe(result -> {/* handle result */}, error -> {/* handle error */});

Move this section into the constructor in your ClientSecureDataToolKit

    InterfaceSecureDataToolKit interfaceSecureDataToolKit = mRetrofit.create(InterfaceSecureDataToolKit.class);

Related Problems and Solutions