Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronous methods of ISubscriber #44

Open
rogeralsing opened this issue Apr 7, 2018 · 7 comments
Open

Synchronous methods of ISubscriber #44

rogeralsing opened this issue Apr 7, 2018 · 7 comments

Comments

@rogeralsing
Copy link

Question, why are the methods in reactive streams synchronous?

https://github.com/reactive-streams/reactive-streams-dotnet/blob/master/src/api/Reactive.Streams/ISubscriber.cs#L51

What if the subscriber is async ?
Forcing sync, you need a buffer (e.g. actor mailbox).

This buffer can either:

  • Block when full (breaking the idea of reactive streams)
  • Grow to infinity (causing OOM)
  • Drop at head or tail, (also breaking the idea of reactive streams)

What am I missing in this equation?
The only way I can see this working out w/o issues is if the subscriber signas demand for every single element. my impression is/was that this is not the case?

@marcpiechura
Copy link
Contributor

A detailed explanation can be found here #19

The TL;DR version, think of them as signals from which you can start your async work and yes, the sync is enforced via demand, i.e. publisher is only allowed to call OnNext after subscriber has signaled demand but subscriber is allowed to signal demand > 1 , i.e. buffer can send initial demand of buffer size and therefore publisher can act push based until limit is reached.

@akarnokd
Copy link
Contributor

akarnokd commented Apr 7, 2018

A void return allows it to be asynchronous as it doesn't require the caller to wait around, unlike with IEnumerator.MoveNext(). The most apparent problem with non-void return was in the original Rx.NET where IObservable.Subscribe returns a Disposable synchronously and it was often not possible to stop a sequence unless the stages were on different threads - adding overhead and delays.

@rogeralsing
Copy link
Author

rogeralsing commented Apr 7, 2018

@marcpiechura
In the case of actors, where in the flow would the subscriber actually sit?
On the outside of an actor, monitoring mailbox length?

e.g. lets say we have an actor, and a bounded mailbox of max 100.

  1. Mailbox starts empty.
  2. Consumer triggers demand? how much? 100?
  3. Some work is produced and sent to consumer via OnNext.
  4. Consumer signals demand for 1 for each successfully consumed message?

And as long as there is a positive demand known by the producer, the producer can safely push downstream?

@marcpiechura
Copy link
Contributor

marcpiechura commented Apr 8, 2018

@rogeralsing
In a very simplified version that’s how it would work, yes.
The problem with the mailbox is that it only shows if message has been consumed ( as you know ofc ;-) )but not if work is done, think of PipeTo(Self) or SelectAsync from an API point of view, also anyone could call Tell or Ask and therefore your demand doesn’t match with your mailbox anymore.
So a more realistic attempt would be to hide the actor in the subscriber, pass in the subscription as constructor parameter and signal demand from within the actor itself when work is done. The subscriber would simply use Tell to feed messages into the actor.

Or your actor itself could implement one of the interfaces, not sure what the best option would be and out of my head i don’t know how we’ve done it in Akka.Streams.
But in the end the actor is used as processing unit and the actual communication happens via the methods from the interfaces instead of Tell/Ask.

@rogeralsing
Copy link
Author

@marcpiechura thanks! this all makes sense.

I don't quite see how this can be done safely in a distributed environment.
Even if the reactive streams API is for local only, the backing implementation must take some form of action to solve signaling need for more work if the streams involve remote parts.

from the JVM API:

It can be called however often and whenever needed—but the outstanding cumulative demand
public void request(long n);

In a guaranteed 1-1 delivery world, I see how this can work.
But, add some distributed complexity on-top of this and:

At most once delivery

A request could get lost. and producer and consumer goes idle.

At least once delivery

multiple versions of the same request could arrive at producer. and producer produces too much work. filling and thus blocking the consumer mailbox

So for this to work properly in a distributed env there has to be some sort of guaranteed delivery with de-duplication.
e.g. incrementing version number per unique request.

Is that part an implementation detail left up to the implementor?
(I'm just thinking, if it is, and the implementor misses this, one implementation could break another implementation by sending too much work, flooding buffers)

@marcpiechura
Copy link
Contributor

@rogeralsing
Yes it’s up the implementor, the SPI contains only the bare minimum for local processing.

Regarding safe interop, which is imortend even for local scenarios, the repository contains a TCK which all libraries should pass in order to guarantee a correct implemention. That can probably also used for a distributed implementation.

Regarding the how, I only know that it’s possible but don’t know how it’s implemented.
I guess @Horusiath knows it, since he has ported StreamRef’s which provide Akka.Streams over the network.
Another option would be RSocket which provides RS semantics at the application level.

@ktoso
Copy link

ktoso commented Apr 9, 2018

@rogeralsing, yes, sure -- in a over-the-network implementation you will need to implement redelivery, deduplication or both depending on what guarantees you're after. Any RS-over-network will need to implement some form of re-delivery and ACKs, as otherwise one does not know if the element (or demand!) has been sent over or accidentally lost.

A request could get lost. and producer and consumer goes idle.

Demand is simple to solve by redelivery; One can change the demand signaled from additive to cumulative to make those messages idempotent which makes that easy to "if not sure, just re-deliver". (A.K.A. sequence numbers).

Is that part an implementation detail left up to the implementor?

As mentioned before, RS is about the happy within-same-jvm case, the other other-the-network details depend on transport used and an implementation's decisions about what to provide and how. The high level semantics must remain true though, so testing with the TCK makes sense also for those distributed impls.

multiple versions of the same request could arrive at producer. and producer produces too much work. filling and thus blocking the consumer mailbox

Again, rather proposing to switch to sequence numbers as demand for over the network, as this entire problem goes away. Otherwise you would have to do demand + sequence number which ends up being the same amount of work, but not all that much gains.

(I'm just thinking, if it is, and the implementor misses this, [...]

Firstly, if talking about across-the-network, then the wire format would have to be the same for all implementations in order for them to be able to even run into this problem :-) We were in touch about rsocket with Ben and Todd when it was started but sadly have not yet implemented / used it. I think it could become the reference for the network semantics if minimized somewhat.


Note also that the same can be said about elements, they could be lost as well. In Stream Refs we opted to do the simplest thing possible, which also has the upside that the sender does not have to buffer to wait for ACKs, we add sequence numbers to signals, and if a missing one is detected we fail. Having that said, since that impl rides on Akka's actor transport, i.e. TCP or Aeron, there are transport level redeliveries at play already, and a missing single signal is usually a "catastrophic" event so failing is the right thing there.

I forgot what rsocket does for elements, would be good to check. But in general the ACK style is not too hard either, as one would piggyback the "receivedUntilSeqNr" in the demand signal, which then would allow the sending side to flush its still retained elements, or attempt redelivering them (also needs envelopes with sequence numbers then though).


one implementation could break another implementation by sending too much work, flooding buffers

This would be illegal under RS rules, regardless of the fact that it's an over-the-network impl, and as such should be caught either by TCK or by having wrongly implemented some rule, which of course then is a major bug in given implementation, nothing we can do to stop people from writing wrong code, but who would want to use an incorrect library? :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants