rx java - RxJava: How to implement a slow consumer with buffer and a fast consumer to the same source? -
while playing various backpressure scenarios, implemented case 1 subscriber slow buffer, while 1 consumes whatever thrown it. using scala , akka streams. can see code here if want , test runs here.
i try develop rxjava version comparison got stuck on one. in akka streams, can build graph 1 source broadcasts on 2 channels, , have slow sink , fast sink consume channels. each channel can independently apply buffering , throttling. in rxjava, there share
operator broadcasting buffering , throttling logic not on subscriber
, on observable
. i'm not sure how apply buffering , throttling , not have both subscribers affected. both akka streams , rxjava being implementation of rx, i'm hoping there's way want.
here's pictorial version of i'm trying do.
something this?
import rx.observable; import rx.observables.connectableobservable; import java.util.concurrent.timeunit; public class test { public static void main(string[] args) { //emits long every 100 milliseconds, , publishes subscribers simultaneously through connectableobservable connectableobservable<long> source = observable.interval(100, timeunit.milliseconds).publish(); //buffers emissions lists within 1 second durations, , first subscriber prints them source.buffer(1,timeunit.seconds).subscribe(system.out::println); //no buffering, push emissions directly second subscriber prints them source.subscribe(system.out::println); //start firing emissions both subscribers connected source.connect(); //sleep keep program alive 10 seconds try { thread.sleep(10000); } catch (interruptedexception e) { e.printstacktrace(); } } }
the subscriber
has no notion of throttling or operators. done on observable
side through various operators yield different observable
implementations. subscriber
pretty dumb , consumes emission final step in observable
chain. agnostic thread emission comes on, less whether been throttled or not observable
passing items it.
Comments
Post a Comment