1. 도입 배경과 시스템 설계 (Architecture)
도입 배경 : 사실 제 개인 프로젝트에 카프카가 반드시 필요한 만큼의 트래픽은 발생하지 않습니다. 하지만 실무에서 대규모 데이터 처리와 시스템 확장을 고민할 때 Kafka는 선택이 아닌 필수라는 것을 알게 되었습니다. 단순히 이론으로만 아는 것과, 실제 내 프로젝트의 아키텍처를 뜯어고치며 발생하는 네트워크 설정, 메모리 최적화, 비동기 데이터 정합성 문제를 직접 해결해 보는 것은 큰 차이가 있다고 판단했습니다.
그래서 기존의 단순한 구조를 과감히 버리고, **이벤트 기반 아키텍처(Event-Driven Architecture)**로 확장하는 도전을 시작했습니다.
해결책 : Kafka를 브로커로 두어 각 서비스 간의 **의존성을 제거(Decoupling)**하고 비동기 처리를 통해 시스템 안정성을 확보하는 구조를 설계했습니다.
2. 아키텍처의 변화: Before & After
- Before: Frontend → Backend (8080) → PostgreSQL / Redis
- 모든 로직이 하나의 서버에서 동기적으로 처리되는 구조

- After: Backend (Producer) → Kafka (Broker) → Consumer Server (8083)
- 기부 생성은 메인에서, 후속 처리는 컨슈머에서 담당하는 데이터 파이프라인 구축

3. Kafka 설정과 구현: Producer (Main Backend)
메인 백엔드에서는 기부 이벤트가 발생했을 때 이를 Kafka 브로커로 던져주는 Producer 역할을 수행합니다.
3-1. application.properties 설정
가장 먼저 Kafka 서버와 통신하기 위한 기본 설정을 진행합니다. 직렬화(Serialization) 방식으로 JsonSerializer를 선택하여 객체 데이터를 JSON 형태로 전송합니다.
# Kafka Bootstrap Server (우분투 서버 IP)
spring.kafka.bootstrap-servers=YOUR_UBUNTU_IP:9092
# Producer 핵심 설정
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.acks=all # 모든 복제본까지 복제 완료 확인 (데이터 안정성)
spring.kafka.producer.retries=3 # 전송 실패 시 재시도 횟수
# 대용량 메시지 대응을 위한 설정
spring.kafka.properties.max.request.size=10485760
3-2. KafkaProducerConfig (Java Configuration)
설정 파일을 기반으로 KafkaTemplate을 빈으로 등록하고, 프로젝트 실행 시 자동으로 토픽을 생성하도록 설정했습니다.
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(config));
}
@Bean
public NewTopic donationTopic() {
// 파티션 3개, 복제 1개로 토픽 생성 [cite: 6, 13]
return TopicBuilder.name("donation-topic")
.partitions(3)
.replicas(1)
.build();
}
}
3-3. 비즈니스 로직 적용 (DonateService)
기존의 기부 등록 로직 후에 Kafka로 이벤트를 발행하는 코드를 추가했습니다.
public Map<String, Object> donateInsert(DonateRequestDto donateRequestDto) {
// 1. DB에 기부 정보 저장 (PostgreSQL)
donateRepository.donateInsert(donateEntity);
// 2. 저장 성공 시 Kafka 메시지 발행
sendDonationToKafka(donateRequestDto);
return result;
}
private void sendDonationToKafka(DonateRequestDto dto) {
try {
Map<String, Object> message = new HashMap<>();
message.put("userId", dto.getUserId());
message.put("amount", dto.getAmount());
message.put("timestamp", System.currentTimeMillis());
// ... 생략 ...
kafkaTemplate.send("donation-topic", message);
log.info("Kafka 메시지 발행 완료: userId {}", dto.getUserId());
} catch (Exception e) {
log.error("Kafka 전송 중 오류 발생", e);
}
}
4. Kafka 설정과 구현: Consumer (Worker Server)
발행된 기부 이벤트를 실시간으로 감시하고 처리하는 Consumer 서버입니다. 메인 서버의 부하를 덜어주기 위해 별도의 프로젝트(Port 8083)로 구성했습니다.
4-1. application.properties 설정
데이터의 정합성을 위해 수동 커밋(manual) 모드를 선택했습니다.
server.port=8083
spring.kafka.bootstrap-servers=YOUR_UBUNTU_IP:9092
# Consumer 핵심 설정
spring.kafka.consumer.group-id=donation-consumer-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false # 수동 커밋 설정
# Trusted Packages 설정 (역직렬화 허용)
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.listener.ack-mode=manual
4-2. KafkaConsumerService 구현
@KafkaListener를 사용하여 메시지를 수신하고, 처리가 완료되면 ack.acknowledge()를 호출하여 메시지를 소비했음을 명시적으로 알립니다.
@Service
@Slf4j
public class KafkaConsumerService {
@KafkaListener(topics = "donation-topic", groupId = "donation-consumer-group")
public void consumeDonation(ConsumerRecord<String, Map<String, Object>> record, Acknowledgment ack) {
try {
log.info("메시지 수신 - Offset: {}", record.offset());
Map<String, Object> donationData = record.value();
// 비즈니스 로직 처리 (통계 저장 등)
processDonation(donationData);
// 처리 성공 시 수동 커밋!
ack.acknowledge();
} catch (Exception e) {
log.error("메시지 처리 실패", e);
}
}
}
'회고 > 개인프로젝트' 카테고리의 다른 글
| [개인프로젝트] CI/CD 연동하기 *gitHube Actions (0) | 2026.02.04 |
|---|