본문으로 바로가기

PostgreSQL logical Replication Streaming이란,

논리 복제를 사용하여 실시간으로 외부 시스템으로 스트리밍 가능한 형태를 말한다.

 

위의 기술을 이용하여 PostgreSQL에서 발생한 이벤트(테이블 Insert / delete / update) 캐치하는 코드를 작성했다.

 

[개발 환경]

- Windows 10

- PostgreSQL 12

- 자바 8

- 스프링부트 2.3.0.RELEASE

 

[참고 자료] 

PostgreSQL Doc에 나와있는 내용을 참고로 작성했다.

https://jdbc.postgresql.org/documentation/head/replication.html

 

[1. DB config 설정 변경]

논리 복제가 가능하도록 PostgreSQL config를 변경한다.

 

1.1 postgresql.conf 수정

# 동시 연결 최대 수 (복제 대상, 대기 서버) / 0 이면 복제를 비활성화
max_wal_senders = 10

# pg_xlog에 저장되는 과거 로그 파일 세그먼트의 최소 수
wal_keep_segments = 10

# 논리 복제 가능하도록 레벨 변경 (중요)
wal_level = logical

# 복제 슬롯 최대 수 (해당 슬롯을 통해 이벤트 캐치 로그가 왔다갔다 함 / 슬롯 분리 = 용도에 따라 이벤트 분리)
max_replication_slots = 10

1.2 pg_hba.conf 수정

host    all             all             [networkIP ?.?.?].0/24    trust

host    replication     replication [networkIP ?.?.?.?]/32    md5
host    replication     all             127.0.0.1/32            md5
host    replication     all             ::1/128                 md5

 

[2. 코드 작성 (일부 설명)]

2.1 복제 연결 생성

org.postgresql.replication.PGReplicationConnectionorg.postgresql.PGConnection#getReplicationAPI

/* Logical Replication 관련 설정 */
Properties props = new Properties();
props.setProperty("url", pg.getUrl());
PGProperty.USER.set(props, pg.getUser());
PGProperty.PASSWORD.set(props, pg.getPassword());
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, pg.getAssumeMinServerVersion());
PGProperty.REPLICATION.set(props, pg.getReplication());
PGProperty.PREFER_QUERY_MODE.set(props, pg.getPreferQueryMode());
PGProperty.TCP_KEEP_ALIVE.set(props, pg.getTcpKeepAlive());
PGProperty.SOCKET_TIMEOUT.set(props, pg.getSocketTimeout());

Connection con = DriverManager.getConnection(pg.getUrl(), props);
PGConnection replConnection = con.unwrap(PGConnection.class);

 

2.2 복제 슬롯 생성

/* replicationSlot 생성 */
replConnection.getReplicationAPI()
        .createReplicationSlot()
        .logical()
        .withSlotName(pg.getSlotName())
        .withOutputPlugin(pg.getOutputPlugin())
        .make();

con.setAutoCommit(pg.getAutoCommit());

 

2.3 복제 스트림 생성

/* 해당 replicationSlot을 통해 스트리밍으로 DB 변화 캐치 */
/* WAL EVENT */
PGReplicationStream stream =
        replConnection.getReplicationAPI()
                .replicationStream()
                .logical()
                .withSlotName(pg.getSlotName())
                .withSlotOption("include-xids", pg.getIncludeXids())
                .withSlotOption("skip-empty-xacts", pg.getSkipEmptyXacts())
                .withSlotOption("include-rewrites", pg.getIncludeRewrites())
                .withSlotOption("include-timestamp", pg.getIncludeTimestamp())
                .withStatusInterval(pg.getStatusInterval(), TimeUnit.SECONDS)
                .start();

 

[전체 코드]

 

PostgreSQLRunner

기동 시점부터 적용 (이벤트 캐치 수신)

package com.example.test.service;

import com.example.test.config.PgConfiguration;
import com.example.test.util.CommonUtil;
import lombok.extern.log4j.Log4j2;
import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.ReplicationSlotInfo;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * {기동과 용도 작성}
 *
 * @author 
 * @version 1.0, {작업내용}
 * @since 2020-09-15
 */

@Log4j2
@Component
public class PostgreSQLRunner implements ApplicationRunner {

    private final PgConfiguration pg;
    private final CommonUtil commonUtil;

