-
Notifications
You must be signed in to change notification settings - Fork 7.6k
1.x: PublishSubject fail-fast when backpressured #4225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
1.x: PublishSubject fail-fast when backpressured #4225
Conversation
Current coverage is 84.12% (diff: 100%)@@ 1.x #4225 diff @@
==========================================
Files 265 265
Lines 17305 17377 +72
Methods 0 0
Messages 0 0
Branches 2624 2643 +19
==========================================
+ Hits 14559 14619 +60
+ Misses 1893 1891 -2
- Partials 853 867 +14
|
pp.onError(e); | ||
} catch (Throwable ex) { | ||
if (errors == null) { | ||
errors = new ArrayList<Throwable>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: (1)
👍 // I'm afraid that this will break some amount of user code, but this has to be done… |
To paraphrase Godzilla (2014): Let them fail (early)! :) |
👍 |
The recent changes to ReplaySubject<String> subject = ReplaySubject.create();
TestSubscriber subscriber = new TestSubscriber();
Subscription subscription = subject.subscribe(subscriber);
subscription.unsubscribe();
subject.onNext("foo");
subject.onCompleted();
subject.subscribe(subscriber);
// Assertions below pass with v1.1.5 but fail with 1.1.6 and above
subscriber.assertCompleted();
subscriber.assertValue("foo"); Questions:
Thanks! |
Alright apparently using |
Test changes were required since RxJava 1.1.6 changed the way ReplaySubject in a way that broke reusing the same `TestSubscriber` more than once. (See comments here: ReactiveX/RxJava#4225 (comment)) Fixed tests by using a `TestObserver` that can be reused multiple times.
You are not supposed to reuse |
* Bump buildTools, sdkVersion and dependencies Test changes were required since RxJava 1.1.6 changed the way ReplaySubject in a way that broke reusing the same `TestSubscriber` more than once. (See comments here: ReactiveX/RxJava#4225 (comment)) Fixed tests by using a `TestObserver` that can be reused multiple times.
My reading of this change is that it's no longer possible to emit on a PublishSubject that has no observers (yet). If so, isn't that a rather large behaviour/API break - a lot more than just failing early? This change broke our tests, but I don't see any discussion of this possibility above. Was it considered? I don't see anything in the javadocs that suggests that emitting on an unsubscribed subject is a problem. |
PublishSubject considers each of the subscribed Subscribers individually and sending an onError if that particular Subscriber is not ready to receive
Without seeing what your unit tests did it's hard to say what's wrong with them. |
Indeed, my mistake, sorry, I got confused reading the diff and thought it was going straight from the outer onNext to the inner producer class onNext, d'oh. I guess something subscribed has missing backpressure somewhere. It's hard to tell from the error where the root cause is though, as the exception is thrown by the PublishSubject and it doesn't say much about which subscriber had problems. |
Enable assembly tracking with |
This PR modifies the
PublishSubject
to fail fast if the child Subscriber can't keep up. Therefore, instead of some other operator failing somewhere downstream, theMissingBackpressureException
now points to thePublishSubject
instead.In addition, there were complaints in #3850 that cross-unsubscription doesn't stop another Subscriber from receiving events if it comes after the unsubscribe() call in the dispatch loop. Since
PublishSubject
now tracks request - which is the main extra overhead - it is trivial to add the necessary eager check for the unsubscribed state.Benchmark comparison (i7 4790, Windows 7 x64, Java 8u102)
As expected, this adds some overhead although most noticeably for the mid-range only. Short-lived publishing is now slightly faster even.
Interestingly, many benchmarks behave oddly in these 1000s range - we could be hitting some JIT threshold. While in other benchmarks, the warmup iteration numbers keep increasing as JIT does its work but here, it starts out quite nicely then drops 25% and stays that way. I'm on windows so JMH -perfasm doesn't work.
If this direction is accepted, I'll update
BehaviorSubject
,timer()
andinterval()
do do the same tracking.