Subversion Repositories SE.SVN

Rev

Blame | Last modification | View Log | RSS feed

-module(epgsql_replication_SUITE).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("epgsql.hrl").

-export([
    init_per_suite/1,
    all/0,
    end_per_suite/1,

    connect_in_repl_mode/1,
    create_drop_replication_slot/1,
    replication_sync/1,
    replication_async/1,

    %% Callbacks
    handle_x_log_data/4
]).

init_per_suite(Config) ->
    [{module, epgsql}|Config].

end_per_suite(_Config) ->
    ok.

all() ->
    [
     connect_in_repl_mode,
     create_drop_replication_slot,
     replication_async,
     replication_sync
    ].

connect_in_repl_mode(Config) ->
    epgsql_ct:connect_only(Config, ["epgsql_test_replication",
        "epgsql_test_replication",
        [{database, "epgsql_test_db1"}, {replication, "database"}]]).

create_drop_replication_slot(Config) ->
    Module = ?config(module, Config),
    epgsql_ct:with_connection(
        Config,
        fun(C) ->
            {ok, Cols, Rows} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),
            [#column{name = <<"slot_name">>}, #column{name = <<"consistent_point">>},
                #column{name = <<"snapshot_name">>}, #column{name = <<"output_plugin">>}] = Cols,
            [{<<"epgsql_test">>, _, _, <<"test_decoding">>}] = Rows,
            [{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
        end,
        "epgsql_test_replication",
        [{replication, "database"}]).

replication_async(Config) ->
    replication_test_run(Config, self()).

replication_sync(Config) ->
    replication_test_run(Config, ?MODULE).

replication_test_run(Config, Callback) ->
    Module = ?config(module, Config),
    epgsql_ct:with_connection(
        Config,
        fun(C) ->
            {ok, _, _} = Module:squery(C, "CREATE_REPLICATION_SLOT ""epgsql_test"" LOGICAL ""test_decoding"""),

            %% new connection because main id in a replication mode
            epgsql_ct:with_connection(
                Config,
                fun(C2) ->
                    [{ok, 1},{ok, 1}] = Module:squery(C2,
                        "insert into test_table1 (id, value) values (5, 'five');delete from test_table1 where id = 5;")
                end),

            Module:start_replication(C, "epgsql_test", Callback, {C, self()}, "0/0"),
            ok = receive_replication_msgs(
                [<<"table public.test_table1: INSERT: id[integer]:5 value[text]:'five'">>,
                    <<"table public.test_table1: DELETE: id[integer]:5">>], C, [])
        end,
        "epgsql_test_replication",
        [{replication, "database"}]),
    %% cleanup
    epgsql_ct:with_connection(
        Config,
        fun(C) ->
            [{ok, _, _}, {ok, _, _}] = Module:squery(C, "DROP_REPLICATION_SLOT ""epgsql_test""")
        end,
        "epgsql_test_replication",
        [{replication, "database"}]).

receive_replication_msgs(Pattern, Pid, ReceivedMsgs) ->
    receive
        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"BEGIN", _/binary>>}} ->
            receive_replication_msgs(Pattern, Pid, [begin_msg | ReceivedMsgs]);
        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, <<"COMMIT", _/binary>>}} ->
            case lists:reverse(ReceivedMsgs) of
                [begin_msg, row_msg | _] -> ok;
                _ -> error_replication_messages_not_received
            end;
        {epgsql, Pid, {x_log_data, _StartLSN, _EndLSN, Msg}} ->
            [Msg | T] = Pattern,
            receive_replication_msgs(T, Pid, [row_msg | ReceivedMsgs])
    after
        60000 ->
            error_timeout
    end.

handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
    {C, Pid} = CbState,
    Pid ! {epgsql, C, {x_log_data, StartLSN, EndLSN, Data}},
    {ok, EndLSN, EndLSN, CbState}.