본문 바로가기

SearchDeveloper/개발자를 위한 레디스

6장 레디스를 메시지 브로커로 사용하기

1.pub/sub(publish, subscribe), 2.메시징 큐(list), 3.이벤트 스트림(stream)

메시지 브로커

  • 메시지 브로커가 필요한 이유
    • 최근 서비스 아키텍쳐(MSA)에선 각 애플리케이션 간 느슨하면서 탄탄한 상호 작용이 필요하기 때문
  • 핵심 역할
    • 모듈 간 통신에서 커넥션 실패 같은 장애가 바로 전파되지 않게 비동기 통신으로 하는게 권장된다. 메시지 브로커를 사용하면 서비스 간 통신이 불가능하더라도 바로 장애로 이어지지 않게, 당장 메시지 처리하지 못해도 메시지를 쌓아두어 나중에 처리할 수 있는 채널을 만들어둔다.
  • 크게 두 형태: 메시징 큐, 이벤트 스트림

 

메시징큐와 이벤트 스트림

메시징 큐

생산자-소비자

이벤트 스트림

발행자-구독자

방향성 관점

  • 메시징 큐: 생산자는 소비자의 큐로 직접 push 한다. 같은 메시지를 2개 소비자에게 보내려면 소비자1 큐, 소비자2 큐로 중복 push해줘야한다.
  • 이벤트 스트림: 스트림의 특정 저장소에 메시지를 보내면 메시지가 필요한 여러 구독자들이 스트림에서 pull하면 돼서 구독자 별로 중복 푸시 안 해도 된다.

데이터 삭제 관점

  • 메시징 큐: 소비자가 데이터 읽으면 큐에서 삭제됨
  • 이벤트 스트림: 구독자가 데이터 읽어도 바로 삭제 안 됨. 지정한 기간 후에 삭제됨

소비자 추가 관점

  • 메시징 큐: 생산자가 소비자1 큐에게 메시지A 보내고 소비자2 큐가 추가됐을 때 이전 데이터 A는 볼 수 없음
  • 이벤트 스트림: 중간에 추가된 서비스도 이전 데이터 볼 수 있음

 

결론

  • 메시징 큐는 1:1 상황에서 한 서비스가 다른 서비스에게 동장 지시할 때 유용
  • 이벤트 스트림은 다대다 상황에서 유용 (이벤트 스트림이 카프카랑 비슷)

 

레디스를 메시지 브로커로 사용하기

       
비교 항목 List (Queue) Pub/Sub Stream
전달 방식 Point-to-Point (1:1) Fan-out (1:N) Consumer Groups (1:N or N:M)
데이터 보존 꺼내면 삭제됨 저장 안 함 (휘발성) 명시적으로 삭제할 때까지 보존
메시지 순서 보장됨 보장됨 보장됨 (ID 기반)
적합한 사례 단순 작업 큐 실시간 채팅, 알림 로그 분석, 복잡한 마이크로서비스 이벤트

Gemini's Tip: > 만약 "절대 놓치면 안 되는 중요 데이터"라면 Stream을, "잠깐 놓쳐도 상관없는 실시간 알림"이라면 Pub/Sub을, "단순히 순서대로 처리해야 할 작업"이라면 List를 선택하는 것이 국룰(?)입니다.

 

레디스의 pub/sub 으로 메시지 브로커 구현 가능

  • 발행자가 특정 채널에 데이터 전송
  • 채널 구독중인 소비자들은 바로 소비 가능
  • 데이터는 한 번 채널 전체에 전파된 뒤 삭제되는 일회성임
  • 메시지 잘 전달됐는지 보장하지 않음
  • fire-and-forget 패턴 서비스에서 유용

fire-and-forget 패턴?

  • 비동기 프로그래밍에서 사용되는 디자인 패턴
  • 응답 안 기다리고 바로 다음 코드 실행함
  • 로깅, 이벤트 발행, 통계 데이터 수집, 간단한 알림 발송처럼 작업 완료나 결과 확인이나 요류나도 괜찮은 서비스에서 유용하다.

 

