Java – Take elements up to a certain character and group them in RxJava

Take elements up to a certain character and group them in RxJava… here is a solution to the problem.

Take elements up to a certain character and group them in RxJava

I have a simple setup for a problem, but the solution seems more complicated.

Setup: I have a hot observable that originates from a scanner that emits each number as a different element and emits an R when the code is complete.

Question: According to this, I want a hot observable that emits each complete code as 1 element.

RxMarbles example

I tried using different flatMap, takeWith, and groupBy operators, but failed to find a solution.

Solution

You can use the buffer operator.

PublishSubject<Token<Integer>> s = PublishSubject.create();

Observable<Token<Integer>> markers = s.filter(x->x.isMarker());

s.buffer(markers).subscribe(
    v->{
        Optional<Integer> reduce = v.stream()
            .filter(t->!t.isMarker())
            .map(t->(ValueToken<Integer>)t)
            .map(ValueToken::get)
            .reduce((a,b)->a+b);
        reduce.ifPresent(System.out::println);
    }
);

s.onNext(value(12));
s.onNext(value(13));
s.onNext(marker());  will emit 25

s.onNext(value(10));
s.onNext(value(7));
s.onNext(marker());  will emit 17

s.onNext(value(10));
s.onNext(value(7));  Not emitting yet

I created a class to wrap both values and tags in a stream.

public abstract class Token<T> {
    private static final MarkerToken MARKER = new MarkerToken<>();

public boolean isMarker() {
        return false;
    }

public static <T> MarkerToken<T> marker() {
        return MARKER;
    }

public static <T> ValueToken<T> value(T o) {
        return new ValueToken<>(o);
    }

public static class ValueToken<T> extends Token<T> {
        T value;

public ValueToken(T value) {
            this.value = value;
        }

public T get() {
            return value;
        }
    }

public static class MarkerToken<T> extends Token<T> {
        public boolean isMarker() {
            return true;
        }
    }

}

Update (using scanning).

The previous method is also emitted when the stream is closed, and with this solution you can emit only the full buffer.

The message class acts as an accumulator that accumulates tokens until it accumulates the closing token.

When this happens, the next message starts from the beginning.

The closing tag is the last element of the presence token message is complete.

public static class Message<T> {
    List<Token<T>> tokens = new ArrayList<>();

public Message<T> append(Token<T> t) {

Message<T> mx = new Message<T>();
        if(!isComplete()) {
            mx.tokens.addAll(tokens);
        }
        mx.tokens.add(t);
        return mx;
    }

public boolean isComplete() {
        int n = tokens.size();
        return n>0 && tokens.get(n-1).isMarker();
    }

public Optional<List<Token<T>>> fullMessage(){
        return isComplete() ? Optional.of(tokens):Optional.empty(); 
    }
}

Scan the source from which you send a message for each tag that is issued, and then filter out incomplete messages and issue only messages marked as complete.

    s.scan(new Message<Integer>(), (a, b) -> a.append(b))
        .filter(Message::isComplete)
        .map(Message::fullMessage)
        .map(Optional::get).subscribe(v -> {
            System.out.println(v);
        });

s.onNext(value(12));
    s.onNext(value(13));
    s.onNext(marker());// [V(12), V(13), MARKER]

s.onNext(value(10));
    s.onNext(value(7));
    s.onNext(marker());  [V(10), V(7), MARKER]

s.onNext(value(10));
    s.onNext(value(127));

s.onComplete();  Not emitting incomplete messages on the closing of the subject.

Related Problems and Solutions