    @Autowired
    public PostgreSQLRunner(PgConfiguration pg, CommonUtil commonUtil) {
        this.pg = pg;
        this.commonUtil = commonUtil;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {

        /* Logical Replication 관련 설정 */
        Properties props = new Properties();
        props.setProperty("url", pg.getUrl());
        PGProperty.USER.set(props, pg.getUser());
        PGProperty.PASSWORD.set(props, pg.getPassword());
        PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, pg.getAssumeMinServerVersion());
        PGProperty.REPLICATION.set(props, pg.getReplication());
        PGProperty.PREFER_QUERY_MODE.set(props, pg.getPreferQueryMode());
        PGProperty.TCP_KEEP_ALIVE.set(props, pg.getTcpKeepAlive());
        PGProperty.SOCKET_TIMEOUT.set(props, pg.getSocketTimeout());

        Connection con = DriverManager.getConnection(pg.getUrl(), props);
        PGConnection replConnection = con.unwrap(PGConnection.class);

        /* 재기동 시, replicationSlot 지우고 시작 */
        // replConnection.getReplicationAPI().dropReplicationSlot(pg.getSlotName());

        /* replicationSlot 생성 */
        try {
            replConnection.getReplicationAPI()
                    .createReplicationSlot()
                    .logical()
                    .withSlotName(pg.getSlotName())
                    .withOutputPlugin(pg.getOutputPlugin())
                    .make();
        } catch (Exception e) {
                log.error(e);
        }

        con.setAutoCommit(pg.getAutoCommit());

        /* 해당 replicationSlot을 통해 스트리밍으로 DB 변화 캐치 */
        /* WAL EVENT */
        PGReplicationStream stream =
                replConnection.getReplicationAPI()
                        .replicationStream()
                        .logical()
                        .withSlotName(pg.getSlotName())
                        .withSlotOption("include-xids", pg.getIncludeXids())
                        .withSlotOption("skip-empty-xacts", pg.getSkipEmptyXacts())
                        .withSlotOption("include-rewrites", pg.getIncludeRewrites())
                        .withSlotOption("include-timestamp", pg.getIncludeTimestamp())
                        .withStatusInterval(pg.getStatusInterval(), TimeUnit.SECONDS)
                        .start();


        while (true) {
            try {
                /* 이벤트 캐치(버퍼 수신) */
                ByteBuffer msg = stream.readPending();

                if (msg == null) {
                    TimeUnit.MILLISECONDS.sleep(10L);
                    continue;
                }

                int offset = msg.arrayOffset();
                byte[] source = msg.array();
                int length = source.length - offset;

                /* String str : 이벤트 발생 로그
                  형식 : [BEGIN]  [table / action / data]  [COMMIT (날짜_시간)]
                */
                String str = new String(source, offset, length);
                /* 실제 데이터 부분만 파싱 */
                if (!str.contains("COMMIT") && !str.contains("BEGIN")) {

                    String[] temp = str.split(":", 3);

                    /* temp[0] : table "스키마"."테이블명"
                     *  따라서, table 문자열 삭제
                     *  */
                    temp[0] = temp[0].replace("table ", "");

                    log.info("Total str = {}", str);
                    log.info("Table = {}", temp[0]);
                    log.info("Action = {}", temp[1]);
                    log.info("Data = {}", temp[2]);

                    // Data 부분 - 컬럼값만 추출
                    commonUtil.parsingParameter(temp[2]);
                }
                /* ByteBuffer 초기화 */
                if (msg.hasRemaining()) {
                    msg.clear();
                }

            } catch (Exception e) {
                log.error("ERROR = ", e);
            } finally {
                /* 버퍼의 실제 위치 값 업데이트 */
                stream.setAppliedLSN(stream.getLastReceiveLSN());
                stream.setFlushedLSN(stream.getLastReceiveLSN());
            }
        }
    }
}

 

PgConfiguration 

논리 복제 연결, 스트림 생성에 필요한 설정 정보

@ConfigurationProperties(prefix = "postgres")
public class PgConfiguration {
    // DB 설정 정보
    private String url;
    private String user;
    private String password;
    // Logical Replication 설정 정보
    private String assumeMinServerVersion;
    private String replication;
    private String preferQueryMode;
    private Boolean tcpKeepAlive;
    private int socketTimeout;
    private Boolean autoCommit;
    // Streaming Slot 설정 정보
    private String slotName;
    private Boolean includeXids;
    private Boolean skipEmptyXacts;
    private Boolean includeRewrites;
    private String includeTimestamp;
    private int statusInterval;
    private String outputPlugin;
}

 

CommonUtil > parsingParameter

정규표현식을 사용해 필요한 데이터 추출

package com.example.test.util;

import lombok.extern.log4j.Log4j2;
import org.apache.http.util.TextUtils;
import org.springframework.stereotype.Component;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * {기동과 용도 작성}
 *
 * @author 
 * @version 1.0, {작업내용}
 * @since 2020-09-15
 */

@Log4j2
@Component
public class CommonUtil {

    /**
     * .
     *
     * @param parameter Data 부분 문자열
     * @return void
     * @throws
     */
    public void parsingParameter(String parameter) {

        if (TextUtils.isEmpty(parameter)) {
            log.error("parameter is NULL {}", parameter);
            return;
        }

        Pattern pattern = Pattern.compile("['](.*?)[']");

        Matcher matcher = pattern.matcher(parameter);
        while (matcher.find()) {
            log.info("Data = {}", matcher.group(1));

            if(matcher.group(1) == null)
                break;
        }
    }
}

 

application.yml

postgres:
  # DB 정보
  url: jdbc:postgresql://localhost:5432/postgres
  user: 
  password: 
  # Logical Replication 설정 정보
  assume-min-server-version: 11
  replication: database
  prefer-query-mode: simple
  tcp-keep-alive: true
  socket-timeout: 0
  autoCommit: true
  # Streaming Slot 설정 정보
  slot-name: demo_logical_slot
  skip-empty-xacts: true
  include-rewrites: true
  include-timestamp: on
  include-xids: false
  status-interval: 20
  output-plugin: test_decoding

#SPRING 설정
spring:
  datasource:
    url: jdbc:postgresql://localhost:5432/postgres
    driver-class-name: org.postgresql.Driver
    username: 
    password: 

 

[결과]

해당 데이터 INSERT 시, log를 통해 스프링에서 INSERT된 로그를 받는다.

INSERT INTO public.eventlog
(tname, act, col, crtdt, crttm)
VALUES('testName', 'testAct', 'testCol', 'crtdt', 'crttm');

 

[기타]

# wal_level 확인

show wal_level;

# 복제 슬롯 모두 보기

SELECT * FROM pg_replication_slots;

# 특정 복제 슬롯 삭제

SELECT pg_drop_replication_slot('demo_logical_slot');

# 복제 슬롯 최대 수 확인

show max_replication_slots ;