6 |
7u83 |
1 |
%% Poolboy - A hunky Erlang worker pool factory
|
|
|
2 |
|
|
|
3 |
-module(poolboy).
|
|
|
4 |
-behaviour(gen_server).
|
|
|
5 |
|
|
|
6 |
-export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2,
|
|
|
7 |
transaction/3, child_spec/2, child_spec/3, start/1, start/2,
|
|
|
8 |
start_link/1, start_link/2, stop/1, status/1]).
|
|
|
9 |
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
|
|
10 |
code_change/3]).
|
|
|
11 |
-export_type([pool/0]).
|
|
|
12 |
|
|
|
13 |
-define(TIMEOUT, 5000).
|
|
|
14 |
|
|
|
15 |
-ifdef(pre17).
|
|
|
16 |
-type pid_queue() :: queue().
|
|
|
17 |
-else.
|
|
|
18 |
-type pid_queue() :: queue:queue().
|
|
|
19 |
-endif.
|
|
|
20 |
|
|
|
21 |
-ifdef(OTP_RELEASE). %% this implies 21 or higher
|
|
|
22 |
-define(EXCEPTION(Class, Reason, Stacktrace), Class:Reason:Stacktrace).
|
|
|
23 |
-define(GET_STACK(Stacktrace), Stacktrace).
|
|
|
24 |
-else.
|
|
|
25 |
-define(EXCEPTION(Class, Reason, _), Class:Reason).
|
|
|
26 |
-define(GET_STACK(_), erlang:get_stacktrace()).
|
|
|
27 |
-endif.
|
|
|
28 |
|
|
|
29 |
-type pool() ::
|
|
|
30 |
Name :: (atom() | pid()) |
|
|
|
31 |
{Name :: atom(), node()} |
|
|
|
32 |
{local, Name :: atom()} |
|
|
|
33 |
{global, GlobalName :: any()} |
|
|
|
34 |
{via, Module :: atom(), ViaName :: any()}.
|
|
|
35 |
|
|
|
36 |
% Copied from gen:start_ret/0
|
|
|
37 |
-type start_ret() :: {'ok', pid()} | 'ignore' | {'error', term()}.
|
|
|
38 |
|
|
|
39 |
-record(state, {
|
|
|
40 |
supervisor :: undefined | pid(),
|
|
|
41 |
workers = [] :: [pid()],
|
|
|
42 |
waiting :: pid_queue(),
|
|
|
43 |
monitors :: ets:tid(),
|
|
|
44 |
size = 5 :: non_neg_integer(),
|
|
|
45 |
overflow = 0 :: non_neg_integer(),
|
|
|
46 |
max_overflow = 10 :: non_neg_integer(),
|
|
|
47 |
strategy = lifo :: lifo | fifo
|
|
|
48 |
}).
|
|
|
49 |
|
|
|
50 |
-spec checkout(Pool :: pool()) -> pid().
|
|
|
51 |
checkout(Pool) ->
|
|
|
52 |
checkout(Pool, true).
|
|
|
53 |
|
|
|
54 |
-spec checkout(Pool :: pool(), Block :: boolean()) -> pid() | full.
|
|
|
55 |
checkout(Pool, Block) ->
|
|
|
56 |
checkout(Pool, Block, ?TIMEOUT).
|
|
|
57 |
|
|
|
58 |
-spec checkout(Pool :: pool(), Block :: boolean(), Timeout :: timeout())
|
|
|
59 |
-> pid() | full.
|
|
|
60 |
checkout(Pool, Block, Timeout) ->
|
|
|
61 |
CRef = make_ref(),
|
|
|
62 |
try
|
|
|
63 |
gen_server:call(Pool, {checkout, CRef, Block}, Timeout)
|
|
|
64 |
catch
|
|
|
65 |
?EXCEPTION(Class, Reason, Stacktrace) ->
|
|
|
66 |
gen_server:cast(Pool, {cancel_waiting, CRef}),
|
|
|
67 |
erlang:raise(Class, Reason, ?GET_STACK(Stacktrace))
|
|
|
68 |
end.
|
|
|
69 |
|
|
|
70 |
-spec checkin(Pool :: pool(), Worker :: pid()) -> ok.
|
|
|
71 |
checkin(Pool, Worker) when is_pid(Worker) ->
|
|
|
72 |
gen_server:cast(Pool, {checkin, Worker}).
|
|
|
73 |
|
|
|
74 |
-spec transaction(Pool :: pool(), Fun :: fun((Worker :: pid()) -> any()))
|
|
|
75 |
-> any().
|
|
|
76 |
transaction(Pool, Fun) ->
|
|
|
77 |
transaction(Pool, Fun, ?TIMEOUT).
|
|
|
78 |
|
|
|
79 |
-spec transaction(Pool :: pool(), Fun :: fun((Worker :: pid()) -> any()),
|
|
|
80 |
Timeout :: timeout()) -> any().
|
|
|
81 |
transaction(Pool, Fun, Timeout) ->
|
|
|
82 |
Worker = poolboy:checkout(Pool, true, Timeout),
|
|
|
83 |
try
|
|
|
84 |
Fun(Worker)
|
|
|
85 |
after
|
|
|
86 |
ok = poolboy:checkin(Pool, Worker)
|
|
|
87 |
end.
|
|
|
88 |
|
|
|
89 |
-spec child_spec(PoolId :: term(), PoolArgs :: proplists:proplist())
|
|
|
90 |
-> supervisor:child_spec().
|
|
|
91 |
child_spec(PoolId, PoolArgs) ->
|
|
|
92 |
child_spec(PoolId, PoolArgs, []).
|
|
|
93 |
|
|
|
94 |
-spec child_spec(PoolId :: term(),
|
|
|
95 |
PoolArgs :: proplists:proplist(),
|
|
|
96 |
WorkerArgs :: proplists:proplist())
|
|
|
97 |
-> supervisor:child_spec().
|
|
|
98 |
child_spec(PoolId, PoolArgs, WorkerArgs) ->
|
|
|
99 |
{PoolId, {poolboy, start_link, [PoolArgs, WorkerArgs]},
|
|
|
100 |
permanent, 5000, worker, [poolboy]}.
|
|
|
101 |
|
|
|
102 |
-spec start(PoolArgs :: proplists:proplist())
|
|
|
103 |
-> start_ret().
|
|
|
104 |
start(PoolArgs) ->
|
|
|
105 |
start(PoolArgs, PoolArgs).
|
|
|
106 |
|
|
|
107 |
-spec start(PoolArgs :: proplists:proplist(),
|
|
|
108 |
WorkerArgs:: proplists:proplist())
|
|
|
109 |
-> start_ret().
|
|
|
110 |
start(PoolArgs, WorkerArgs) ->
|
|
|
111 |
start_pool(start, PoolArgs, WorkerArgs).
|
|
|
112 |
|
|
|
113 |
-spec start_link(PoolArgs :: proplists:proplist())
|
|
|
114 |
-> start_ret().
|
|
|
115 |
start_link(PoolArgs) ->
|
|
|
116 |
%% for backwards compatability, pass the pool args as the worker args as well
|
|
|
117 |
start_link(PoolArgs, PoolArgs).
|
|
|
118 |
|
|
|
119 |
-spec start_link(PoolArgs :: proplists:proplist(),
|
|
|
120 |
WorkerArgs:: proplists:proplist())
|
|
|
121 |
-> start_ret().
|
|
|
122 |
start_link(PoolArgs, WorkerArgs) ->
|
|
|
123 |
start_pool(start_link, PoolArgs, WorkerArgs).
|
|
|
124 |
|
|
|
125 |
-spec stop(Pool :: pool()) -> ok.
|
|
|
126 |
stop(Pool) ->
|
|
|
127 |
gen_server:call(Pool, stop).
|
|
|
128 |
|
|
|
129 |
-spec status(Pool :: pool()) -> {atom(), integer(), integer(), integer()}.
|
|
|
130 |
status(Pool) ->
|
|
|
131 |
gen_server:call(Pool, status).
|
|
|
132 |
|
|
|
133 |
init({PoolArgs, WorkerArgs}) ->
|
|
|
134 |
process_flag(trap_exit, true),
|
|
|
135 |
Waiting = queue:new(),
|
|
|
136 |
Monitors = ets:new(monitors, [private]),
|
|
|
137 |
init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}).
|
|
|
138 |
|
|
|
139 |
init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) ->
|
|
|
140 |
{ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs),
|
|
|
141 |
init(Rest, WorkerArgs, State#state{supervisor = Sup});
|
|
|
142 |
init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) ->
|
|
|
143 |
init(Rest, WorkerArgs, State#state{size = Size});
|
|
|
144 |
init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) ->
|
|
|
145 |
init(Rest, WorkerArgs, State#state{max_overflow = MaxOverflow});
|
|
|
146 |
init([{strategy, lifo} | Rest], WorkerArgs, State) ->
|
|
|
147 |
init(Rest, WorkerArgs, State#state{strategy = lifo});
|
|
|
148 |
init([{strategy, fifo} | Rest], WorkerArgs, State) ->
|
|
|
149 |
init(Rest, WorkerArgs, State#state{strategy = fifo});
|
|
|
150 |
init([_ | Rest], WorkerArgs, State) ->
|
|
|
151 |
init(Rest, WorkerArgs, State);
|
|
|
152 |
init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) ->
|
|
|
153 |
Workers = prepopulate(Size, Sup),
|
|
|
154 |
{ok, State#state{workers = Workers}}.
|
|
|
155 |
|
|
|
156 |
handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) ->
|
|
|
157 |
case ets:lookup(Monitors, Pid) of
|
|
|
158 |
[{Pid, _, MRef}] ->
|
|
|
159 |
true = erlang:demonitor(MRef),
|
|
|
160 |
true = ets:delete(Monitors, Pid),
|
|
|
161 |
NewState = handle_checkin(Pid, State),
|
|
|
162 |
{noreply, NewState};
|
|
|
163 |
[] ->
|
|
|
164 |
{noreply, State}
|
|
|
165 |
end;
|
|
|
166 |
|
|
|
167 |
handle_cast({cancel_waiting, CRef}, State) ->
|
|
|
168 |
case ets:match(State#state.monitors, {'$1', CRef, '$2'}) of
|
|
|
169 |
[[Pid, MRef]] ->
|
|
|
170 |
demonitor(MRef, [flush]),
|
|
|
171 |
true = ets:delete(State#state.monitors, Pid),
|
|
|
172 |
NewState = handle_checkin(Pid, State),
|
|
|
173 |
{noreply, NewState};
|
|
|
174 |
[] ->
|
|
|
175 |
Cancel = fun({_, Ref, MRef}) when Ref =:= CRef ->
|
|
|
176 |
demonitor(MRef, [flush]),
|
|
|
177 |
false;
|
|
|
178 |
(_) ->
|
|
|
179 |
true
|
|
|
180 |
end,
|
|
|
181 |
Waiting = queue:filter(Cancel, State#state.waiting),
|
|
|
182 |
{noreply, State#state{waiting = Waiting}}
|
|
|
183 |
end;
|
|
|
184 |
|
|
|
185 |
handle_cast(_Msg, State) ->
|
|
|
186 |
{noreply, State}.
|
|
|
187 |
|
|
|
188 |
handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) ->
|
|
|
189 |
#state{supervisor = Sup,
|
|
|
190 |
workers = Workers,
|
|
|
191 |
monitors = Monitors,
|
|
|
192 |
overflow = Overflow,
|
|
|
193 |
max_overflow = MaxOverflow} = State,
|
|
|
194 |
case Workers of
|
|
|
195 |
[Pid | Left] ->
|
|
|
196 |
MRef = erlang:monitor(process, FromPid),
|
|
|
197 |
true = ets:insert(Monitors, {Pid, CRef, MRef}),
|
|
|
198 |
{reply, Pid, State#state{workers = Left}};
|
|
|
199 |
[] when MaxOverflow > 0, Overflow < MaxOverflow ->
|
|
|
200 |
{Pid, MRef} = new_worker(Sup, FromPid),
|
|
|
201 |
true = ets:insert(Monitors, {Pid, CRef, MRef}),
|
|
|
202 |
{reply, Pid, State#state{overflow = Overflow + 1}};
|
|
|
203 |
[] when Block =:= false ->
|
|
|
204 |
{reply, full, State};
|
|
|
205 |
[] ->
|
|
|
206 |
MRef = erlang:monitor(process, FromPid),
|
|
|
207 |
Waiting = queue:in({From, CRef, MRef}, State#state.waiting),
|
|
|
208 |
{noreply, State#state{waiting = Waiting}}
|
|
|
209 |
end;
|
|
|
210 |
|
|
|
211 |
handle_call(status, _From, State) ->
|
|
|
212 |
#state{workers = Workers,
|
|
|
213 |
monitors = Monitors,
|
|
|
214 |
overflow = Overflow} = State,
|
|
|
215 |
StateName = state_name(State),
|
|
|
216 |
{reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State};
|
|
|
217 |
handle_call(get_avail_workers, _From, State) ->
|
|
|
218 |
Workers = State#state.workers,
|
|
|
219 |
{reply, Workers, State};
|
|
|
220 |
handle_call(get_all_workers, _From, State) ->
|
|
|
221 |
Sup = State#state.supervisor,
|
|
|
222 |
WorkerList = supervisor:which_children(Sup),
|
|
|
223 |
{reply, WorkerList, State};
|
|
|
224 |
handle_call(get_all_monitors, _From, State) ->
|
|
|
225 |
Monitors = ets:select(State#state.monitors,
|
|
|
226 |
[{{'$1', '_', '$2'}, [], [{{'$1', '$2'}}]}]),
|
|
|
227 |
{reply, Monitors, State};
|
|
|
228 |
handle_call(stop, _From, State) ->
|
|
|
229 |
{stop, normal, ok, State};
|
|
|
230 |
handle_call(_Msg, _From, State) ->
|
|
|
231 |
Reply = {error, invalid_message},
|
|
|
232 |
{reply, Reply, State}.
|
|
|
233 |
|
|
|
234 |
handle_info({'DOWN', MRef, _, _, _}, State) ->
|
|
|
235 |
case ets:match(State#state.monitors, {'$1', '_', MRef}) of
|
|
|
236 |
[[Pid]] ->
|
|
|
237 |
true = ets:delete(State#state.monitors, Pid),
|
|
|
238 |
NewState = handle_checkin(Pid, State),
|
|
|
239 |
{noreply, NewState};
|
|
|
240 |
[] ->
|
|
|
241 |
Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting),
|
|
|
242 |
{noreply, State#state{waiting = Waiting}}
|
|
|
243 |
end;
|
|
|
244 |
handle_info({'EXIT', Pid, _Reason}, State) ->
|
|
|
245 |
#state{supervisor = Sup,
|
|
|
246 |
monitors = Monitors} = State,
|
|
|
247 |
case ets:lookup(Monitors, Pid) of
|
|
|
248 |
[{Pid, _, MRef}] ->
|
|
|
249 |
true = erlang:demonitor(MRef),
|
|
|
250 |
true = ets:delete(Monitors, Pid),
|
|
|
251 |
NewState = handle_worker_exit(Pid, State),
|
|
|
252 |
{noreply, NewState};
|
|
|
253 |
[] ->
|
|
|
254 |
case lists:member(Pid, State#state.workers) of
|
|
|
255 |
true ->
|
|
|
256 |
W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers),
|
|
|
257 |
{noreply, State#state{workers = [new_worker(Sup) | W]}};
|
|
|
258 |
false ->
|
|
|
259 |
{noreply, State}
|
|
|
260 |
end
|
|
|
261 |
end;
|
|
|
262 |
|
|
|
263 |
handle_info(_Info, State) ->
|
|
|
264 |
{noreply, State}.
|
|
|
265 |
|
|
|
266 |
terminate(_Reason, State) ->
|
|
|
267 |
ok = lists:foreach(fun (W) -> unlink(W) end, State#state.workers),
|
|
|
268 |
true = exit(State#state.supervisor, shutdown),
|
|
|
269 |
ok.
|
|
|
270 |
|
|
|
271 |
code_change(_OldVsn, State, _Extra) ->
|
|
|
272 |
{ok, State}.
|
|
|
273 |
|
|
|
274 |
start_pool(StartFun, PoolArgs, WorkerArgs) ->
|
|
|
275 |
case proplists:get_value(name, PoolArgs) of
|
|
|
276 |
undefined ->
|
|
|
277 |
gen_server:StartFun(?MODULE, {PoolArgs, WorkerArgs}, []);
|
|
|
278 |
Name ->
|
|
|
279 |
gen_server:StartFun(Name, ?MODULE, {PoolArgs, WorkerArgs}, [])
|
|
|
280 |
end.
|
|
|
281 |
|
|
|
282 |
new_worker(Sup) ->
|
|
|
283 |
{ok, Pid} = supervisor:start_child(Sup, []),
|
|
|
284 |
true = link(Pid),
|
|
|
285 |
Pid.
|
|
|
286 |
|
|
|
287 |
new_worker(Sup, FromPid) ->
|
|
|
288 |
Pid = new_worker(Sup),
|
|
|
289 |
Ref = erlang:monitor(process, FromPid),
|
|
|
290 |
{Pid, Ref}.
|
|
|
291 |
|
|
|
292 |
dismiss_worker(Sup, Pid) ->
|
|
|
293 |
true = unlink(Pid),
|
|
|
294 |
supervisor:terminate_child(Sup, Pid).
|
|
|
295 |
|
|
|
296 |
prepopulate(N, _Sup) when N < 1 ->
|
|
|
297 |
[];
|
|
|
298 |
prepopulate(N, Sup) ->
|
|
|
299 |
prepopulate(N, Sup, []).
|
|
|
300 |
|
|
|
301 |
prepopulate(0, _Sup, Workers) ->
|
|
|
302 |
Workers;
|
|
|
303 |
prepopulate(N, Sup, Workers) ->
|
|
|
304 |
prepopulate(N-1, Sup, [new_worker(Sup) | Workers]).
|
|
|
305 |
|
|
|
306 |
handle_checkin(Pid, State) ->
|
|
|
307 |
#state{supervisor = Sup,
|
|
|
308 |
waiting = Waiting,
|
|
|
309 |
monitors = Monitors,
|
|
|
310 |
overflow = Overflow,
|
|
|
311 |
strategy = Strategy} = State,
|
|
|
312 |
case queue:out(Waiting) of
|
|
|
313 |
{{value, {From, CRef, MRef}}, Left} ->
|
|
|
314 |
true = ets:insert(Monitors, {Pid, CRef, MRef}),
|
|
|
315 |
gen_server:reply(From, Pid),
|
|
|
316 |
State#state{waiting = Left};
|
|
|
317 |
{empty, Empty} when Overflow > 0 ->
|
|
|
318 |
ok = dismiss_worker(Sup, Pid),
|
|
|
319 |
State#state{waiting = Empty, overflow = Overflow - 1};
|
|
|
320 |
{empty, Empty} ->
|
|
|
321 |
Workers = case Strategy of
|
|
|
322 |
lifo -> [Pid | State#state.workers];
|
|
|
323 |
fifo -> State#state.workers ++ [Pid]
|
|
|
324 |
end,
|
|
|
325 |
State#state{workers = Workers, waiting = Empty, overflow = 0}
|
|
|
326 |
end.
|
|
|
327 |
|
|
|
328 |
handle_worker_exit(Pid, State) ->
|
|
|
329 |
#state{supervisor = Sup,
|
|
|
330 |
monitors = Monitors,
|
|
|
331 |
overflow = Overflow} = State,
|
|
|
332 |
case queue:out(State#state.waiting) of
|
|
|
333 |
{{value, {From, CRef, MRef}}, LeftWaiting} ->
|
|
|
334 |
NewWorker = new_worker(State#state.supervisor),
|
|
|
335 |
true = ets:insert(Monitors, {NewWorker, CRef, MRef}),
|
|
|
336 |
gen_server:reply(From, NewWorker),
|
|
|
337 |
State#state{waiting = LeftWaiting};
|
|
|
338 |
{empty, Empty} when Overflow > 0 ->
|
|
|
339 |
State#state{overflow = Overflow - 1, waiting = Empty};
|
|
|
340 |
{empty, Empty} ->
|
|
|
341 |
Workers =
|
|
|
342 |
[new_worker(Sup)
|
|
|
343 |
| lists:filter(fun (P) -> P =/= Pid end, State#state.workers)],
|
|
|
344 |
State#state{workers = Workers, waiting = Empty}
|
|
|
345 |
end.
|
|
|
346 |
|
|
|
347 |
state_name(State = #state{overflow = Overflow}) when Overflow < 1 ->
|
|
|
348 |
#state{max_overflow = MaxOverflow, workers = Workers} = State,
|
|
|
349 |
case length(Workers) == 0 of
|
|
|
350 |
true when MaxOverflow < 1 -> full;
|
|
|
351 |
true -> overflow;
|
|
|
352 |
false -> ready
|
|
|
353 |
end;
|
|
|
354 |
state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) ->
|
|
|
355 |
full;
|
|
|
356 |
state_name(_State) ->
|
|
|
357 |
overflow.
|