list로 메시징 큐, stream으로 이벤트 스트림 구현 가능

  • list로 새로운 데이터 들어오면 바로 읽어가능 블로킹도 가능
  • stream은 카프카에서 영감 받은 것. append-only 방식으로 저장된다. 소비자, 소비자 그룹으로 데이터 분산 처리도 되고 데이터 실시간 읽기, 시간대 별 검색도 가능

 

레디스의 pub/sub

아주 가벼운 pub/sub

  • 발행자는 메시지를 채널로 보내기만 함, 어떤 구독자가 메시지 읽어가는지, 정상적으로 전달됐는지 확인 못함
  • 구독자는 메시지가 언제, 어떤 방행자에 의해 생성됐는지 메타데이터는 모른다.
  • 데이터는 레디스에 저장되지 않은다.
  • 정합성 중요한 데이터 전달하기에 적합하지 않음

 

PUBLISH, SUBSCRIBE, PSUBSCRIBE

  • PUBLISH CH1 hello 채널(CH1)에 메시지 보내기
  • SUBSCRIBE CH1 CH2 채널 2개(CH1, CH2) 구독하기
  • PSUBSCRIBE CH* 채널 glob 패턴으로 구독하기. pmessage타입으로 메시지가 온다.
    • 만약 SUBSCRIBE , PSUBSCRIBE 둘 다했으면 메시지 중복으로 온다. message, pmessage 타입 구분해서

 

클러스터에서의 pub/sub

클러스터: 레디스가 자체적으로 제공하는 데이터 분산 형태 구조

  • pub/sub으로 메시지를 발행하면 클러스터의 모든 노드에 자동으로 전달된다.
  • 그래서 아무 노드에서 SUBSCRIBE 로 데이터 읽을 수 있다.

→ 근데 클러스터는 대규모 서비스에서 분산 저장용으로 설계된건데 모든 노드에 저장되는건 핵심 목표에 맞지도 않고 불필요한 리소스, 네트워크 부하 생김 (해결: shared pub/sub)

 

shared pub/sub

  • 레디스 7.0부터 도입됨
  • 각 채널이 슬롯에 매핑되고, 같은 슬롯 가지고 있는 노드 간에만 메시지 전파한다.

 

SPUBLISH, SSUBSCRIBE

127.0.0.1:6379> SPUBLISH ch1 v
(integer) 1

(다른 세션)
127.0.0.1:6379> SSUBSCRIBE ch1 
Reading messages... (press Ctrl-C to quit)
1) "ssubscribe"
2) "ch1"
3) (integer) 1
1) "smessage"
2) "ch1"
3) "v"

해당 데이터가 있는 슬롯의 노드로 redirect됨

 

레디스의 list로 메시징 큐 구현하기

list는 head, tail에서 넣고 뺼 수 있는 LPUSH, LPOP, RPUSH, RPOP 있어서 구현하기 좋음

 

용례 (RPUSHX)

트위터(X)에서 팔로우한 계정의 게시물을 타임라인 피드에 올려주기 위헤 레디스 list 를 사용한다고 한다.

UserD는 자주 안 들어오기 때문에 굳이 팔로우 게시물있다고 UserD 캐시에 안 넣어도 된다. RPUSHX로 캐시 존재하는 유저에만 넣어줄 수 있다.

 

RPUSHX

127.0.0.1:6379> RPUSHX Timelinechace:userA data1
(integer) 0 # 값 안 넣어짐

# 키 생성해야 PRUSHX로 넣을 수 있음
127.0.0.1:6379> RPUSH Timelinechace:userA data
(integer) 1
127.0.0.1:6379> RPUSHX Timelinechace:userA data1
(integer) 2

127.0.0.1:6379> LRANGE Timelinechace:userA 0 -1
1) "data"
2) "data1"
  • RPUSHX Timelinechace:userA data1 X: exist. Timelinechace:userA 키 있으면 넣고 없으면 안 넣고 0 반환

