본문 바로가기
Spring/Cloud

[Spring Cloud] Kafka를 활용한 이벤트 기반 아키텍처(EDA) 설계 패턴

by coding_whale 2026. 5. 4.
반응형

마이크로서비스 아키텍처(MSA)에서 각 서비스는 독립적인 데이터베이스를 가진다. 이로 인해 발생하는 가장 큰 고민은 "어떻게 서비스 간 데이터 일관성을 유지할 것인가?"이다. 주문이 들어왔을 때 상품의 재고를 줄여야 하고, 사용자의 포인트를 적립해야 한다. 이를 단순히 HTTP API 호출로 해결하려 하면 서비스 간 결합도가 높아지고 시스템 전체의 가용성이 떨어진다.
오늘은 이 문제를 해결하는 만능 열쇠, Apache KafkaKafka Connect에 대해 심층적으로 정리해 본다.

 

1. Kafka Client: 애플리케이션의 목소리와 귀

Kafka Client는 Java 애플리케이션이 Kafka 브로커와 대화하기 위해 사용하는 필수 라이브러리다. 단순히 데이터를 보내고 받는 것을 넘어, 시스템 관리와 실시간 스트림 처리까지 담당하는 강력한 API들을 제공한다.

핵심 구성 요소

  • Producer (발행자): 데이터를 특정 토픽(Topic)으로 전송한다. 주문 서비스에서 "새로운 주문이 발생함"이라는 이벤트를 던지는 역할이다.
  • Consumer (구독자): 토픽에 쌓인 이벤트를 읽어 후속 작업을 처리한다. 카탈로그 서비스에서 주문 이벤트를 받아 실시간으로 재고를 차감하는 것이 대표적이다.
  • Admin API: 애플리케이션 코드 내에서 토픽을 생성하거나 설정을 조회하는 등 관리 기능을 수행한다.
  • Kafka Streams: 여러 토픽의 데이터를 실시간으로 조합, 변환, 집계하여 새로운 데이터를 만들어내는 처리에 특화되어 있다.

 

2. Kafka 인프라 준비: 서버 기동과 토픽 관리

Kafka는 단순한 메시지 큐가 아니라 '분산 커밋 로그' 시스템이다. 데이터를 파티션에 나누어 저장하고 복제(Replication)를 통해 안정성을 보장한다.

2.1 서버 기동 및 논리적 흐름

Kafka는 분산 환경 관리를 위해 Zookeeper를 사용하므로 실행 순서가 중요하다.

  1. Zookeeper 실행: 클러스터 노드 관리 및 리더 선출을 담당한다.
  2. Kafka Broker 실행: 실제 메시지가 저장되는 서버다. 하나의 클러스터에 여러 브로커를 두어 확장성을 높인다.
  3. Topic 생성: 데이터를 구분하는 단위다. 각 토픽은 여러 개의 Partition으로 나뉘어 병렬 처리가 가능해진다.

2.2 핵심 명령어 정리

  • 생성: bin/kafka-topics.sh --create --topic example-topic --bootstrap-server localhost:9092 --partitions 3 (병렬 처리를 위해 파티션 설정 가능)
  • 목록 확인: --list 옵션 사용
  • 상세 정보: --describe 옵션으로 어떤 브로커가 리더 파티션을 가지고 있는지(ISR 상태) 확인 가능

 

3. Kafka Connect: "코드 없는" 데이터 파이프라인

개발자가 직접 Producer/Consumer 코드를 짜는 것은 유연하지만, 단순 DB 동기화나 로그 수집 같은 반복 작업에는 비효율적이다. Kafka Connect는 이를 설정(Configuration)만으로 해결한다.

Kafka Connect의 핵심 특징

  • Source Connector: 외부 시스템(MySQL, S3 등)의 변경 사항을 감지해 Kafka 토픽으로 밀어 넣는다.
  • Sink Connector: Kafka 토픽의 데이터를 외부 시스템(Elasticsearch, DB 등)으로 내보낸다.
  • Converter: JSON, Avro 등 데이터 포맷 변환을 자동으로 처리한다.

 

4. 실전 가이드: 코드 기반 데이터 동기화 (Order -> Catalog)

이제 주문(Order) 서비스와 카탈로그(Catalog) 서비스가 어떻게 이벤트를 주고받으며 재고 정합성을 맞추는지 코드로 살펴보자.

4.1 Consumer 설정 및 로직 (Catalog Service)

카탈로그 서비스는 주문 토픽을 구독하여 재고를 실시간으로 업데이트해야 한다.

