본문 바로가기

SearchDeveloper/엘라스틱서치 바이블

[9] 커스텀 플러그인을 이용한 ES 커스터마이징과 실전 운영 (끝)

플러그인을 elasticsearch-plugin install 로 설치하면 플러그인.zip 파일을 풀어 plugins/[플러그인명] 디렉토리에 생성한다.

플러그인 최소 3요소

  1. 본체 jar 파일
  2. plugin-descriptor.properties elasticsearch.version :  마이너까지 일치해야 한다.
  3. plugin-security.policy : 플러그인이 요구하는 추가적인 권한

(이외 다른 파일은 메인 플러그인과 의존 관계 파일이다)

플러그인 개발 시 유의 사항

  • 버전에 종속적이라 버전 올리면 호환되게 맞춰야 한다.
  • 플러그인 문서는 적은 편이라 ES 코드 읽으면서 개발해야 한다.
  • 라이선스 유의해햐 한다. X-pack 유료 플러그인 코드를 복붙해서 쓰면 위반이다

플러그인 제작

아래처럼 동작하는 HelloWorldPlugin 을 만든다.

  • GET /_hello 호출하면 “hello, world” 응답한다.
  • elasticsearch.ymlhello.greetings 설정 지정하면 hello 대신 지정한 문자열로 응답한다.
  • GET /_hello?name=plu 호출하면 “hello, plu” 응답한다.

Plugin

Plugin 상속 후 getSettings() 오버라이드

: ES 설정은 모두 getSettings() 안에 담는다. (hello.greetings)

설정 속성

-Property.Dynamic: 동적으로 업데이트 가능

-Property.Final: 동적으로 업데이트 불가

-Property.NodeScope: 설정 범위가 노드 레벨

-Property.IndexScope: 설정 범위가 인덱스 레벨

-Property.Deprecated: 지원 중단

-Property.Filtered: 비번, 인증 정보 등 필터링되도록 의도한 설정

ActionPlugin 인터페이스 구현

getActions(), getRestHandlers() 구현

나머지는 나의 깃 코드 참고

테스트 코드

build.gradle

dependencies {
    testImplementation 'org.elasticsearch.test:framework:${elasticVersion}'
}
test {
    useJUnitPlatform()
    // test 위한 policy 파일 경로 지정
    systemProperty("java.security.policy", "${projectDir}/src/test/resources/test-plugin-security.policy") 
}

테스트 코드

class BasicTest: ESTestCase() {
    @Test
    fun testBasic() {
        logger.info("test")
        Assert.assertEquals(1, 1)
    }
}

ESTestCase 역할

ESTestCase 확장한 테스트는 랜덤 테스트로 수행한다.

  • randomNonNegativeLong() 등을 사용해 테스트 돌릴 때마다 노드 수, 클러스터 구조, 로케일, 타임존 등이 다르다.
  • 랜덤 조건 출력도 해준다.
  • @Seed 를 붙여 특정 시드로 테스트할 수 있다.

스레드 누수 체크 기능이 있다.

  • @ThreadLeakScope(ThreadLeakScope.Scope.TEST)

접두어가 test 인 메소드는 @Test 붙이지 않아도 테스트로 동작한다.

ESSinglNodeTestCase

노드 하나 띄워서 테스트한다.

ESIntegTestCase

여러노드로 구성된 클러스터 기동하고 통합 테스트 수행한다.

  • 데이터, 마스터 후보, 조정 노드 구성이 매번 달라진다.

REST 레이어 테스트

3가지 방법

  1. ESRestTestCase + javaRestTest
  2. ESClientYamlSuiteTestCase + yamlRestTest
  3. ESIntegTestCase + 저수준 RestClient

상세 내용은 책 참조

ActionFilter 로 위험한 액션 차단하기

: 전체 인덱스 대상으로한 요청이나 과도한 뎁스 가진 집계 요청하거나 다른 인덱스를 삭제하는 등의 액션을 막기 위해 ActionFilter 를 구현해서 차단하는 플러그인을 만들 수 있다.

ActionFilter