→ 애플리케이션에서 캐시 유무 확인하지 않아도 돼서 좋음!

 

list의 블로킹 기능

레디스를 이벤트 큐로 사용할 때 블로킹 유용하게 사용할 수 있다.

  • 이벤트 기반 구조에서 시스템은 이벤트 루프를 폴링 방식으로 돌며 이벤트 큐에 새 이벤트 있는지 체크한다.

→ list의 블로킹 사용하면 polling이 아니라 기다리기만 하면 이벤트 들어오게 할 수 있다.

 

BRPOP

(컨슈머 세션)
127.0.0.1:6379> BRPOP Q:a Q:b 1000

(프로듀서 세션)
127.0.0.1:6379> LPUSH Q:a 1
(integer) 1

(컨슈머 세션)
127.0.0.1:6379> BRPOP Q:a Q:b 1000
1) "Q:a"
2) "1"
(13.12s)
127.0.0.1:6379> 
  • BRPOP Q:a Q:b 1000 큐 여러 개 블로킹할 수 있다. 1000은 타임아웃, 0이면 무한 대기
    • 이벤트 하나 받으면 바로 커맨드 종료된다. 한 번만 받을 수 있는 듯

 

list로 원형 큐 구현

시나리오: 클라이언트 특정 아이템을 반복해서 접근해야할 때, 여러 클라이언트가 병렬적으로 같은 아이템 접근해야할 때

RPOPLPUSH

127.0.0.1:6379> LPUSH circle a
(integer) 1
127.0.0.1:6379> LPUSH circle b
(integer) 2
127.0.0.1:6379> LPUSH circle c
(integer) 3
127.0.0.1:6379> LRANGE circle 0 -1
1) "c"
2) "b"
3) "a"

127.0.0.1:6379> RPOPLPUSH circle circle
"a"
127.0.0.1:6379> LRANGE circle 0 -1
1) "a"
2) "c"
3) "b"
  • RPOPLPUSH circle circle source dest. circle 에서 꺼내서 circle 에 넣는다.

 

참고: Redis 6.2 버전부터는 RPOPLPUSH가 Deprecated(권장되지 않음) 되고, 더 범용적인 LMOVE 커맨드로 대체되었습니다.

# RPOPLPUSH circle circle와 동일한 동작
LMOVE circle circle RIGHT LEFT
     
인자 의미 설명
circle (1번) Source 데이터를 꺼내올 리스트의 키 이름입니다.
circle (2번) Destination 데이터를 집어넣을 리스트의 키 이름입니다. (출발지와 같으면 원형 큐가 됩니다.)
RIGHT Where to Pop 소스 리스트의 어느 쪽에서 꺼낼지 결정합니다. (LEFT 또는 RIGHT)
LEFT Where to Push 목적지 리스트의 어느 쪽으로 넣을지 결정합니다. (LEFT 또는 RIGHT)

RPOPLPUSH, LMOVE는 원자적(Atomic)으로 실행됩니다. 즉, "오른쪽에서 뽑는 행위"와 "왼쪽에 넣는 행위" 사이에 다른 클라이언트가 끼어들어 데이터를 가로챌 수 없습니다. 데이터가 공중에서 사라질 염려가 없는 안전한 이동을 보장합니다.

 

Stream

  • 레디스 5.0에 추가된 자료구조
  • 대규모 메시징 빠르게 처리하는 목표, append-only 방식
  • 크게 2가지 방식으로 사용
    • 대규모 데이터 효율적으로 처리하기
    • 여러 생산자가 생성한 데이터를 여러 소비자가 처리할 수 있게 하는 데이터 저장소/중간 큐잉 시스템 (위에 나온 이벤트 스트림인 듯)

 

스트림이란?

  • 연속적인 데이터의 흐름, 일정한 데이터 조각의 연속

 

1) 파일에서의 바이트 스트림

  • 앺에서 10GB 텍스트 파일을 읽을 때 데이터를 잘게 쪼개서 바이트 스트림으로 처리함

