Search

데이터 다루기

목차

데이터 다루기

인덱스에 실제로 문서를 색인, 조회, 업데이트, 삭제하는 방법에 대한 학습
다양한 검색쿼리, 페이지네이션, 집계에 대해서 학습
JVM 서비스 코드에서 ES Client를 이용해 ES API를 호출하는 방법 학습

색인

PUT [인덱스 이름]/_doc/[_id값] POST [인덱스 이름]/_doc PUT [인덱스 이름]/_create/[_id값] CREATE [인덱스 이름]/_create/[_id값]
Bash
복사
ID값이 있다면 PUT, 이미 해당 _id값으로 문서가 있다면 덮어씌운다
ID값이 없다면 POST, _id는 ES에서 임의 지정
_create는 항상 새 문서를 생성하며 _id가 이미 존재하면 덮어쓰기 하지 않고 실패한다.

refresh

색인 시 refresh 매개변수를 통해 색인 직후 샤드 refresh를 수행해 즉시 검색 가능 여부를 설정할 수 있다.
refresh 속성
true: 색인 직후 문서가 색인된 shard refresh하고 응답 반환
wait_for: 색인 이후 refresh 될 때까지 기다린 후 응답 반환
false(default): refresh 동작을 수행하지 않는다.
잦은 refresh는 비용낭비일뿐 아니라 크기가 작은 세그먼트가 많이 생성될 수 있어 검색 성능도 떨어진다.
문서 색인 요청 결과가 검색 역색인에 즉시 동시적으로 반영되야 하는 요청을 최소화하도록 설계하자.
즉시 확인이 필요한 경우 검색보단 조회 API를 활용하자.
대량의 색인이 필요한 경우 bulk API를 사용하자

조회

GET [인덱스 이름]/_doc/[_id값] GET [인덱스 이름]/_source/[_id값]
Bash
복사
_doc은 인덱스나 _id와 같은 메타데이터를 같이 조회한다.
_source는 문서의 본문만을 반환한다.

조회 - 필터링

_source_includes, _source_excludes 옵션으로 필드 필터링을 할 수 있다.
GET my_index2/_source/1?_source_includes=p*,views # 검색 결과 #{ # "views": 1234, # "public": true, # "point": 4.5 #} GET my_index2/_source/1?_source_includes=p*,views&_source_excludes=public # 검색 결과 #{ # "views": 1234, # "point": 4.5 #}
Bash
복사
_source_includes 만 사용할때는 _source로 줄여서 사용할 수 있다.

업데이트

POST [인덱스 이름]/_update/[_id값]
Bash
복사
doc을 이용해 업데이트하기
POST [인덱스 이름]/_update/[_id값] { "doc": { [업데이트할 내용] } }
Bash
복사
detect_noop 속성을활용하자
변경 요청은 했지만 실제로 변경된게 없는 경우를 noop이라 하는데, 데이터 특성상 noop이 발생하 ㄹ가능성이 없는 경우 noop 설정을 비활성화해서 약간의 성능 향상을 할 수 있다.
doc_as_upsert를 활용하자
SQL의 upsert와 같은 동작을 하도록 한다. default는 false이다.
script를 이용해 업데이트하기
put update_test/_doc/1 { "title":"hello world", "views": 35, "created":"2019-01-17T14:05:01.234Z" } POST update_test/_update/1 { "script":{ "source": "ctx._source.views += params.amount", "lang":"painless", "params":{ "amount":1 } }, "scripted_upsert": false } get update_test/_doc/1
Bash
복사
ES 자체 스크립트 painless를 이용해 스크립트를 작성해 업데이트 할 수 있다.

삭제

DELETE [인덱스 이름]/_doc/[_id값]
Bash
복사

복수 문서 API

bulk API

여러 색인, 업데이트, 삭제 작업을 한 번의 요청에 담아서 보내는 API
JSON이아니라 NDJSON형태로 데이터를 만들어 보낸다.
Content-Type도 application/x-ndjson을 사용해야 한다.
마지막 줄도 줄바꿈 문자 \n으로 끝나야 한다.
POST _bulk {"index":{"_index":"bulk_test","_id":"1"}} {"field1":"value1"} {"delete":{"_index":"bulk_test","_id":"2"}} {"create":{"_index":"bulk_test","_id":"3"}} {"field1":"value3"} {"update":{"_index":"bulk_test","_id":"1"}} {"doc":{"field2":"value2"}} {"index":{"_index":"bulk_test","_id":"4", "routing":"a"}} {"field1":"value4"}
Bash
복사
bulk API 는 기본적으로 순서를 보장하지 않는다. 순서를 가지도록 하고 싶을 경우 인덱스, _id, 라우팅 조합을 동일하게 가져간다면 요청들은 모두 동일한 주 샤드로 넘어갈테고 순서대로 수행될 수 있다.

