티스토리 뷰

💡 최상용님의 실습으로 배우는 선착순 이벤트 시스템 강의를 듣고 정리한 내용입니다.

 

목차

     
    글 목록

     

    배경

    지난 [시스템 디자인] 실습으로 배우는 선착순 이벤트 시스템 (3/3) - 요구사항 변경과 쿠폰 발급 실패 예외처리에서 쿠폰 발급 Consumer에서 처리에 실패하는 경우 재처리 방안을 소개하였다. 재처리 방안으로는 실패 이벤트 테이블을 추가하여 데이터를 적재하고 배치 프로그램을 돌리는 방안과 DLT(Dead Letter Topic)에 발행하여 재처리하는 방안을 소개했었는데, 이번에는 그 중 두번째 방법인 DLT를 이용한 쿠폰 발급 재처리 방안을 직접 구현하여 소개한다.

     

    DLQ(Dead Letter Queue)

    오류로 인해 처리할 수 없는 메시지를 임시로 저장하는 특수한 유형의 역할을 하는 큐(Queue)이다. 쉽게 말해서 메시지 대기열은 분산 시스템에서 비동기 통신을 지원하는 소프트웨어 구성 요소 중 하나인데, 이 중 DLQ(Dead Letter Queue)는 대상이 없거나 처리할 수 없는 잘못된 메시지를 보관하는 특수한 역할을 한다. (출처 : https://aws.amazon.com/ko/what-is/dead-letter-queue/)

     

    이 때 Kafka는 메시지를 Queue가 아닌 Topic에 보관하므로, 이처럼 예외 처리를 위해 특수한 역할을 하는 Topic을 Dead Letter Topic(DLT)라 한다.

     

    DLT(Dead Letter Topic)를 사용하면 다음과 같이 쿠폰 발급에 실패한 메시지를 보내고 안전하게 실패한 메시지를 보관하면서 처리를 보장할 수 있다. 또한 일반 메시지 토픽(coupon-created)에서는 메시지 처리에 실패하면 곧바로 DLT에 보내고 이후에 발행되는 메시지를 처리함으로써 결과적으로 처리 속도와 처리량 향상에 기여할 수 있다. 마지막으로 DLQ에 적재된 메시지들을 분석하여 개발자는 오류 원인을 더 쉽게 파악하고 문제를 해결할 수 있다.

     

    Consumer 예외 발생

    먼저 DLT를 적용하기에 앞서, Consumer에서 메시지를 처리하다가 예외가 발생하면 어떻게 되는지 알아보자. CouponCreatedConsumer 로직을 다음과 같이 수정한다. 쿠폰을 발급한 userId 값이 홀수라면 RuntimeException을 던지고, 짝수라면 쿠폰 발급한다.

    package org.example.consumer.consumer;
    
    import org.example.consumer.domain.Coupon;
    import org.example.consumer.repository.CouponRepository;
    import org.example.consumer.repository.FailedEventRepository;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j // 에러 로깅을 위함
    @Component
    public class CouponCreatedConsumer {
    	private final CouponRepository couponRepository;
    	private final FailedEventRepository failedEventRepository;
    
    	public CouponCreatedConsumer(CouponRepository couponRepository, FailedEventRepository failedEventRepository) {
    		this.couponRepository = couponRepository;
    		this.failedEventRepository = failedEventRepository;
    	}
    
    	@KafkaListener(
    		topics = "coupon-create", 	// 토픽 명 지정
    		groupId = "group_1", 		// 컨슈머 그룹 ID 지정
    		containerFactory = "kafkaListenerContainerFactory" // Listener Container Factory 지정
    	)
    	public void listener(Long userId) {
    		System.out.println("userId = " + userId);
    		if(userId % 2 != 0) {
    			throw new RuntimeException("쿠폰 발급 실패 : " + userId);
    		}
    		couponRepository.save(new Coupon(userId));
    	}
    }

     

    다음 테스트 코드를 실행한다. userId를 1로 쿠폰 발급 요청을 보낸다.

    @Test
    void applyCoupon() throws Exception {
    	// 요청 파라미터
    	UserRequest request = new UserRequest(1L);
    
    	// API 요청 및 응답 확인
    	mockMvc.perform(post("/coupon/apply")
    			.contentType(MediaType.APPLICATION_JSON)
    			.content(objectMapper.writeValueAsString(request)))
    		.andExpect(status().isOk());
    
    	// 쿠폰 발급 확인
    	Assertions.assertThat(couponRepository.findById(request.getUserId()))
    		.isNotEmpty();
    }

     

    이후 에러 로그를 확인해보면 coupon-create 토픽의 coupon-create-0 파티션의 메시지를 처리하다가 예외가 발생하여 실패하고 KafkaException이 발생한 것을 볼 수 있다.

     

    • Seeking to offset 8 for partition coupon-create-0 : Kafka 컨슈머가 coupon-create-0 파티션의 offset 8로 이동해서 그 위치에 있는 메시지를 읽으려고 시도했다.
    • KafkaException: Seek to current after exception: 컨슈머가 예외 발생 후 현재 오프셋으로 다시 이동해 메시지를 재처리하려고 했지만, 그 과정에서 또 예외가 발생했다.

     

    로그를 확인해보면 의도대로 userId가 홀수라서 RuntimExcpetion이 발생한 것을 볼 수 있다.

     

     

    이후 로그를 보면 Spring Kafka로 구현한 Consumer의 예외 처리 방안을 볼 수 있다. 기본 정책으로는 예외가 발생하면 Spring Kafka의 기본 예외 핸들러인 DefaultErrorHandler가 동작하여 10번(SeekUtils.DEFAULT_BACK_OFF 참고) 재처리한다.

     

    또한 다음과 같은 로그를 확인할 수 있는데, 각각 interval = 재시도 사이에 텀, currentAttempts = 현재 재시도 횟수, maxAttempts = 최대 시도 횟수를 의미한다. 즉 최대 시도 횟수를 넘었으므로 재처리 로직이 종료된다.

     

     

    메시지 처리 중에 실패가 발생하고나서 다시 메시지를 요청하는 경우 어떻게 되는지 확인해보기 위해 이후 userId를 2로 변경하여 다시 메시지를 요청해보자.

    @Test
    void applyCoupon() throws Exception {
    	// 요청 파라미터
    	UserRequest request = new UserRequest(2L);
    
    	// API 요청 및 응답 확인
    	mockMvc.perform(post("/coupon/apply")
    			.contentType(MediaType.APPLICATION_JSON)
    			.content(objectMapper.writeValueAsString(request)))
    		.andExpect(status().isOk());
    
    	// 쿠폰 발급 확인
    	Assertions.assertThat(couponRepository.findById(request.getUserId()))
    		.isNotEmpty();
    }

     

    이번에는 메시지를 정상적으로 컨슘하고 테이블에 적재된 것을 볼 수 있다.

     

     

    즉, 현재 기본 설정인 DefaultErrorHandler는 메시지 처리 중에 에외가 발생하면 SeekUtils.DEFAULT_BACK_OFF 횟수 만큼 재처리를 시도하는 것을 알 수 있다.

    이후 설정된 횟수 만큼 재처리를 하고나서도 메시지 처리에 실패하면 다음 오프셋부터 메시지를 읽어온다. 다시 말해 재처리에 실패한 메시지는 버려지고(= 처리에 실패한 메시지는 읽었다고 간주한다.) 다음 메시지부터 처리한다.

     

    DeadLetterPublishingRecoverer

    이번에는 메시지 처리에 실패한 경우 DLT를 통해 재처리하도록 KafkaConsumerConfig를 수정한다. 먼저 DLT에 실패한 메시지를 발행하기 위해서는 DeadLetterPublishingRecoverer를 사용해야 한다.

     

    DeadLetterPublishingRecoverer는 Consumer에서 Kafka 메시지 처리를 실패했을때, 해당 메시지를 DLT에 전달하는 역할을 한다. 이때 DeadLetterPublishingRecoverer는 KafkaTemplate에 작성한 템플릿에 맞춰 메시지를 DLT에 전송하는데, 이때 DLT의 이름은 topicName.DLT를 따른다.(postifx : .DLT)

     

    이 DeadLetterPublishingRecovere를 에러 핸들러 로직을 정의하는 CouponErrorHandler의 생성자에 전달할 것이다.

     

    이때 DLT은 자동으로 생성되지 않는다, 개발자가 토픽을 따로 생성해줘야한다. 토픽을 생성할때 파티션도 지정해주기 때문에 DLT 파티션과 기존 토픽의 파티션 개수가 다르다면 잘 고려해야한다. (참고 : https://junuuu.tistory.com/795)

    docker exec -it kafka kafka-topics.sh \\
    --bootstrap-server localhost:9092 \\
    --create \\
    --topic coupon-create.DLT

     

    CouponErrorHandler

    DeadLetterPublishingRecoverer를 사용하여 예외 처리 정책을 정의하기 위해 couponErrorHandler 빈을 등록한다. couponErrorHandler는 Spring Kafka에서 제공하는 예외 처리 핸들러인 DefaultErrorHandler로 구현한다.

     

    이를 통해 Consumer에서 메시지 처리 중에 예외가 발생했을때 어떻게 처리할지 정의할 수 있다. 여기서는 메시지 처리에 실패하면 1초 간격으로 최대 2번 재시도한 후에도 실패하면 그 메시지를 Dead Letter Topic에 보내도록 설정하였다.

    // 에러 핸들러 구현
    @Bean
    public CommonErrorHandler couponErrorHandler() {
    	DeadLetterPublishingRecoverer recover = new DeadLetterPublishingRecoverer(kafkaTemplate(producerFactory()));
    
    	// 1초(1000MS) 간격으로 2회 재시도
    	FixedBackOff fixedBackOff = new FixedBackOff(1000L, 2L);
    	return new DefaultErrorHandler(recover, fixedBackOff);
    }

     

    KafkaTemplate

    예외가 발생하면 DLT에 메시지를 발행(Produce)하는데, 이때 필요한 메시지 템플릿과 Producer 설정 정보를 세팅한다. 정리하자면 컨슈머에서 예외가 발생하면 다시 예외 처리용 토픽에 메시지를 발행(Produce)하여 실패한 메시지를 안전하게 보관하고 처리하는 것이다.

    // DLT(Dead Letter Topic)에 발행할 메시지 템플릿
    @Bean
    public KafkaTemplate<String, Long> kafkaTemplate(ProducerFactory<String, Long> producerFactory) {
    	return new KafkaTemplate<>(producerFactory);
    }
    
    // DLT(Dead Letter Topic)에 메시지 발행시 사용할 Producer 설정 세팅
    @Bean
    public ProducerFactory<String, Long> producerFactory() {
    	Map<String, Object> config = new HashMap<>();
    	config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    	config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    	config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
    	return new DefaultKafkaProducerFactory<>(config);
    }

     

    KafkaConsumerConfig

    위 수정 사항을 모두 반영한 KafkaConsumerConfig 코드는 다음과 같다.

    package org.example.consumer.config;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.LongDeserializer;
    import org.apache.kafka.common.serialization.LongSerializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    import org.springframework.kafka.listener.CommonErrorHandler;
    import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
    import org.springframework.kafka.listener.DefaultErrorHandler;
    import org.springframework.util.backoff.FixedBackOff;
    
    @Configuration
    public class KafkaConsumerConfig {
    	// Consumer 인스턴스를 생성하는데 필요한 설정값들을 정의
    	// 스프링은 간편하게 설정값들을 세팅할 수 있도록 ConsumerFactory 인터페이스를 제공한다.
    
    	// 컨슈머 설정 정보를 세팅하는 ConsumerFactory 빈 등록
    	@Bean
    	public ConsumerFactory<String, Long> consumerFactory() {
    		// 컨슈머 설정 값들을 담을 Map 선언
    		Map<String, Object> config = new HashMap<>();
    
    		// 컨슈머 설정 정보 세팅
    		config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 				// 서버 정보
    		config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1"); 								// 컨슈머 그룹 ID
    		config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // key 역직렬화 클래스
    		config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); // value 역직렬화 클래스
    
    		return new DefaultKafkaConsumerFactory<>(config);
    	}
    
    	// 토픽으로부터 메시지를 수신해올 KafkaListener를 만드는 KafkaListenerContainerFactory 빈 등록
    	@Bean
    	public ConcurrentKafkaListenerContainerFactory<String, Long> kafkaListenerContainerFactory() {
    		ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
    
    		// 앞서 세팅한 설정 값을 담는 consuemerFactory를 설정해준다.
    		factory.setConsumerFactory(consumerFactory());
    
    		// 에러 핸들러 지정
    		factory.setCommonErrorHandler(couponErrorHandler());
    
    		return factory;
    	}
    
    	// DLT(Dead Letter Topic)에 발행할 메시지 템플릿
    	@Bean
    	public KafkaTemplate<String, Long> kafkaTemplate(ProducerFactory<String, Long> producerFactory) {
    		return new KafkaTemplate<>(producerFactory);
    	}
    
    	// DLT(Dead Letter Topic)에 메시지 발행시 사용할 Producer 설정 세팅
    	@Bean
    	public ProducerFactory<String, Long> producerFactory() {
    		Map<String, Object> config = new HashMap<>();
    		config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    		config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    		config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
    		return new DefaultKafkaProducerFactory<>(config);
    	}
    
    	// 에러 핸들러 구현
    	@Bean
    	public CommonErrorHandler couponErrorHandler() {
    		DeadLetterPublishingRecoverer recover = new DeadLetterPublishingRecoverer(kafkaTemplate(producerFactory()));
    
    		// 1초(1000MS) 간격으로 2회 재시도
    		FixedBackOff fixedBackOff = new FixedBackOff(1000L, 2L);
    		return new DefaultErrorHandler(recover, fixedBackOff);
    	}
    }

     

    DeadLetterTopicConsumer

    이후 DLT에 보관된 실패 메시지를 처리하기 위한 Consumer를 구현한다. 여기서는 실패 이벤트 이력 테이블(FailedEvent)에 적재하고 나서 쿠폰 발급 로직을 진행하도록 구현하였다.

    package org.example.consumer.consumer;
    
    import org.example.consumer.domain.Coupon;
    import org.example.consumer.domain.FailedEvent;
    import org.example.consumer.repository.CouponRepository;
    import org.example.consumer.repository.FailedEventRepository;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class DeadLetterTopicConsumer {
    	private final CouponRepository couponRepository;
    	private final FailedEventRepository failedEventRepository;
    
    	public DeadLetterTopicConsumer(CouponRepository couponRepository, FailedEventRepository failedEventRepository) {
    		this.couponRepository = couponRepository;
    		this.failedEventRepository = failedEventRepository;
    	}
    
    	@KafkaListener(
    		topics = "coupon-create.DLT", 	// 토픽 명 지정
    		groupId = "group_1", 			// 컨슈머 그룹 ID 지정
    		containerFactory = "kafkaListenerContainerFactory" // Listener Container Factory 지정
    	)
    	public void listener(Long userId) {
    		System.out.println("DeadLetterTopicConsumer.listener");
    		System.out.println("failed to create coupon = " + userId);
    		failedEventRepository.save(new FailedEvent(userId));
    		couponRepository.save(new Coupon(userId));
    
    	}
    }

     

    쿠폰 발급 실패 테스트

    다시 다음 테스트 코드를 실행하여 coupon-create 토픽에 userId를 1로 메시지를 발행해보자.

    @Test
    void applyCoupon() throws Exception {
    	// 요청 파라미터
    	UserRequest request = new UserRequest(1L);
    
    	// API 요청 및 응답 확인
    	mockMvc.perform(post("/coupon/apply")
    			.contentType(MediaType.APPLICATION_JSON)
    			.content(objectMapper.writeValueAsString(request)))
    		.andExpect(status().isOk());
    
    	// 쿠폰 발급 확인
    	Assertions.assertThat(couponRepository.findById(request.getUserId()))
    		.isNotEmpty();
    }

     

    동일하게 coupon-create 토픽의 coupon-create-0 파티션의 메시지를 처리하다가 예외가 발생하여 실패하고 KafkaException이 발생한 것을 볼 수 있다.

     

     

    하지만 이번에는 2번만 재처리를 시도하고 그 후에도 실패하면 DLT에 메시지를 발행하는 것을 볼 수 있다.

     

     

    그 후 DeadLetterTopicConsumer가 동작하여 DLT에 발행된 토픽을 처리하여 coupon 테이블과 실패 이력을 보관할 failed_event 테이블에 적재되는 것을 볼 수 있다.

     

    주의할 점

    이로써 Consumer에서 쿠폰 발급 중 예외가 발생하면 DLT에 처리에 실패한 메시지를 적재하고, 이후 쿠폰 재발급을 하도록 예외 처리 방안을 구현하였다. 하지만 DLT로 메시지를 재처리하는 경우 순서가 보장되지 않을 수 있다. 따라서 순서가 중요한 경우 사용에 주의 해야햔다.

     

    예시에서는 Redis를 통해 발급된 쿠폰의 개수를 조회하고 있으므로([시스템 디자인] 실습으로 배우는 선착순 이벤트 시스템 (1/3) - 동시성 이슈와 Redis로 해결하기 글 참고) DLT로 재처리를 진행해도 쿠폰 발급 수량을 제한할 수 있다. 하지만 Consumer에서 쿠폰 발급 개수도 체크하고 있다면 쿠폰이 정해진 수량보다 더 많이 발급될 수 있다.

     

    또한 DLT에 보관된 메시지는 즉시 처리되지 않고 이후에 재처리될 수 있다. 따라서 실시간으로 쿠폰 발급이 중요한 경우 유의해야한다. 마지막으로 발급에 실패한 이벤트를 장기적으로 보관해야하는 경우 강의에서 소개한 바와 같이 데이터베이스처럼 영구적인 스토리지를 적용하는게 더 편리할 수 있다.

    참고한 곳

    https://aws.amazon.com/ko/what-is/dead-letter-queue/

    https://junuuu.tistory.com/795

    Comments