Kafka를 활용한 MSA 이벤트 기반 아키텍처 구축

Kafka를 활용하여 마이크로서비스 아키텍처(MSA) 기반의 회원 가입 및 이메일 발송 시스템을 구축하는 방법을 알아보자. Kafka의 메시지 브로커 역할을 통해 서비스 간 비동기 통신을 구현하며, 이벤트 기반 아키텍처의 핵심 개념을 실습한다

프로젝트 목표

  • MSA 구조에서 Kafka의 실제 활용 방법 이해
  • 이벤트 기반 아키텍처 패턴 실습
  • Producer-Consumer 모델 구현
  • 장애 처리 및 재시도 메커니즘 구현

시스템 아키텍처

전체 구조

[사용자] → [User Service] → [Kafka Cluster] → [Email Service]
              ↓                                      ↓
           [User DB]                             [Email DB]

서비스 구성

서비스역할데이터베이스포트
User Service회원 가입 처리 및 Kafka 메시지 발생(Producer)User DB (H2)8080
Email Service이메일 발송 처리 및 Kafka 메시지 소비(Consumer)Email DB (H2)8081

구현 기능

User Service
  • 회원 가입 API 제공
  • 사용자 정보를 User DB에 저장
  • 회원 가입 완료 이벤트를 Kafka에 발행
Email Service
  • Kafka로부터 회원 가입 이벤트 수신
  • 환영 이메일 발송(시뮬레이션)
  • 이메일 발송 로그를 Email DB에 저장

User Service 구현

프로젝트 초기 설정 – 필수 의존성

  • Spring Boot DevTools
  • Spring Web
  • Spring for Apache Kafka
  • H2 Database
  • Spring Data JPA

애플리케이션 설정

server:
  port: 8080

spring:
  kafka:
    bootstrap-servers:
      - {KAFKA_SERVER_IP}:9092
      - {KAFKA_SERVER_IP}:19092
      - {KAFKA_SERVER_IP}:29092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        partitioner.class: org.apache.kafka.clients.producer.RoundRobinPartitioner
  h2:
    console:
      enabled: true
  datasource:
    url: jdbc:h2:mem:userDB
    driver-class-name: org.h2.Driver
    username: sa
    password:
주요 설정 항목
  • bootstrap-servers: Kafka 클러스터의 브로커 주소 목록 (고가용성 확보)
  • key-serializer / value-serializer: 메시지를 문자열로 직렬화
  • partitioner.class: RoundRobinPartitioner를 사용하여 메시지를 파티션에 균등 분배

도메인 모델 구현

User Entity
@Entity
@Table(name = "users")
public class User {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String email;
    private String name;
    private String password;

    public User() {
    }

    public User(String email, String name, String password) {
        this.email = email;
        this.name = name;
        this.password = password;
    }

    // Getters
    public Long getId() {
        return id;
    }

    public String getEmail() {
        return email;
    }

    public String getName() {
        return name;
    }

    public String getPassword() {
        return password;
    }
}
User Repository
@Repository
public interface UserRepository extends JpaRepository<User, Long> {
}

이벤트 모델 정의

UserSignedUpEvent
public record UserSignedUpEvent(
    Long userId,
    String email,
    String name
) {
}

설계 원칙: 패스워드와 같은 민감 정보나 다른 서비스에서 불필요한 데이터는 이벤트에 포함하지 않는다. 이는 보안과 네트워크 효율성을 고려한 설계이다

컨트롤러 구현

SignUpRequestDto
public record SignUpRequestDto(
    String email,
    String name,
    String password
) {
}
UserController
@RestController
@RequestMapping("/api/users")
public class UserController {
    private final UserService userService;

    public UserController(UserService userService) {
        this.userService = userService;
    }

    @PostMapping
    public ResponseEntity<String> signUp(
            @RequestBody SignUpRequestDto signUpRequestDto
    ) {
        userService.signUp(signUpRequestDto);
        return ResponseEntity.ok("회원가입 성공");
    }
}

비즈니스 로직 및 Kafka Producer 구현

