Home Kafka DLQ(Dead Letter Queue) 구현 - 메시지 유실 없이 에러 처리하기
Post
Cancel

Kafka DLQ(Dead Letter Queue) 구현 - 메시지 유실 없이 에러 처리하기

Kafka Consumer에서 에러가 발생했을 때 그냥 로그만 남기고 넘어가면 해당 메시지는 영원히 유실된다. DLQ(Dead Letter Queue)는 처리 실패한 메시지를 별도 토픽에 보관해 나중에 재처리할 수 있게 해주는 패턴이다.


기존 코드의 문제점

1
2
3
4
5
6
7
8
9
10
@KafkaListener(topics = "${gateway.kafka.topic}")
public void consume(String message) {
    try {
        GatewayDataDocument document = objectMapper.readValue(message, GatewayDataDocument.class);
        repository.save(document);
    } catch (Exception e) {
        log.error("Failed to consume message. error={}", e.getMessage(), e);
        // 여기서 그냥 넘어감 → offset 커밋 → 메시지 유실
    }
}

MongoDB가 다운되거나 역직렬화 실패 시 메시지를 그냥 버린다. Kafka에서 offset이 이미 커밋되어 사실상 데이터 유실이다.


DLQ 흐름

1
2
3
4
5
6
7
8
9
10
11
12
13
메시지 수신
    ↓
처리 실패
    ↓
재시도 3회 (1초 간격)
    ↓
재시도 모두 실패
    ↓
gateway-data-dlq 토픽으로 전송
    ↓
DLQ Consumer가 수신 → 로그/알림
    ↓
나중에 원인 파악 후 재처리

구현

1. DLQ 토픽 생성 (docker-compose)

1
2
3
4
5
6
7
8
9
10
11
12
13
kafka-init:
  command: >
    "sleep 10 &&
    kafka-topics --create --if-not-exists
    --topic gateway-data
    --bootstrap-server kafka:29092
    --partitions 5
    --replication-factor 1 &&
    kafka-topics --create --if-not-exists
    --topic gateway-data-dlq
    --bootstrap-server kafka:29092
    --partitions 5
    --replication-factor 1"

DLQ 토픽도 원본 토픽과 동일한 파티션 수로 생성한다. 원본 파티션과 동일한 파티션으로 DLQ에 전송하기 때문이다.


2. application.yml

1
2
3
4
gateway:
  kafka:
    topic: gateway-data
    dlq-topic: gateway-data-dlq

3. KafkaErrorHandlerConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
@Configuration
public class KafkaErrorHandlerConfig {

    @Value("${gateway.kafka.dlq-topic}")
    private String dlqTopic;

    @Bean
    public DefaultErrorHandler errorHandler(KafkaTemplate<String, String> kafkaTemplate) {

        // 실패한 메시지를 DLQ 토픽으로 전송
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
                kafkaTemplate,
                (record, ex) -> {
                    log.error("Message sent to DLQ. topic={} partition={} offset={} error={}",
                            record.topic(), record.partition(), record.offset(), ex.getMessage());
                    return new TopicPartition(dlqTopic, record.partition());
                }
        );

        // 1초 간격으로 3회 재시도 후 DLQ 전송
        FixedBackOff backOff = new FixedBackOff(1000L, 3);

        DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, backOff);

        // 역직렬화 에러는 재시도 없이 즉시 DLQ 전송 (재시도해도 의미 없음)
        errorHandler.addNotRetryableExceptions(
                JsonParseException.class,
                InvalidFormatException.class
        );

        return errorHandler;
    }
}

DefaultErrorHandler는 Spring Kafka가 제공하는 에러 핸들러다. 재시도 횟수와 간격을 설정하고, 모두 실패하면 DeadLetterPublishingRecoverer가 DLQ 토픽으로 메시지를 전송한다.

역직렬화 에러는 addNotRetryableExceptions로 즉시 DLQ로 보낸다. JSON 파싱 오류는 재시도해도 결과가 같기 때문이다.


4. GatewayDataConsumer 수정

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@KafkaListener(topics = "${gateway.kafka.topic}", groupId = "${spring.kafka.consumer.group-id}")
public void consume(String message) throws Exception {
    // try-catch 제거 → 에러 발생 시 ErrorHandler가 처리
    GatewayDataDocument document = objectMapper.readValue(message, GatewayDataDocument.class);
    repository.save(document);

    String batchId = document.getBatchId();
    int batchSize = document.getBatchSize();

    long[] state = batchTracker.computeIfAbsent(batchId, k -> new long[]{0L, System.currentTimeMillis()});

    long count;
    synchronized (state) {
        state[0]++;
        count = state[0];
    }

    if (count == batchSize) {
        long elapsed = System.currentTimeMillis() - state[1];
        log.info("Batch complete. batchId={} count={} elapsed={}ms", batchId, batchSize, elapsed);
        batchTracker.remove(batchId);
    }
}

try-catch를 제거하고 throws Exception으로 에러를 그대로 던진다. DefaultErrorHandler가 받아서 재시도 후 DLQ로 보낸다.


5. GatewayDataDlqConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j
@Component
public class GatewayDataDlqConsumer {

    @KafkaListener(topics = "${gateway.kafka.dlq-topic}", groupId = "${spring.kafka.consumer.group-id}-dlq")
    public void consume(ConsumerRecord<String, String> record) {
        log.error("DLQ message received. topic={} partition={} offset={} key={} value={}",
                record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value());

        // 운영 환경에서는 여기에 슬랙/이메일 알림 추가
    }
}

DLQ Consumer는 실패한 메시지를 수신해서 로그를 남긴다. 운영에서는 슬랙 알림 등을 연동해 즉시 인지할 수 있게 한다.


에러 유형별 처리 흐름

에러 유형처리 방식이유
MongoDB 일시 장애재시도 3회 → DLQ재시도 후 복구 가능
MongoDB 완전 다운재시도 3회 → DLQ모두 실패 후 DLQ 보관
JSON 파싱 오류즉시 DLQ재시도해도 동일하게 실패
역직렬화 오류즉시 DLQ재시도해도 동일하게 실패

DLQ 메시지 재처리 방법

DLQ에 쌓인 메시지는 원인 파악 후 수동으로 재처리한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# DLQ 토픽 메시지 확인
kafka-console-consumer \
  --topic gateway-data-dlq \
  --bootstrap-server localhost:9092 \
  --from-beginning

# DLQ 메시지를 원본 토픽으로 복사해서 재처리
kafka-console-consumer \
  --topic gateway-data-dlq \
  --bootstrap-server localhost:9092 \
  --from-beginning | \
kafka-console-producer \
  --topic gateway-data \
  --bootstrap-server localhost:9092

기존 방식 vs DLQ 방식 비교

항목기존 (try-catch 무시)DLQ 방식
처리 실패 시메시지 유실DLQ에 보관
재처리 가능 여부불가가능
에러 인지로그만로그 + DLQ Consumer 알림
MongoDB 일시 장애해당 메시지 유실재시도 후 DLQ 보관
운영 안정성낮음높음

정리

DLQ 도입으로 얻은 것:

  • 데이터 무손실: 처리 실패해도 DLQ에 보관되어 유실 없음
  • 재처리 가능: 원인 파악 후 DLQ 메시지를 원본 토픽으로 복사해 재처리
  • 즉시 인지: DLQ Consumer가 수신 즉시 로그/알림으로 에러 감지
  • 재시도 내장: MongoDB 일시 장애 시 3회 재시도로 자동 복구 가능
This post is licensed under CC BY 4.0 by the author.