2021년 1월 14일 목요일

[elk] logstash jdbc 플러그인 사용하기

[elk] logstash jdbc 플러그인 사용하기

logstash jdbc 플러그인 사용하기 (mariadb)

jdbc 플러그인일 이용하여 데이터베이스 데이터를 elasticsearch 에 주기적으로 갱신한다.

예를 들어 관리자의 로그인 기록을 elasticsearch 에 동기화 하는 상황일때.

테이블 구조

CREATE TABLE `login_log` (
	`login_log_id` INT(10) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '관리자 로그인 로그 시퀀스',
	`account_id` INT(10) UNSIGNED NOT NULL COMMENT '로그인 계정 시퀀스',
	`account_type_id` INT(10) UNSIGNED NOT NULL COMMENT '로그인 관리자 타입 시퀀스',
	`login_action_type` VARCHAR(1) NOT NULL COMMENT '액션 타입(0: 로그인, 1:로그아웃, 2:세션 타임아웃)',
	`login_device_type` VARCHAR(1) NOT NULL COMMENT '기기 타입(0: PC, 1: 모바일, 2:테블릿, 3:기타..)',
	`company_id` INT(10) UNSIGNED NOT NULL COMMENT '가맹점 아이디',
	`login_ip` VARCHAR(50) NOT NULL COMMENT '마지막 로그인 아이피',
	`register_date` DATETIME NOT NULL COMMENT '로그인 로그 등록 일시',
	`update_date` DATETIME NULL DEFAULT NULL COMMENT '로그인 로그 수정 일시',
	`use_yn` VARCHAR(1) NOT NULL DEFAULT 'Y' COMMENT '노출 여부( Y: 노출, N: 미노출)',
	PRIMARY KEY (`login_log_id`),
	INDEX `login_action_type` (`login_action_type`),
	INDEX `login_device_type` (`login_device_type`),
	INDEX `register_date` (`register_date`),
	INDEX `FK2_LOGIN_LOG_COMPANY_ID` (`company_id`),
	INDEX `FK2_LOGIN_LOG_ACCOUNT_TYPE_ID` (`account_type_id`),
	INDEX `FK1_LOGIN_LOG_ACCOUNT_ID` (`account_id`),
	CONSTRAINT `FK1_LOGIN_LOG_ACCOUNT_ID` FOREIGN KEY (`account_id`) REFERENCES `account` (`account_id`) ON UPDATE CASCADE ON DELETE CASCADE,
	CONSTRAINT `FK2_LOGIN_LOG_ACCOUNT_TYPE_ID` FOREIGN KEY (`account_type_id`) REFERENCES `account_type` (`account_type_id`)
)
COLLATE='utf8mb4_general_ci'
ENGINE=InnoDB
AUTO_INCREMENT=3207
;
  1. 인덱스 생성및 매핑 설정
    인덱스명은 그냥 dbms schema 명으로 만듦
PUT /advertise
{
 "settings": {
   "number_of_replicas": 1,
   "number_of_shards": 2
 },
 "mappings": {
   "properties": {
     "login_log_id" : {
       "type": "integer"
     },
     "account_id" : {
       "type": "integer"
     },
     "account__type_id" : {
       "type": "integer"
     },
     "login_action_type" : {
       "type": "keyword"
     },
     "login_device_type" : {
       "type": "keyword"
     },
     "company_id" : {
       "type": "integer"
     },
     "login_ip" : {
       "type": "text"
     },
     "register_date" : {
       "type": "date"
     },
     "update_date" : {
       "type": "date"
     },
     "use_yn" : {
       "type": "keyword"
     }
   }
 }
}
  1. logstash 설정 파일 생성
    이전 포스팅에서 만들어뒀던 files 디렉토리 그대로 사용.
cd /usr/share/logstash/files
vi jdbc.conf

jdbc.conf 파일 내용

input{
  jdbc {
    jdbc_driver_library => "/usr/local/server/advertise-apache-tomcat-8.5.43/webapps/advertise/WEB-INF/lib/mariadb-java-client-2.4.0.jar"
    jdbc_driver_class => "org.mariadb.jdbc.Driver"
    jdbc_connection_string => "jdbc:mariadb://localhost:3306/advertise?characterEncoding=UTF-8&allowMultiQueries=true"
    jdbc_user => "<디비계정>"
    jdbc_password => "<비밀번호>"
    schedule => "* * * * *"

    statement => "SELECT login_log_id, account_id, account_type_id, login_action_type, login_device_type, login_ip, register_date, update_date, use_yn FROM login_log WHERE 1=1 AND company_id = ? and login_log_id > ? order by login_log_id asc"

    prepared_statement_bind_values => [318, ":sql_last_value"]
    prepared_statement_name => "foobar"
    use_prepared_statements => true
    use_column_value => true
    tracking_column_type => "numeric"
    tracking_column => "login_log_id"
    record_last_run => true
  }
}

output {
  elasticsearch {
    index => "advertise"
    hosts => ["localhost:9200"]
    user => "<elasticsearch 계정>"
    password => "<elasticsearch 비밀번호>"
  }
}

옵션 설명
jdbc 관련된건 생략…

use_column_value 설정한컬럼을 sql_last_value 에 할당해서 쓰겟다는 설정
tracking_column_type 설정될 컬럼의 타입
tracking_column sql_last_value 로 설정할 컬럼명
record_last_run 기본값이 true 마지막 쿼리 기록 여부
prepared_statement_bind_values 준비된 쿼리 ? 파라미터에 순서대로 들어갈 값 배열로 :sql_last_value 는 동적 변수를 의미함.

위 설정에 대한 프로세스 를 설명하자면 매 분마다
로그인 로그 데이터를 가져와서 elasticsearch 에 인덱싱하는데 마지막 login_log_id 값을 내부적으로 저장하고 쿼리시 동적 파라미터로 전달해준다.
별도의 설정이 없으면 마지막 login_log_id 값은 홈경로에 .logstash_jdbc_last_run 파일에 기록된다.

주의할점 일반적으로 쿼리시 최근순서로 데이터를 뽑아온다고하면 login_log_id 값을 desc 로 출력하지만 elasticsearch 에 인덱싱할때는 asc 로 가져와야 제일 큰 login_log_id 값이 저장되어 다음 쿼리시 중복 저장되는거를 막을수 있다.

  1. 실행
cd /usr/share/logstash
bin/logstash -f files/jdbc.conf

정상작동

[INFO ] 2021-01-15 09:55:00.692 [Ruby-0-Thread-38: :1] jdbc - (0.015989s) PREPARE foobar: SELECT login_log_id, account_id, account_type_id, login_action_type, login_device_type, login_ip, register_date, update_date, use_yn FROM login_log WHERE 1=1 AND company_id = ? and login_log_id > ? order by login_log_id asc
[INFO ] 2021-01-15 09:55:00.711 [Ruby-0-Thread-38: :1] jdbc - (0.010707s) EXECUTE foobar; [318, 3223]

댓글 없음:

댓글 쓰기

[oracle]백업및 복구

[oracle]백업및 복구 오라클 덤프 백업및 복구 윈도우 서버 기반 간단 정리 --디렉터리 조회 sqlplus 또는 dbtool 입력시작 SELECT * FROM DBA_DIRECTORIES ; --D:...