UserService
@Service
public class UserService {
    private final UserRepository userRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    public UserService(UserRepository userRepository, KafkaTemplate<String, String> kafkaTemplate) {
        this.userRepository = userRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    public void signUp(SignUpRequestDto signUpRequestDto) {
        // 1. 사용자 정보 저장
        User user = new User(
                signUpRequestDto.email(),
                signUpRequestDto.name(),
                signUpRequestDto.password()
        );
        User savedUser = userRepository.save(user);

        // 2. 이벤트 객체 생성
        UserSignedUpEvent userSignedUpEvent = new UserSignedUpEvent(
                savedUser.getId(),
                savedUser.getEmail(),
                savedUser.getName()
        );

        // 3. Kafka에 이벤트 발행
        this.kafkaTemplate.send("user.signed-up", toJsonString(userSignedUpEvent));
    }

    private String toJsonString(Object object) {
        ObjectMapper mapper = new ObjectMapper();
        try {
            return mapper.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("JSON 직렬화 실패", e);
        }
    }
}
핵심 로직
  • 사용자 정보를 데이터베이스에 저장한다
  • 저장된 사용자 정보로 이벤트 객체를 생성한다
  • 이벤트를 JSON 문자열로 변환하여 Kafka 토픽에 발행한다

Email Service 구현

프로젝트 초기 설정

User Service와 동일한 의존성을 사용한다

애플리케이션 설정

server:
  port: 8081  # User Service와 포트 충돌 방지

spring:
  kafka:
    bootstrap-servers:
      - {KAFKA_SERVER_IP}:9092
      - {KAFKA_SERVER_IP}:19092
      - {KAFKA_SERVER_IP}:29092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
  h2:
    console:
      enabled: true
  datasource:
    url: jdbc:h2:mem:emailDB
    driver-class-name: org.h2.Driver
    username: sa
    password:
주요 설정 항목
  • key-serializer / value-serializer: 수신한 메시지를 문자열로 역직렬화
  • auto-offset-reset: earliest: 컨슈머 그룹이 처음 시작할 때 가장 오래된 메시지부터 처리

이벤트 모델 정의

UserSignedUpEvent
public class UserSignedUpEvent {
    private Long userId;
    private String email;
    private String name;

    // 역직렬화를 위한 기본 생성자 필수
    public UserSignedUpEvent() {
    }

    public UserSignedUpEvent(Long userId, String email, String name) {
        this.userId = userId;
        this.email = email;
        this.name = name;
    }

    // JSON 문자열을 객체로 변환하는 정적 팩토리 메서드
    public static UserSignedUpEvent fromJson(String json) {
        try {
            ObjectMapper mapper = new ObjectMapper();
            return mapper.readValue(json, UserSignedUpEvent.class);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("JSON 파싱 실패", e);
        }
    }

    // Getters
    public Long getUserId() {
        return userId;
    }

    public String getEmail() {
        return email;
    }

    public String getName() {
        return name;
    }
}
  • 설계 참고: Record 타입은 기본 생성자를 제공하지 않으므로 역직렬화가 필요한 경우 일반 클래스를 사용한다

도메인 모델 구현

EmailLog Entity
@Entity
@Table(name = "email_logs")
public class EmailLog {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private Long receiverUserId;  // 수신자 사용자 ID
    private String receiverEmail; // 수신자 이메일
    private String subject;       // 이메일 제목

    public EmailLog() {
    }

    public EmailLog(Long receiverUserId, String receiverEmail, String subject) {
        this.receiverUserId = receiverUserId;
        this.receiverEmail = receiverEmail;
        this.subject = subject;
    }

    // Getters
    public Long getId() {
        return id;
    }

    public Long getReceiverUserId() {
        return receiverUserId;
    }

    public String getReceiverEmail() {
        return receiverEmail;
    }

    public String getSubject() {
        return subject;
    }
}
EmailLogRepository
@Repository
public interface EmailLogRepository extends JpaRepository<EmailLog, Long> {
}

Kafka Consumer 구현

UserSignupEventConsumer
@Service
public class UserSignupEventConsumer {
    private final EmailLogRepository emailLogRepository;

    public UserSignupEventConsumer(EmailLogRepository emailLogRepository) {
        this.emailLogRepository = emailLogRepository;
    }

    @KafkaListener(
            topics = "user.signed-up",
            groupId = "email-service",
            concurrency = "3"  // 3개의 스레드로 병렬 처리
    )
    @RetryableTopic(
            attempts = "5",  // 최대 5번 재시도
            backoff = @Backoff(delay = 1000, multiplier = 2),  // 지수 백오프: 1초, 2초, 4초, 8초, 16초
            dltTopicSuffix = ".dlt"  // Dead Letter Topic 접미사
    )
    public void consume(String message) throws InterruptedException {
        // 1. JSON 메시지를 객체로 변환
        UserSignedUpEvent userSignedUpEvent = UserSignedUpEvent.fromJson(message);

        // 2. 이메일 발송 (실제 구현은 생략, 시뮬레이션)
        String receiverEmail = userSignedUpEvent.getEmail();
        String subject = userSignedUpEvent.getName() + "님, 회원 가입을 축하드립니다!";
        
        Thread.sleep(3000);  // 이메일 발송 시간 시뮬레이션 (3초)
        System.out.println("이메일 발송 완료");

        // 3. 이메일 발송 로그 저장
        EmailLog emailLog = new EmailLog(
                userSignedUpEvent.getUserId(),
                receiverEmail,
                subject
        );
        emailLogRepository.save(emailLog);
    }
}
주요 기능
  • @KafkaListener: Kafka 토픽으로부터 메시지를 수신
  • concurrency = “3”: 멀티쓰레드를 활용한 병렬 처리로 처리량 향상
  • @RetryableTopic: 메시지 처리 실패 시 자동 재시도
    • 지수 백오프 전략: 재시도 간격을 점진적으로 증가
    • 최종 실패 시 DLT(Dead Letter Topic)로 전송

Dead Letter Topic(DLT) 처리

UserSignedUpEventDltConsumer
@Service
public class UserSignedUpEventDltConsumer {