2) 채팅에서의 스트림 처리

  • 계속되는 불규칙한 데이터를 연속적으로 반복 처리할 때
  • JSON 형태의 채팅 데이터를 끊임없이 처리하는 경우, 서버는 JSON 스트림을 처리한다라고 할 수 있다.

 

3) 앺 내부에서 이벤트 스트림

  • 웹서버/상품관련/결제관련 여러 프로듀서로부터 이벤트가 발행된다.
  • 서버에서 다양한 프로세스를 거쳐
  • 웹 서버 이벤트는 DB, 캐시에 저장 / 상품관련은 DB, DW에 저장될 수 있다.

→ 이런 이벤트 스트리밍을 지원해주는 스트리밍 플랫폼인 카프카와 레디스 stream을 비교해서 설명할 예정

메시지 저장과 식별

  • 카프카: 토픽의 파티션 단위로 유니크ID가 생성된다. 토픽이 1개 이상 파티션이면 ID는 유니크하지 않다.
  • 레디스: stream(=카프카의 토픽) 단위로 유니크ID가 생성된다.
    • 레디스 ID 포맷: <밀리세컨드시간>-<시퀀스넘버>
      • 밀리세컨드 시간: 아이템이 저장될 시점의 레디스 노드의 로컬 시간
      • 시퀀스넘버: 같은 시간에 여러 메시지 저장될 수 있으므로 같은 시간 내 데이터 순서. (64bit라 거의 제한 없음)
    • 레디스 stream 단위로 모두 유니크 ID 가지고 ID가 시간이라 시간 검색도 가능

 

stream 생성과 데이터 입력

  • 카프카: 토픽 생성, 프로듀서가 토픽에 데이터 푸시, 컨슈머가 읽어감
  • 레디스: stream 생성 필요 없음, 데이터 처음 넣을 때 stream 자료구조 생성됨

 

XADD

127.0.0.1:6379> XADD Email * k1 v k2 v2
"1769843435725-0"

127.0.0.1:6379> XADD Email 0-0 k1 v k2 v2
(error) ERR The ID specified in XADD must be greater than 0-0
127.0.0.1:6379> XADD Email 0-1 k1 v k2 v2
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

127.0.0.1:6379> XADD Email 1769843435725-1 k1 v k2 v2
"1769843435725-1"
  • XADD Email * k1 v k2 v2 Email를 키로 하는 stream 자료구조 생성.
    • * ID는 자동생성한다.
    • hash 처럼 k, v 넣어주면 됨
  • XADD Email 1769843435725-1 k1 v k2 v2
    • id 수동 생성하려면 0-0보다 크고, 직전 ID보다 큰 값이어야한다.

 

데이터 조회

  • 카프카: 컨슈머는 토픽을 실시간 리스닝함. 리스닝 시작한 시점 부터 읽거나 --from-beginning 으로 첫 메시지부터 읽을 수 있음. 토픽에 데이터 없어도 계속 리스닝함
  • 레디스: 1) 실시간 리스닝하거나 2) ID로 데이터 검색할 수 있음

 

1) 레디스 steram 실시간 리스닝

XREAD

127.0.0.1:6379> XREAD BLOCK 0 STREAMS Email 0
1) 1) "Email"
   2) 1) 1) "1769843435725-0"
         2) 1) "k1"
            2) "v"
            3) "k2"
            4) "v2"
      2) 1) "1769843435725-1"
         2) 1) "k1"
            2) "v"
            3) "k2"
            4) "v2"
127.0.0.1:6379> 
127.0.0.1:6379> XREAD BLOCK 500 STREAMS Email $     # 지금부터 들어오는 데이터 500ms만 기다림
(nil)
(0.51s)
127.0.0.1:6379> 
127.0.0.1:6379> XREAD BLOCK 0 STREAMS Email 1769843435725 # 특정 ID 이후만 반환하라
1) 1) "Email"
   2) 1) 1) "1769843435725-1"
         2) 1) "k1"
            2) "v"
            3) "k2"
            4) "v2"

