Skip to content

Commit

Permalink
Add some accessors #30
Browse files Browse the repository at this point in the history
  • Loading branch information
lempiji committed May 31, 2019
1 parent f43152a commit 0a66504
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 1 deletion.
8 changes: 8 additions & 0 deletions source/rx/observer.d
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,19 @@ private:
}

public:
///
this(Observer!E[] observers)
{
_observers = observers;
}

public:
///
Observer!E[] observers() @property
{
return _observers;
}

public:
///
void put(E obj)
Expand Down
68 changes: 67 additions & 1 deletion source/rx/subject.d
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ public:
while (!cas(&_observer, oldObserver, newObserver));
}

protected:
Observer!E currentObserver() @property
{
return assumeThreadLocal(atomicLoad(_observer));
}

private:
shared(Observer!E) _observer;
}
Expand Down Expand Up @@ -261,6 +267,65 @@ unittest
assert(observer.lastException is ex);
}

unittest
{
// MyFilterSubject puts a value only on MyCustomObserver.

static class MyCustomObserver : Observer!int
{
int[] buf;

void put(int obj)
{
buf ~= obj;
}

void completed()
{
}

void failure(Exception ex)
{
}
}

static class MyFilterSubject : SubjectObject!int
{
override void put(int obj)
{
if (auto current = cast(CompositeObserver!int) currentObserver)
{
/// write a own filter, map, order and more
foreach (observer; current.observers)
{
if (auto myObserver = cast(MyCustomObserver) observer)
{
myObserver.put(obj);
}
}
}
}
}

import std.array : appender;

auto myObserver = new MyCustomObserver;
auto buffer = appender!(int[]);

auto sub = new MyFilterSubject;
.put(sub, -1);

sub.subscribe(myObserver);
sub.subscribe(buffer);

.put(sub, 0);
.put(sub, 1);
.put(sub, 2);

assert(myObserver.buf.length == 3);
assert(buffer.data.length == 0);
}

private class Subscription(TSubject, TObserver) : Disposable
{
public:
Expand Down Expand Up @@ -834,7 +899,8 @@ public:
///
void put(E obj)
{
if (_completed) return;
if (_completed)
return;
.put(_buffer, obj);
.put(_subject, obj);
}
Expand Down

0 comments on commit 0a66504

Please sign in to comment.