Blame | Last modification | View Log | RSS feed
-module(epgsql_replication_SUITE).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("epgsql.hrl").
-export([
init_per_suite/1,
all/0,
end_per_suite/1,
connect_in_repl_mode/1,
create_drop_replication_slot/1,
replication_sync/1,
replication_async/1,
%% Callbacks
handle_x_log_data/4
]).
init_per_suite(Config) ->
[{module, epgsql}|Config].
end_per_suite(_Config) ->
ok.
all() ->
[
connect_in_repl_mode,
create_drop_replication_slot,
replication_async,
replication_sync
].
connect_in_repl_mode(Config) ->
epgsql_ct:connect_only(Config, ["epgsql_test_replication",
"epgsql_test_replication",
[{database, "epgsql_test_db1"}, {replication, "database"}]]).
create_drop_replication_slot(Config) ->
Module = ?config(module, Config),
epgsql_ct:with_connection(
Config,
fun(C) ->
{ok, Cols, Rows} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
[#column{name = <<"slot_name">>}, #column{name = <<"consistent_point">>},
#column{name = <<"snapshot_name">>}, #column{name = <<"output_plugin">>}] = Cols,
[{<<"epgsql_test">>, _, _, <<"test_decoding">>}] = Rows,
[{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
end,
"epgsql_test_replication",
[{replication, "database"}]).
replication_async(Config) ->
replication_test_run(Config, self()).
replication_sync(Config) ->
replication_test_run(Config, ?MODULE).
replication_test_run(Config, Callback) ->
Module = ?config(module, Config),
epgsql_ct:with_connection(
Config,
fun(C) ->
{ok, _, _} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
%% new connection because main id in a replication mode
epgsql_ct:with_connection(
Config,
fun(C2) ->
[{ok, 1},{ok, 1}] = Module:squery(C2,
"insert into test_table1 (id, value) values (5, 'five');delete from test_table1 where id = 5;")
end),
Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
ok = receive_replication_msgs(
[<<"table public.test_table1: INSERT: id[integer]:5 value[text]:'five'">>,
<<"table public.test_table1: DELETE: id[integer]:5">>], C, [])
end,
"epgsql_test_replication",
[{replication, "database"}]),
%% cleanup
epgsql_ct:with_connection(
Config,
fun(C) ->
[{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
end,
"epgsql_test_replication",
[{replication, "database"}]).
receive_replication_msgs(Pattern, Pid, ReceivedMsgs) ->
receive
{epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
receive_replication_msgs(Pattern, Pid, [begin_msg | ReceivedMsgs]);
{epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
case lists:reverse(ReceivedMsgs) of
[begin_msg, row_msg | _] -> ok;
_ -> error_replication_messages_not_received
end;
{epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
[Msg | T] = Pattern,
receive_replication_msgs(T, Pid, [row_msg | ReceivedMsgs])
after
60000 ->
error_timeout
end.
handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
{C, Pid} = CbState,
Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
{ok, EndLSN, EndLSN, CbState}.