데이터가 없을 때만 기다리지, 있으면 바로 끝낸다!

  • XREAD BLOCK 0 STREAMS Email 0 Email 스트림 읽기
    • BLOCK 0: 가져올 데이터 없어도 연결 끊지 말고 리스닝 하라
      • BLOCK 1000이면 1000ms 만 기다림
    • STREAMS Email 0 ID가 0보다 큰 메시지만, 즉 모두 읽으라
      • STREAMS Email $ 커맨드 실행한 후 부터 데이터만 가져온다. ($: 마지막 ID)

p.179??? XREAD해도 종료되던데
XREAD를 사용했을 때에는 기존 데이터를 모두 반환한뒤, 신규로 들어오는 메시지를 계속해서 반환하지만, XRANGE 커맨드는 커맨드를 수행하는 시점에 stream에 저장된 모든 데이터를 반환한 뒤 종료된다는 차이점이 존재한다.

 

2) 특정 데이터 조회

XRANGE, XREVRANGE

127.0.0.1:6379> XRANGE Email - +
1) 1) "1769843435725-0"
   2) 1) "k1"
      2) "v"
      3) "k2"
      4) "v2"
2) 1) "1769843435725-1"
   2) 1) "k1"
      2) "v"
      3) "k2"
      4) "v2"

127.0.0.1:6379> XREVRANGE Email + -
1) 1) "1769843435725-1"
   2) 1) "k1"
      2) "v"
      3) "k2"
      4) "v2"
2) 1) "1769843435725-0"
   2) 1) "k1"
      2) "v"
      3) "k2"
      4) "v2"

127.0.0.1:6379> XRANGE Email 1769843435725-1 +  # "이상"
1) 1) "1769843435725-1"
   2) 1) "k1"
      2) "v"
      3) "k2"
      4) "v2"

127.0.0.1:6379> XRANGE Email (1769843435725-0 + # ( 붙이면 "초과"
1) 1) "1769843435725-1"
   2) 1) "k1"
      2) "v"
      3) "k2"
      4) "v2"
  • -, + 로 최소 최대 지정 가능
  • ( 붙이면 초과의 의미를 가짐

 

소비자와 소비자 그룹

여러 소비자가 읽어가는 상황을 알아보자

  • 팬아웃(fan-out): 같은 데이터를 여러 소비자에게 전달하는 것
  • 카프카: 같은 토픽을 여러 소비자가 읽어가게 하면 됨
  • 레디스: XREAD 를 여러 소비자가 수행하면 됨

 

여러 소비자가 똑같은 데이터를 읽어가는 것 말고, 벙렬 처리 위해 데이터를 여러 소비자가 나눠 갖기 위해서는?

  • 메시지 순서 보장이 중요하다
  • 카프카: 토픽의 파티션 내에서만 순서 보장 됨, 여러 파티션에서 받으려면 순서 보장 안 됨 → 소비자 그룹으로 해결
    • 소비자 그룹 내 소비자 하나는 하나의 파티션과 매핑된다. 하나의 소비자 내에선 파티션이라 순서 보장된다.
     

  • 레디스: stream은 데이터가 고유한 ID라 순서 보장 가능
    • 레디스도 소비자 그룹이 있지만 메시지 전달 순서 신경 안 써도 됨.
    • 소비자 그룹 내 소비자는 다른 소비자가 아직 읽지 않은 데이터만 읽어감
      • stream에 쌓인 차례대로 소비자가 데이터를 읽어간다.

레디스 소비자 그룹 생성, 읽기

XGROUP, XREADGROUP

127.0.0.1:6379> XGROUP CREATE Email EmailGroup $
OK
127.0.0.1:6379> XREADGROUP GROUP EmailGroup 111 COUNT 1 STREAMS Email >
(nil)

(프로듀서 세션에서 발행)
127.0.0.1:6379> XADD Email * k1 v k2 v2
"1769847024499-0"

127.0.0.1:6379> XREADGROUP GROUP EmailGroup 111 COUNT 1 STREAMS Email >
1) 1) "Email"
   2) 1) 1) "1769847024499-0"
         2) 1) "k1"
            2) "v"
            3) "k2"
            4) "v2"
