SseEmitterRepository.java
package com.newbit.notification.command.infrastructure;
import com.newbit.notification.command.application.dto.response.NotificationSendResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Repository;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Repository
public class SseEmitterRepository {
// key: emitterId (userId_UUID), value: SseEmitter
private final Map<String, SseEmitter> emitterMap = new ConcurrentHashMap<>();
// key: userId, value: List<emitterId>
private final Map<Long, List<String>> userEmitterMap = new ConcurrentHashMap<>();
public void save(String emitterId, Long userId, SseEmitter emitter) {
emitterMap.put(emitterId, emitter);
userEmitterMap.computeIfAbsent(userId, id -> new ArrayList<>()).add(emitterId);
}
public void deleteById(String emitterId) {
emitterMap.remove(emitterId);
// userEmitterMap에서도 emitterId 제거
userEmitterMap.forEach((userId, emitterIds) -> emitterIds.remove(emitterId));
}
public void send(Long userId, NotificationSendResponse response) {
List<String> emitterIds = userEmitterMap.getOrDefault(userId, new ArrayList<>());
for (String emitterId : emitterIds) {
SseEmitter emitter = emitterMap.get(emitterId);
if (emitter == null) continue;
try {
emitter.send(SseEmitter.event()
.name("notification")
.data(response));
} catch (IOException e) {
log.warn("SSE 연결 실패 -> emitterId: {}, 이유: {}", emitterId, e.getMessage());
emitter.completeWithError(e);
deleteById(emitterId);
}
}
}
}