알림서버 개발기

알림서버 개발기 V2

dltmd202 2024. 5. 10. 02:05

 알림서버 V2는 예약알림 전송이 가능하도록 세팅했다.

  • 예약 알림 정보는 mysql에 영속화 함
  • Spring의 scheduling을 이용해서 5분 마다 처리되지 않은 예약 전송 정보를 모아 브로커에 produce 해준다.

HTTP API 처리하기

Controller

@RestController
@RequestMapping("/alert")
class PushRegistrationController(
    val pushRegisterService: PushRegisterService
) {
    val log = LoggerFactory.getLogger(PushRegistrationController::class.java)

    // ...

    @PostMapping("/reserved")
    fun registerReserved(@RequestBody reservedEmailForm: ReservedEmailForm) : ResponseEntity<Boolean> {
        log.info("reserved push alert at {}", reservedEmailForm.atTime)
        return ResponseEntity.ok(pushRegisterService.reservePushAlert(reservedEmailForm))
    }
}

Service

@Service
class PushRegisterService(
    val kafkaTemplate: KafkaTemplate<String, String>,
    val reservedPushRegisterRepository: PushRegisterRepository
) {
    val log = LoggerFactory.getLogger(KafkaEventListener::class.java)

    //...

    @Transactional
    fun reservePushAlert(reservedEmailForm: ReservedEmailForm): Boolean? {
        val reservedPush: ReservedPushRegister = reservedPushRegisterRepository.save(reservedEmailForm.toEntity())
        return true
    }

    @Transactional
    @Scheduled(fixedDelay = 300_000)
    fun enqueue(){
        val periodReservation: List<ReservedPushRegister> =
            reservedPushRegisterRepository.getNotCommitedPeriodReservation(
                LocalDateTime.now(),
                LocalDateTime.now().plusMinutes(5))

        periodReservation
            .forEach{ r ->
                log.info("record {}", r)
                kafkaTemplate.send("reserved", r.toEmailForm().toString())
                r.commit()
            }

        log.info("records {}", periodReservation)
    }
}
  • reservePushAlert는 예약 정보를 영속화 하는 메서드이다.
  • enqueue는 최근 5분 간에 전송되지 않은 예약 정보를 브로커에 produce 해주는 메서드이다.
    • 5분 마다 스케줄링 되어 실행된다.

인프라 세팅

기존에 즉시 전송을 위한 토픽이 아닌 예약 전송을 위한 토픽을 새로 만들었다.

Kafka 구동하기

mysql이나 kafka와 같이 필요한 인프라를 간단하게 docker-compose로 구성해 두었다.

docker-compose up -d

 

이렇게 3개의 컨테이너가 잘 구동되고 있다면 정상적으로 실행되고 있는 것이다.

Kafka Topic 추가하기

 docker-compose exec kafka kafka-topics \
                                         --bootstrap-server 127.0.0.1:9092 \
                                         --create \
                                         --topic reserved \
                                         --partitions 2
  • reserved라는 이름의 토픽 생성
  • 단일 브로커에 파티션을 두 개 생성

Kafka Consumer

@Component
class KafkaEventListener (
    val emailSender: EmailSender
) {
    val log = LoggerFactory.getLogger(KafkaEventListener::class.java)

    @KafkaListener(groupId = "group001", topics = ["urgent", "reserved"])
    fun urgentConsumer1(
        record: ConsumerRecord<String, EmailForm>,
        ack: Acknowledgment,
        consumer: Consumer<String, EmailForm>
    ) {
        log.info("{}", record.value())

        emailSender.send(record.value().email, record.value().title, record.value().message)
        ack.acknowledge()
    }

    @KafkaListener(groupId = "group001", topics = ["urgent", "reserved"])
    fun urgentConsumer2(
        record: ConsumerRecord<String, EmailForm>,
        ack: Acknowledgment,
        consumer: Consumer<String, EmailForm>
    ) {
        log.info("{}", record.value())

        emailSender.send(record.value().email, record.value().title, record.value().message)
        ack.acknowledge()
    }

}
  • 각각 urgent, reserved 토픽이 두 개의 파티션을 가지고 있기 때문에, 두 개의 컨슈머를 구성했다.

WAS 실행

예약 알림을 위한 요청을 여러 개 전송해보았다.

레코드가 잘 생성되었고

 

 

5분 간격으로 예약된 메일이 잘 전송된 것을 볼 수 있다.