127.0.0.1:6379> 
  • XGROUP CREATE Email EmailGroup $ Email stream을 읽는 EmailGroup 이라는 소비자 그룹 생성.
    • $: 현재 시점 이후 데이터부터 리스닝하겠다
  • XREADGROUP GROUP EmailGroup 111 COUNT 1 STREAMS Email > 소비자 그룹(EmailGroup) 으로 읽는다.
    • 111이라는 소비자는 자동생성된다.
    • COUNT 1: 1개만 읽을게
    • STREAMS Email > : Email stream에서 다른 소비자가 읽지 않은 새로운 메시지만 읽을게
      • > 가 아니라 0이나 ID면 새로운 메시지가 아닌 입력한 ID보다 큰 ID만 pending list에 속하던 메시지를 반환한다.
    • XREADGROUP 은 읽는게 소비자 그룹에 영향 미치기 때문에 쓰기 커맨드로 분류돼 마스터에서만 호출할 수 있다.

 

stream에서 소비자 그룹은 stream 상태를 나타내는 개념으로 간주

  • pending msg, last_delivered_id, 소비자에서 어떤 메시지 받았는지 알 수 있음

stream과 소비자 그룹은 독립적으로 동작해서 한 소비자 그룹에서 여러 stream 리스닝 가능

XREADGROUP GROUP BiGroup 111 COUNT 2 STREAMS Email PushStream > >

 

부하 분산 관점

  • 카프카: 파티션을 이용해 소비자 부하 분산
  • 레디스 stream: 파티션 없어도 소비자 그룹으로 소비자 부하 분산

 

ACK와 보류 리스트

  • 메시지 브로커가 장애로 인해 시스템 종료됐을 떄 인지하고 재처리할 수 있는 기능 필요
    • 어느 메시지가 어느 소비자에게 전달됐고 소비자가 잘 처리했는지 인지하고 있어야함
  • 카프카: 내부 토픽 __consumer_offsets 에 소비자가 읽은 파티션 메시지의 오프셋, 소비자 그룹, 토픽, 파티션 내용 저장됨. 어디까지 읽었는지 추적
  • 레디스: pending list, last_delivered_id

  • 마지막으로 읽어간 데이터 ID로 last_delivered_id 를 업데이트 한다.
    • 목적: 다른 소비자에게 같은 메시지 중복 전송하지 않기 위함
  • stream에서 소비자가 메시지를 읽어가면 소비자별로 메시지 pending list를 새로 생성함
    • pending list: 소비자가 ACK 주길 기다리고 있는 메시지들
      • 소비자가 ACK보내면 pending list에서 메시지 삭제한다.
    • 목적: 소비자쪽 장애로 재부팅해야할 때 미처리일지도 모를 pending list의 메시지를 다른 소비자가 처리하게 하기 위함
    • XREADGROUP 소비자 그룹으로 메시지 읽으면 XACK 로 전송해줘야한다. (XREAD 는 pending list 필요 없음)

 

XPENDING, XACK

# 소비자 그룹 내 pending 메시지 확인
127.0.0.1:6379> XPENDING Email EmailGroup
1) (integer) 2   # 소비자 그룹(EmailGroup)에서 pending 메시지 2개
2) "1769847024499-0" # 메시지 최소 ID
3) "1769851022266-0" # 메시지 최대 ID
4) 1) 1) "111"      # 111 소비자에서 2개 pending 중
      2) "2"

# 메시지 ACK
127.0.0.1:6379> XACK Email EmailGroup 1769847024499-0 
(integer) 1

# pending 메시지 1개로 줄음
127.0.0.1:6379> XPENDING Email EmailGroup
1) (integer) 1
2) "1769851022266-0"
3) "1769851022266-0"
4) 1) 1) "111"
      2) "1"

