Blame | Last modification | View Log | RSS feed
%% Poolboy - A hunky Erlang worker pool factory
-module(poolboy).
-behaviour(gen_server).
-export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2,
transaction/3, child_spec/2, child_spec/3, start/1, start/2,
start_link/1, start_link/2, stop/1, status/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export_type([pool/0]).
-define(TIMEOUT, 5000).
-ifdef(pre17).
-type pid_queue() :: queue().
-else.
-type pid_queue() :: queue:queue().
-endif.
-ifdef(OTP_RELEASE). %% this implies 21 or higher
-define(EXCEPTION(Class, Reason, Stacktrace), Class:Reason:Stacktrace).
-define(GET_STACK(Stacktrace), Stacktrace).
-else.
-define(EXCEPTION(Class, Reason, _), Class:Reason).
-define(GET_STACK(_), erlang:get_stacktrace()).
-endif.
-type pool() ::
Name :: (atom() | pid()) |
{Name :: atom(), node()} |
{local, Name :: atom()} |
{global, GlobalName :: any()} |
{via, Module :: atom(), ViaName :: any()}.
% Copied from gen:start_ret/0
-type start_ret() :: {'ok', pid()} | 'ignore' | {'error', term()}.
-record(state, {
supervisor :: undefined | pid(),
workers = [] :: [pid()],
waiting :: pid_queue(),
monitors :: ets:tid(),
size = 5 :: non_neg_integer(),
overflow = 0 :: non_neg_integer(),
max_overflow = 10 :: non_neg_integer(),
strategy = lifo :: lifo | fifo
}).
-spec checkout(Pool :: pool()) -> pid().
checkout(Pool) ->
checkout(Pool, true).
-spec checkout(Pool :: pool(), Block :: boolean()) -> pid() | full.
checkout(Pool, Block) ->
checkout(Pool, Block, ?TIMEOUT).
-spec checkout(Pool :: pool(), Block :: boolean(), Timeout :: timeout())
-> pid() | full.
checkout(Pool, Block, Timeout) ->
CRef = make_ref(),
try
gen_server:call(Pool, {checkout, CRef, Block}, Timeout)
catch
?EXCEPTION(Class, Reason, Stacktrace) ->
gen_server:cast(Pool, {cancel_waiting, CRef}),
erlang:raise(Class, Reason, ?GET_STACK(Stacktrace))
end.
-spec checkin(Pool :: pool(), Worker :: pid()) -> ok.
checkin(Pool, Worker) when is_pid(Worker) ->
gen_server:cast(Pool, {checkin, Worker}).
-spec transaction(Pool :: pool(), Fun :: fun((Worker :: pid()) -> any()))
-> any().
transaction(Pool, Fun) ->
transaction(Pool, Fun, ?TIMEOUT).
-spec transaction(Pool :: pool(), Fun :: fun((Worker :: pid()) -> any()),
Timeout :: timeout()) -> any().
transaction(Pool, Fun, Timeout) ->
Worker = poolboy:checkout(Pool, true, Timeout),
try
Fun(Worker)
after
ok = poolboy:checkin(Pool, Worker)
end.
-spec child_spec(PoolId :: term(), PoolArgs :: proplists:proplist())
-> supervisor:child_spec().
child_spec(PoolId, PoolArgs) ->
child_spec(PoolId, PoolArgs, []).
-spec child_spec(PoolId :: term(),
PoolArgs :: proplists:proplist(),
WorkerArgs :: proplists:proplist())
-> supervisor:child_spec().
child_spec(PoolId, PoolArgs, WorkerArgs) ->
{PoolId, {poolboy, start_link, [PoolArgs, WorkerArgs]},
permanent, 5000, worker, [poolboy]}.
-spec start(PoolArgs :: proplists:proplist())
-> start_ret().
start(PoolArgs) ->
start(PoolArgs, PoolArgs).
-spec start(PoolArgs :: proplists:proplist(),
WorkerArgs:: proplists:proplist())
-> start_ret().
start(PoolArgs, WorkerArgs) ->
start_pool(start, PoolArgs, WorkerArgs).
-spec start_link(PoolArgs :: proplists:proplist())
-> start_ret().
start_link(PoolArgs) ->
%% for backwards compatability, pass the pool args as the worker args as well
start_link(PoolArgs, PoolArgs).
-spec start_link(PoolArgs :: proplists:proplist(),
WorkerArgs:: proplists:proplist())
-> start_ret().
start_link(PoolArgs, WorkerArgs) ->
start_pool(start_link, PoolArgs, WorkerArgs).
-spec stop(Pool :: pool()) -> ok.
stop(Pool) ->
gen_server:call(Pool, stop).
-spec status(Pool :: pool()) -> {atom(), integer(), integer(), integer()}.
status(Pool) ->
gen_server:call(Pool, status).
init({PoolArgs, WorkerArgs}) ->
process_flag(trap_exit, true),
Waiting = queue:new(),
Monitors = ets:new(monitors, [private]),
init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}).
init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) ->
{ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs),
init(Rest, WorkerArgs, State#state{supervisor = Sup});
init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) ->
init(Rest, WorkerArgs, State#state{size = Size});
init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) ->
init(Rest, WorkerArgs, State#state{max_overflow = MaxOverflow});
init([{strategy, lifo} | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State#state{strategy = lifo});
init([{strategy, fifo} | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State#state{strategy = fifo});
init([_ | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State);
init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) ->
Workers = prepopulate(Size, Sup),
{ok, State#state{workers = Workers}}.
handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) ->
case ets:lookup(Monitors, Pid) of
[{Pid, _, MRef}] ->
true = erlang:demonitor(MRef),
true = ets:delete(Monitors, Pid),
NewState = handle_checkin(Pid, State),
{noreply, NewState};
[] ->
{noreply, State}
end;
handle_cast({cancel_waiting, CRef}, State) ->
case ets:match(State#state.monitors, {'$1', CRef, '$2'}) of
[[Pid, MRef]] ->
demonitor(MRef, [flush]),
true = ets:delete(State#state.monitors, Pid),
NewState = handle_checkin(Pid, State),
{noreply, NewState};
[] ->
Cancel = fun({_, Ref, MRef}) when Ref =:= CRef ->
demonitor(MRef, [flush]),
false;
(_) ->
true
end,
Waiting = queue:filter(Cancel, State#state.waiting),
{noreply, State#state{waiting = Waiting}}
end;
handle_cast(_Msg, State) ->
{noreply, State}.
handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) ->
#state{supervisor = Sup,
workers = Workers,
monitors = Monitors,
overflow = Overflow,
max_overflow = MaxOverflow} = State,
case Workers of
[Pid | Left] ->
MRef = erlang:monitor(process, FromPid),
true = ets:insert(Monitors, {Pid, CRef, MRef}),
{reply, Pid, State#state{workers = Left}};
[] when MaxOverflow > 0, Overflow < MaxOverflow ->
{Pid, MRef} = new_worker(Sup, FromPid),
true = ets:insert(Monitors, {Pid, CRef, MRef}),
{reply, Pid, State#state{overflow = Overflow + 1}};
[] when Block =:= false ->
{reply, full, State};
[] ->
MRef = erlang:monitor(process, FromPid),
Waiting = queue:in({From, CRef, MRef}, State#state.waiting),
{noreply, State#state{waiting = Waiting}}
end;
handle_call(status, _From, State) ->
#state{workers = Workers,
monitors = Monitors,
overflow = Overflow} = State,
StateName = state_name(State),
{reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State};
handle_call(get_avail_workers, _From, State) ->
Workers = State#state.workers,
{reply, Workers, State};
handle_call(get_all_workers, _From, State) ->
Sup = State#state.supervisor,
WorkerList = supervisor:which_children(Sup),
{reply, WorkerList, State};
handle_call(get_all_monitors, _From, State) ->
Monitors = ets:select(State#state.monitors,
[{{'$1', '_', '$2'}, [], [{{'$1', '$2'}}]}]),
{reply, Monitors, State};
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(_Msg, _From, State) ->
Reply = {error, invalid_message},
{reply, Reply, State}.
handle_info({'DOWN', MRef, _, _, _}, State) ->
case ets:match(State#state.monitors, {'$1', '_', MRef}) of
[[Pid]] ->
true = ets:delete(State#state.monitors, Pid),
NewState = handle_checkin(Pid, State),
{noreply, NewState};
[] ->
Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting),
{noreply, State#state{waiting = Waiting}}
end;
handle_info({'EXIT', Pid, _Reason}, State) ->
#state{supervisor = Sup,
monitors = Monitors} = State,
case ets:lookup(Monitors, Pid) of
[{Pid, _, MRef}] ->
true = erlang:demonitor(MRef),
true = ets:delete(Monitors, Pid),
NewState = handle_worker_exit(Pid, State),
{noreply, NewState};
[] ->
case lists:member(Pid, State#state.workers) of
true ->
W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers),
{noreply, State#state{workers = [new_worker(Sup) | W]}};
false ->
{noreply, State}
end
end;
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, State) ->
ok = lists:foreach(fun (W) -> unlink(W) end, State#state.workers),
true = exit(State#state.supervisor, shutdown),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
start_pool(StartFun, PoolArgs, WorkerArgs) ->
case proplists:get_value(name, PoolArgs) of
undefined ->
gen_server:StartFun(?MODULE, {PoolArgs, WorkerArgs}, []);
Name ->
gen_server:StartFun(Name, ?MODULE, {PoolArgs, WorkerArgs}, [])
end.
new_worker(Sup) ->
{ok, Pid} = supervisor:start_child(Sup, []),
true = link(Pid),
Pid.
new_worker(Sup, FromPid) ->
Pid = new_worker(Sup),
Ref = erlang:monitor(process, FromPid),
{Pid, Ref}.
dismiss_worker(Sup, Pid) ->
true = unlink(Pid),
supervisor:terminate_child(Sup, Pid).
prepopulate(N, _Sup) when N < 1 ->
[];
prepopulate(N, Sup) ->
prepopulate(N, Sup, []).
prepopulate(0, _Sup, Workers) ->
Workers;
prepopulate(N, Sup, Workers) ->
prepopulate(N-1, Sup, [new_worker(Sup) | Workers]).
handle_checkin(Pid, State) ->
#state{supervisor = Sup,
waiting = Waiting,
monitors = Monitors,
overflow = Overflow,
strategy = Strategy} = State,
case queue:out(Waiting) of
{{value, {From, CRef, MRef}}, Left} ->
true = ets:insert(Monitors, {Pid, CRef, MRef}),
gen_server:reply(From, Pid),
State#state{waiting = Left};
{empty, Empty} when Overflow > 0 ->
ok = dismiss_worker(Sup, Pid),
State#state{waiting = Empty, overflow = Overflow - 1};
{empty, Empty} ->
Workers = case Strategy of
lifo -> [Pid | State#state.workers];
fifo -> State#state.workers ++ [Pid]
end,
State#state{workers = Workers, waiting = Empty, overflow = 0}
end.
handle_worker_exit(Pid, State) ->
#state{supervisor = Sup,
monitors = Monitors,
overflow = Overflow} = State,
case queue:out(State#state.waiting) of
{{value, {From, CRef, MRef}}, LeftWaiting} ->
NewWorker = new_worker(State#state.supervisor),
true = ets:insert(Monitors, {NewWorker, CRef, MRef}),
gen_server:reply(From, NewWorker),
State#state{waiting = LeftWaiting};
{empty, Empty} when Overflow > 0 ->
State#state{overflow = Overflow - 1, waiting = Empty};
{empty, Empty} ->
Workers =
[new_worker(Sup)
| lists:filter(fun (P) -> P =/= Pid end, State#state.workers)],
State#state{workers = Workers, waiting = Empty}
end.
state_name(State = #state{overflow = Overflow}) when Overflow < 1 ->
#state{max_overflow = MaxOverflow, workers = Workers} = State,
case length(Workers) == 0 of
true when MaxOverflow < 1 -> full;
true -> overflow;
false -> ready
end;
state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) ->
full;
state_name(_State) ->
overflow.