RxJava Adding Events to Stream based on callback
RxJava Adding Events to Stream based on callback
Adding some code to clear up the question
//generates a sequence in the range from input value (+1) to input value (+9)
Observable<ColoredIntegerModel> getSequenceObservable(int value, int delay, int color)
return Observable.range(value+1,9)
.map(i ->
Log.d(TAG, "Value " + i
+ " evaluating on " + Thread.currentThread().getName()
+ " emitting item at " + System.currentTimeMillis());
try
Thread.sleep(delay);
catch (InterruptedException e)
return new ColoredIntegerModel(i, color);
);
//creates a stream if say input =2 of numbers from 1-20 (input*2) such that the output is 1 (Red color) 2-10(green color) 11 (Red color) 11-20 (Green Color)
Observable<ColoredIntegerModel> getEventStream(int value)
return Observable.create(new ObservableOnSubscribe<ColoredIntegerModel>()
@Override
public void subscribe(ObservableEmitter<ColoredIntegerModel> emitter) throws Exception
for (int i = 0; i < value; ++i)
ColoredIntegerModel model = new ColoredIntegerModel(i*10, Color.RED);
emitter.onNext(model);
Observable<ColoredIntegerModel> more = getSequenceObservable(i*10, 100, Color.GREEN);
more.subscribe(new Consumer<ColoredIntegerModel>()
@Override
public void accept(ColoredIntegerModel coloredIntegerModel) throws Exception
emitter.onNext(coloredIntegerModel);
);
);
The code above works. It prints 1(Red) 2-10(Green) 11(Red), 12-20, but I would like a much cleaner solution. I am also not sure when the inner subscription in getEventStream() can be disposed off.
The question basically is that getEventStream is calling a function for each emission that returns an Observable as well. This is analogous to a chain of Promise, in which each individual Promise can return a Series of other Promises. Hope this clarifies any confusion on the original question.
Observable.fromIterable
Yes, I am trying to resolve concrete internal issue on Android. Our API level requirements API < 17. The issue is resolved by promises in js implementation. The main issue is that we have series of async promises in then chain, each promise can result in series of promise which need to be resolved before. so if initially we had [promise1, promise2, promise3] and promise1 returned a promise chain [promise1a, promise1b] then order of resolution is [promise1a, promise1b, promise1, promise2, promise3]. I could not find a good promise lib in android to achieve this.
– rOrlig
Sep 10 '18 at 12:31
2 Answers
2
If what you want is simplify the code above so that all subscriptions handling are left to the final subscriber and maintain the order to subsequences emissions, you can do it like this:
Observable<ColoredIntegerModel> getSequenceObservable(int value, int delay, int color)
return Observable.range(value+1, 9)
.flatMap( i -> Observable
.just(new ColoredIntegerModel(i, color))
.delay(delay * (i + 1), TimeUnit.MILLISECONDS)
)
;
Observable<ColoredIntegerModel> getEventStream(int value)
return Observable.range(0, value)
.concatMap(i ->
getSequenceObservable(i * 10,100, Color.GREEN)
.startWith(new ColoredIntegerModel(i*10, Color.RED))
)
;
that is if you really need the manual delay, if you don't, just replace the above getSequenceObservable
with:
getSequenceObservable
Observable<ColoredIntegerModel> getSequenceObservable(int value, int color)
return Observable.range(value+1, 9)
.map(i -> new ColoredIntegerModel(i, color))
;
sorry above is just an illustration of the real problem. it has nothing to do with delays. the question is if a subscriber is subscribing to sources of events, how can each event handler add things to this sequence.
– rOrlig
Sep 10 '18 at 18:35
I think you need to state better the problem you're trying to solve. a subscriber cannot add events to an upstream observable unless it's a representative of a subject, in that's case you should take a look at subjects reactivex.io/documentation/subject.html.
– Jans
Sep 10 '18 at 18:55
Sorry not to be clear, I tried to as clear as I could be. On the questions of subjects also how will you change the events inside it.
– rOrlig
Sep 10 '18 at 19:29
Don't worry. you can check these links, they may give you a sense on subjects. medium.com/@bishoy_abd/… and tech.instacart.com/…
– Jans
Sep 10 '18 at 19:51
Thanks for this. I am not sure how subjects resolve the above question as well
– rOrlig
Sep 11 '18 at 11:45
You should take a look at the FlatMap operator
In short words it transforms each of the elements in an Observable
into it's own Observable
and joins of them.
Observable
Observable
The simplest solution for your problem could be something like:
getEventStream()
.flatMap(it -> getSequenceObservable(it))
.doOnNext(System.out::print)
.blockingSubscribe();
Where helper functions are
static Observable<ColoredIntegerModel> getEventStream()
return Observable.fromArray(
new ColoredIntegerModel(10, Color.RED),
new ColoredIntegerModel(20, Color.RED)
);
static Observable<ColoredIntegerModel> getSequenceObservable(ColoredIntegerModel color)
return Observable.range(1, 10)
.flatMap(it -> Observable.timer(it, TimeUnit.SECONDS)
.map(time -> new ColoredIntegerModel(time.intValue(), Color.GREEN))
);
In case you want to keep original value from getEventStream()
you can use something like this instead of getSequenceObservable
getEventStream()
getSequenceObservable
static Observable<ColoredIntegerModel> getSequenceObservable(ColoredIntegerModel color)
return Observable.range(1, 10)
.flatMap(it -> Observable.timer(it, TimeUnit.MILLISECONDS)
.map(time -> new ColoredIntegerModel(time.intValue(), Color.GREEN)))
.concatWith(Observable.just(color));
In case ordering of emissions is important, use flatMap version with maxConcurrency:
getEventStream()
.flatMap(it -> getSequenceObservable(it), true, 1)
.doOnNext(System.out::println)
.blockingSubscribe();
wouldn't blockingSubscribe() block the main android thread ?
– rOrlig
Sep 16 '18 at 15:20
That's just an example, you obviously shouldn't call
blockingSubscribe()
on the main thread in production. This would work if you'd call it in on some other thread through. Another thing missing are calls to subscribeOn()
.– Daniil
Sep 17 '18 at 4:56
blockingSubscribe()
subscribeOn()
Thanks for contributing an answer to Stack Overflow!
But avoid …
To learn more, see our tips on writing great answers.
Required, but never shown
Required, but never shown
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
So, two clarifications required - are you trying to solve a concrete problem (in which case, what is it?), and why are you using nested observables, when you could just use
Observable.fromIterable
or similar?– Tassos Bassoukos
Sep 9 '18 at 7:51