-
Notifications
You must be signed in to change notification settings - Fork 2
/
multi_headed_greek_monster.rb
103 lines (92 loc) · 2.1 KB
/
multi_headed_greek_monster.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
class MultiHeadedGreekMonster
def initialize(progress = nil, worker_count = 3, on_port = 23121, &block)
@action = block
@on_port = on_port
@worker_count = worker_count
@progress = progress
start_service
start_workers
end
def feed(thing)
@service_manager.give(thing)
end
def wait(for_min_q_size = 5, &block)
while(@service_manager.q_size > for_min_q_size)
sleep(1)
if block_given?
yield
end
end
end
def finish
@service_manager.done!
while(!@service_manager.done?)
sleep(1)
end
@worker_pids.each do |pid|
Process.wait(pid)
end
Process.kill("KILL", @server_pid)
end
class ServiceManager
def initialize(progress)
@progress = progress
@things = []
@done = false
end
def give(thing)
@things << thing
end
def take
@things && @things.pop
end
def q_size
@things && @things.size || 0
end
def done?
@done && @things && @things.empty?
end
def done!
@done = true
end
def tick
@progress.tick if @progress
end
end
private
def start_service
require 'drb'
@server_pid = fork do
ActiveRecord::Base.clear_all_connections!
at_exit { exit! }
DRb.start_service "druby://localhost:#{@on_port}", ServiceManager.new(@progress)
DRb.thread.join
end
@service_manager = DRbObject.new nil, "druby://localhost:#{@on_port}"
sleep 0.2 # FIXME
end
def start_workers
@worker_pids = []
@worker_count.times do |i|
@worker_pids << fork do
ActiveRecord::Base.clear_all_connections!
sleep(i)
at_exit { exit! }
work = DRbObject.new nil, "druby://localhost:#{@on_port}"
waits = 0
while(!work.done?)
if got = work.take
@action.call(got, work)
waits = 0
else
if waits > 10
puts "waiting on work to do from #{Process.pid} (#{waits})"
end
sleep(1)
waits += 1
end
end
end
end
end
end