Subversion Repositories SE.SVN

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
6 7u83 1
%% Poolboy - A hunky Erlang worker pool factory
2
 
3
-module(poolboy).
4
-behaviour(gen_server).
5
 
6
-export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2,
7
         transaction/3, child_spec/2, child_spec/3, start/1, start/2,
8
         start_link/1, start_link/2, stop/1, status/1]).
9
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
10
         code_change/3]).
11
-export_type([pool/0]).
12
 
13
-define(TIMEOUT, 5000).
14
 
15
-ifdef(pre17).
16
-type pid_queue() :: queue().
17
-else.
18
-type pid_queue() :: queue:queue().
19
-endif.
20
 
21
-ifdef(OTP_RELEASE). %% this implies 21 or higher
22
-define(EXCEPTION(Class, Reason, Stacktrace), Class:Reason:Stacktrace).
23
-define(GET_STACK(Stacktrace), Stacktrace).
24
-else.
25
-define(EXCEPTION(Class, Reason, _), Class:Reason).
26
-define(GET_STACK(_), erlang:get_stacktrace()).
27
-endif.
28
 
29
-type pool() ::
30
    Name :: (atom() | pid()) |
31
    {Name :: atom(), node()} |
32
    {local, Name :: atom()} |
33
    {global, GlobalName :: any()} |
34
    {via, Module :: atom(), ViaName :: any()}.
35
 
36
% Copied from gen:start_ret/0
37
-type start_ret() :: {'ok', pid()} | 'ignore' | {'error', term()}.
38
 
39
-record(state, {
40
    supervisor :: undefined | pid(),
41
    workers = [] :: [pid()],
42
    waiting :: pid_queue(),
43
    monitors :: ets:tid(),
44
    size = 5 :: non_neg_integer(),
45
    overflow = 0 :: non_neg_integer(),
46
    max_overflow = 10 :: non_neg_integer(),
47
    strategy = lifo :: lifo | fifo
48
}).
49
 
50
-spec checkout(Pool :: pool()) -> pid().
51
checkout(Pool) ->
52
    checkout(Pool, true).
53
 
54
-spec checkout(Pool :: pool(), Block :: boolean()) -> pid() | full.
55
checkout(Pool, Block) ->
56
    checkout(Pool, Block, ?TIMEOUT).
57
 
58
-spec checkout(Pool :: pool(), Block :: boolean(), Timeout :: timeout())
59
    -> pid() | full.
60
checkout(Pool, Block, Timeout) ->
61
    CRef = make_ref(),
62
    try
63
        gen_server:call(Pool, {checkout, CRef, Block}, Timeout)
64
    catch
65
        ?EXCEPTION(Class, Reason, Stacktrace) ->
66
            gen_server:cast(Pool, {cancel_waiting, CRef}),
67
            erlang:raise(Class, Reason, ?GET_STACK(Stacktrace))
68
    end.
69
 
70
-spec checkin(Pool :: pool(), Worker :: pid()) -> ok.
71
checkin(Pool, Worker) when is_pid(Worker) ->
72
    gen_server:cast(Pool, {checkin, Worker}).
73
 
74
-spec transaction(Pool :: pool(), Fun :: fun((Worker :: pid()) -> any()))
75
    -> any().
76
transaction(Pool, Fun) ->
77
    transaction(Pool, Fun, ?TIMEOUT).
78
 
79
-spec transaction(Pool :: pool(), Fun :: fun((Worker :: pid()) -> any()),
80
    Timeout :: timeout()) -> any().
81
transaction(Pool, Fun, Timeout) ->
82
    Worker = poolboy:checkout(Pool, true, Timeout),
83
    try
84
        Fun(Worker)
85
    after
86
        ok = poolboy:checkin(Pool, Worker)
87
    end.
88
 
89
-spec child_spec(PoolId :: term(), PoolArgs :: proplists:proplist())
90
    -> supervisor:child_spec().
91
child_spec(PoolId, PoolArgs) ->
92
    child_spec(PoolId, PoolArgs, []).
93
 
94
-spec child_spec(PoolId :: term(),
95
                 PoolArgs :: proplists:proplist(),
96
                 WorkerArgs :: proplists:proplist())
97
    -> supervisor:child_spec().
98
child_spec(PoolId, PoolArgs, WorkerArgs) ->
99
    {PoolId, {poolboy, start_link, [PoolArgs, WorkerArgs]},
100
     permanent, 5000, worker, [poolboy]}.
101
 
102
-spec start(PoolArgs :: proplists:proplist())
103
    -> start_ret().
104
start(PoolArgs) ->
105
    start(PoolArgs, PoolArgs).
106
 
107
-spec start(PoolArgs :: proplists:proplist(),
108
            WorkerArgs:: proplists:proplist())
109
    -> start_ret().
110
start(PoolArgs, WorkerArgs) ->
111
    start_pool(start, PoolArgs, WorkerArgs).
112
 
113
-spec start_link(PoolArgs :: proplists:proplist())
114
    -> start_ret().
115
start_link(PoolArgs)  ->
116
    %% for backwards compatability, pass the pool args as the worker args as well
