본문 바로가기

SearchDeveloper/ElasticSearch

[ElasticSearch] update_by_query version_conflict_engine_exception & update 쿼리 비교

문제

운영 중인 인덱스 문서를 update_by_query 로 실시간으로 업데이트하는 배치를 만들었다. 운영 환경에서 첫 테스트를 하는데 몇 초 지나더니 version_conflict_engine_exception 에러가 간헐적으로 발생하기 시작했다.

해결

update_by_querybulk update 로 바꿨다.

[기존]

POST my_index/_update_by_query
{
  "query": {"term": {"_id": {"value": "1"}}}, 
  "script": {
    "source": "ctx._source.updated_by_query = true",
    "lang": "painless"
  }
}

 

[변경후]

POST _bulk
{"update": {"_index": "my_index", "_id": 1}}
{"doc": {"updated_by_query": true}}

원인

운영 환경의 인덱스는 변경 이벤트가 발생하면 실시간으로 업데이트 되고 있는 상황이다. update_by_query 실행중에 우연히 외부 상황으로 인해 업데이트 중인 문서와 쿼리로 인해 업데이트하려는 문서가 동일하면 변경 충돌이 발생해 에러가 나는 것이다.

자세한 이야기

먼저 상황 재현을 위한 환경을 만들어본다.

ID가 1 인 문서 1 건을 실시간으로 업데이트 하는 코드

더보기
from time import sleep

from elasticsearch import Elasticsearch, helpers
import json

es = Elasticsearch(hosts=["http://localhost:9201"])

documents = {
    1: {"version": 1, "updated_by_query": False},
}


def docs(version):
    return ({
        "_op_type": "index",
        "_index": "my_index",
        "_id": key,
        "_source": {**value, "version": value.get("version", 0) + version},
    }
        for key, value in documents.items())


try:
    # 이미 존재하는 문서 지우기
    if es.exists(index="my_index", id=1):
        es.delete(index="my_index", id=1)

    # version 필드 값 1 씩 더하기
    i = 0
    while True:
        i += 1
        helpers.bulk(es, docs(i))
        sleep(0.001)
        print(f"Indexed {i} versions")
except Exception as e:
    print(json.dumps(e.errors, indent=2))

이 파이썬 코드를 실행하면 0.001 초마다 문서가 업데이트 된다.

이 와중에 update_by_query 쿼리를 날려보면 에러가 난다.

POST my_index/_update_by_query
{
  "script": {
    "source": "ctx._source.updated_by_query = true",
    "lang": "painless"
  }
}

 

에러 내용

{
  "took" : 1,
  "timed_out" : false,
  "total" : 1,
  "updated" : 0,
  "deleted" : 0,
  "batches" : 1,
  "version_conflicts" : 1,
  "noops" : 0,
  "retries" : {
    "bulk" : 0,
    "search" : 0
  },
  "throttled_millis" : 0,
  "requests_per_second" : -1.0,
  "throttled_until_millis" : 0,
  "failures" : [
    {
      "index" : "my_index",
      "type" : "_doc",
      "id" : "1",
      "cause" : {
        "type" : "version_conflict_engine_exception",
        "reason" : "[1]: version conflict, required seqNo [62396], primary term [3]. current document has seqNo [62477] and primary term [3]",
        "index_uuid" : "5MzBkNVhSjeGfya3_4be1g",
        "shard" : "0",
        "index" : "my_index"
      },
      "status" : 409
    }
  ]
}

에러 나는 이유

update_by_query 는 실행 시점에 문서 데이터 스냅샷을 찍는다. 그 후 검색 단계에서 업데이트할 문서들을 정한 다음 업데이트 단계 전에 최신 문서가 맞는지 버전 비교를 한다. 스냅샷 시점의 문서 버전과 현재 버전이 다르면 버전 충돌이 나 실패하는 것이다.

해결 방법 2가지

① conflict=proceed

POST my_index/_update_by_query?conflicts=proceed
{
  "query": {"term": {"_id": {"value": "1"}}},
  "script": {
    "source": "ctx._source.updated_by_query = true",
    "lang": "painless"
  }
}

_update_by_query 의 쿼리 파라미터로 conflicts=proceed 를 붙여주면 충돌이 감지됐을 때 무시하고 업데이트한다.

② bulk update

POST _bulk
{"update": {"_index": "my_index", "_id": 1}}
{"doc": {"updated_by_query": true}}

