일반적으로 푸시 알림이 필요할 땐 FCM을 사용했다.
현업에서 일하시는 분들한테 여쭤봐도 푸시 알림 서버를 직접 만들기보다는 FCM을 만든다고 하셨다.
그 이유는 알림 서버는 그때그때 필요한 성능이 다르기 때문에(특정 이벤트 기간에 알림 가용한 알림서버가 아주 많아야 한다.) 스케일 아웃이 편한 FCM을 사용하는게 더 좋기 때문이다.
그래서 그런지 직접 푸시 알림서버를 만드는 레퍼런스도 많지 않았고, 때문에 어떻게 만들어야 할지 더 궁금했던 것같다.
때문에 푸시 알림서버를 직접 구현해보기로 했다.
요구사항
요구사항을 정해놓고 이에 맞게 개발을 하려하는데 한 번에 구현하기보다 단계를 나눠서 개발하려한다.
- 특정 인원에게 즉시/예약 발생
- ex) A가 B에게 즉시/예약 발송을 보낼 수 있어야 함
- API Request/Response Body 포맷은 자유롭게 정의
- 알림 타입
- 최소한 Email 발송이 가능해야 함
- 필요 시 다른 타입(ex 알림, SNS, 카톡 등등)도 지원해도 됨
- 테스트 코드 작성
- [구독형 서비스] 특정 그룹에 속해 있는 n명에게 즉시/예약 발송을 지원하는 API 제공
- ex) A가 위 API 호출 시 특정 그룹에 속해 있는 n명에게 즉시 혹은 예약 발송이 가능해야 함
단계별 구현
버전 1
단순히 HTTP API로 받아온 데이터를 카프카와 연결해서 email을 전송해주는 consumer와 연결하는 구조이다.
- 파티션은 N개를 둘 수 있다고 가정하고, 여기선 2개만 만들어 두었다.
- 컨슈머도 2개를 사용했는데 HTTP API를 핸들링하는 서버와 일단 같이 띄웠다.
- 추후에 분리할 수 있도록 변경할 예정이다.
즉시 알림 전송 과정
- 알림 전송자는 HTTP로 알림 전송을 요청한다.
- was에서
kafka
의urgent
토픽으로 알림 전송을 위한 이벤트를produce
한다. - 컨슈머에서
consumer
가 해당 요청을consume
하여 이메일을 전송한다.
HTTP API 처리
푸시 알림을 위한 클라이언트의 요청을 핸들링하는 컨트롤러를 만들려고 한다.
Controller
@RestController
@RequestMapping("/alert")
class PushRegistrationController(
val pushRegisterService: PushRegisterService
) {
val log = LoggerFactory.getLogger(PushRegistrationController::class.java)
@PostMapping("/urgent")
fun register(@RequestBody emailForm: EmailForm) : ResponseEntity<Boolean> {
log.info("push alert to {}", emailForm.email)
return ResponseEntity.ok(pushRegisterService.sendUrgently(emailForm))
}
}
Service
@Service
class PushRegisterService(
val kafkaTemplate: KafkaTemplate<String, String>
) {
fun sendUrgently(emailForm: EmailForm) : Boolean {
kafkaTemplate.send("urgent", emailForm.toString())
return true
}
}
- 컨트롤러가 너무 Email에 초점이 맞춰져있는 것을 볼 수 있다.
- 사실 아직
Android
나APNs
에 push 알림을 보내기 위해 어떻게 처리해야하는지 잘 모르기 때문에 그냥 단순히 이렇게 작성했다.
- 사실 아직
- 전체적으로 로직이 매우 간단하다. 아직 디테일을 고민하기 보단 전체적인 틀을 먼저 짜놓고 싶었다.
이벤트 브로커(Kafka) 세팅
docker-compose.yml
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ports:
- "22181:2181"
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
- 이미지는
confluent
의 주키퍼와 카프카를 사용했다. - v1이니 카프카도 심플하게 세팅했다. 단일 주키퍼에 단일 브로커이다.
컨테이너 실행
- 컨테이너를 실행시키면 잘 뜨는 것을 볼 수 있고 프로커에 대한 요청은
29092
포트로 포워딩되어 있다.
KafkaConfig
@Configuration
class KafkaConfig {
val log = LoggerFactory.getLogger(KafkaConfig::class.java)
@Value("\${spring.kafka.bootstrap-servers[0]}")
lateinit var bootstrapServer: String
@Bean
fun consumerConfig() : Map<String, Any> {
val props: HashMap<String, Any> = HashMap()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServer
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = EmailFormDeserializer::class.java
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
return props
}
@Bean
fun consumerFactory() : ConsumerFactory<String, String> {
return DefaultKafkaConsumerFactory(consumerConfig())
}
@Bean
fun producerConfig() : Map<String, Any> {
val props: HashMap<String, Any> = HashMap()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServer
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return props
}
@Bean
fun producerFactory() : ProducerFactory<String, String> {
return DefaultKafkaProducerFactory(producerConfig())
}
@Bean
fun kafkaTemplate() : KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
@Bean
fun kafkaListenerContainerFactory() :
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
{
val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory()
factory.consumerFactory = consumerFactory()
factory.setConcurrency(2)
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
return factory
}
}
consumerConfig
- 어플리케이션에서 사용할 Consumer의 프로퍼티를 설정하는 코드이다.
- consumer에서 소비하는 KafkaRecord의 key는
String
으로 deserialize 한다. value
는 바로 사용하기 편한 자바 객체로 이용하기 위해 간단히EmailFormDeserializer
를 만들어서 적용했다.
producerConfig
- HTTP API로 요청받는 알림을 kafka에 이벤트를 넣어주는
Producer
의 프로퍼티를 설정하는 코드이다. - producer에서 생산하는 KafkaRecord의 key는
String
으로 serialize 한다. - producer에서 produce하는 카프카 레코드는 HTTP 메시지에서 받은 메시지를 그대로 json 형식의
String
으로 serialize 한다.
- HTTP API로 요청받는 알림을 kafka에 이벤트를 넣어주는
KafkaEventListener
@Component
class KafkaEventListener (
val emailSender: EmailSender
) {
val log = LoggerFactory.getLogger(KafkaEventListener::class.java)
@KafkaListener(groupId = "group001", topics = ["urgent"])
fun consumer(
record: ConsumerRecord<String, EmailForm>,
ack: Acknowledgment,
consumer: Consumer<String, EmailForm>
) {
log.info("{}", record.value())
emailSender.send(record.value().email, record.value().title, record.value().message)
ack.acknowledge()
}
}
- 읽은 레코드를 이메일로 전송해준다.
- ack를 처리해준다.
실행
실행시킨후 요청을 보내서 메일이 잘 오는지 확인해봤다.
메일이 잘 오는 것을 볼 수 있었다.
'알림서버 개발기' 카테고리의 다른 글
알림서버 개발기 V2 (0) | 2024.05.10 |
---|