Spring Boot 애플리케이션에서 Kafka를 사용하려면 의존성을 추가하고 설정 파일을 구성해야 한다
Gradle (build.gradle) implementation 'org.springframework.kafka:spring-kafka'
Spring Boot로 Kafka에 메시지 넣기 (Producer)
application.yaml 설정
Kafka 서버 연결 및 Producer/Consumer의 직렬화/역직렬화 방식을 설정한다
application.yaml (Producer)
spring:
kafka:
bootstrap-servers: {ec2-public ip}:9092 # Kafka 서버 주소 (AWS EC2 Public IP 사용)
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
- bootstrap-servers: 연결할 Kafka 브로커(서버)의 주소와 포트를 지정한다. 여러 브로커가 있을 경우 쉼표로 구분하여 나열할 수 있다.
- producer.key-serializer / producer.value-serializer: 프로듀서가 Kafka로 메시지를 보낼 때, 자바 객체를 네트워크를 통해 전송 가능한 바이트 배열로 변환(직렬화)하는 방식을 정의한다. 여기서는 키와 값 모두 String 타입으로 직렬화하도록 설정했다. 즉, 자바의 String 객체를 바이트 배열로 변환하여 Kafka에 전달한다
주의사항
Kafka 서버를 백그라운드로 실행하고 email.send 토픽을 미리 생성해 두어야 한다 (CLI를 통해 생성)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic email.send
Spring Boot에서 Kafka에 메시지를 보내는 프로듀서는 KafkaTemplate을 사용하여 구현한다. 메시지 객체는 JSON 형태로 직렬화하여 전송하는 것이 일반적이다
메시지 데이터 구조 정의
public record EmailSendMessage(
String from,
String to,
String subject,
String body
) {
}
EmailService (Producer 로직) 구현
KafkaTemplate을 주입받아 메시지를 전송하는 서비스를 구현한다
@Service
public class EmailService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper; // ObjectMapper 인스턴스 추가
// 생성자 주입
public EmailService(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = objectMapper; // ObjectMapper 주입
}
public void sendEmail(SendEmailRequestDto request) {
// Request DTO를 EmailSendMessage 레코드로 변환
EmailSendMessage emailSendMessage = new EmailSendMessage(
request.from(), request.to(), request.subject(), request.body()
);
// "email.send" 토픽으로 JSON 문자열 메시지 전송
this.kafkaTemplate.send("email.send", toJsonString(emailSendMessage));
}
// 객체를 JSON 형태의 String 타입으로 변환하는 유틸리티 메서드
private String toJsonString(Object object) {
try {
return objectMapper.writeValueAsString(object);
} catch (JsonProcessingException e) {
// 직렬화 실패 시 RuntimeException 발생
throw new RuntimeException("Json 직렬화 실패", e); // 예외 체인 추가
}
}
}
- KafkaTemplate<String, String>: Kafka에 메시지를 보내기 위한 Spring Kafka의 핵심 컴포넌트이다. 제네릭 타입을 메시지의 Key와 Value의 타입을 나타낸다
- kafkaTemplate.send(“email.send”, toJsonString(emailSendMessage)): email.send 토픽으로 emailSendMessage 객체를 JSON 문자열로 변환하여 전송한다
- ObjectMapper: Jackson 라이브러리의 ObjectMapper를 사용하여 자바 객체를 JSON 문자열로 직렬화한다. 이는 Kafka 메시지의 value가 StringSerializer에 의해 처리되도록 하기 위함이다
Producer 테스트 (API 엔트포인트)
SendEmailRequestDto와 EmailController
// SendEmailRequestDto는 EmailSendMessage와 유사하게 from, to, subject, body를 가지는 record 또는 class로 정의
public record SendEmailRequestDto(
String from,
String to,
String subject,
String body
) {
}
@RestController
@RequestMapping("/api/emails")
public class EmailController {
private final EmailService emailService;
public EmailController(EmailService emailService) {
this.emailService = emailService;
}
@PostMapping
public ResponseEntity<Void> sendEmail(@RequestBody SendEmailRequestDto request) {
emailService.sendEmail(request);
return ResponseEntity.ok().build();
}
}
Postman이나 curl 등으로 다음 API 요청을 보내면 Kafka로 메시지가 전송된다
POST http://localhost:8080/api/emails
Content-Type: application/json
{
"from": "sender@test.com",
"to": "receiver@test.com",
"subject": "제목",
"body": "내용"
}
확인: Kafka CLI consumer를 통해 email.send 토픽의 메시지를 조회하면, 전송된 JSON 형태의 메시지를 확인할 수 있다
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic email.send --from-beginning
출력: {“from”:”sender@test.com”,”to”:”receiver@test.com”,”subject”:”제목”,”body”:”내용”}
Spring Boot가 Kafka에서 메시지 조회하기 (Consumer)
Kafka 메시지를 소비하는 Consumer 애플리케이션을 구현한다. 일반적으로 Producer와 Consumer는 서로 다른 서비스(마이크로서비스)로 분리하여 배포한다
application.yaml 설정
server:
port: 0
spring:
kafka:
bootstrap-servers: 13.125.45.117:9092
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
- server.port: 8080 포트는 Producer 서버에서 이미 사용 중이므로 충돌을 피하기 위해
0으로 설정한다. 이렇게 하면 스프링 부트가 사용 가능한 임의의 포트를 자동으로 할당해 서버를 실행한다. - consumer.key-deserializer / consumer.value-deserializer: 컨슈머가 Kafka로부터 바이트 배열 형태의 메시지를 받아 자바 객체로 변환(역직렬화)하는 방식을 정의한다. 프로듀서와 동일하게 String 타입으로 역직렬화하도록 설정한다
- consumer.group-id: 컨슈머가 속할 선큐머 그룹의 기본 ID를 저장한다. 이 ID는 @KafkaListener에서 재정의할 수 있다
- consumer.auto-offset-reset: earliest: 컨슈머 그룹이 해당 토픽의 오프셋 기록을 가지고 있지 않을 떄(즉, 처음 메시지를 소비할 떄), 메시지를 어디서부터 읽을 지 결정하는 옵션이다
- earliest: 토픽이 존재하는 가장 오래된 메시지부터 읽는다 (권장)
- latest: 컨슈머가 시작된 시점 이후에 도착하는 새로운 메시지부터 읽는다 (기존 메시지 무시) 이 옵션은 컨슈머 그룹의 오프셋 기록이 존재하면 무시되고, 기록된 오프셋 다음부터 읽는다. earliest 설정을 통해 컨슈머 그룹이 처음 생성될 때 과거에 누락된 메시지 없이 모든 메시지를 처리할 수 있도록 보장한다
메시지 데이터 구조 (Consumer)
Consumer에서 JSON 문자열을 Java 객체로 역직렬화하려면, 해당 클래스에 기본 생성자(no-argument constructor)가 필수적이다. Java Record는 기본 생성자를 자동으로 제공하지 않으므로, 이 경우에는 class를 사용하는 것이 좋다
public class EmailSendMessage {
private String from;
private String to;
private String subject;
private String body;
// 기본 생성자 (필수)
public EmailSendMessage() {
}
// 모든 필드를 포함하는 생성자
public EmailSendMessage(String from, String to, String subject, String body) {
this.from = from;
this.to = to;
this.subject = subject;
this.body = body;
}
// Getter 메서드
public String getFrom() {
return from;
}
public String getTo() {
return to;
}
public String getSubject() {
return subject;
}
public String getBody() {
return body;
}
// Setter 메서드 (필요시 추가, 역직렬화 시 사용될 수 있음)
public void setFrom(String from) { this.from = from; }
public void setTo(String to) { this.to = to; }
public void setSubject(String subject) { this.subject = subject; }
public void setBody(String body) { this.body = body; }
/*
JSON 값을 EmailSendMessage 객체로 역직렬화하는 정적 팩토리 메서드
*/
public static EmailSendMessage fromJson(String json) {
try {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(json, EmailSendMessage.class);
} catch (JsonProcessingException e) {
throw new RuntimeException("Json 파싱 실패", e);
}
}
}
EmailSendConsumer (Consumer 로직) 구현
@KafkaListener 어노테이션을 사용하여 Kafka 토픽에서 메시지를 받아 처리하는 로직을 구현한다
@Service
public class EmailSendConsumer {
private final ObjectMapper objectMapper; // ObjectMapper 주입
public EmailSendConsumer(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
/*
@KafkaListener 어노테이션을 사용하여 Kafka 메시지를 구독하고 처리합니다.
- topics: 구독할 토픽 이름을 지정합니다.
- groupId: 이 리스너가 속할 컨슈머 그룹의 ID를 지정합니다.
이 그룹 ID를 통해 Kafka는 이 컨슈머가 어디까지 메시지를 읽었는지 추적합니다.
*/
@KafkaListener(
topics = "email.send",
groupId = "email-send-group" // 컨슈머 그룹 ID 지정
)
public void consume(String message) {
System.out.println("Kafka로부터 받아온 메시지: " + message);
EmailSendMessage emailSendMessage = EmailSendMessage.fromJson(message); // JSON 파싱
// 실제 이메일 발송 로직은 생략한다.
// 예를 들어, 외부 이메일 서비스 API 호출 등을 수행할 수 있다.
System.out.println("이메일 발송 완료: " + emailSendMessage.getSubject() + " to " + emailSendMessage.getTo());
}
}
- @KafkaListener: 이 메서드가 Kafka 컨슈머 역할을 수행하며, 지정된 토픽에서 메시지를 리슨하도록 한다
- topics = “email.send”: email.send 토픽의 메시지를 구독한다
- groupId = “email-send-group”: 이 컨슈머는 email-send-group 이라는 컨슈머 그룹에 속한다. 이 그룹 ID를 통해 Kafka는 이 컨슈머가 마지막으로 처리한 메시지의 오프셋을 추적하고, 중복 없이 메시지를 순차적으로 소비할 수 있도록 돕는다
테스트
Producer 서버와 Consumer 서버를 각각 실행한 후 Producer의 API를 통해 메시지를 전송한다. Consumer 서버의 로그에 메시지가 수신되어 처리되는 것을 확인할 수 있다
Kafka로부터 받아온 메시지: {"from":"sender@test.com","to":"receiver@test.com","subject":"제목","body":"내용"}
이메일 발송 완료: 제목 to receiver@test.com
또한, CLI를 통해 email-send-group의 오프셋 정보를 확인하면 CURRENT-OFFSET이 증가된 것을 볼 수 있다
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group email-send-group --describe
CURRENT-OFFSET이 처리된 메시지 수만큼 증가하여, 다음에 읽을 메시지의 오프셋을 정확히 가리키고 있을 것이다
Kafka 비동기 처리의 장점과 한계점, 그리고 보완 전략
Kafka와 같은 메시지 큐를 활용한 비동기 처리는 시스템 아키텍처에 많은 이점을 가져다주지만, 동시에 고려해야 할 한계점도 존재한다
장점
- 빠른 응답 속도: 프로듀서는 메시지를 Kafka에 전달하자마자 사용자에게 응답을 반환할 수 있으므로, 최종 작업이 완료될 때까지 기다릴 필요가 없어 사용자 경험이 향상된다 (예: 이메일 발송처럼 시간이 오래 걸리는 작업)
- 시스템 부하 분산 및 확장성: 프로듀서와 컨슈머가 독립적으로 작동하여, 특정 작업이 지연되더라도 전체 시스템에 미치는 영향을 최소화한다. 트래픽이 급증해도 Kafka가 메시지를 일시적으로 버퍼링하므로, 컨슈머는 자신의 처리 능력에 맞춰 메시지를 소비할 수 있다
- 내결함성 및 안정성: Kafka는 메시지를 영구적으로 저장하고, 컨슈머 그룹과 오프셋을 통해 메시지 처리 상태를 관리하므로, 컨슈머에 장애가 발생하더라도 재시작 시점에 중단된 지점부터 다시 메시지를 처리할 수 있다
한계점
- 작업 성공 여부 확인이 어려움: 프로듀서가 사용자에게 빠른 응답을 보내는 시점은 메시지가 Kafka에 성공적으로 전달되었다는 것만을 의미한다. 컨슈머가 실제 작업을 성공적으로 완료했는지 여부는 사용자에게 직접적으로 알리기 어렵다
- 예시: 이메일 발송 요청을 Kafka에 넣고 성공 응답을 보냈지만, 이후 컨슈머에서 잘못된 이메일 주소 등으로 인해 발송이 실패할 수 있다. 이미 사용자에게 성공 응답을 보냈기 때문에 이러한 실제 실패를 사용자에게 다시 알릴 방법이 마땅치 않다
보완 전략
메시지 재시도 (Retry)
- 컨슈머가 메시지 처리 중 일시적인 오류로 실패했을 경우, 정해진 횟수만큼 메시지 처리를 자동으로 재시도하도록 로직을 구현한다. 이는 네트워크 문제, 일시적인 외부 시스템 장애 등에 효과적이다.
데드 레터 토픽 (Dead Latter Topic, DLT)
- 여러 번의 재시도에서 불구하고 메시지 처리에 계속 실패하는 경우, 해당 메시지를 별도의 “데드 레터 토픽”으로 전송하여 보관하는 전략이다
- DLT에 모인 메시지들은 나중에 개발자가 수동으로 확인하거나, 별도의 모니터링 시스템을 통해 분석하여 문제의 원인을 파악하고 해결한 후 다시 처리할 수 있다. 이는 시스템의 견고성을 높이고 메시지 손실을 방지하는 데 매우 중요하다