Propertiesprops=newProperties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG,"order-service");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 수동 커밋KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);consumer.subscribe(List.of("order-events"));while(true){ConsumerRecords<String,String>records=consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String>record:records){processOrder(record.value());}// 처리 완료 후 오프셋 커밋consumer.commitSync();}
2.3 Commit 방식
방식
설정
특징
Auto Commit
enable.auto.commit=true
주기적 자동 커밋, 중복/유실 가능
Sync Commit
commitSync()
커밋 완료까지 블로킹, 안전
Async Commit
commitAsync()
논블로킹, 실패 시 재시도 어려움
1
2
3
4
5
6
7
8
9
// 레코드 단위 커밋 (가장 정밀)for(ConsumerRecord<String,String>record:records){processOrder(record.value());consumer.commitSync(Map.of(newTopicPartition(record.topic(),record.partition()),newOffsetAndMetadata(record.offset()+1)));}
3. 메시지 보장 수준 비교
3.1 At-least-once, At-most-once, Exactly-once
보장 수준
설명
중복
유실
설정
At-most-once
최대 한 번 전달
✗
✓
처리 전 커밋
At-least-once
최소 한 번 전달
✓
✗
처리 후 커밋
Exactly-once
정확히 한 번 전달
✗
✗
트랜잭션 + 멱등성
3.2 At-most-once 구현
1
2
3
4
5
6
// 먼저 커밋 → 처리 중 실패하면 메시지 유실ConsumerRecords<String,String>records=consumer.poll(Duration.ofMillis(1000));consumer.commitSync();// 먼저 커밋!for(ConsumerRecord<String,String>record:records){processOrder(record.value());// 여기서 실패하면 유실}
3.3 At-least-once 구현
1
2
3
4
5
6
7
// 처리 후 커밋 → 커밋 전 실패하면 재처리(중복)ConsumerRecords<String,String>records=consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String>record:records){processOrder(record.value());// 먼저 처리!}consumer.commitSync();// 처리 완료 후 커밋// 여기서 실패하면 → 다음 poll에서 같은 메시지 다시 수신
멱등성(Idempotency) 으로 중복 처리 방어:
1
2
3
4
5
6
7
8
9
publicvoidprocessOrder(OrderEventevent){// 이미 처리한 이벤트인지 확인if(processedEventRepository.existsById(event.getEventId())){log.info("이미 처리된 이벤트: {}",event.getEventId());return;}orderService.createOrder(event);processedEventRepository.save(newProcessedEvent(event.getEventId()));}
3.4 Exactly-once (Kafka Transactions)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Producer: 트랜잭션 설정props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"order-tx-1");KafkaProducer<String,String>producer=newKafkaProducer<>(props);producer.initTransactions();try{producer.beginTransaction();producer.send(newProducerRecord<>("order-events",key,value));producer.send(newProducerRecord<>("payment-events",key,paymentValue));producer.commitTransaction();// 두 메시지 모두 성공 또는 모두 실패}catch(Exceptione){producer.abortTransaction();throwe;}
@Service@RequiredArgsConstructorpublicclassOrderEventProducer{privatefinalKafkaTemplate<String,OrderEvent>kafkaTemplate;publicvoidpublishOrderCreated(Orderorder){OrderEventevent=newOrderEvent(UUID.randomUUID().toString(),order.getId(),"ORDER_CREATED",order);CompletableFuture<SendResult<String,OrderEvent>>future=kafkaTemplate.send("order-events",order.getId(),event);future.whenComplete((result,ex)->{if(ex!=null){log.error("메시지 전송 실패: {}",ex.getMessage());}else{log.info("메시지 전송 성공: topic={}, partition={}, offset={}",result.getRecordMetadata().topic(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());}});}}
@Service@RequiredArgsConstructorpublicclassOrderEventConsumer{privatefinalOrderServiceorderService;@KafkaListener(topics="order-events",groupId="order-service",containerFactory="kafkaListenerContainerFactory")publicvoidhandleOrderEvent(@PayloadOrderEventevent,@Header(KafkaHeaders.RECEIVED_PARTITION)intpartition,@Header(KafkaHeaders.OFFSET)longoffset,Acknowledgmentack){log.info("수신: event={}, partition={}, offset={}",event.getType(),partition,offset);try{switch(event.getType()){case"ORDER_CREATED"->orderService.handleCreated(event);case"ORDER_PAID"->orderService.handlePaid(event);case"ORDER_CANCELLED"->orderService.handleCancelled(event);default->log.warn("알 수 없는 이벤트: {}",event.getType());}ack.acknowledge();// 수동 커밋}catch(Exceptione){log.error("이벤트 처리 실패: {}",e.getMessage());// ack 하지 않음 → 재시도 또는 DLQ로 이동}}}
@ConfigurationpublicclassKafkaConfig{@AutowiredprivateKafkaTemplate<String,OrderEvent>kafkaTemplate;@BeanpublicConcurrentKafkaListenerContainerFactory<String,OrderEvent>kafkaListenerContainerFactory(ConsumerFactory<String,OrderEvent>consumerFactory){ConcurrentKafkaListenerContainerFactory<String,OrderEvent>factory=newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.setConcurrency(3);// 3개 스레드로 병렬 소비factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);// 에러 핸들러 + DLQfactory.setCommonErrorHandler(newDefaultErrorHandler(newDeadLetterPublishingRecoverer(kafkaTemplate),newFixedBackOff(1000L,3)// 1초 간격, 3번 재시도));returnfactory;}}
5. Consumer Group Rebalancing
5.1 Rebalancing이 발생하는 시점
컨슈머가 그룹에 새로 참가하거나 이탈할 때
컨슈머가 heartbeat 타임아웃 (세션 만료)
토픽의 파티션 수가 변경될 때
1
2
3
4
5
6
7
8
9
10
Before Rebalancing:
Consumer A ← P0, P1
Consumer B ← P2
Consumer C 참가 → Rebalancing 발생
After Rebalancing:
Consumer A ← P0
Consumer B ← P1
Consumer C ← P2
spring:kafka:consumer:properties:# Heartbeat 간격 (기본 3초)heartbeat.interval.ms:3000# 세션 타임아웃 (기본 45초)session.timeout.ms:45000# poll 최대 간격 — 이 시간 내 poll() 호출 필수max.poll.interval.ms:300000# 한 번에 가져오는 레코드 수max.poll.records:500
Tip: max.poll.records를 줄이면 처리 시간이 max.poll.interval.ms를 넘기는 것을 방지할 수 있다.
6. 실전 패턴: Dead Letter Queue & Retry 전략
6.1 Dead Letter Queue (DLQ)
처리에 반복적으로 실패한 메시지를 별도 토픽으로 이동시켜 메인 처리 흐름을 보호한다.
1
2
3
4
order-events → Consumer (처리 시도)
├── 성공 → ack
└── 3회 실패 → order-events.DLT (Dead Letter Topic)
└── DLQ 모니터링 & 수동 처리
@ConfigurationpublicclassKafkaConfig{@BeanpublicDefaultErrorHandlererrorHandler(KafkaTemplate<String,Object>template){// DLQ로 보내는 RecovererDeadLetterPublishingRecovererrecoverer=newDeadLetterPublishingRecoverer(template,(record,ex)->newTopicPartition(record.topic()+".DLT",record.partition()));// 1초, 2초, 4초 간격으로 3번 재시도 후 DLQExponentialBackOffbackOff=newExponentialBackOff(1000L,2.0);backOff.setMaxElapsedTime(10000L);DefaultErrorHandlerhandler=newDefaultErrorHandler(recoverer,backOff);// 특정 예외는 재시도 없이 바로 DLQ로handler.addNotRetryableExceptions(InvalidMessageException.class,DeserializationException.class);returnhandler;}}
6.2 DLQ 메시지 모니터링 & 재처리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@KafkaListener(topics="order-events.DLT",groupId="dlq-handler")publicvoidhandleDlq(ConsumerRecord<String,OrderEvent>record,@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE)StringerrorMessage,@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC)StringoriginalTopic){log.error("DLQ 수신: key={}, error={}, originalTopic={}",record.key(),errorMessage,originalTopic);// 알림 발송 (Slack, PagerDuty 등)alertService.sendDlqAlert(record,errorMessage);// 필요 시 수동 재처리 큐에 저장dlqRepository.save(newDlqRecord(record,errorMessage,originalTopic));}
상태를 직접 저장하는 대신 상태 변경 이벤트를 순차적으로 저장하고, 이벤트를 재생하여 현재 상태를 도출한다.
1
2
3
4
5
6
7
주문 #123의 이벤트 로그:
1. OrderCreated {items: [...], total: 50000}
2. PaymentCompleted {paymentId: "pay-1"}
3. ItemAdded {item: "keyboard", amount: 30000}
4. OrderShipped {trackingNo: "T-456"}
현재 상태 = 이벤트 1~4를 순차 적용한 결과
1
2
3
4
5
6
7
8
9
10
11
// Kafka를 이벤트 저장소로 활용@ServicepublicclassOrderEventStore{privatefinalKafkaTemplate<String,OrderEvent>kafkaTemplate;publicvoidappend(StringorderId,OrderEventevent){// 주문 ID를 키로 → 같은 파티션에 순서대로 저장kafkaTemplate.send("order-event-store",orderId,event);}}
// Command: 주문 생성@ServicepublicclassOrderCommandService{publicvoidcreateOrder(CreateOrderCommandcommand){Orderorder=Order.create(command);orderRepository.save(order);// 이벤트 발행kafkaTemplate.send("order-events",order.getId(),newOrderCreatedEvent(order));}}// Query: 읽기 모델 업데이트@ServicepublicclassOrderQueryProjector{@KafkaListener(topics="order-events",groupId="order-query")publicvoidproject(OrderEventevent){switch(event){caseOrderCreatedEvente->{OrderViewview=newOrderView(e.getOrderId(),e.getCustomerName(),e.getItems(),e.getTotal(),"CREATED");orderViewRepository.save(view);// Elasticsearch, MongoDB 등}caseOrderShippedEvente->{orderViewRepository.updateStatus(e.getOrderId(),"SHIPPED",e.getTrackingNo());}// ...}}}// Query: 읽기 전용 API@RestControllerpublicclassOrderQueryController{@GetMapping("/orders/{orderId}")publicOrderViewgetOrder(@PathVariableStringorderId){returnorderViewRepository.findById(orderId).orElseThrow();}@GetMapping("/orders/search")publicList<OrderView>searchOrders(@RequestParamStringkeyword){returnorderViewRepository.searchByKeyword(keyword);}}
7.3 조합의 장단점
장점
단점
이벤트 이력 완전 보존
시스템 복잡도 증가
읽기/쓰기 독립 확장
최종 일관성(Eventual Consistency)
다양한 읽기 모델 구축 가능
이벤트 스키마 버전 관리 필요
감사 로그 자동 확보
이벤트 재생 시간
정리
개념
핵심
Topic / Partition
논리적 채널과 물리적 분산 단위
Consumer Group
파티션 분담으로 병렬 처리
Offset Commit
메시지 처리 위치 관리, 보장 수준 결정
acks
Producer의 안정성-성능 트레이드오프
At-least-once + 멱등성
실전에서 가장 많이 사용되는 보장 전략
@KafkaListener
Spring Boot의 선언적 메시지 소비
Dead Letter Queue
실패 메시지 격리로 안정성 확보
Event Sourcing + CQRS
Kafka를 중심으로 한 이벤트 기반 아키텍처
Kafka는 단순한 메시지 전달이 아니라, 이벤트 기반 시스템의 근간이다. 메시지 보장 수준과 장애 처리 전략을 올바르게 설계하는 것이 안정적인 시스템의 핵심이다. MSA 환경에서 Kafka는 Saga 패턴의 이벤트 채널로, Redis 캐시 무효화의 CDC 파이프라인으로 활용된다. Kafka와 함께 자주 논의되는 시스템 안정성 주제로 API Rate Limiting도 참고하자.