Propertiesprops=newProperties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG,"order-service");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 수동 커밋KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);consumer.subscribe(List.of("order-events"));while(true){ConsumerRecords<String,String>records=consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String>record:records){processOrder(record.value());}// 처리 완료 후 오프셋 커밋consumer.commitSync();}
2.3 Commit 방식
방식
설정
특징
Auto Commit
enable.auto.commit=true
주기적 자동 커밋, 중복/유실 가능
Sync Commit
commitSync()
커밋 완료까지 블로킹, 안전
Async Commit
commitAsync()
논블로킹, 실패 시 재시도 어려움
1
2
3
4
5
6
7
8
9
// 레코드 단위 커밋 (가장 정밀)for(ConsumerRecord<String,String>record:records){processOrder(record.value());consumer.commitSync(Map.of(newTopicPartition(record.topic(),record.partition()),newOffsetAndMetadata(record.offset()+1)));}
3. 메시지 보장 수준 비교
3.1 At-least-once, At-most-once, Exactly-once
보장 수준
설명
중복
유실
설정
At-most-once
최대 한 번 전달
✗
✓
처리 전 커밋
At-least-once
최소 한 번 전달
✓
✗
처리 후 커밋
Exactly-once
정확히 한 번 전달
✗
✗
트랜잭션 + 멱등성
3.2 At-most-once 구현
1
2
3
4
5
6
// 먼저 커밋 → 처리 중 실패하면 메시지 유실ConsumerRecords<String,String>records=consumer.poll(Duration.ofMillis(1000));consumer.commitSync();// 먼저 커밋!for(ConsumerRecord<String,String>record:records){processOrder(record.value());// 여기서 실패하면 유실}
3.3 At-least-once 구현
1
2
3
4
5
6
7
// 처리 후 커밋 → 커밋 전 실패하면 재처리(중복)ConsumerRecords<String,String>records=consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String>record:records){processOrder(record.value());// 먼저 처리!}consumer.commitSync();// 처리 완료 후 커밋// 여기서 실패하면 → 다음 poll에서 같은 메시지 다시 수신
멱등성(Idempotency) 으로 중복 처리 방어:
1
2
3
4
5
6
7
8
9
publicvoidprocessOrder(OrderEventevent){// 이미 처리한 이벤트인지 확인if(processedEventRepository.existsById(event.getEventId())){log.info("이미 처리된 이벤트: {}",event.getEventId());return;}orderService.createOrder(event);processedEventRepository.save(newProcessedEvent(event.getEventId()));}
3.4 Exactly-once (Kafka Transactions)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Producer: 트랜잭션 설정props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"order-tx-1");KafkaProducer<String,String>producer=newKafkaProducer<>(props);producer.initTransactions();try{producer.beginTransaction();producer.send(newProducerRecord<>("order-events",key,value));producer.send(newProducerRecord<>("payment-events",key,paymentValue));producer.commitTransaction();// 두 메시지 모두 성공 또는 모두 실패}catch(Exceptione){producer.abortTransaction();throwe;}
@Service@RequiredArgsConstructorpublicclassOrderEventProducer{privatefinalKafkaTemplate<String,OrderEvent>kafkaTemplate;publicvoidpublishOrderCreated(Orderorder){OrderEventevent=newOrderEvent(UUID.randomUUID().toString(),order.getId(),"ORDER_CREATED",order);CompletableFuture<SendResult<String,OrderEvent>>future=kafkaTemplate.send("order-events",order.getId(),event);future.whenComplete((result,ex)->{if(ex!=null){log.error("메시지 전송 실패: {}",ex.getMessage());}else{log.info("메시지 전송 성공: topic={}, partition={}, offset={}",result.getRecordMetadata().topic(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());}});}}
@Service@RequiredArgsConstructorpublicclassOrderEventConsumer{privatefinalOrderServiceorderService;@KafkaListener(topics="order-events",groupId="order-service",containerFactory="kafkaListenerContainerFactory")publicvoidhandleOrderEvent(@PayloadOrderEventevent,@Header(KafkaHeaders.RECEIVED_PARTITION)intpartition,@Header(KafkaHeaders.OFFSET)longoffset,Acknowledgmentack){log.info("수신: event={}, partition={}, offset={}",event.getType(),partition,offset);try{switch(event.getType()){case"ORDER_CREATED"->orderService.handleCreated(event);case"ORDER_PAID"->orderService.handlePaid(event);case"ORDER_CANCELLED"->orderService.handleCancelled(event);default->log.warn("알 수 없는 이벤트: {}",event.getType());}ack.acknowledge();// 수동 커밋}catch(Exceptione){log.error("이벤트 처리 실패: {}",e.getMessage());// ack 하지 않음 → 재시도 또는 DLQ로 이동}}}
@ConfigurationpublicclassKafkaConfig{@AutowiredprivateKafkaTemplate<String,OrderEvent>kafkaTemplate;@BeanpublicConcurrentKafkaListenerContainerFactory<String,OrderEvent>kafkaListenerContainerFactory(ConsumerFactory<String,OrderEvent>consumerFactory){ConcurrentKafkaListenerContainerFactory<String,OrderEvent>factory=newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.setConcurrency(3);// 3개 스레드로 병렬 소비factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);// 에러 핸들러 + DLQfactory.setCommonErrorHandler(newDefaultErrorHandler(newDeadLetterPublishingRecoverer(kafkaTemplate),newFixedBackOff(1000L,3)// 1초 간격, 3번 재시도));returnfactory;}}
5. Consumer Group Rebalancing
5.1 Rebalancing이 발생하는 시점
컨슈머가 그룹에 새로 참가하거나 이탈할 때
컨슈머가 heartbeat 타임아웃 (세션 만료)
토픽의 파티션 수가 변경될 때
1
2
3
4
5
6
7
8
9
10
Before Rebalancing:
Consumer A ← P0, P1
Consumer B ← P2
Consumer C 참가 → Rebalancing 발생
After Rebalancing:
Consumer A ← P0
Consumer B ← P1
Consumer C ← P2
spring:kafka:consumer:properties:# Heartbeat 간격 (기본 3초)heartbeat.interval.ms:3000# 세션 타임아웃 (기본 45초)session.timeout.ms:45000# poll 최대 간격 — 이 시간 내 poll() 호출 필수max.poll.interval.ms:300000# 한 번에 가져오는 레코드 수max.poll.records:500
Tip: max.poll.records를 줄이면 처리 시간이 max.poll.interval.ms를 넘기는 것을 방지할 수 있다.
6. 실전 패턴: Dead Letter Queue & Retry 전략
6.1 Dead Letter Queue (DLQ)
처리에 반복적으로 실패한 메시지를 별도 토픽으로 이동시켜 메인 처리 흐름을 보호한다.
1
2
3
4
order-events → Consumer (처리 시도)
├── 성공 → ack
└── 3회 실패 → order-events.DLT (Dead Letter Topic)
└── DLQ 모니터링 & 수동 처리
@ConfigurationpublicclassKafkaConfig{@BeanpublicDefaultErrorHandlererrorHandler(KafkaTemplate<String,Object>template){// DLQ로 보내는 RecovererDeadLetterPublishingRecovererrecoverer=newDeadLetterPublishingRecoverer(template,(record,ex)->newTopicPartition(record.topic()+".DLT",record.partition()));// 1초, 2초, 4초 간격으로 3번 재시도 후 DLQExponentialBackOffbackOff=newExponentialBackOff(1000L,2.0);backOff.setMaxElapsedTime(10000L);DefaultErrorHandlerhandler=newDefaultErrorHandler(recoverer,backOff);// 특정 예외는 재시도 없이 바로 DLQ로handler.addNotRetryableExceptions(InvalidMessageException.class,DeserializationException.class);returnhandler;}}
6.2 DLQ 메시지 모니터링 & 재처리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@KafkaListener(topics="order-events.DLT",groupId="dlq-handler")publicvoidhandleDlq(ConsumerRecord<String,OrderEvent>record,@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE)StringerrorMessage,@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC)StringoriginalTopic){log.error("DLQ 수신: key={}, error={}, originalTopic={}",record.key(),errorMessage,originalTopic);// 알림 발송 (Slack, PagerDuty 등)alertService.sendDlqAlert(record,errorMessage);// 필요 시 수동 재처리 큐에 저장dlqRepository.save(newDlqRecord(record,errorMessage,originalTopic));}
상태를 직접 저장하는 대신 상태 변경 이벤트를 순차적으로 저장하고, 이벤트를 재생하여 현재 상태를 도출한다.
1
2
3
4
5
6
7
주문 #123의 이벤트 로그:
1. OrderCreated {items: [...], total: 50000}
2. PaymentCompleted {paymentId: "pay-1"}
3. ItemAdded {item: "keyboard", amount: 30000}
4. OrderShipped {trackingNo: "T-456"}
현재 상태 = 이벤트 1~4를 순차 적용한 결과
1
2
3
4
5
6
7
8
9
10
11
// Kafka를 이벤트 저장소로 활용@ServicepublicclassOrderEventStore{privatefinalKafkaTemplate<String,OrderEvent>kafkaTemplate;publicvoidappend(StringorderId,OrderEventevent){// 주문 ID를 키로 → 같은 파티션에 순서대로 저장kafkaTemplate.send("order-event-store",orderId,event);}}