RxJava Adding Events to Stream based on callback

RxJava Adding Events to Stream based on callback



Adding some code to clear up the question


//generates a sequence in the range from input value (+1) to input value (+9)
Observable<ColoredIntegerModel> getSequenceObservable(int value, int delay, int color)
return Observable.range(value+1,9)
.map(i ->
Log.d(TAG, "Value " + i
+ " evaluating on " + Thread.currentThread().getName()
+ " emitting item at " + System.currentTimeMillis());
try
Thread.sleep(delay);
catch (InterruptedException e)


return new ColoredIntegerModel(i, color);
);


//creates a stream if say input =2 of numbers from 1-20 (input*2) such that the output is 1 (Red color) 2-10(green color) 11 (Red color) 11-20 (Green Color)
Observable<ColoredIntegerModel> getEventStream(int value)
return Observable.create(new ObservableOnSubscribe<ColoredIntegerModel>()
@Override
public void subscribe(ObservableEmitter<ColoredIntegerModel> emitter) throws Exception
for (int i = 0; i < value; ++i)
ColoredIntegerModel model = new ColoredIntegerModel(i*10, Color.RED);
emitter.onNext(model);
Observable<ColoredIntegerModel> more = getSequenceObservable(i*10, 100, Color.GREEN);
more.subscribe(new Consumer<ColoredIntegerModel>()
@Override
public void accept(ColoredIntegerModel coloredIntegerModel) throws Exception
emitter.onNext(coloredIntegerModel);

);


);



The code above works. It prints 1(Red) 2-10(Green) 11(Red), 12-20, but I would like a much cleaner solution. I am also not sure when the inner subscription in getEventStream() can be disposed off.



The question basically is that getEventStream is calling a function for each emission that returns an Observable as well. This is analogous to a chain of Promise, in which each individual Promise can return a Series of other Promises. Hope this clarifies any confusion on the original question.






So, two clarifications required - are you trying to solve a concrete problem (in which case, what is it?), and why are you using nested observables, when you could just use Observable.fromIterable or similar?

– Tassos Bassoukos
Sep 9 '18 at 7:51


Observable.fromIterable






Yes, I am trying to resolve concrete internal issue on Android. Our API level requirements API < 17. The issue is resolved by promises in js implementation. The main issue is that we have series of async promises in then chain, each promise can result in series of promise which need to be resolved before. so if initially we had [promise1, promise2, promise3] and promise1 returned a promise chain [promise1a, promise1b] then order of resolution is [promise1a, promise1b, promise1, promise2, promise3]. I could not find a good promise lib in android to achieve this.

– rOrlig
Sep 10 '18 at 12:31





2 Answers
2



If what you want is simplify the code above so that all subscriptions handling are left to the final subscriber and maintain the order to subsequences emissions, you can do it like this:


Observable<ColoredIntegerModel> getSequenceObservable(int value, int delay, int color)
return Observable.range(value+1, 9)
.flatMap( i -> Observable
.just(new ColoredIntegerModel(i, color))
.delay(delay * (i + 1), TimeUnit.MILLISECONDS)
)
;


Observable<ColoredIntegerModel> getEventStream(int value)
return Observable.range(0, value)
.concatMap(i ->
getSequenceObservable(i * 10,100, Color.GREEN)
.startWith(new ColoredIntegerModel(i*10, Color.RED))
)
;



that is if you really need the manual delay, if you don't, just replace the above getSequenceObservable with:


getSequenceObservable


Observable<ColoredIntegerModel> getSequenceObservable(int value, int color)
return Observable.range(value+1, 9)
.map(i -> new ColoredIntegerModel(i, color))
;






sorry above is just an illustration of the real problem. it has nothing to do with delays. the question is if a subscriber is subscribing to sources of events, how can each event handler add things to this sequence.

– rOrlig
Sep 10 '18 at 18:35






I think you need to state better the problem you're trying to solve. a subscriber cannot add events to an upstream observable unless it's a representative of a subject, in that's case you should take a look at subjects reactivex.io/documentation/subject.html.

– Jans
Sep 10 '18 at 18:55







Sorry not to be clear, I tried to as clear as I could be. On the questions of subjects also how will you change the events inside it.

– rOrlig
Sep 10 '18 at 19:29






Don't worry. you can check these links, they may give you a sense on subjects. medium.com/@bishoy_abd/… and tech.instacart.com/…

– Jans
Sep 10 '18 at 19:51







Thanks for this. I am not sure how subjects resolve the above question as well

– rOrlig
Sep 11 '18 at 11:45



You should take a look at the FlatMap operator



In short words it transforms each of the elements in an Observable into it's own Observable and joins of them.


Observable


