회사에서 교육용으로 컨텐츠를 등록하고 나서 특정 문장이나 숫자들이 포함된 컨텐츠가 어디있는지 찾길 원하는 요구사항이 들어왔다. 단순히 제목 검색이 아니라 안에 담긴 내용을 검색해야해서 결과를 빠르게 찾기가 쉽지 않았다. 만약 DB에서 "When I started" 라는 문장이 포함된 컨텐츠를 찾으려 한다면 like %When I started% 로 검색을 해야하는데 이 같은 like 쿼리는 결과 조회가 매우 느리다. 이에 엘라스틱 서치하는 검색엔진을 이용해 빠른 조회를 할 수 있게 만들었다.
Elastic Search
Elasticsearch는 검색 및 분석을 위한 오픈소스 분산 검색 엔진이다. 분산 아키텍처를 기반으로 하며, RESTful API를 통해 데이터를 인덱싱하고 검색할 수 있다. 기존 데이터로 처리하기 힘든 대량의 비정형 데이터 검색이 가능하고, 전문검색과 구조 검색 모두 지원한다.
ES의 기본컨셉은 ‘역색인’ 이다. 일반적인 색인은 문서의 위치에 대한 index를 만들어서 빠르게 문서에 접근하는 것인데 역색인은 반대로 문서 내의 문자와 같은 내용물의 맵핑 정보를 색인 하는 것이다. 일반색인은 책에서 목차 같은 것이며, 역색인은 책 맨 뒤의 단어 별 색인 페이지라고 생각하면 된다. 한마디로 모든 정보에 인덱스가 달려서 빠른 검색을 보장하는 검색특화 DB 라고 생각하면 간단하다. ES에서는 하나의 테이블을 인덱스라고 부르고 한 row를 document 라는 단위로 부른다. 이 document id는 중복될 수 없고 자동으로 upsert를 지원한다.
Logstash
ES는 색인, 검색 기능만 제공하기 때문에 데이터 수집을 위한 도구로 Logstash를 사용한다. Logstash는 데이터를 수집-변환-저장을 해주는 서비스이다.
입력(input) > 필터(filter) > 출력(output)
입력에서 데이터를 받고 필터에서 데이터를 확장, 변경, 필터링 등 가공을 한 후에 출력에서 데이터를 ES로 전송해서 적재한다. 아래와 같이 conf 파일에서 input, filter, output을 정해주기만 하면 되서 간단하게 사용 할 수 있다.
.conf 파일
input {
jdbc {
jdbc_driver_library => "${WORKDIR}/lib/mysql-connector-java-8.0.26.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://${DB_USERNAME}:${DB_PASSWORD}@${DB_HOST}/${DB_DATABASE}"
jdbc_user => "${DB_USERNAME}"
jdbc_password => "${DB_PASSWORD}"
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
statement => "쿼리문"
schedule => "*/5 * * * * *"
last_run_metadata_path => "${WORKDIR}/.logstash_jdbc_last_run"
}
}
filter {
mutate {
copy => {"id" => "[@metadata][_id]"}
remove_field => ["auto_increase", "@version", "unix_ts_in_secs"]
}
}
output {
elasticsearch {
index => "${저장할인덱스이름}"
cloud_id => "${ELASTICSEARCH_CLOUD_ID}"
ssl => true
ilm_enabled => false
api_key => "${API_KEY}"
document_id => "%{[@metadata][_id]}"
}
}
입력에는 jdbc 옵션을 사용해 지정한 스케줄마다 쿼리를 실행할 수 있게 한다.
스케줄마다 쿼리를 실행하게 되면 어디까지 데이터를 넣었는지 어디서부터 새롭게 넣어야 할지 정보를 모르기 때문에 이를 tracking column을 이용해서 트래킹한다. 만약 트래킹 컬럼에 updated_at을 설정하고 order by updated_at 를 해준다면 로그스태시가 수집했던 데이터 중 가장 최근의 updated_at을 기억했다가(last_run_metadata_path에 저장됨) 그 이후로 업데이트가 일어나서 값이 변경된 컬럼부터 가져와서 적재하게 된다. statement에 가져올 데이터를 쿼리문으로 작성해넣으면 그게 로그스태시의 인풋 데이터가 된다.
이런 로그스태시 설정 파일들을 잘 작성해서 EKS 클러스터에 올리면 설정대로 알아서 주기마다 데이터를 끌어다 엘라스틱 서치에 넣는다.
이제 검색 작업시 메인 DB가 아닌 엘라스틱 서치api로 요청을 보내면 몇십초씩 걸려도 결과가 안나오던 느린 쿼리에서 몇초면 내가 원하는 값이 나오는 것을 볼 수 있을 것이다.
'데이터 엔지니어링' 카테고리의 다른 글
firebase + BigQuery 이벤트로그 쿼리 Tip (0) | 2023.06.19 |
---|---|
Airflow execution date와 실행시간에 대하여 (0) | 2023.05.10 |
통계데이터 파이프라인 구축기 (1) | 2023.03.03 |
m1 mac에 docker로 Airflow 실행시키기 (0) | 2023.02.06 |
ETL 파이프라인 구축기 (0) | 2023.01.21 |