알림서버 V2는 예약알림 전송이 가능하도록 세팅했다.
- 예약 알림 정보는 mysql에 영속화 함
- Spring의 scheduling을 이용해서 5분 마다 처리되지 않은 예약 전송 정보를 모아 브로커에 produce 해준다.
HTTP API 처리하기
Controller
@RestController
@RequestMapping("/alert")
class PushRegistrationController(
val pushRegisterService: PushRegisterService
) {
val log = LoggerFactory.getLogger(PushRegistrationController::class.java)
// ...
@PostMapping("/reserved")
fun registerReserved(@RequestBody reservedEmailForm: ReservedEmailForm) : ResponseEntity<Boolean> {
log.info("reserved push alert at {}", reservedEmailForm.atTime)
return ResponseEntity.ok(pushRegisterService.reservePushAlert(reservedEmailForm))
}
}
Service
@Service
class PushRegisterService(
val kafkaTemplate: KafkaTemplate<String, String>,
val reservedPushRegisterRepository: PushRegisterRepository
) {
val log = LoggerFactory.getLogger(KafkaEventListener::class.java)
//...
@Transactional
fun reservePushAlert(reservedEmailForm: ReservedEmailForm): Boolean? {
val reservedPush: ReservedPushRegister = reservedPushRegisterRepository.save(reservedEmailForm.toEntity())
return true
}
@Transactional
@Scheduled(fixedDelay = 300_000)
fun enqueue(){
val periodReservation: List<ReservedPushRegister> =
reservedPushRegisterRepository.getNotCommitedPeriodReservation(
LocalDateTime.now(),
LocalDateTime.now().plusMinutes(5))
periodReservation
.forEach{ r ->
log.info("record {}", r)
kafkaTemplate.send("reserved", r.toEmailForm().toString())
r.commit()
}
log.info("records {}", periodReservation)
}
}
reservePushAlert
는 예약 정보를 영속화 하는 메서드이다.enqueue
는 최근 5분 간에 전송되지 않은 예약 정보를 브로커에 produce 해주는 메서드이다.- 5분 마다 스케줄링 되어 실행된다.
인프라 세팅
기존에 즉시 전송을 위한 토픽이 아닌 예약 전송을 위한 토픽을 새로 만들었다.
Kafka 구동하기
mysql
이나 kafka
와 같이 필요한 인프라를 간단하게 docker-compose로 구성해 두었다.
docker-compose up -d
이렇게 3개의 컨테이너가 잘 구동되고 있다면 정상적으로 실행되고 있는 것이다.
Kafka Topic 추가하기
docker-compose exec kafka kafka-topics \
--bootstrap-server 127.0.0.1:9092 \
--create \
--topic reserved \
--partitions 2
reserved
라는 이름의 토픽 생성- 단일 브로커에 파티션을 두 개 생성
Kafka Consumer
@Component
class KafkaEventListener (
val emailSender: EmailSender
) {
val log = LoggerFactory.getLogger(KafkaEventListener::class.java)
@KafkaListener(groupId = "group001", topics = ["urgent", "reserved"])
fun urgentConsumer1(
record: ConsumerRecord<String, EmailForm>,
ack: Acknowledgment,
consumer: Consumer<String, EmailForm>
) {
log.info("{}", record.value())
emailSender.send(record.value().email, record.value().title, record.value().message)
ack.acknowledge()
}
@KafkaListener(groupId = "group001", topics = ["urgent", "reserved"])
fun urgentConsumer2(
record: ConsumerRecord<String, EmailForm>,
ack: Acknowledgment,
consumer: Consumer<String, EmailForm>
) {
log.info("{}", record.value())
emailSender.send(record.value().email, record.value().title, record.value().message)
ack.acknowledge()
}
}
- 각각
urgent
,reserved
토픽이 두 개의 파티션을 가지고 있기 때문에, 두 개의 컨슈머를 구성했다.
WAS 실행
예약 알림을 위한 요청을 여러 개 전송해보았다.
레코드가 잘 생성되었고
5분 간격으로 예약된 메일이 잘 전송된 것을 볼 수 있다.
'알림서버 개발기' 카테고리의 다른 글
알림서버 개발기 1주차 (0) | 2024.05.08 |
---|