모든 transport 액션 수행되기 전에 ActionFilter 가 액션 넘길지 판단하는 인터페이스. (액션에 대한 필터니까!) 요청을 변조해서 넘길 수도 있다.

  • order(): 액션 필터 우선순위. 여러 액션필터가 등록되면 order 값이 낮은 값부터 적용된다.
  • apply(): 실제 필터링 작업 수행
    • 뒤쪽 체인으로 넘기려면 true, 아니면 false 반환하게 구현해주면 됨
  • ActionFilter.Simple 추상 클래스를 상속받아 사용하는 게 편하다.

사용방법

  • 메인 플러그인 클래스에 ActionPlugin 클래스를 상속받아
    • getActionFilters() 를 오버라이드 해주면 된다.
    • createComponents() 로 의존성 주입할 인스턴스들을 넣는다.

ES 기동 과정

class Elasticsearch {
  public static void main(final String[] args) {
    initPhase1();
    initPhase2();
    initPhase3();
  }
}
  • initPhase1()
    • CLI 통해 넘겨진 인자나 환경 변수 확인
    • Bootstrap 같은 초기 기동 클래스 및 로깅 초기화
  • initPhase2()
    • security manager 초기화
  • initPhase3()
    • Node.start() 로 기동시키고 데몬으로 기동 여부 확인
    • Node 생성자에서 ActionModule 을 생성하는데, ActionModule 생성자에서 getActionFilters() 호출해서 필터링한다.

LifecycleComponent

: 초기화, 시작, 정지, 종료 등 생명 주기 가져야하는 구성요소 관리 인터페이스

LifeCycleComponent 등록하면 ES 노드가 시작, 정지 등 될 때 같이 시작, 정지 등 된다.

특정 조건 감지해여 알람 보내기

PersistentTaskPlugin 으로 지속적으로 동작하게 해 복잡한 탐지 조건을 구현할 수 있다.

  • persistent 타입 task 생성하면 마스터 노드가 노드 찾아 작업을 할당한다.
  • 작업 수행되는 동안 상태가 클러스터에 저장된다.
  • 만약 작업 수행하던 노드에 문제 생겨서 클러스터에서 빠지면 다른 노드에게 할당하고 클러스터에 저장된 데이터 불러와 이어서 수행한다.
  • _task API 로 관리 가능

<내부 데이터 검색한 뒤 특정 조건 만족하면 HTTP로 알람 보내기>

PersistentTasksExecutor

getAssignment(): task 가 어떤 노드에 할당될지 담당한다.

  • selectLeastLoadedNode() : 클러스터에서 메타 데이터 읽어 task 가 가장 적게 할당된 노드 반환한다.

AllocatedPersistentTask

노드에 할단된 persistent 타입 task

PersistentTaskParams

AllocatedPersistentTask 진행 상황 저장하는 용도. 클러스터에 저장된다.

그리고, plugin-security.policy 에 필요한 권한 추가하기

깃 소스 (따라치다 이해가 잘 안돼서 그만둠)

인덱스 내 일부 데이터만 덤프하기

scroll 검색

플러그인에 권한 강제 부여하기

FilePermission write 권한이 ES white list 에 없다.

jdk/lib/security/java.policy 에 작성하고 ES 기동시 인자 넣어주기

데이터 변경분을 카프카로 발급하기

큰 부하가 드는 대용량 통계 작업을 해야할 때, ES 같은 2차 저장소 구축은 필수다. 그럼 데이터 변경분을 2차로 넘길 수 있어야 싱크를 맞출 수 있다.

보통 ES 앞에 카프카 같은 메시지 큐를 둔다.

ES 를 원본 스토리지로 삼는 경우, 데이터 변경분을 카프카로 발급하고 다른 서비스를 카프카에 붙어서 변경분 구독하면 싱크 문제 해결할 수 있다.

→ ES 가 CDC 를 직접 발급

ES 가 발급한 CDC 메시지를 싱크에 사용하는 게 아니라 별도 저장소에 순차적으로 쌓으면 시계열로 추적할 수 있고, 로그를 다른 ES 에 쌓으면 (VOC 추적에도 활용할 수 있을 듯)

구현 방법

IndexingOperationListener 클래스에서 preIndex(), postIndex(), preDelete(), postDelete() 등 오버라이드해서 색인 전 후 로그 남길 수 있다.

 

레퍼런스

엘라스틱서치 바이블 9장