-
Notifications
You must be signed in to change notification settings - Fork 77
/
Api.cs
143 lines (115 loc) · 3.56 KB
/
Api.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
using System;
using System.Threading.Tasks;
using System.Runtime.Serialization;
using Orleankka;
using Orleankka.Meta;
using Orleans;
namespace Demo
{
[Serializable, GenerateSerializer]
public class Search : Query<int>
{
[Id(0)] public readonly string Subject;
public Search(string subject)
{
Subject = subject;
}
}
[Serializable, GenerateSerializer]
public class Subscribe : Command
{
[Id(0)] public readonly ObserverRef Observer;
public Subscribe(ObserverRef observer)
{
Observer = observer;
}
}
[Serializable, GenerateSerializer]
public class AvailabilityChanged : Event
{
[Id(0)] public readonly ActorRef Api;
[Id(1)] public readonly bool Available;
public AvailabilityChanged(ActorRef api, bool available)
{
Api = api;
Available = available;
}
}
[Serializable, GenerateSerializer]
public class ApiUnavailableException : ApplicationException
{
public ApiUnavailableException(string api)
: base(api + " api is unavailable. Try later!")
{}
}
public interface IApi : IActorGrain, IGrainWithStringKey
{}
public class Api : DispatchActorGrain, IApi
{
const int FailureThreshold = 3;
IObserverCollection observers;
IApiWorker worker;
int failures;
bool available = true;
public Api(
IApiWorker worker = null,
IObserverCollection observers = null,
IActorRuntime runtime = null)
: base(runtime)
{
this.worker = worker;
this.observers = observers;
}
void On(Activate _)
{
observers = observers ?? new ObserverCollection();
worker = worker ?? ApiWorkerFactory.Create(Id);
}
public void Handle(Subscribe cmd) => observers.Add(cmd.Observer);
public async Task<int> Handle(Search search)
{
if (!available)
throw new ApiUnavailableException(Id);
try
{
var result = await worker.Search(search.Subject);
ResetFailureCounter();
return result;
}
catch (HttpException)
{
IncrementFailureCounter();
if (!HasReachedFailureThreshold())
throw new ApiUnavailableException(Id);
Lock();
Notify();
ScheduleAvailabilityCheck();
throw new ApiUnavailableException(Id);
}
}
bool HasReachedFailureThreshold() => failures == FailureThreshold;
void IncrementFailureCounter() => failures++;
void ResetFailureCounter() => failures = 0;
void ScheduleAvailabilityCheck()
{
var due = TimeSpan.FromSeconds(1);
var period = TimeSpan.FromSeconds(1);
Timers.Register("check", due, period, CheckAvailability);
}
public async Task CheckAvailability()
{
try
{
await worker.Search("test");
Timers.Unregister("check");
Unlock();
Notify();
}
catch (HttpException)
{}
}
void Lock() => available = false;
void Unlock() => available = true;
void Notify() => observers.Notify(new AvailabilityChanged(Self, available));
}
}