Observable



The simplest solution for your problem could be something like:


getEventStream()
.flatMap(it -> getSequenceObservable(it))
.doOnNext(System.out::print)
.blockingSubscribe();



Where helper functions are


static Observable<ColoredIntegerModel> getEventStream()
return Observable.fromArray(
new ColoredIntegerModel(10, Color.RED),
new ColoredIntegerModel(20, Color.RED)
);

static Observable<ColoredIntegerModel> getSequenceObservable(ColoredIntegerModel color)
return Observable.range(1, 10)
.flatMap(it -> Observable.timer(it, TimeUnit.SECONDS)
.map(time -> new ColoredIntegerModel(time.intValue(), Color.GREEN))
);



In case you want to keep original value from getEventStream() you can use something like this instead of getSequenceObservable


getEventStream()


getSequenceObservable


static Observable<ColoredIntegerModel> getSequenceObservable(ColoredIntegerModel color)
return Observable.range(1, 10)
.flatMap(it -> Observable.timer(it, TimeUnit.MILLISECONDS)
.map(time -> new ColoredIntegerModel(time.intValue(), Color.GREEN)))
.concatWith(Observable.just(color));



In case ordering of emissions is important, use flatMap version with maxConcurrency:


getEventStream()
.flatMap(it -> getSequenceObservable(it), true, 1)
.doOnNext(System.out::println)
.blockingSubscribe();






wouldn't blockingSubscribe() block the main android thread ?

– rOrlig
Sep 16 '18 at 15:20






That's just an example, you obviously shouldn't call blockingSubscribe() on the main thread in production. This would work if you'd call it in on some other thread through. Another thing missing are calls to subscribeOn().

– Daniil
Sep 17 '18 at 4:56



blockingSubscribe()


subscribeOn()



Thanks for contributing an answer to Stack Overflow!



But avoid



To learn more, see our tips on writing great answers.



Required, but never shown



Required, but never shown




By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

𛂒𛀶,𛀽𛀑𛂀𛃧𛂓𛀙𛃆𛃑𛃷𛂟𛁡𛀢𛀟𛁤𛂽𛁕𛁪𛂟𛂯,𛁞𛂧𛀴𛁄𛁠𛁼𛂿𛀤 𛂘,𛁺𛂾𛃭𛃭𛃵𛀺,𛂣𛃍𛂖𛃶 𛀸𛃀𛂖𛁶𛁏𛁚 𛂢𛂞 𛁰𛂆𛀔,𛁸𛀽𛁓𛃋𛂇𛃧𛀧𛃣𛂐𛃇,𛂂𛃻𛃲𛁬𛃞𛀧𛃃𛀅 𛂭𛁠𛁡𛃇𛀷𛃓𛁥,𛁙𛁘𛁞𛃸𛁸𛃣𛁜,𛂛,𛃿,𛁯𛂘𛂌𛃛𛁱𛃌𛂈𛂇 𛁊𛃲,𛀕𛃴𛀜 𛀶𛂆𛀶𛃟𛂉𛀣,𛂐𛁞𛁾 𛁷𛂑𛁳𛂯𛀬𛃅,𛃶𛁼

ữḛḳṊẴ ẋ,Ẩṙ,ỹḛẪẠứụỿṞṦ,Ṉẍừ,ứ Ị,Ḵ,ṏ ṇỪḎḰṰọửḊ ṾḨḮữẑỶṑỗḮṣṉẃ Ữẩụ,ṓ,ḹẕḪḫỞṿḭ ỒṱṨẁṋṜ ḅẈ ṉ ứṀḱṑỒḵ,ḏ,ḊḖỹẊ Ẻḷổ,ṥ ẔḲẪụḣể Ṱ ḭỏựẶ Ồ Ṩ,ẂḿṡḾồ ỗṗṡịṞẤḵṽẃ ṸḒẄẘ,ủẞẵṦṟầṓế

⃀⃉⃄⃅⃍,⃂₼₡₰⃉₡₿₢⃉₣⃄₯⃊₮₼₹₱₦₷⃄₪₼₶₳₫⃍₽ ₫₪₦⃆₠₥⃁₸₴₷⃊₹⃅⃈₰⃁₫ ⃎⃍₩₣₷ ₻₮⃊⃀⃄⃉₯,⃏⃊,₦⃅₪,₼⃀₾₧₷₾ ₻ ₸₡ ₾,₭⃈₴⃋,€⃁,₩ ₺⃌⃍⃁₱⃋⃋₨⃊⃁⃃₼,⃎,₱⃍₲₶₡ ⃍⃅₶₨₭,⃉₭₾₡₻⃀ ₼₹⃅₹,₻₭ ⃌