    @KafkaListener(
            topics = "user.signed-up.dlt",
            groupId = "email-service"
    )
    public void consume(String message) {
        // 로그 시스템에 전송
        System.out.println("로그 시스템에 전송: " + message);

        // 알림 발송 (Slack, Discord, Telegram 등)
        System.out.println("알림 발송: " + message);
    }
}
DLT 처리 전략
  • 모든 재시도가 실패한 메시지를 별도 토픽에 저장
  • 운영팀에게 알림을 전송하여 수동 개입 유도
  • 실무에서를 로그수집 시스템 (ELK Stack 등)과 연동하여 모니터링

Kafka 클러스터 설정

Kafka 서버 실행

백그라운드에서 3대의 Kafka 서버를 실행한다

bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh -daemon config/server2.properties
bin/kafka-server-start.sh -daemon config/server3.properties

실행 상태 확인

lsof -i:9092
lsof -i:19092
lsof -i:29092

기존 토픽 정리

테스트를 위해 기존 토픽을 삭제한다

# 토픽 목록 조회
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

# 토픽 삭제
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic email.send

토픽 생성

메인 토픽 생성 (user.signed-up)
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
  --create \
  --topic user.signed-up \
  --partitions 3 \
  --replication-factor 3
설정 근거
  • partitions 3 – 병렬 처리량 향상 (Consumer의 concurrency)
  • replication-factor: 3 – 고가용성 확보 (데이터 손실 방지)
토픽 상세 정보 확인
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe \
  --topic user.signed-up
예상 출력
Topic: user.signed-up   TopicId: tJ28bBqYSoiEh0GcmHmjVg
PartitionCount: 3       ReplicationFactor: 3
Configs: segment.bytes=1073741824

Topic: user.signed-up   Partition: 0    Leader: 3
Replicas: 3,1,2         Isr: 3,1,2

Topic: user.signed-up   Partition: 1    Leader: 1
Replicas: 1,2,3         Isr: 1,2,3

Topic: user.signed-up   Partition: 2    Leader: 2
Replicas: 2,3,1         Isr: 2,3,1
DLT 토픽 생성 (user.signed-up.dlt)
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
  --create \
  --topic user.signed-up.dlt \
  --partitions 1 \
  --replication-factor 3
설정 근거
  • partitions 1 – DLT 메시지는 빠른 처리가 불필요하므로 단일 파티션 사용
  • replication-factor: 3 – 장애 메시지 손실 방지를 위한 복제 유지

통합 테스트

서버 실행

  • Kafka 클러스터 (이미 실행 중)
  • User Service (포트 8080)
  • Email Service (포트 8081)

회원 가입 API 호출

POST http://localhost:8080/api/users
Content-Type: application/json

{
  "email": "sender@test.com",
  "name": "김이름",
  "password": "1234"
}
응답
회원가입 성공
Email Service 콘솔 출력
이메일 발송 완료

데이터 검증

User DB 확인 (localhost:8080/h2-console)
SELECT * FROM users;
결과
id | email             | name   | password
1  | sender@test.com   | 김이름 | 1234
Email DB 확인 (localhost:8081/h2-console)
id | receiver_user_id | receiver_email    | subject
1  | 1                | sender@test.com   | 김이름님, 회원 가입을 축하드립니다!
결과
id | receiver_user_id | receiver_email    | subject
1  | 1                | sender@test.com   | 김이름님, 회원 가입을 축하드립니다!

검증 결과

  • User Service가 회원 정보를 User DB에 저장
  • User Service가 Kafka에 이벤트 발행
  • Email Service가 Kafka로부터 이벤트 수신
  • Email Service가 이메일 발송 처리 (시뮬레이션)
  • Email Service가 발송 로그를 Email DB에 저장

결론

  • Kafka 기본 개념: Producer, Consumer, Topic, Partition의 실제 동작 방식
  • MSA 패턴: 서비스 간 비동기 통신 구현
  • 이벤트 기반 아키텍처: 느슨한 결합과 확장성의 이점
  • 장애 처리: 재시도 메커니즘과 DLT를 통한 복구 전략

Kafka는 현대 MSA 환경에서 서비스 간 통신의 핵심 인프라로 자리잡았다. Kafka의 강력한 메시지 브로커 기능과 Spring의 편리한 추상화를 결합하면, 안정적으로 확장 가능한 분산 시스템을 효율적으로 구축할 수 있다

인프런 강의 중 ‘실전에서 바로 써먹는 Kafka 입문’ 강의 참고