PostgreSQL logical Replication Streaming이란,
논리 복제를 사용하여 실시간으로 외부 시스템으로 스트리밍 가능한 형태를 말한다.
위의 기술을 이용하여 PostgreSQL에서 발생한 이벤트(테이블 Insert / delete / update) 캐치하는 코드를 작성했다.
[개발 환경]
- Windows 10
- PostgreSQL 12
- 자바 8
- 스프링부트 2.3.0.RELEASE
[참고 자료]
PostgreSQL Doc에 나와있는 내용을 참고로 작성했다.
https://jdbc.postgresql.org/documentation/head/replication.html
[1. DB config 설정 변경]
논리 복제가 가능하도록 PostgreSQL config를 변경한다.
1.1 postgresql.conf 수정
# 동시 연결 최대 수 (복제 대상, 대기 서버) / 0 이면 복제를 비활성화
max_wal_senders = 10
# pg_xlog에 저장되는 과거 로그 파일 세그먼트의 최소 수
wal_keep_segments = 10
# 논리 복제 가능하도록 레벨 변경 (중요)
wal_level = logical
# 복제 슬롯 최대 수 (해당 슬롯을 통해 이벤트 캐치 로그가 왔다갔다 함 / 슬롯 분리 = 용도에 따라 이벤트 분리)
max_replication_slots = 10
1.2 pg_hba.conf 수정
host all all [networkIP ?.?.?].0/24 trust
host replication replication [networkIP ?.?.?.?]/32 md5
host replication all 127.0.0.1/32 md5
host replication all ::1/128 md5
[2. 코드 작성 (일부 설명)]
2.1 복제 연결 생성
org.postgresql.replication.PGReplicationConnectionorg.postgresql.PGConnection#getReplicationAPI
/* Logical Replication 관련 설정 */
Properties props = new Properties();
props.setProperty("url", pg.getUrl());
PGProperty.USER.set(props, pg.getUser());
PGProperty.PASSWORD.set(props, pg.getPassword());
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, pg.getAssumeMinServerVersion());
PGProperty.REPLICATION.set(props, pg.getReplication());
PGProperty.PREFER_QUERY_MODE.set(props, pg.getPreferQueryMode());
PGProperty.TCP_KEEP_ALIVE.set(props, pg.getTcpKeepAlive());
PGProperty.SOCKET_TIMEOUT.set(props, pg.getSocketTimeout());
Connection con = DriverManager.getConnection(pg.getUrl(), props);
PGConnection replConnection = con.unwrap(PGConnection.class);
2.2 복제 슬롯 생성
/* replicationSlot 생성 */
replConnection.getReplicationAPI()
.createReplicationSlot()
.logical()
.withSlotName(pg.getSlotName())
.withOutputPlugin(pg.getOutputPlugin())
.make();
con.setAutoCommit(pg.getAutoCommit());
2.3 복제 스트림 생성
/* 해당 replicationSlot을 통해 스트리밍으로 DB 변화 캐치 */
/* WAL EVENT */
PGReplicationStream stream =
replConnection.getReplicationAPI()
.replicationStream()
.logical()
.withSlotName(pg.getSlotName())
.withSlotOption("include-xids", pg.getIncludeXids())
.withSlotOption("skip-empty-xacts", pg.getSkipEmptyXacts())
.withSlotOption("include-rewrites", pg.getIncludeRewrites())
.withSlotOption("include-timestamp", pg.getIncludeTimestamp())
.withStatusInterval(pg.getStatusInterval(), TimeUnit.SECONDS)
.start();
[전체 코드]
PostgreSQLRunner
기동 시점부터 적용 (이벤트 캐치 수신)
package com.example.test.service;
import com.example.test.config.PgConfiguration;
import com.example.test.util.CommonUtil;
import lombok.extern.log4j.Log4j2;
import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.ReplicationSlotInfo;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
/**
* {기동과 용도 작성}
*
* @author
* @version 1.0, {작업내용}
* @since 2020-09-15
*/
@Log4j2
@Component
public class PostgreSQLRunner implements ApplicationRunner {
private final PgConfiguration pg;
private final CommonUtil commonUtil;
@Autowired
public PostgreSQLRunner(PgConfiguration pg, CommonUtil commonUtil) {
this.pg = pg;
this.commonUtil = commonUtil;
}
@Override
public void run(ApplicationArguments args) throws Exception {
/* Logical Replication 관련 설정 */
Properties props = new Properties();
props.setProperty("url", pg.getUrl());
PGProperty.USER.set(props, pg.getUser());
PGProperty.PASSWORD.set(props, pg.getPassword());
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, pg.getAssumeMinServerVersion());
PGProperty.REPLICATION.set(props, pg.getReplication());
PGProperty.PREFER_QUERY_MODE.set(props, pg.getPreferQueryMode());
PGProperty.TCP_KEEP_ALIVE.set(props, pg.getTcpKeepAlive());
PGProperty.SOCKET_TIMEOUT.set(props, pg.getSocketTimeout());
Connection con = DriverManager.getConnection(pg.getUrl(), props);
PGConnection replConnection = con.unwrap(PGConnection.class);
/* 재기동 시, replicationSlot 지우고 시작 */
// replConnection.getReplicationAPI().dropReplicationSlot(pg.getSlotName());
/* replicationSlot 생성 */
try {
replConnection.getReplicationAPI()
.createReplicationSlot()
.logical()
.withSlotName(pg.getSlotName())
.withOutputPlugin(pg.getOutputPlugin())
.make();
} catch (Exception e) {
log.error(e);
}
con.setAutoCommit(pg.getAutoCommit());
/* 해당 replicationSlot을 통해 스트리밍으로 DB 변화 캐치 */
/* WAL EVENT */
PGReplicationStream stream =
replConnection.getReplicationAPI()
.replicationStream()
.logical()
.withSlotName(pg.getSlotName())
.withSlotOption("include-xids", pg.getIncludeXids())
.withSlotOption("skip-empty-xacts", pg.getSkipEmptyXacts())
.withSlotOption("include-rewrites", pg.getIncludeRewrites())
.withSlotOption("include-timestamp", pg.getIncludeTimestamp())
.withStatusInterval(pg.getStatusInterval(), TimeUnit.SECONDS)
.start();
while (true) {
try {
/* 이벤트 캐치(버퍼 수신) */
ByteBuffer msg = stream.readPending();
if (msg == null) {
TimeUnit.MILLISECONDS.sleep(10L);
continue;
}
int offset = msg.arrayOffset();
byte[] source = msg.array();
int length = source.length - offset;
/* String str : 이벤트 발생 로그
형식 : [BEGIN] [table / action / data] [COMMIT (날짜_시간)]
*/
String str = new String(source, offset, length);
/* 실제 데이터 부분만 파싱 */
if (!str.contains("COMMIT") && !str.contains("BEGIN")) {
String[] temp = str.split(":", 3);
/* temp[0] : table "스키마"."테이블명"
* 따라서, table 문자열 삭제
* */
temp[0] = temp[0].replace("table ", "");
log.info("Total str = {}", str);
log.info("Table = {}", temp[0]);
log.info("Action = {}", temp[1]);
log.info("Data = {}", temp[2]);
// Data 부분 - 컬럼값만 추출
commonUtil.parsingParameter(temp[2]);
}
/* ByteBuffer 초기화 */
if (msg.hasRemaining()) {
msg.clear();
}
} catch (Exception e) {
log.error("ERROR = ", e);
} finally {
/* 버퍼의 실제 위치 값 업데이트 */
stream.setAppliedLSN(stream.getLastReceiveLSN());
stream.setFlushedLSN(stream.getLastReceiveLSN());
}
}
}
}
PgConfiguration
논리 복제 연결, 스트림 생성에 필요한 설정 정보
@ConfigurationProperties(prefix = "postgres")
public class PgConfiguration {
// DB 설정 정보
private String url;
private String user;
private String password;
// Logical Replication 설정 정보
private String assumeMinServerVersion;
private String replication;
private String preferQueryMode;
private Boolean tcpKeepAlive;
private int socketTimeout;
private Boolean autoCommit;
// Streaming Slot 설정 정보
private String slotName;
private Boolean includeXids;
private Boolean skipEmptyXacts;
private Boolean includeRewrites;
private String includeTimestamp;
private int statusInterval;
private String outputPlugin;
}
CommonUtil > parsingParameter
정규표현식을 사용해 필요한 데이터 추출
package com.example.test.util;
import lombok.extern.log4j.Log4j2;
import org.apache.http.util.TextUtils;
import org.springframework.stereotype.Component;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* {기동과 용도 작성}
*
* @author
* @version 1.0, {작업내용}
* @since 2020-09-15
*/
@Log4j2
@Component
public class CommonUtil {
/**
* .
*
* @param parameter Data 부분 문자열
* @return void
* @throws
*/
public void parsingParameter(String parameter) {
if (TextUtils.isEmpty(parameter)) {
log.error("parameter is NULL {}", parameter);
return;
}
Pattern pattern = Pattern.compile("['](.*?)[']");
Matcher matcher = pattern.matcher(parameter);
while (matcher.find()) {
log.info("Data = {}", matcher.group(1));
if(matcher.group(1) == null)
break;
}
}
}
application.yml
postgres:
# DB 정보
url: jdbc:postgresql://localhost:5432/postgres
user:
password:
# Logical Replication 설정 정보
assume-min-server-version: 11
replication: database
prefer-query-mode: simple
tcp-keep-alive: true
socket-timeout: 0
autoCommit: true
# Streaming Slot 설정 정보
slot-name: demo_logical_slot
skip-empty-xacts: true
include-rewrites: true
include-timestamp: on
include-xids: false
status-interval: 20
output-plugin: test_decoding
#SPRING 설정
spring:
datasource:
url: jdbc:postgresql://localhost:5432/postgres
driver-class-name: org.postgresql.Driver
username:
password:
[결과]
해당 데이터 INSERT 시, log를 통해 스프링에서 INSERT된 로그를 받는다.
INSERT INTO public.eventlog
(tname, act, col, crtdt, crttm)
VALUES('testName', 'testAct', 'testCol', 'crtdt', 'crttm');
[기타]
# wal_level 확인
show wal_level;
# 복제 슬롯 모두 보기
SELECT * FROM pg_replication_slots;
# 특정 복제 슬롯 삭제
SELECT pg_drop_replication_slot('demo_logical_slot');
# 복제 슬롯 최대 수 확인
show max_replication_slots ;
'Computer Science > Java' 카테고리의 다른 글
# 전자정부 프레임워크 세부 적용 규칙 - Intellij + SpringBoot 적용 (0) | 2020.11.01 |
---|---|
[intelliJ] 전자정부 프레임워크 구축(세팅)하기 (0) | 2020.10.14 |
[eclipse]전자정부 표준프레임워크 개발환경 구축하기 (0) | 2020.10.14 |
# Live templates(주석 자동완성) + File & Code template (0) | 2020.07.14 |
# Springboot + Mybatis + PostgreSQL DB 연동 (1) | 2020.06.28 |