SSE의 ReadableStream

1. ReadableStream이란 무엇인가
ReadableStream은 웹 표준 API로, 데이터를 한 번에 모두 보내는 것이 아니라 조금씩 나눠서 흘려보내는 통로입니다. 일반적인 HTTP 응답은 서버가 데이터를 전부 준비한 다음 한 번에 클라이언트로 보냅니다. 반면 ReadableStream을 사용하면 서버가 데이터 일부를 만들어 보내고, 또 시간이 흐른 뒤 다음 데이터를 보내는 식으로 연결을 계속 열어둘 수 있습니다.
이 코드는 그 통로를 만들어서 서버가 클라이언트에게 실시간으로 알림을 밀어넣는 구조를 구현합니다. 채팅 메시지가 도착했을 때, 결제가 완료됐을 때, 누군가 댓글을 달았을 때 등 서버에서 발생한 사건을 클라이언트가 폴링하지 않고 받아볼 수 있게 됩니다.
2. start 콜백의 역할
const stream = new ReadableStream({ start(controller) { // ... } });
ReadableStream 생성자에 객체를 넘기면 그 안의 start 메서드가 스트림이 만들어지는 순간 자동으로 실행됩니다. 매개변수로 받는 controller는 이 스트림을 조작할 수 있는 리모컨입니다. 컨트롤러를 통해 데이터를 밀어넣거나(enqueue), 스트림을 닫거나(close), 오류를 발생시킬(error) 수 있습니다.
start 함수는 한 번만 실행되므로 여기서 초기 설정, 이벤트 리스너 등록, 타이머 시작 같은 작업을 모두 처리합니다.
3. enqueue 헬퍼 함수
function enqueue(frame: string) { if (closed) return; controller.enqueue(encoder.encode(frame)); }
controller.enqueue는 스트림에 데이터 조각을 밀어넣는 메서드입니다. 다만 ReadableStream은 문자열이 아닌 바이트 데이터(Uint8Array)를 받기 때문에 encoder.encode로 문자열을 바이트로 변환합니다. encoder는 코드 바깥에서 new TextEncoder()로 만들어진 객체일 것입니다.
여기서 중요한 부분은 if (closed) return 체크입니다. 스트림이 이미 닫혔는데 데이터를 또 밀어넣으려고 하면 런타임 오류가 발생합니다. 클라이언트가 갑자기 탭을 닫았을 때 백그라운드 작업이 뒤늦게 enqueue를 시도하는 일이 흔하기 때문에 이 가드를 두는 것입니다.
4. 초기 연결 메시지 전송
enqueue(`event: ready\ndata: "connected"\n\n`);
이 한 줄은 SSE 프로토콜의 형식을 따릅니다. SSE는 텍스트 기반 프로토콜이고, 각 이벤트는 정해진 규칙으로 작성합니다. event: 줄은 이벤트 이름을 지정하고, data: 줄은 실제 데이터를 담습니다. 그리고 이벤트 하나의 끝은 빈 줄, 즉 \n\n으로 표시합니다.
클라이언트는 이걸 받아서 eventSource.addEventListener("ready", ...) 같은 식으로 처리합니다. 연결이 성공적으로 열렸음을 알리는 신호 역할을 합니다.
5. 사용자 등록과 알림 허브
const unregister = registerClient({ userId: user.id, enqueue });
이 줄이 이 코드의 핵심입니다. registerClient는 코드 바깥에 있는 함수로, 서버 메모리 어딘가에 사용자 ID와 enqueue 함수를 매핑해서 저장합니다. 예를 들면 다음과 같은 자료구조일 것입니다.
const clients = new Map(); // clients.set("user_123", [enqueueFn1, enqueueFn2, ...])
이렇게 등록해두면 서버의 다른 부분에서 notifyUser("user_123", message) 같은 함수를 호출했을 때, 그 사용자의 enqueue 함수를 찾아 메시지를 밀어넣을 수 있습니다. 결과적으로 이 스트림으로 데이터가 흘러나가고, 클라이언트는 실시간으로 알림을 받습니다.
registerClient는 등록 해제 함수를 반환합니다. 나중에 연결이 끊겼을 때 이 함수를 호출해서 명단에서 자신을 빼야 합니다.
6. 하트비트 처리
const ping = setInterval(() => enqueue(`: ping\n\n`), 25000);
25초마다 : ping\n\n을 보냅니다. 콜론으로 시작하는 줄은 SSE에서 주석으로 취급되어 클라이언트가 무시합니다. 그러면 왜 보내는가 하면, 중간에 있는 프록시 서버나 로드밸런서가 일정 시간 동안 데이터가 흐르지 않는 연결을 죽은 연결로 판단해서 끊어버리기 때문입니다.
예를 들어 Nginx의 기본 idle timeout은 60초이고, 클라우드 환경에 따라 30초인 곳도 있습니다. 25초 간격으로 빈 신호라도 흘려보내면 연결이 살아있다고 알릴 수 있습니다. 데이터를 받는 클라이언트 입장에서는 주석이라 아무 영향이 없습니다.
7. 정리 함수
function cleanup() { if (closed) return; closed = true; clearInterval(ping); unregister(); controller.close(); }
연결이 끝났을 때 뒷정리를 하는 함수입니다. 순서가 중요합니다.
먼저 closed = true로 플래그를 세웁니다. 이렇게 해야 앞서 본 enqueue의 가드가 작동해서 이후 들어오는 enqueue 호출이 무시됩니다. 그다음 clearInterval(ping)으로 하트비트 타이머를 멈춥니다. 멈추지 않으면 닫힌 스트림에 계속 ping을 쏘려 들어 오류가 납니다.
unregister()는 알림 허브의 명단에서 자신을 빼는 작업입니다. 주석에 "안 빼면 좀비"라고 적힌 이유가 여기 있습니다. 명단에 남아있으면 서버는 이미 끊어진 연결로 계속 메시지를 보내려고 시도하게 됩니다. 메모리도 새고, enqueue 호출마다 오류가 누적됩니다.
마지막으로 controller.close()로 스트림 자체를 닫습니다.
8. 연결 종료 감지
req.signal.addEventListener("abort", cleanup);
req.signal은 요청에 딸린 AbortSignal입니다. 클라이언트가 탭을 닫거나, 새로고침하거나, 네트워크가 끊기면 이 신호에서 abort 이벤트가 발생합니다. 그 순간 cleanup 함수가 실행되어 모든 자원이 정리됩니다.
이 부분이 없으면 서버는 클라이언트가 사라진 것을 모르고 계속 ping을 보내고, 알림도 보내고, 메모리에 정보도 들고 있게 됩니다. 사용자가 많을수록 좀비 연결이 쌓여 서버가 망가집니다.
9. 전체 흐름 정리
이 코드의 동작 순서를 시간순으로 보면 다음과 같습니다. 클라이언트가 SSE 엔드포인트로 요청을 보내면 ReadableStream이 만들어지고 start 함수가 실행됩니다. 즉시 ready 이벤트가 클라이언트에게 전달되고, 알림 허브에 사용자가 등록되며, 25초 간격의 하트비트 타이머가 돌기 시작합니다. 이후 서버 다른 곳에서 발생한 알림이 enqueue를 통해 클라이언트에 흘러갑니다. 클라이언트가 연결을 끊으면 abort 이벤트가 발생해 cleanup이 호출되고, 타이머가 멈추고, 허브에서 제거되고, 스트림이 닫힙니다.
10. closed 변수에 대한 주의
코드 안에서 closed 변수를 참조하는데 이 변수는 보여주신 스니펫 바깥에서 선언되어 있을 것입니다. 보통 이런 식으로 작성합니다.
let closed = false; const stream = new ReadableStream({ start(controller) { // ...closed를 여기서 사용 } });
start 함수가 클로저로 closed 변수를 캡처하므로 함수 안 어디서든 같은 값을 읽고 쓸 수 있습니다. 이 플래그는 cleanup이 여러 번 호출되거나 닫힌 후에 enqueue가 시도되는 경합 상황을 막는 안전장치입니다.



댓글
댓글을 작성하려면 이 필요합니다.