📌 KafkaConsumerConfig.java (역직렬화 및 그룹 설정)

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        
        // 1. 브로커 주소: Consumer가 데이터를 읽어올 브로커 정보
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        // 2. 그룹 ID: 같은 그룹 멤버끼리는 파티션 소유권을 나눠 가짐 (Scalability)
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "catalog-service-group");
        
        // 3. 오프셋 초기화: 처음 연결 시 가장 처음(earliest) 데이터부터 읽을지 결정
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        // 4. 역직렬화: Kafka 브로커의 바이트 배열 데이터를 Java String으로 복원
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

📌 KafkaConsumer.java (비즈니스 로직 처리)

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
    private final CatalogRepository repository;

    // 설정된 토픽에서 메시지가 도착하면 이 메서드가 호출됨
    @KafkaListener(topics = "example-catalog-topic")
    public void updateQty(String kafkaMessage) {
        log.info("수신된 메시지: " + kafkaMessage);

        try {
            // 1. JSON 문자열 파싱 (실무에선 전용 DTO 객체 사용 권장)
            Map<Object, Object> map = new ObjectMapper().readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
            
            String productId = (String) map.get("productId");
            Integer qty = (Integer) map.get("qty");

            // 2. 해당 상품의 최신 재고 조회
            CatalogEntity entity = repository.findByProductId(productId);
            if (entity != null) {
                // 3. 재고 차감 및 반영 (동기화 핵심)
                entity.setStock(entity.getStock() - qty);
                repository.save(entity);
                log.info("상품 [{}] 재고 차감 완료. 남은 수량: {}", productId, entity.getStock());
            }
        } catch (JsonProcessingException e) {
            log.error("데이터 변환 중 에러 발생", e);
        }
    }
}

4.2 Producer 설정 및 로직 (Order Service)

주문이 발생하면 컨트롤러는 이벤트를 발행하여 전파한다.

📌 OrderProducerService.java (이벤트 전송 전담)

@Service
@Slf4j
@RequiredArgsConstructor
public class OrderProducerService {
    // 스프링 부트가 자동 설정해주는 Kafka 송신 템플릿
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ObjectMapper objectMapper;

    public void send(String topic, OrderDto orderDto) {
        try {
            // 1. 주문 데이터를 JSON 문자열로 직렬화
            String payload = objectMapper.writeValueAsString(orderDto);
            
            // 2. 특정 토픽으로 메시지 전송 (비동기 처리)
            kafkaTemplate.send(topic, payload);
            
            log.info("이벤트 발행 성공: " + orderDto);
        } catch (JsonProcessingException e) {
            log.error("직렬화 에러", e);
            throw new RuntimeException("이벤트 전송 실패");
        }
    }
}

 

5. 아키텍처의 진화: Kafka Connect를 이용한 단일 DB 동기화

가장 고도화된 패턴은 서비스가 DB 적재 책임을 버리는 것이다. 주문 서비스(Order Service)가 오직 Kafka로 이벤트만 발행하고, 이를 Kafka Sink Connector가 받아서 DB에 대신 저장하는 방식이다.

[진화된 아키텍처 흐름]

  1. Order Service: 주문 API 요청을 받으면 비즈니스 검증 후 즉시 Kafka Topic으로 이벤트를 던진다. (자체 DB에 안 쓸 수도 있음)
  2. Kafka Sink Connector: 토픽을 리스닝하다가 데이터가 들어오면 최종 통합 DB에 Insert를 수행한다.

이 아키텍처가 강력한 이유

  • 관심사 분리: 서비스는 로직에만 집중하고, 데이터 영속성 처리는 인프라(Connect)에 맡긴다.
  • 장애 내성: DB가 일시적으로 다운되어도 Kafka 토픽에는 데이터가 쌓여 있다. DB 복구 시 커넥터가 쌓인 데이터를 한 번에 밀어 넣으므로 데이터 유실이 전혀 없다.
  • 다중 저장소 지원: 동일한 주문 이벤트를 S3(백업용), Elasticsearch(검색용) 등에 각각의 커넥터로 동시에 쏠 수 있다.

 

6. 마무리하며

Kafka는 단순한 메시지 브로커를 넘어 분산 시스템의 **'중추 신경계'**와 같다. 오늘 정리한 Producer/Consumer 패턴과 Kafka Connect의 조화는 MSA 환경에서 발생하는 복잡한 데이터 정합성 문제를 해결하는 핵심 전략이다. 

🏷️ 관련 태그 #ApacheKafka #SpringCloud #MSA #KafkaConnect #MessageBroker #SpringKafka #데이터동기화 #이벤트기반아키텍처 #백엔드개발 #KafkaConsumer

반응형