ID 를 특정해서 업데이트를 한다. 그래서 업데이트할 문서만 사전에 필터링해서 bulk 쿼리를 날려야한다.

그런데 문서 한 건을 0.001 초마다 업데이트 하는 환경에서 bulk update 요청을 날렸을 때 간헐적으로 version_conflict_engine_exception 에러가 또 발생하는 것이다!

{
  "took" : 0,
  "errors" : true,
  "items" : [
    {
      "update" : {
        "_index" : "my_index",
        "_type" : "_doc",
        "_id" : "1",
        "status" : 409,
        "error" : {
          "type" : "version_conflict_engine_exception",
          "reason" : "[1]: version conflict, required seqNo [32418], primary term [3]. current document has seqNo [32419] and primary term [3]",
          "index_uuid" : "5MzBkNVhSjeGfya3_4be1g",
          "shard" : "0",
          "index" : "my_index"
        }
      }
    }
  ]
}

그 이유는 bulk update 도 문서를 업데이트하기 전에 버전 비교를 한다. 그래서 업데이트 직전의 _version 값이 현재 _version 값과 다르다면 version_conflict_engine_exception 를 내뱉는 것이다.

그래서 bulk update 도 완벽한 해결책이라고 할 수는 없지만 bulk update 도입 후 운영 환경에서 에러가 나지 않았고 0.001 초 마다 처럼 심하게 문서가 업데이트되는 환경도 아니라고 판단해서 bulk update 로 수정하였다.

어 그러면 둘 다 버전 비교를 하는데 update_by_query 에서 왜 exception 이 더 자주 나는 것일까?

버전값을 찍는 시점이 다르기 때문이다. update_by_query 는 스냅샷을 찍고 검색 쿼리를 실행한 후에 버전 비교를 하지만 update 는 문서 ID 조회 후 바로 버전 비교를 한다. 그래서 현재 시점과의 차이가 update_by_query 에서 더 크기 때문에 더 자주 나는 것이다.

또 궁금한거. update_by_query 는 스냅샷을 찍는데 bulk update 는 찍지 않는 이유?

쿼리의 역할이 달라서이다. update_by_query 는 검색 쿼리를 실행해서 업데이트 할 문서를 필터링하는 로직이 들어있다. 그래서 스냅샷을 찍고 그 상태에서 검색 쿼리를 실행해 쿼리 결과의 일관성을 유지한다. 그에 반해 bulk update 는 update 요청 한 건씩 ID 기준으로 독립적으로 업데이트되므로 굳이 스냅샷을 찍을 필요가 없는 것이다.

데이터 중심 애플리케이션 관점에서의 ElasticSearch 구조

[일관성을 유지하는 방법 - 스냅샷]

update_by_query검색 단계업데이트 단계가 있다. 만약 검색 단계 중 문서가 업데이트 돼버리면 업데이트 단계 시점에서의 문서 내용과 달라 일관성이 깨질 수 있다. 이를 해결하기 위해 스냅샷 방식을 활용한다.

※ [데이터 중심 애플리케이션 설계] 책 7장의 트랜잭션 > 스냅숏 격리 내용과 비슷한 것 같음

[동시성을 제어하는 방법]

update_by_query 혹은 update 쿼리에서 문서를 변경하려할 때 동시에 외부로부터 문서가 변경되는 경우 동시성 제어가 필요하다. ElasticSearch는 낙관적 동시성 제어이다. 업데이트할 때 문서가 변경되지 못하게 잠그는게 아니라 문서 조회할 때 버전과 현재 버전을 비교해서 같다면 진행하고, 다르다면 충돌을 감지하고 실패 처리하거나 요청 방식에 따라 재시도를 진행한다.

※ [데이터 중심 애플리케이션 설계] 책 p.261 : 7장 트랜잭션 > 비관적 동시성 제어 대 낙관적 동시성 제어

[동시성이 깨졌을 때 이미 업데이트된 문서들은?]

update_by_query 혹은 bulk update 쿼리에서 버전 충돌이 발생해 version_conflict_engine_exception 이 발생했다하더라도 이미 업데이트된 다른 문서들까지 롤백되지 않는다. 요청 자체가 하나의 트랜잭션이 아니기 때문이다. 대신 쿼리 결과 response 를 통해 몇 개 문서가 업데이트에 실패했는지 오류 보고를 한다.