rx java - RxJava: How to implement a slow consumer with buffer and a fast consumer to the same source? -


while playing various backpressure scenarios, implemented case 1 subscriber slow buffer, while 1 consumes whatever thrown it. using scala , akka streams. can see code here if want , test runs here.

i try develop rxjava version comparison got stuck on one. in akka streams, can build graph 1 source broadcasts on 2 channels, , have slow sink , fast sink consume channels. each channel can independently apply buffering , throttling. in rxjava, there share operator broadcasting buffering , throttling logic not on subscriber, on observable. i'm not sure how apply buffering , throttling , not have both subscribers affected. both akka streams , rxjava being implementation of rx, i'm hoping there's way want.

here's pictorial version of i'm trying do.

something this?

import rx.observable; import rx.observables.connectableobservable;  import java.util.concurrent.timeunit;  public class test {     public static void main(string[] args) {          //emits long every 100 milliseconds, , publishes subscribers simultaneously through connectableobservable         connectableobservable<long> source = observable.interval(100, timeunit.milliseconds).publish();          //buffers emissions lists within 1 second durations, , first subscriber prints them         source.buffer(1,timeunit.seconds).subscribe(system.out::println);          //no buffering, push emissions directly second subscriber prints them         source.subscribe(system.out::println);          //start firing emissions both subscribers connected         source.connect();          //sleep keep program alive 10 seconds         try {             thread.sleep(10000);         } catch (interruptedexception e) {             e.printstacktrace();         }     } } 

the subscriber has no notion of throttling or operators. done on observable side through various operators yield different observable implementations. subscriber pretty dumb , consumes emission final step in observable chain. agnostic thread emission comes on, less whether been throttled or not observable passing items it.


Comments