프로젝트/DevLink

[프로젝트] 채팅서비스에 Kafka 연동하기 ( 2023.12.07(목) )

IT록흐 2023. 12. 8. 00:32
반응형

 

작업한 것

- 개발 

 

 

 

 

 

 

 

SpringBoot에서 제공하는 SimpMessageBroker를 활용하여 채팅서비스를 구현하였다.  그러나 한 가지 문제가 있다. 쿠버네티스 환경에 띄울 것이라 여러 파드에 해당 서버가 띄어지는데, 토픽이 서버 내부에 있다보니 유저의 구독요청이  여러 서버로 로드밸런싱 되어 분산된다. 그러면 어느 한 서버의 토픽에 메시지가 들어가도 다른 서버의 토픽에 구독한 유저는 메시지를 받지 못한다. 

 

이런 문제를 해결하기 위해 처리량이 높고 확장성이 좋은 Kafka를 사용하였다. 

 

1. 채팅방 구독하기 ( STOMP )
2. 채팅방 입장하기 ( Kafka )
3. 메시지 전송하기 ( Kafka )
4. 채팅방 벗어나기 ( STOMP )
5. 채팅방 구독 취소하기 ( STOMP )
6. STOMP 세션 끊기 ( STOMP )

 

세션이 연결되어 있는 서버와 관련있는 기능은 STOMP 프로토콜로,  여러 서버와 메시지를 공유해야 하는 기능은 Kafka를 활용하였다.

 

배운 것

 

- SimpMessageBroker 한계

- kafka가 필요한 이유

 

이슈

This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer 

 

SpringBoot에서 Kafka 모듈로 Consumer를 생성하는 과정에서 위와 같은 오류가 발생했다. Producer에서 직렬화를 하고 Consumenr에서 역직렬화를 담당하다.  spring-kafka 2.1 이후 부터는 Consumer가 역직렬화를 할 때, 동일한 패키지가 아닌 Producer에서 직렬화 된 데이터는 신뢰하지 못하는 데이터라 판단하여 에러를 뱉어낸다. 이런 검사 과정이 필요한 이유는 잘 모르지만 당장은 불필요하므로 검사가 진행되지 못하도록 false 처리 했다. 

 

@Bean
public ConsumerFactory<String,ChatDto> consumerFactory(){
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(),null,new JsonDeserializer<>(ChatDto.class,false)); // 파라미터로 false 추가
}

 

 

- 참고자료 

 

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition t

스프링에서 카프카 컨슈밍할 경우 아래와 같은 에러가 발생하였습니다. org.apache.kafka.common.errors.S...

blog.naver.com

 

Spring Kafka The class is not in the trusted packages

In my Spring Boot/Kafka application before the library update, I used the following class org.telegram.telegrambots.api.objects.Update in order to post messages to the Kafka topic. Right now I use ...

stackoverflow.com

 

마무리

 

대화내역 저장 프로세스 만들고!

DB도 H2에서 MySQL로 바꾸고!

서비스 간 통신도 구현해보자!

 

반응형