-
Notifications
You must be signed in to change notification settings - Fork 1
/
app.py
149 lines (130 loc) · 4.64 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import asyncio
import concurrent
import logging
import queue
import socket
import threading
import time
from concurrent.futures._base import Future
from datetime import datetime
from random import randint
from pysyncobj import SyncObj, SyncObjConf
from pysyncobj.batteries import ReplDict
import util
import network_discovery
logging.basicConfig(
format=f'%(asctime)s:%(levelname)8s:{socket.gethostname():>32s}:%(process)8d:%(name)s: %(message)s',
datefmt="%Y-%m-%dT%H:%M:%S%z",
level=logging.INFO,
)
logger = logging.getLogger(__name__)
def ssdp_handler(*, peers_queue: queue.Queue, shutdown_event: threading.Event) -> None:
"""
Starts SSDP client/server to handle node discovery and populates
`peers_queue` with discovered nodes. Nodes are continually
re-discovered so the queue may provide previously discovered
nodes
:param peers_queue: connection strings for discovered nodes
:param shutdown_event: event that signals thread should stop
:return:
"""
_loop = asyncio.new_event_loop()
communicator = network_discovery.SsdpCommunicator(
loop=_loop,
discovered_peers=peers_queue,
shutdown_event=shutdown_event,
)
_loop.run_until_complete(communicator.start())
def peer_handler(
*,
peers_set: util.ExpiringSet,
peers_queue: queue.Queue,
sync_obj: SyncObj,
shutdown_event: threading.Event,
discovery_event: threading.Event,
discovery_timeout: int = 15
) -> None:
"""
Updates PySyncObj with peer changes from SSDP discovery server.
This method will first perform a "discovery" mode will it will listen for
peers before performing any updates. This allows initial cluster bootstrapping to complete successfully
and allows new nodes time to notify existing cluster and discover existing nodes before joining.
:param peers_set: set of known peers. This will be updated in-place as new peers are discovered
:param peers_queue: queue of peer connection strings (discovered by SSDP)
:param sync_obj: PySyncObj cluster that will have new peers dynamically added
:param shutdown_event: event that signals when thread should stop
:param discovery_event: threading.Event that gets signaled once initial discovery is complete
:param discovery_timeout: time to wait in discovery mode before making cluster updates
:return: None
"""
_current_host = util.current_sync_host()
start_time = time.time()
while not shutdown_event.is_set():
try:
peer = peers_queue.get()
except queue.Empty:
time.sleep(0.05)
continue
# The SSDP code will discover itself but
# PyObjSync treats `self` differently
if peer == _current_host:
continue
else:
if start_time <= (time.time()-discovery_timeout):
logger.debug("Discovery timeout met")
discovery_event.set()
if peer not in peers_set and discovery_event.is_set():
logger.info("Dynamically adding peer %s", peer)
try:
sync_obj.result(timeout=0).addNodeToCluster(peer)
except concurrent.futures.TimeoutError:
pass
# Refresh ttl regardless
peers_set.add(peer)
shutdown = threading.Event()
q = queue.Queue()
ssdp_server = threading.Thread(
target=ssdp_handler,
kwargs=dict(
peers_queue=q,
shutdown_event=shutdown,
)
)
ssdp_server.start()
current_host = util.current_sync_host()
peers = util.ExpiringSet()
logger.info("Discovering initial nodes")
# Cluster relies on an initial node-set and cluster updates
# rely on the existence of the cluster. This future allows
# the regular discovery code to work before the cluster has
# been initially created
s_future = concurrent.futures.Future()
discovery_complete = threading.Event()
node_manager = threading.Thread(
target=peer_handler,
kwargs=dict(
peers_set=peers,
peers_queue=q,
sync_obj=s_future,
discovery_event=discovery_complete,
shutdown_event=shutdown,
)
)
node_manager.start()
discovery_complete.wait()
logger.info("Initial discovery complete, Found peers %s", peers)
d = ReplDict()
s = SyncObj(
selfNode=current_host,
otherNodes=peers,
consumers=[d],
conf=SyncObjConf(dynamicMembershipChange=True)
)
s_future.set_result(s)
for _ in range(600):
logger.info(s.getStatus())
d.set(current_host, datetime.now().isoformat(), sync=True)
d.set("shared_key", datetime.now().isoformat() + " " + current_host, sync=True)
time.sleep(randint(0, 50) / 10)
for k, v in d.items():
logger.info("%10s: %s", k, v)