logstash jdbc 플러그인 사용하기 (mariadb)
jdbc 플러그인일 이용하여 데이터베이스 데이터를 elasticsearch 에 주기적으로 갱신한다.
예를 들어 관리자의 로그인 기록을 elasticsearch 에 동기화 하는 상황일때.
테이블 구조
CREATE TABLE `login_log` (
`login_log_id` INT(10) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '관리자 로그인 로그 시퀀스',
`account_id` INT(10) UNSIGNED NOT NULL COMMENT '로그인 계정 시퀀스',
`account_type_id` INT(10) UNSIGNED NOT NULL COMMENT '로그인 관리자 타입 시퀀스',
`login_action_type` VARCHAR(1) NOT NULL COMMENT '액션 타입(0: 로그인, 1:로그아웃, 2:세션 타임아웃)',
`login_device_type` VARCHAR(1) NOT NULL COMMENT '기기 타입(0: PC, 1: 모바일, 2:테블릿, 3:기타..)',
`company_id` INT(10) UNSIGNED NOT NULL COMMENT '가맹점 아이디',
`login_ip` VARCHAR(50) NOT NULL COMMENT '마지막 로그인 아이피',
`register_date` DATETIME NOT NULL COMMENT '로그인 로그 등록 일시',
`update_date` DATETIME NULL DEFAULT NULL COMMENT '로그인 로그 수정 일시',
`use_yn` VARCHAR(1) NOT NULL DEFAULT 'Y' COMMENT '노출 여부( Y: 노출, N: 미노출)',
PRIMARY KEY (`login_log_id`),
INDEX `login_action_type` (`login_action_type`),
INDEX `login_device_type` (`login_device_type`),
INDEX `register_date` (`register_date`),
INDEX `FK2_LOGIN_LOG_COMPANY_ID` (`company_id`),
INDEX `FK2_LOGIN_LOG_ACCOUNT_TYPE_ID` (`account_type_id`),
INDEX `FK1_LOGIN_LOG_ACCOUNT_ID` (`account_id`),
CONSTRAINT `FK1_LOGIN_LOG_ACCOUNT_ID` FOREIGN KEY (`account_id`) REFERENCES `account` (`account_id`) ON UPDATE CASCADE ON DELETE CASCADE,
CONSTRAINT `FK2_LOGIN_LOG_ACCOUNT_TYPE_ID` FOREIGN KEY (`account_type_id`) REFERENCES `account_type` (`account_type_id`)
)
COLLATE='utf8mb4_general_ci'
ENGINE=InnoDB
AUTO_INCREMENT=3207
;
- 인덱스 생성및 매핑 설정
인덱스명은 그냥 dbms schema 명으로 만듦
PUT /advertise
{
"settings": {
"number_of_replicas": 1,
"number_of_shards": 2
},
"mappings": {
"properties": {
"login_log_id" : {
"type": "integer"
},
"account_id" : {
"type": "integer"
},
"account__type_id" : {
"type": "integer"
},
"login_action_type" : {
"type": "keyword"
},
"login_device_type" : {
"type": "keyword"
},
"company_id" : {
"type": "integer"
},
"login_ip" : {
"type": "text"
},
"register_date" : {
"type": "date"
},
"update_date" : {
"type": "date"
},
"use_yn" : {
"type": "keyword"
}
}
}
}
- logstash 설정 파일 생성
이전 포스팅에서 만들어뒀던 files 디렉토리 그대로 사용.
cd /usr/share/logstash/files
vi jdbc.conf
jdbc.conf 파일 내용
input{
jdbc {
jdbc_driver_library => "/usr/local/server/advertise-apache-tomcat-8.5.43/webapps/advertise/WEB-INF/lib/mariadb-java-client-2.4.0.jar"
jdbc_driver_class => "org.mariadb.jdbc.Driver"
jdbc_connection_string => "jdbc:mariadb://localhost:3306/advertise?characterEncoding=UTF-8&allowMultiQueries=true"
jdbc_user => "<디비계정>"
jdbc_password => "<비밀번호>"
schedule => "* * * * *"
statement => "SELECT login_log_id, account_id, account_type_id, login_action_type, login_device_type, login_ip, register_date, update_date, use_yn FROM login_log WHERE 1=1 AND company_id = ? and login_log_id > ? order by login_log_id asc"
prepared_statement_bind_values => [318, ":sql_last_value"]
prepared_statement_name => "foobar"
use_prepared_statements => true
use_column_value => true
tracking_column_type => "numeric"
tracking_column => "login_log_id"
record_last_run => true
}
}
output {
elasticsearch {
index => "advertise"
hosts => ["localhost:9200"]
user => "<elasticsearch 계정>"
password => "<elasticsearch 비밀번호>"
}
}
옵션 설명
jdbc 관련된건 생략…
use_column_value 설정한컬럼을 sql_last_value 에 할당해서 쓰겟다는 설정
tracking_column_type 설정될 컬럼의 타입
tracking_column sql_last_value 로 설정할 컬럼명
record_last_run 기본값이 true 마지막 쿼리 기록 여부
prepared_statement_bind_values 준비된 쿼리 ? 파라미터에 순서대로 들어갈 값 배열로 :sql_last_value 는 동적 변수를 의미함.
위 설정에 대한 프로세스 를 설명하자면 매 분마다
로그인 로그 데이터를 가져와서 elasticsearch 에 인덱싱하는데 마지막 login_log_id 값을 내부적으로 저장하고 쿼리시 동적 파라미터로 전달해준다.
별도의 설정이 없으면 마지막 login_log_id 값은 홈경로에 .logstash_jdbc_last_run 파일에 기록된다.
주의할점 일반적으로 쿼리시 최근순서로 데이터를 뽑아온다고하면 login_log_id 값을 desc 로 출력하지만 elasticsearch 에 인덱싱할때는 asc 로 가져와야 제일 큰 login_log_id 값이 저장되어 다음 쿼리시 중복 저장되는거를 막을수 있다.
- 실행
cd /usr/share/logstash
bin/logstash -f files/jdbc.conf
정상작동
[INFO ] 2021-01-15 09:55:00.692 [Ruby-0-Thread-38: :1] jdbc - (0.015989s) PREPARE foobar: SELECT login_log_id, account_id, account_type_id, login_action_type, login_device_type, login_ip, register_date, update_date, use_yn FROM login_log WHERE 1=1 AND company_id = ? and login_log_id > ? order by login_log_id asc
[INFO ] 2021-01-15 09:55:00.711 [Ruby-0-Thread-38: :1] jdbc - (0.010707s) EXECUTE foobar; [318, 3223]
댓글 없음:
댓글 쓰기