117
    start_link(PoolArgs, PoolArgs).
118
 
119
-spec start_link(PoolArgs :: proplists:proplist(),
120
                 WorkerArgs:: proplists:proplist())
121
    -> start_ret().
122
start_link(PoolArgs, WorkerArgs)  ->
123
    start_pool(start_link, PoolArgs, WorkerArgs).
124
 
125
-spec stop(Pool :: pool()) -> ok.
126
stop(Pool) ->
127
    gen_server:call(Pool, stop).
128
 
129
-spec status(Pool :: pool()) -> {atom(), integer(), integer(), integer()}.
130
status(Pool) ->
131
    gen_server:call(Pool, status).
132
 
133
init({PoolArgs, WorkerArgs}) ->
134
    process_flag(trap_exit, true),
135
    Waiting = queue:new(),
136
    Monitors = ets:new(monitors, [private]),
137
    init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}).
138
 
139
init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) ->
140
    {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs),
141
    init(Rest, WorkerArgs, State#state{supervisor = Sup});
142
init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) ->
143
    init(Rest, WorkerArgs, State#state{size = Size});
144
init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) ->
145
    init(Rest, WorkerArgs, State#state{max_overflow = MaxOverflow});
146
init([{strategy, lifo} | Rest], WorkerArgs, State) ->
147
    init(Rest, WorkerArgs, State#state{strategy = lifo});
148
init([{strategy, fifo} | Rest], WorkerArgs, State) ->
149
    init(Rest, WorkerArgs, State#state{strategy = fifo});
150
init([_ | Rest], WorkerArgs, State) ->
151
    init(Rest, WorkerArgs, State);
152
init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) ->
153
    Workers = prepopulate(Size, Sup),
154
    {ok, State#state{workers = Workers}}.
155
 
156
handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) ->
157
    case ets:lookup(Monitors, Pid) of
158
        [{Pid, _, MRef}] ->
159
            true = erlang:demonitor(MRef),
160
            true = ets:delete(Monitors, Pid),
161
            NewState = handle_checkin(Pid, State),
162
            {noreply, NewState};
163
        [] ->
164
            {noreply, State}
165
    end;
166
 
167
handle_cast({cancel_waiting, CRef}, State) ->
168
    case ets:match(State#state.monitors, {'$1', CRef, '$2'}) of
169
        [[Pid, MRef]] ->
170
            demonitor(MRef, [flush]),
171
            true = ets:delete(State#state.monitors, Pid),
172
            NewState = handle_checkin(Pid, State),
173
            {noreply, NewState};
174
        [] ->
175
            Cancel = fun({_, Ref, MRef}) when Ref =:= CRef ->
176
                             demonitor(MRef, [flush]),
177
                             false;
178
                        (_) ->
179
                             true
180
                     end,
181
            Waiting = queue:filter(Cancel, State#state.waiting),
182
            {noreply, State#state{waiting = Waiting}}
183
    end;
184
 
185
handle_cast(_Msg, State) ->
186
    {noreply, State}.
187
 
188
handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) ->
189
    #state{supervisor = Sup,
190
           workers = Workers,
191
           monitors = Monitors,
192
           overflow = Overflow,
193
           max_overflow = MaxOverflow} = State,
194
    case Workers of
195
        [Pid | Left] ->
196
            MRef = erlang:monitor(process, FromPid),
197
            true = ets:insert(Monitors, {Pid, CRef, MRef}),
198
            {reply, Pid, State#state{workers = Left}};
199
        [] when MaxOverflow > 0, Overflow < MaxOverflow ->
200
            {Pid, MRef} = new_worker(Sup, FromPid),
201
            true = ets:insert(Monitors, {Pid, CRef, MRef}),
202
            {reply, Pid, State#state{overflow = Overflow + 1}};
203
        [] when Block =:= false ->
204
            {reply, full, State};
205
        [] ->
206
            MRef = erlang:monitor(process, FromPid),
207
            Waiting = queue:in({From, CRef, MRef}, State#state.waiting),
208
            {noreply, State#state{waiting = Waiting}}
209
    end;
210
 
211
handle_call(status, _From, State) ->
212
    #state{workers = Workers,
213
           monitors = Monitors,
214
           overflow = Overflow} = State,
215
    StateName = state_name(State),
216
    {reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State};
217
handle_call(get_avail_workers, _From, State) ->
218
    Workers = State#state.workers,
219
    {reply, Workers, State};
220
handle_call(get_all_workers, _From, State) ->
221
    Sup = State#state.supervisor,
222
    WorkerList = supervisor:which_children(Sup),
223
    {reply, WorkerList, State};
224
handle_call(get_all_monitors, _From, State) ->
225
    Monitors = ets:select(State#state.monitors,
226
                          [{{'$1', '_', '$2'}, [], [{{'$1', '$2'}}]}]),
227
    {reply, Monitors, State};
228
handle_call(stop, _From, State) ->
229
    {stop, normal, ok, State};
230
handle_call(_Msg, _From, State) ->
231
    Reply = {error, invalid_message},
232
    {reply, Reply, State}.
233
 
234
handle_info({'DOWN', MRef, _, _, _}, State) ->
235
    case ets:match(State#state.monitors, {'$1', '_', MRef}) of
236
        [[Pid]] ->
237
            true = ets:delete(State#state.monitors, Pid),
238
            NewState = handle_checkin(Pid, State),
239
            {noreply, NewState};
240
        [] ->
241
            Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting),
242
            {noreply, State#state{waiting = Waiting}}
243
    end;
244
handle_info({'EXIT', Pid, _Reason}, State) ->
245
    #state{supervisor = Sup,
246
           monitors = Monitors} = State,
247
    case ets:lookup(Monitors, Pid) of
248
        [{Pid, _, MRef}] ->
249
            true = erlang:demonitor(MRef),
250
            true = ets:delete(Monitors, Pid),
251
            NewState = handle_worker_exit(Pid, State),
252
            {noreply, NewState};
253
        [] ->
254
            case lists:member(Pid, State#state.workers) of
255
                true ->
256
                    W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers),
257
                    {noreply, State#state{workers = [new_worker(Sup) | W]}};
258
                false ->
259
                    {noreply, State}
260
            end
261
    end;
262
 
263
handle_info(_Info, State) ->
264
    {noreply, State}.
265
 
266
terminate(_Reason, State) ->
267
    ok = lists:foreach(fun (W) -> unlink(W) end, State#state.workers),
268
    true = exit(State#state.supervisor, shutdown),
269
    ok.
270
 
271
code_change(_OldVsn, State, _Extra) ->
272
    {ok, State}.
273
 
274
start_pool(StartFun, PoolArgs, WorkerArgs) ->
275
    case proplists:get_value(name, PoolArgs) of
276
        undefined ->
277
            gen_server:StartFun(?MODULE, {PoolArgs, WorkerArgs}, []);
278
        Name ->
279
            gen_server:StartFun(Name, ?MODULE, {PoolArgs, WorkerArgs}, [])
280
    end.
281
 
282
new_worker(Sup) ->
283
    {ok, Pid} = supervisor:start_child(Sup, []),
284
    true = link(Pid),
285
    Pid.
286
 
287
new_worker(Sup, FromPid) ->
288
    Pid = new_worker(Sup),
289
    Ref = erlang:monitor(process, FromPid),
290
    {Pid, Ref}.
291
 
292
dismiss_worker(Sup, Pid) ->
293
    true = unlink(Pid),
294
    supervisor:terminate_child(Sup, Pid).
295
 
296
prepopulate(N, _Sup) when N < 1 ->
297
    [];
298
prepopulate(N, Sup) ->
299
    prepopulate(N, Sup, []).
300
 
301
prepopulate(0, _Sup, Workers) ->
302
    Workers;
303
prepopulate(N, Sup, Workers) ->
304
    prepopulate(N-1, Sup, [new_worker(Sup) | Workers]).
305
 
306
handle_checkin(Pid, State) ->
307
    #state{supervisor = Sup,
308
           waiting = Waiting,
309
           monitors = Monitors,
310
           overflow = Overflow,
311
           strategy = Strategy} = State,
312
    case queue:out(Waiting) of
313
        {{value, {From, CRef, MRef}}, Left} ->
314
            true = ets:insert(Monitors, {Pid, CRef, MRef}),
315
            gen_server:reply(From, Pid),
316
            State#state{waiting = Left};
317
        {empty, Empty} when Overflow > 0 ->
318
            ok = dismiss_worker(Sup, Pid),
319
            State#state{waiting = Empty, overflow = Overflow - 1};
320
        {empty, Empty} ->
321
            Workers = case Strategy of
322
                lifo -> [Pid | State#state.workers];
323
                fifo -> State#state.workers ++ [Pid]
324
            end,
325
            State#state{workers = Workers, waiting = Empty, overflow = 0}
326
    end.
327
 
328
handle_worker_exit(Pid, State) ->
329
    #state{supervisor = Sup,
330
           monitors = Monitors,
331
           overflow = Overflow} = State,
332
    case queue:out(State#state.waiting) of
333
        {{value, {From, CRef, MRef}}, LeftWaiting} ->
334
            NewWorker = new_worker(State#state.supervisor),
335
            true = ets:insert(Monitors, {NewWorker, CRef, MRef}),
336
            gen_server:reply(From, NewWorker),
337
            State#state{waiting = LeftWaiting};
338
        {empty, Empty} when Overflow > 0 ->
339
            State#state{overflow = Overflow - 1, waiting = Empty};
340
        {empty, Empty} ->
341
            Workers =
342
                [new_worker(Sup)
343
                 | lists:filter(fun (P) -> P =/= Pid end, State#state.workers)],
344
            State#state{workers = Workers, waiting = Empty}
345
    end.
346
 
347
state_name(State = #state{overflow = Overflow}) when Overflow < 1 ->
348
    #state{max_overflow = MaxOverflow, workers = Workers} = State,
349
    case length(Workers) == 0 of
350
        true when MaxOverflow < 1 -> full;
351
        true -> overflow;
352
        false -> ready
353
    end;
354
state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) ->
355
    full;
356
state_name(_State) ->
357
    overflow.