multi get

한 번에 여러 문서를 조회할 수 있다.
GET _mget GET [인덱스 이름]/_mget
Bash
복사
GET _mget { "docs": [ { "_index":"bulk_test", "_id":1 }, { "_index": "bulk_test", "_id": 4, "routing":"a" }, { "_index":"my_index2", "_id":"1", "_source": { "include": [ "p*" ], "exclude": [ "point" ] } } ] } GET bulk_test/_mget { "ids": ["1", "3"] }
Bash
복사

update by query

검색 쿼리 조건에 맞는 복수 문서를 찾아 업데이트 및 삭제 작업을 수행할 수 있다.
script를 통한 업데이트만을 지원한다.
검색 결과 문서들에 대해 스냅샷을 찍고 업데이트를 진행하는데, 업데이트 작업 중 스냅샷을 찍웠던 문서에서 다른 요청으로 인해 변경이 생겨 버전 충돌 문제가 생기면 그만두거나 다음 작업으로 넘어가거나 선택할 수 있다.
conflicts 매개변수를 지정해 동작을 지정할 수 있다.
abort (충돌 시 중단), proceed (다음 작업으로 진행)
POST [인덱스 이름]/_update_by_query { "script": { "source": " //..", }, "query": { //... } }
Bash
복사

update by query - 스로틀링

update 작업은 운영 중인 서비스에 영향을 줄 수 있기에 스로틀링을 통해 작업 속도를 조정할 수 있다.
클러스터 부하와 서비스 영향을 최소화하는 목적
POST [인덱스 이름]/_update_by_query ?scroll_size=1000 &scroll=1m &requests_per_second=500 { //... }
Bash
복사
scroll_size: 한번에 가져올 업데이트 대상 문서의 갯수
scroll: 검색 조건을 만족한 모든 문서에 대한 상태를 검색 문맥(search context)를 보존할 시간
requests_per_second: 초당 수행할 작업의 갯수
위 설정(1000, 500)기준으로는 ES는 2초마다 스크롤 한 번 분량만큼 업데이트 한다.
1000개에 문서에 대한 업데이트 작업 수행 시간이 0.5초일 경우 작업 수행 후 1.5초를 대기하면 총 2초간 1000개를 업데이트 한 것이 되어 초당 500개의 작업을 수행한 것이 된다.

비동기적 요청과 tasks API

update by query 작업을 wait_for_completion 매개변수를 false로 지정해 비동기로 처리할 수 있다.
ES에선 작업을 task로 등록하고 task의 id를 응답에 포함시켜 반환한다.
클라이언트는 이 taskId를 통해 작업 진행상황을 조회하거나 취소할 수 있다.
POST [인덱스 이름]/_update_by_query?wait_for_completion=false { //... } # 응답 { "task": "Iqb0AujZQViife0-AOlyPg:1650433" } # task 상태 확인 GET .tasks/_doc/[task id] GET _tasks/[task id] # task 작업 취소 POST _tasks/[task id]/_cancel # task 결과 삭제 DELETE .tasks/_doc/[task id]
Bash
복사

슬라이싱

스로틀링으로 부하를 줄이는 대신 업데이트 성능을 올려 빠른 시간내에 작업을 끝내고자 할 때 선택
POST [인덱스 이름]/_update_by_query?slices=auto { //... }
Bash
복사
기본값은 1로 작업을 병렬로 쪼개지 않는다는 의미
auto로 지정하면 보통 지정한 인덱스의 주 샤드 수만큼 지정된다.

delete by query

update by query처럼 지정한 검색 쿼리로 삭제할 대상을 지정한 뒤 삭제를 수행한다.
동작 방식이나 충돌 처리, 비동기 처리등에 대한 속성은 update by query와 동일하다.
POST [인덱스 이름]/_delete_by_query { "query":{ //... } }
Bash
복사

검색

GET [인덱스 이름]/_search POST [인덱스 이름]/_search GET _search POST _search
Bash
복사
index 이름은 하나 이상을 작성할 수 있고 와일드 카드(*)사용도 가능하다.
GET my_index*,analyzer_test*,mapping_test/_search

쿼리 DSL 검색, 쿼리 문자열 검색

쿼리 DSL 검색 : query 필드에 원하는 쿼리와 질의를 작성해서 호출할 수 있다.
GET my_index/_search { "query": { "match": { "title": "hello" } } }
Bash
복사
쿼리 문자열 검색 : 루씬 쿼리 문자열을 지정하는 방법으로 수행하는 방법
GET my_index/_search?q=title:hello
Bash
복사
기타 문법
문법
설명
예시
질의어만 기술
전체 필드 검색
hello
필드이름:질의어
지정 필드 대상 검색
title:hello title:(hello OR world)
_exists_:필드이름
특정 필드가 존재하는 문서 검색
_exists_:title
필드이름:[시작 to 끝] 필드이름:{시작 to 끝}
범위 검색, 경계값이 없으면 *을 사용한다.
data:[2021-05-03 to 2021-09-03] count:{10 to 20} date:{* to 2021-09-03} count:[20 to *]
질의어 * or ?
와일드 카드를 포함한 쿼리는 매우 느리고 위험(사용 비권장)
title:hello*
AND, OR, NOT, ( )
쿼리에 연산 적용, ( ) 로 연산 순서 지정
(title:(hello OR world)) AND (contents: NOT bye) OR count: [1 to 3]

match 쿼리

지정한 필드의 내용이 질의어와 매치되는 문서를 찾는 쿼리
필드가 text라면 필드의 값과 질의어 모두 애널라이저로 분석된다.
GET my_index/_search { "query": { "match": { "fieldName": { "query": "test query sentence" } } } }
Bash
복사
fieldName 필드가 text타입이고 기본 애널라이저(standard)라면 질의어(test query sentence)는 test, query, sentence 총 3개의 토큰으로 분석될 것이다.
default는 OR 검색이다. AND로 바꾸고 싶다면 operator 속성을 and로 지정하면 된다.

term 쿼리

지정한 필드의 값이 질의어와 일치하는 문서를 찾는 쿼리
GET my_index/_search { "query": { "term": { "fieldName": { "value": "hello" } } } }
Bash
복사
지정할 필드가 하나 이상이라면 term대신 terms를 사용하면 된다.
GET my_index/_search { "query": { "terms": { "fieldName": ["hello", "world"] } } }
Bash
복사

range 쿼리

범위 검색 쿼리
GET [인덱스 이름]/_search { "query": { "range": { "fieldName": { "gte": 100, "lt": 200 } } } }
Bash
복사
gt(greater than) lt(less than) gte(greater than or equal to) lte(less than or equal to)
역시 부하가 크기에 부담이 없는 상황에서만 사용해야 한다.

prefix 쿼리

필드의 값이 지정한 질의어로 시작하는 문서 검색
GET [인덱스 이름]/_search { "query": { "prefix": { "fieldName": { "value": "hello" } } } }
Bash
복사
무거운 쿼리로 분류되며 단발성 쿼리로 가끔 수행한다면 괜찮지만 일상 쿼리로는 적절치 않다.
매핑에 index_prefixs 설정을 추가해 길이 제한을 해서 성능을 높힐 수 있다.
PUT prefix_mapping_test { "mappings": { "properties": { "prefixField": { "type": "text" "index_prefixes": { "min_chars":3, "max_chars":5, } } } } }
Bash
복사

exists 쿼리

필드가 존재하는 문서를 검색한다.
GET [인덱스 이름]/_search { "query": { "exists": { "field": "fieldName" } } }
Bash
복사

bool 쿼리

쿼리 조합 검색 쿼리
GET _search { "query": { "bool": { "must": [ {"term": {"field1": {"value": "hello"}}}, {"term": {"field2": {"value": "world"}}} ], "must_not": [ {"term": {"field4": {"value": "elasticsearch-test"}}} ], "filter":[ {"term": {"field3": {"value": true}}} ], "should": [ {"match": {"field4": {"query": "elasticsearch"}}}, {"match": {"field5": {"query": "lucene"}}} ], "minimum_should_match": 1 } } }
Bash
복사
must, must_not, filter, shoud 조건절을 이용해 다른 쿼리와 조합할 수 있다.
must, filter 조건절에 들어간 하위 쿼리는 AND 조건으로 만족해야 한다.
must_not 조건절을 만족하는 쿼리는 결과에서 제외된다.
should 조건절에 들어간 쿼리는 minimum_should_match 에 지정한 개수 이상의 하위 쿼리를 만족하는 문서가 포함된다.

검색 결과 정렬

검색 요청 payload에 sort를 지정해 검색 결과 정렬이 가능하다.
GET [인덱스 이름]/_search { "query": { ... }, "sort": [ {"field1": { "order":"desc"} }, {"field2": { "order":"asc"} }, "field3" } }
Bash
복사
정렬 필드는 여럿 지정이 가능하며 필드명만 지정해도 내림차순 정렬을 한다.
text 타입은 정렬대상으로 하지 않도록 하자.
_score를 정렬 순서에 포함시켜 유사도 정렬도 가능하다.
_doc을 정렬 순서에 포함시켜 문서 번호로 정렬할 수 있다.

페이지네이션

from과 size
GET [인덱스 이름]/_search { "from": 10, # 11번째부터 (default: 0) "size": 5, # 16번째까지 (default: 10) "query": { //... } }
Bash
복사
from 15 / size 5로 지정하면, 20개의 문서를 검색한 뒤 마지막 5개를 잘라서 반환한다.
from절이 커질수록 검색해야하는 범위가 커진다. (무거워진다.)
다음 페이지를 검색하는사이에 새로운 값이 추가되거나 삭제될 수 있어 일관성이 떨어진다.
정확한 페이지네이션을 위해서는 from/size는 권장하지 않는다.
ES에선 from + size가 1만이 넘는 검색은 수행이 거부 된다.
scroll
: 검색 조건에 매칭되는 전체 문서 순회시 사용하는 방법으로 스크롤 순회도중에는 최초 검색시의 문맥(search context)가 유지된다.
GET [인덱스 이름]/_search?scroll=1m { "size": 1000, "query": { //... } }
Bash
복사
scroll을 사용하면 검색 결과에서 scroll_id를 확인할 수 있는데, 두 번째 검색부터는 이 scroll_id를 지정해서 scroll 검색을 수행하면 된다.
GET [인덱스 이름]/_search/scroll { "scroll_id": "FGluY2x1ZGVfY29....", "scroll": "1m" }
Bash
복사
scroll 검색을 수행할 때마다 검색 문맥은 연장된다.
검색 문맥이 유지되고 있지만 더 빠르게 자원반납을 하고 싶다면 delete로 제거해줄수도 있다.
scroll은 보통 정렬 여부가 상관없는 경우에 사용하기에 정렬을 _doc으로 지정하는게 좋다.
유사도 점수 계산도 하지 않고 정렬을 위한 별도의 자원도 사용하지 않기 때문
GET [인덱스 이름]/_search?scroll=1m { "size": 1000, "query": { //... }, "sort":["_doc"] }
Bash
복사
search_after
서비스에서 사용자에게 제공하는 페이지네이션은 search_after를 사용하는게 좋다.
sort를 지정하며 동일한 정렬 값이 나오지 않게 1개 이상의 동점 제거(tiebreaker)용 필드를 지정해야 한다.
GET kibana_sample_data_ecommerce/_search { "size":20, "query": { "term": { "currency": { "value":"EUR" } } }, "search_after": [1706992790000,"591924"], "sort": [ { "order_date":"desc" }, { "order_id":"asc" } ] } # response { ... "sort": [1706992790000,"591924"] ... }
Bash
복사
동점 제거용 필드는 문서를 고유하게 특정할 수 있는 값이 들어가야 한다.
id는 doc_values가 꺼져있기에 비권장
차라리 id와 동일한 값을 별도로 필드에 저장해서 동점제거용으로 쓰는게 낫다.
인덱스 상태가 변하는 도중 페이지네이션은 문서 누락이 발생할 수 있기에point in time API를 조합해 사용한다.
search_after - point in time API
keep_alive 매개변수로 지정한 시간만큼 검색 대상의 상태를 고정한다.
POST kibana_sample_data_ecommerce/_pit?keep_alive=1m # response { "id": "gcSHBAEca2liYW5hX3NhbXBsZV9kYXRhX2Vjb21tZXJjZRYzVktwbzJBU1M0V3JTd29remI0YWRBABZYVWFUTmlwOVE3ZTV2NlRlRmpPZnFnAAAAAAAAAXuDFjJhbTVpOXhOUXphUk5XZTI2RXZya2cAARYzVktwbzJBU1M0V3JTd29remI0YWRBAAA=" }
Bash
복사
이 pit id를 search_after에 활용할 수 있다.
GET _search { "size":20, "query": { "term": { "currency": { "value":"EUR" } } }, "pit": { "id": "gcSHBAEca2liYW5hX3NhbXBsZV9kYXRhX2Vjb21tZXJjZRYzVktwbzJBU1M0V3JTd29remI0YWRBABZYVWFUTmlwOVE3ZTV2NlRlRmpPZnFnAAAAAAAAAXuDFjJhbTVpOXhOUXphUk5XZTI2RXZya2cAARYzVktwbzJBU1M0V3JTd29remI0YWRBAAA", "keep_alive": "1m" }, "search_after": [1706992790000,"591924"], "sort": [ { "order_date":"desc" } ] }
Bash
복사
pit id로 검색 대상이 지정되서 인덱스 지정이 불필요하다.

집계

aggs 속성을 이용해 집계를 진행할 수 있다.
: 집계에서는 모든 문서가 사용되기에 size 속성은 0으로 설정하도록 하자.
GET kibana_sample_data_ecommerce/_search { "size":0, "query": { "term": { "currency": { "value":"EUR" } } }, "aggs": { "my-sum-aggregation-name": { "sum": { "field": "taxless_total_price" } } } }
Bash
복사
# response { ... "aggregations": { "my-sum-aggregation-name": { "value": 350884.12890625 } } }
Bash
복사
집계는 검색 쿼리에 매칭되는 모든 문서를 대상으로 하기에 과도한 집계는 성능 저하를 유발한다.
ES 집계에는 메트릭, 버킷, 파이프라인 집계로 분류된다.

집계 - 메트릭 집계

avg, max, min, sum 집계로 대상 필드의 집계를 구할 수 있다.
stats 집계로 대상 필드의 평균,최댓값,최솟값,합,개수를 모두 계산할 수 있다. (한번에)
cadinality 집계로 해당 필드의 카디널리티를 집계할 수 있다.
precision_threshold(default: 3000) 속성을 추가해 정확도를 조절할 수 있다.
정확도를 올릴 수록 메모리를 사용하기에 최종 카디널리티보다 높은 수준으로 유지하는게 좋다.

집계 - 버킷 집계

문서를 특정 기준으로 분리해 부분집하으로 만들고, 이에 대해 하위 집계(sub-aggregation)을 수행한다.
range 집계로 원하는 버킷 구간을 직접 지정할 수 있다.
{ ... "aggs": { "distance-kilometers-range": { "range": { "field": "DistanceKilometers", "ranges": [ { "to": 5000 }, { "from": 5000, "to", 10000 }, { "from": 10000 } ] } }, "aggs": { "average-ticket-price": { "avg": { "field": "AvgTicketPrice" } } } }
Bash
복사
각각의 버킷 내에서 AvgTicketPrice라는 필드의 평균값을 구한다.
data_range 집계로 date 타입에 대한 집계도 가능하다.
시간 계산식을 사용할 수 있다.
now와 같이 호출시점에 값이 달라지는 집계 요청은 샤드 요청 캐시에 캐싱되지 않는다.
histogram 집계는 범위의 시작-끝이 아니라 간격을 지정해서 버킷을 만들어 집계한다.
지정 필드의 최소값-최대값을 구한뒤 interval 간격으로 버킷을 분리한다.
offset을 이용해 위치 조정도 가능하다.
GET _search { "size":0, "query": { "match_all": {}}, "aggs": { "my-histogram": { "histogram": { "field": "DistanceKilometers", "interval": 1000 } } } } # response "aggregations": { "my-histogram": { "buckets": [ { "key": 0, "doc_count": 1799 }, { "key": 1000, "doc_count": 1148 }, { "key": 2000, "doc_count": 529 }, ... } }
Bash
복사
GET _search { "size":0, "query": { "match_all": {}}, "aggs": { "my-histogram": { "histogram": { "field": "DistanceKilometers", "interval": 1000, "offset": 50 } } } } # response "aggregations": { "my-histogram": { "buckets": [ { "key": -950, "doc_count": 641 }, { "key": 50, "doc_count": 1226 }, { "key": 1050, "doc_count": 1106 }, ... ] } }
Bash
복사
offset이 50이면 [0, 50] 구간이 포함될 버킷이 필요해 [-950, 50] 구간이 생긴다.
date 타입도 data_histogram으로 버킷 집계가 가능하다.
구간 설정 속성은 interval이 아닌 calendar_interva | fixed_interval을 사용한다.
terms 집계로 지정 필드에 대해 가장 빈도수가 높은 term 순서대로 버킷을 생성한다.
size속성으로 버킷 생성 갯수를 지정한다.
GET kibana_sample_data_logs/_search { "size": 0, "query": { "match_all": {}}, "aggs": { "my-terms-aggs": { "terms": { "field": "host.keyword", "size": 10 } } } } # response { ... "aggregations": { "my-terms-aggs": { "doc_count_error_upper_bound": 0, #doc_count의 오차 상한선 "sum_other_doc_count": 0, # 버킷에 포함되지 않은 문서 수 "buckets": [ { "key": "artifacts.elastic.co", "doc_count": 6488 }, { "key": "www.elastic.co", "doc_count": 4779 }, { "key": "cdn.elastic-elastic-elastic.org", "doc_count": 2255 }, { "key": "elastic-elastic-elastic.org", "doc_count": 552 } ] } } }
Bash
복사
모든 term에 대한 페이지네이션으로 전체 순회하며 집계를 하고 싶다면 composite 집계 사용이 좋다.
composite 집계는 sources로 지정된 하위 집계 버킷 전부 페이지네이션을 이용해 순회하는 집계
GET kibana_sample_data_logs/_search { "size": 0, "query": { "match_all": {}}, "aggs": { "composite_aggs": { "composite": { "size": 100, # 페이지네이션에서 한 번에 반환할 버킷 갯수 "sources": [ { "terms-aggs": { "terms": { "field": "host.keyword" } } }, { "data-histogram-aggs": { "date_histogram": { "field": "@timestamp", "calendar_interval": "day" } } } ] } } } }
Bash
복사

집계 - 파이프라인 집계

집계 결과를 집계 대상으로 수행한다.
buckets_path 속성으로 다른 집계의 결과를 가져오며 상대 경로로 지정된다.
>: 하위 집계로 이동하는 구분자
.: 하위 메트릭으로 이동하는 구분자
집계 이름
메트릭 이름
cumulative_sum 집계로 다른 집계의 값을 누적해 합산한다.
GET kibana_sample_data_ecommerce/_search { "size": 0, "query": { "match_all": {}}, "aggs": { "daily-timestamp-bucket": { "date_histogram": { "field":"order_date", "calendar_interval": "day" }, "aggs": { "daily-total-quantity-average": { "avg": { "field": "total_quantlty" } }, "pipeline-sum": { "cumulative_sum": { "buckets_path": "daily-total-quantity-average" } } } } } }
Bash
복사
max_bucket 집계로 가장 값이 큰 버킷의 key와 결과값을 구할 수 있다.
GET kibana_sample_data_ecommerce/_search { "size": 0, "query": { "match_all": {}}, "aggs": { "daily-timestamp-bucket": { "date_histogram": { "field":"order_date", "calendar_interval": "day" }, "aggs": { "daily-total-quantity-average": { "avg": { "field": "total_quantity" } } } }, "max-total-quantity": { "max_bucket": { "buckets_path": "daily-timestamp-bucket>daily-total-quantity-average" } } } } # response { ... "max-total-quantity": { "value": 2.289473684210526, "keys": [ "2024-02-02T00:00:00.000Z" ] } }
Bash
복사

ES Client 사용하기

JVM용 클라이언트
저수준 REST 클라이언트
HTTP 기반 ES 통신 클라이언트
Request, Response 데이터에 대한 가공 작업은 사용자가 직접 해야 한다.
고수준 REST 클라이언트 | 자바 클라이언트
둘 다 저수준 REST 클라이언트를 래핑해서 만들었다.
고수준 클라이언트는 7.15부터 지원 중단
자바클라이언트는 개발초기라서 고수준 클라이언트와 혼용해서 사용한다.
즉, ES 8이상이라면 그냥 자바 클라이언트를 사용하도록 하자
1.
의존성 추가
implementation 'com.fasterxml.jackson.core:jackson-databind:2.16.1' implementation 'co.elastic.clients:elasticsearch-java:8.11.4' implementation 'com.fasterxml.jackson.module:jackson-module-kotlin:2.16.1' implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.16.1'
Groovy
복사
jackson-databind : 자바 객체 to JSON 직렬화 및 역직렬화 라이브러리
elasticsearch-java: ES 자바 클라이언트 라이브러리
jackson-module-kotlin: jackson 코틀린 지원 라이브러리
jackson-datatype-jsr310: ZonedDateTime, LocalDateTime과 같은 JSR-310 날짜 객체 처리용 라이브러리
2.
클라이언트 초기화
private fun getEsClient(): ElasticsearchClient { // Elasticsearch REST 클라이언트 빌더 생성 val restClientBuilder = RestClient.builder( HttpHost("localhost", 9200, "http") ) // 빌더를 사용하여 낮은 수준의 REST 클라이언트 인스턴스 생성 val lowLevelRestClient = restClientBuilder.build() // Jackson JSON 매퍼 구성. Java 8 날짜/시간 API 모듈 추가 및 몇 가지 옵션 설정 val mapper = jacksonMapperBuilder() .addModule(JavaTimeModule()) // Java 8 날짜/시간 모듈 추가 .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) // 날짜를 타임스탬프로 쓰지 않도록 설정 .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) // 알 수 없는 속성에 대해 실패하지 않도록 설정 .build() // Jackson JSON 매퍼를 사용하여 REST 클라이언트 트랜스포트 설정 val transport = RestClientTransport(lowLevelRestClient, JacksonJsonpMapper(mapper)) // 최종적으로 Elasticsearch 클라이언트 인스턴스 반환 return ElasticsearchClient(transport) }
Kotlin
복사
3.
테스트용 객체 생성
data class MyIndexClass( val fieldOne: String, val fieldTwo: Long, val fieldThree: ZonedDateTime ) data class MyPartialIndexClass( val fieldOne: String )
Kotlin
복사
4.
문서 색인 및 조회해보기
// 색인 private fun indexExample(client: ElasticsearchClient) { val indexResponse = client.index { builder -> builder .index("my-index") //색인 요청 함수 index .id("my-id-1") .routing("my-routing-1") .document(MyIndexClass("hello", 2L, ZonedDateTime.now(ZoneOffset.UTC))) } val result = indexResponse.result() println("result: $result") } // 조회 private fun getExample(client: ElasticsearchClient) { val request = GetRequest.Builder() .index("my-index") .id("my-id-1") .routing("my-routing-1") .build() client.get(request, MyIndexClass::class.java) .run { println("result: $this") } }
Kotlin
복사
5.
bulk API 사용해보기
// Builder Pattern을 사용한 bulk API private fun bulkExampleOne(client: ElasticsearchClient) { val createOperation = CreateOperation.Builder<MyIndexClass>() .index("my-index") .id("my-id-2") .routing("my-routing-2") .document(MyIndexClass("world", 2L, ZonedDateTime.now(ZoneOffset.UTC))) .build() val indexOperation = IndexOperation.Builder<MyIndexClass>() .index("my-index") .id("my-id-3") .routing("my-routing-3") .document(MyIndexClass("world", 4L, ZonedDateTime.now(ZoneOffset.UTC))) .build() val updateAction = UpdateAction.Builder<MyIndexClass, MyPartialIndexClass>() .doc(MyPartialIndexClass("world updated")) .build() val updateOperation = UpdateOperation.Builder<MyIndexClass, MyPartialIndexClass>() .index("my-index") .id("my-id-1") .routing("my-routing-1") .action(updateAction) .build() val bulkOne = BulkOperation.Builder().create(createOperation).build() val bulkTwo = BulkOperation.Builder().index(indexOperation).build() val bulkThree = BulkOperation.Builder().update(updateOperation).build() val operations = listOf(bulkOne, bulkTwo, bulkThree) val response = client.bulk { it.operations(operations) } response.items().forEach { println("result: ${it.result()}, error: ${it.error()}") } } // Functional Call Pattern을 사용한 bulk API private fun bulkExampleTwo(client: ElasticsearchClient) { val bulkResponse = client.bulk { _0 -> _0 .operations { _1 -> _1 .index { _2: IndexOperation.Builder<MyIndexClass> -> _2 .index("my-index") .id("my-id-4") .routing("my-routing-4") .document(MyIndexClass("world", 4L, ZonedDateTime.now(ZoneOffset.UTC))) } } .operations { _1 -> _1 .update { _2: UpdateOperation.Builder<MyIndexClass, MyPartialIndexClass> -> _2 .index("my-index") .id("my-id-2") .routing("my-routing-2") .action { _3 -> _3 .doc(MyPartialIndexClass("world updated")) } } } } bulkResponse.items().forEach { println("result: ${it.result()}, error: ${it.error()}") } }
Kotlin
복사
Functional Call Pattern을 사용한 함수에서 _0, _1과 같은 이름을 사용했는데 이는 스칼라 공식 문서에서 람다의 깊이에 따른 네이밍 권장사항이다.
6.
검색 API 사용해보기
private fun searchExample(client: ElasticsearchClient) { val response = client.search({ builder -> builder .index("my-index") .from(0) .size(10) .query { query -> query .term { term -> term .field("fieldOne") .value { value -> value.stringValue("world") } } } }, MyIndexClass::class.java) response.hits().hits().forEach { println("result: $it") } }
Kotlin
복사
7.
BulkIngester 사용하기
: ES 8.7부터 자바 클라이언트에 추가된 BulkIngester는 고수준 REST Client의 BulkProcessor를 대체하는 기능으로 여러 문서에 관련된 작업들을 한 번에 ES로 요청하는 기능
// 대량(bulk) 작업의 생명 주기를 관리하는 리스너 class BulkIngestListener<Context> : BulkListener<Context> { override fun beforeBulk(executionId: Long, request: BulkRequest?, contexts: MutableList<Context>?) { println("[${LocalDateTime.now()}] beforeBulk - executionId: $executionId") } override fun afterBulk( executionId: Long, request: BulkRequest?, contexts: MutableList<Context>?, response: BulkResponse? ) { println( "[${LocalDateTime.now()}] " + "afterBulk - executionId: $executionId, " + "itemSize: [${response?.items()?.size}], " + "took : ${response?.took()}, " + "errors: ${response?.errors()}" ) } // 작업이 완료되기전 혹은 오류가 발생한 후 호출하는 메서드 override fun afterBulk( executionId: Long, request: BulkRequest?, contexts: MutableList<Context>?, failure: Throwable? ) { System.err.println(failure) } } private fun bulkIngesterExample(client: ElasticsearchClient) { val listener = BulkIngestListener<String>() val ingester = BulkIngester.of<String> { it.client(client) // Elasticsearch 클라이언트를 설정합니다. .maxOperations(200) // 대량 작업에서 처리할 최대 작업 수를 설정합니다. 이 값이 너무 낮으면 오버헤드가 증가하고, 너무 높으면 메모리 문제가 발생할 수 있습니다 .maxConcurrentRequests(1) // 동시에 처리할 최대 요청 수를 설정합니다. 이 값은 Elasticsearch 클러스터의 용량과 원하는 병렬 수준에 따라 설정해야 합니다 .maxSize(5242880L) // 대량 요청의 최대 크기를 바이트 단위로 설정합니다. 이 값은 사용 가능한 메모리와 문서의 크기에 따라 설정해야 합니다 .flushInterval(5L, TimeUnit.SECONDS) // 대량 요청이 실행되는 간격을 설정합니다. 이 간격은 작업 수나 요청 크기에 상관없이 적용됩니다 .listener(listener) // 대량 작업의 생명주기를 관리하는 리스너를 설정합니다. } for (number in 0L until 1100L) { val bulkOperation = BulkOperation.of { bulkBuilder -> bulkBuilder.index { indexOpBuilder: IndexOperation.Builder<MyIndexClass> -> indexOpBuilder .index("my-index") .id("my-id-$number") .routing("my-routing-$number") .document(MyIndexClass("world", number, ZonedDateTime.now(ZoneOffset.UTC))) } } ingester.add(bulkOperation) } println("[${LocalDateTime.now()}] sleep 10 seconds ...") Thread.sleep(10000L) for (number in 1100L until 1200L) { val bulkOperation = BulkOperation.of { bulkBuilder -> bulkBuilder.index { indexOpBuilder: IndexOperation.Builder<MyIndexClass> -> indexOpBuilder .index("my-index") .id("my-id-$number") .routing("my-routing-$number") .document(MyIndexClass("world", number, ZonedDateTime.now(ZoneOffset.UTC))) } } ingester.add(bulkOperation) } println("[${LocalDateTime.now()}] sleep 10 seconds ...") Thread.sleep(10000L) ingester.close() }
Kotlin
복사
.maxOperations(200)
대량 작업에서 처리할 최대 작업 수를 설정합니다.
이 값이 너무 낮으면 오버헤드가 증가하고, 너무 높으면 메모리 문제가 발생할 수 있습니다
.maxConcurrentRequests(1)
동시에 처리할 최대 요청 수를 설정합니다.
이 값은 Elasticsearch 클러스터의 용량과 원하는 병렬 수준에 따라 설정해야 합니다
.maxSize(5242880L)
대량 요청의 최대 크기를 바이트 단위로 설정합니다.
이 값은 사용 가능한 메모리와 문서의 크기에 따라 설정해야 합니다
.flushInterval(5L, TimeUnit.SECONDS)
대량 요청이 실행되는 간격을 설정합니다.
이 간격은 작업 수나 요청 크기에 상관없이 적용됩니다
listener(listener)
대량 작업의 생명주기를 관리하는 리스너를 설정합니다.