<레디스에서 메시지 보증 전략>

  • at most once: 최대 1번 전송. 소비자는 메시지 받자마자 ACK를 먼저 보낸다. 메시지 일부 손실되더라도 빠른 응답 필요한 경우 선택
  • at least once: 최소 1번 전송. 소비자는 메시지 처리 한 뒤 ACK를 보낸다. 만약 ACK 못 보내고 소비자가 죽었다면 그 후에 pending list의 메시지를 재전송한다.
  • exactly once: 정확히 1번 전송. 레디스 set 자료구조로 이미 처리된 메시지인지 확인하는 추가 로직 필요

 

메시지의 재할당

  • 소비자가 장애 복구에 실패하면 그 소비자의 pending list를 다른 소비자가 대신 처리해야 한다.

XCLAIM

XCLAIM Email EmailGroup 222 1000000 1769851022266-0
  • 222: 넘길 소비자
  • 1000000: 최소 대기 시간. 메시지 보류 상태로 이만큼 시간이 흐른 후에 다른 소비자로 소유권 변경할 수 있다. 소비자 중복 할당 방지 위함
  • XPENDING으로 메시지 ID를 알아야 쓸 수 있다.

 

메시지의 자동 재할당

  • Redis 6.2에서 도입
  • 일정 시간 이상 방치된 메시지를 알아서 찾아서 뺏어오기
  • 메시지 ID 몰라도 재할당 가능

XAUTOCLAIM

XAYUTOCLAIM Email EmailGroup 222 1000000

Email 스트림의 EmailGroup 그룹에서, 1,000초 넘게 ACK가 안 오고 방치된 메시지가 있다면, 그걸 소비자 '222'가 가져가겠다

❗이런 자동 처리를 개발자가 수동으로 해야하나봄 (카프카는 리밸런싱으로 알아서 해주는데)

 

메시지의 수동 재할당

  • stream내 메시지는 counter 값을 갖고 있다. XREADGROUP으로 소비자에게 할당하거나 XCLAIM으로 재할당하면 1씩 증가한다.
  • 만약 문제가 있어 counter 가 계속 증가하면 다른 stream(dead letter)으로 보내 관리자가 따로 확인하는 로직 추가하는 게 좋을 수 있다.

 

stream 상태 확인

XINFO

127.0.0.1:6379> XINFO HELP
1) XINFO <subcommand> [<arg> [value] [opt] ...]. Subcommands are:
2) CONSUMERS <key> <groupname>
3)     Show consumers of <groupname>.
4) GROUPS <key>
5)     Show the stream consumer groups.
6) STREAM <key> [FULL [COUNT <count>]
7)     Show information about the stream.
8) HELP
9)     Print this help.

# CONSUMERS - 소비자 그룹에 속한 소비자들 확인 
127.0.0.1:6379> XINFO CONSUMERS Email EmailGroup
1) 1) "name"
   2) "111" ## 소비자
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 618270
   7) "inactive"
   8) (integer) 618270

# GROUPS - 소비자 그룹 확인
127.0.0.1:6379> XINFO GROUPS Email
1)  1) "name"
    2) "EmailGroup"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 1
    7) "last-delivered-id"
    8) "1769851022266-0"
    9) "entries-read"
   10) (integer) 4
   11) "lag"
   12) (integer) 0

# STREAM - stream 확인
127.0.0.1:6379> XINFO STREAM Email
 1) "length"
 2) (integer) 4
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1769851022266-0"
 9) "max-deleted-entry-id"
10) "0-0"
11) "entries-added"
12) (integer) 4
13) "recorded-first-entry-id"
14) "1769843435725-0"
15) "groups"
16) (integer) 1
17) "first-entry"
18) 1) "1769843435725-0"
    2) 1) "k1"
       2) "v"
       3) "k2"
       4) "v2"
19) "last-entry"
20) 1) "1769851022266-0"
    2) 1) "k1"
       2) "v"
       3) "k2"
       4) "v2"

 

레퍼런스

개발자를 위한 레디스 책

글 읽어주셔서 언제나 감사합니다. 좋은 피드백, 개선 피드백 너무나도 환영합니다.