복사되었습니다.

FastAPI에서 사용자가 연결을 끊으면 어떻게 될까? - Middleware disconnection check 문제

Cover Image for FastAPI에서 사용자가 연결을 끊으면 어떻게 될까? - Middleware disconnection check 문제

FastAPI에서 middleware를 사용하면 클라이언트가 연결을 끊었다는 사실을 알 수 없게 된다. 사용자의 disconnection이 요청 취소를 의미하는 경우, 이는 여러가지 문제로 이어질 수 있다. 연결이 끊겼다는 것을 어떻게 감지해야 할까?

요약: FastAPI의 middleware를 사용하면 disconnection을 감지하기 어렵다!

이 글에서 다루고자 하는 문제는 간단하다. FastAPI를 통해 개발한 web server에 사용자가 요청을 날렸다가 연결을 끊어버리는 것을 제대로 감지할 수 없다는 것이다. 보통은 사용자의 요청을 받는 로직 부분에서 Request라는 객체를 인자로 받은 후, is_disconnected라는 빌트인 메서드를 실행해보면 disconnection 여부를 알 수 있다. 이 메서드는 bool 타입으로 사용자 연결의 끊김 여부를 반환해준다.

문제는 middleware를 사용하는 경우이다. Middleware가 가로챈 Request 객체는 최종적으로 web server의 로직에 전달되는 Request 객체와 다른 참조를 갖게 된다. Middleware들을 거친 뒤 web server의 로직 부분에서 매개변수로 전달된 Request 객체에 is_disconnected 메서드를 호출하면 제대로 동작하지 않는다. 사용자의 요청을 최초로 받은 middleware가 참조하는 Request 객체의 is_disconnected 메서드를 호출해야 정상적으로 disconnection check를 할 수 있다.

간단히 말해서, middleware를 사용한다면 FastAPI web server의 핵심 로직 부분에서 disconnection check를 할 수 없다는 뜻이다. 억지로라도 가능하게 하려면 어떻게 해야 할까? Request 객체들끼리 내부적으로 공유하는 scope라는 공간에 첫 middleware가 참조하는 is_disconnected 메서드 자체를 저장하는 등 꼼수가 필요하다. 만약 disconnection 감지가 조금 늦어도 상관없다면, 모든 로직이 수행되고 난 뒤 최초의 middleware에서 is_disconnected를 호출하면 된다.

문제 발생의 정확한 원인을 찾아본 결과, starlette에 일부 잘못된 코드가 있는 듯 하다. Github에 아직 close되지 않은 이슈가 존재하며, 다양한 사람들이 같은 문제를 겪고 있다. 문제되는 코드 부분을 대체할 수 있도록 커스텀한 is_disconnected 메서드를 정의해서 대신 사용하는 것도 하나의 방법이 될 수 있다. 해결 방법들은 실제 코드 예제와 함께 천천히 살펴보도록 하자.

문제 정의: 연결 끊김을 감지할 수 없다?

연결 끊김, 뭐가 문제일까?

Backend API를 개발할 때, 사용자에게 요구할 input과 내어줄 output을 정의해두고 필요한 기능만 만들면 끝일까? Basic flow만 완성했다고 끝은 아니다. 당연히 exceptional flow도 최대한 생각하고 개발해야 한다. Web application은 네트워크를 통해 서비스하기 때문에, 항상 다음 질문과 관련된 exceptional flow를 생각해봐야 한다.

"사용자가 요청해놓고 연결을 끊어버리면 어떡하지?"

GET처럼 app의 상태 변화 없이 단순 데이터 조회를 하는 경우라면 크게 신경쓰지 않아도 된다. 그런데, POST처럼 DB에 새로운 데이터를 추가하는 등 app의 상태를 변화시키는 동작은 어떨까? 심지어, DB를 업데이트하는 동작이 오래걸린다면? 다음과 같이 두 가지 시나리오를 고려해야 한다.

  • 사용자가 실수로 연결을 끊었으니 업데이트를 계속해야 한다.
  • 사용자가 일부러 연결을 끊었으니 업데이트를 취소해야 한다.

사실 첫 번째 시나리오는 딱히 신경쓸 것이 없다. 업데이트 도중에 사용자 연결이 끊겼더라도, 원래 하던 작업은 계속 수행되기 때문이다. 사용자에게 응답만 전달되지 않을 뿐이다. 문제는 두 번째 시나리오다. Disconnection을 일종의 "취소" 동작이라고 생각한다면, DB transaction에 rollback을 하는 등 상태 업데이트 작업을 중단하고 원래 상태로 되돌려놔야 한다.

이 글에서는 두 번째 시나리오를 FastAPI로 구현하는 과정에서 만나는 문제들에 대해 다루고자 한다. 결론부터 말하면 완벽한 해답은 없다. 그래도 여러가지 고민을 해 볼 가치는 있다고 믿는다. 테스트를 해볼 수 있는 코드 예제와 함께 두 번째 시나리오에서 어떤 문제가 발생하는지, 그리고 어떻게 해결해야 하는지 살펴보자.

Client disconnection 확인 방법

애초에 web app에서 사용자의 연결 해제를 감지할 수 있긴 할까? 기본적인 FastAPI 코드 예제를 통해 연결이 해제되는 상황을 시뮬레이션해보자. 우선, 클라이언트 코드는 다음과 같다.

import requests

def send_request() -> None:
  try:
    response = requests.get('http://localhost:8000', timeout=3)
    print("Good:", response.text)
  except requests.exceptions.Timeout as e:
    print("Time out:", e)
  except requests.exceptions.RequestException as e:
    print("Bad:", e)

if __name__ == "__main__":
  send_request()

클라이언트 동작은 매우 간단하다. http://localhost:8000에 요청을 날린 뒤, 3초 동안 응답이 없으면 연결을 해제한다.

이어서 서버 코드도 간단히 작성해보자. 요청이 들어오면 10초 동안 시간을 질질 끌다가 사용자의 연결 해제가 감지되면 동작을 끝내는 코드다.

from fastapi import FastAPI
from starlette.requests import Request
import uvicorn
import asyncio
from typing import Optional, Dict, Any

app = FastAPI()

@app.get("/")
async def hello(request: Request) -> Optional[Dict[str, Any]]:
  for _ in range(10):
    if await request.is_disconnected():
      print("Client is disconnected.")
      return None
    else:
      print("Client is connected.")
    await asyncio.sleep(1)
  return {"Hello": "World"}

if __name__ == '__main__':
  uvicorn.run(app, host="0.0.0.0", port=8000)

Request 타입의 객체를 인자로 받은 후, is_disconnected라는 메서드를 통해 client disconnection을 감지하고 있음을 알 수 있다.

이제 클라이언트와 서버 코드가 모두 준비되었으니 다음 명령어들을 실행해보자.

python server.py
python client.py
INFO:     Started server process [72759]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
Client is connected.
Client is connected.
Client is connected.
Client is disconnected.

예상대로 3초 후에 사용자가 연결을 해제했다는 것을 서버에서 정상적으로 확인할 수 있다. 그럼 문제 없는 것 아닌가? 아쉽게도 middleware를 사용하면 문제가 생긴다. 이어서 middleware를 추가할 경우 발생하는 문제에 대해서 살펴보자.

Middleware가 문제다!

Middleware는 사용자의 요청이 web app에 구현된 특정 메서드에 전달되기 전에 요청을 가로챈다. 가로챈 요청에 여러가지 처리를 한 후 원래의 목적지로 보내준다. 이러한 전처리뿐만 아니라 사용자에게 응답을 되돌려주기 전 후처리를 담당하기도 한다. HTTPS 사용을 강제하기 위한 HTTPSRedirectMiddleware, 또는 CORS 처리를 위한 CORSMiddleware 등 web 서버에는 거의 필수적으로 다양한 middleware들이 사용된다고 보면 된다.

fastapi middleware client request interception

문제는 이러한 middleware를 쓸 경우, 앞서 살펴본 것처럼 간단하게 client의 disconnection을 확인할 수 없다는 것이다. 서버 코드를 다음과 같이 변경해보자.

from fastapi import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import Response
import uvicorn
import asyncio
from typing import Optional, Dict, Any

app = FastAPI()

class MyMiddleware(BaseHTTPMiddleware):
  async def dispatch(
    self, request: Request, call_next: RequestResponseEndpoint
  ) -> Response:
    print('[Middleware] Request id:', id(request))
    print('[Middleware] Disconnection check1:', await request.is_disconnected())
    response = await call_next(request)
    print('[Middleware] Disconnection check2:', await request.is_disconnected())
    return response

app.add_middleware(MyMiddleware)

@app.get("/")
async def hello(request: Request) -> Optional[Dict[str, Any]]:
  print('[API] Request id:', id(request))
  for _ in range(10):
    if await request.is_disconnected():
      print("Client is disconnected.")
      return None
    else:
      print("Client is connected.")
    await asyncio.sleep(1)
  return {"Hello": "World"}


if __name__ == '__main__':
  uvicorn.run(app, host="0.0.0.0", port=8000)

MyMiddleware라는 이름의 커스텀 middleware를 정의하였다. 이 middleware는 클라이언트와 hello라는 메서드 사이에서 요청을 가로채어 전처리와 후처리를 수행한다. 전처리와 후처리 로직은 간단하게 is_disconnected 메서드를 호출한 뒤 결과를 출력하도록 정의했다. call_next라는 메서드 전에 정의한 동작들은 전처리 로직, 이후에 정의한 동작들은 후처리 로직으로 동작한다고 이해하면 된다. 이제 이 코드와 클라이언트 코드를 함께 실행시켜보자.

INFO:     Started server process [41956]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
[Middleware] Request id: 4388243584
[Middleware] Disconnection check1: False
[API] Request id: 4388535456
Client is connected.
Client is connected.
Client is connected.
Client is connected.
Client is connected.
Client is connected.
Client is connected.
Client is connected.
Client is connected.
Client is connected.
[Middleware] Disconnection check2: True

아까와는 달리, hello 메서드가 요청이 들어온 후 3초 뒤에 연결이 해제된 것을 감지하지 못하는 모습을 확인할 수 있다. 오히려 middleware의 후처리 로직에서 뒤늦게 연결이 해제되었음을 감지하고 있다. 7초를 낭비한 셈이다.

왜 그런 것일까? 결과를 잘 보면, middleware가 받은 request 객체와 hello 메서드가 받은 request 객체가 다른 것임을 알 수 있다. Disconnection check는 최초 클라이언트의 요청을 받은 middleware만 가능하다는 것을 짐작할 수 있다.

Disconnection check 시점에 따른 해결 방법들

고려사항

앞서 살펴본 문제 상황을 정리하자면, middleware를 쓰는 순간 web app의 실제 기능이 구현된 메서드에서 disconnection check가 불가능해진다는 것이다. 아무래도 이상하다고 생각하여 github의 issue들을 찾아보았는데, 실제로 동일한 문제를 경험하고 있는 사람들이 많았다. 그리고 2024년 4월 기준 여전히 해결되지 않은 문제임을 확인했다. 따라서, 완벽한 솔루션은 아니지만 우회할 수 있는 방법을 적용해야 한다.

해결 방법을 바로 살펴보기에 앞서, 사용자의 연결 끊김 현상에 어떻게 대응할 것인지 한 번 고민해볼 필요가 있다. 크게 두 가지 정도로 방향을 잡아서 이름을 붙여보자.

  • Lazy: 연결이 끊겨도 작업하던 것을 중단하지 않는다. 모든 작업이 종료된 후, disconnection check을 하여 후속조치를 취한다.
  • Immediate: 작업 중간중간에 수시로 disconnection check를 한다. Disconnection이 감지되자마자 작업하던 것을 중단하고 에러를 발생킨 뒤 곧바로 후속조치를 취한다.

이 두 가지 방식 중 정답은 없다. 구체적으로 어떻게 구현할지 살펴보고, 장단점을 고려하여 상황에 맞는 해결 방법을 채택해야 한다. 예제 코드와 함께 하나씩 살펴보자.

Lazy 방식

Lazy 방식은 말그대로 모든 일이 끝나고 나서 느긋하게 disconnection check를 하는 방법이다. 앞서 언급했듯이, middleware는 클라이언트와 web server api에 매핑된 메서드 사이에서 전처리와 후처리를 담당한다. 후처리 부분은 로직이 담긴 메서드가 끝나고 난 뒤에 실행된다.

class MyMiddleware(BaseHTTPMiddleware):
  async def dispatch(
    self, request: Request, call_next: RequestResponseEndpoint
  ) -> Response:
    print('[Middleware] Preprocessing:', await request.is_disconnected())
    response = await call_next(request)  # business logic
    print('[Middleware] Postprocessing:', await request.is_disconnected())
    return response

앞서 2-3에서 call_next 메서드 다음, 즉, 후처리 부분에 작성한 코드에서 is_disconnected를 호출할 경우 disconnection check가 정상적으로 동작하는 것을 확인했다. 즉, lazy 방식대로 "연결이 끊겼을 때의 대응"에 해당하는 후속조치 코드만 후처리 부분에 작성하면 되는 것이다.

이렇게 구현하면 실제 business logic을 작성하는 부분에서 연결이 끊기는 시나리오에 대해 고민할 필요가 없다는 장점이 있다. 연결이 끊기더라도 작업하던 것은 끝까지 마무리하고, 후속조치는 최초의 middleware에게 위임하는 것이다. 연결이 끊겼다는 로그를 남기거나, admin에게 알림을 전송하는 등 간단하고 일반적인 후속조치만 필요하다면, 이 방법을 채택하여 간단하게 해결할 수 있다.

그런데, 이 방법에는 크게 두 가지 문제가 있다.

  • 자원 낭비
  • 복잡한 후속조치 구현의 어려움

첫 번째는 자원 낭비 문제이다. 이 문제는 business logic 완료에 소요되는 시간이 길수록 극대화된다. 앞서 2-3의 코드 예제에서 이미 3초가 지난 시점에 연결이 끊겼음에도 남은 7초 동안 로직을 계속 수행하는 것을 확인했다. 만약 business logic이 DB에 무언가를 저장하는 작업이라면, 시간뿐만 아니라 디스크 자원도 낭비하게 되는 셈이다.

두 번째는 복잡한 후속조치를 구현하기 어렵다는 것이다. Middleware에게 모든 후속조치를 위임하기 때문에 당연한 결과다. Business logic을 수행하는 메서드는 매우 다양하다. 각각의 메서드의 "취소" 동작은 저마다 고유한 로직이 필요할 것이다. 그런데, 수많은 메서드들이 하나의 middleware를 공유한다. 모든 메서드들이 후속조치를 middleware에 위임한다면 어떻게 될까? Middleware의 후처리 부분에 수많은 if, elif, else가 난무하는 모습을 상상해보자. 생각만 해도 끔찍하다.

그렇다고 아예 못쓸 정도는 아니다. 대부분의 경우 backend API의 작업은 DB와 여러가지 상호작용을 하는 것이 전부이다. 따라서, 다음과 같은 방법을 사용하면 어느정도 통일성있는 후속조치 로직이 탄생한다.

  • Request마다 DB transaction을 1:1 매핑시킨다.
  • Middleware의 후속조치 부분에서 disconnection이 감지되면 참조 중인 request의 DB transaction에 rollback을 호출한다.

그런데, 이렇게 구현하더라도 결국 DB 업데이트와 관련되지 않은 로직들은 여전히 "취소"라는 행동을 정의하기 어렵다. 따라서, 이 lazy 방법은 후속조치가 비교적 간단한 경우에만 제한적으로 사용해야 한다. 사실 실제로는 어떤 business logic이 추가 정의될지 모르기 때문에, 이 방법은 아예 채택하지 않는 것이 좋아보이기도 한다.

추후 event-driven architecture로 전체 구조를 리팩토링한다면 얘기가 달라질 수 있다. 그런데, EDA까지 다루려면 글의 분량이 지나치게 길어질 수 있기 때문에 자세한 설명은 생략하도록 하겠다. 간략하게만 짚고 넘어가자면, "취소" 동작을 별도로 구현해두고 disconnection이 감지될 때마다 이벤트를 발생시켜서 취소 동작을 trigger하는 식으로 구성하는 것이다.

Immediate 방식

Immediate 방식은 작업 도중 수시로 disconnection check를 하는 방식이다. 앞서 middleware를 거칠 때 Request 객체가 달라지는 것을 확인했다. 이 request 객체들 사이에 공유되는 자원은 없을까? Github에서 starlette 코드를 까보면 __call__ 메서드의 구현 부분에서 힌트를 얻을 수 있다.

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
  if scope["type"] != "http":
    await self.app(scope, receive, send)
    return

  request = _CachedRequest(scope, receive)
  wrapped_receive = request.wrapped_receive
  response_sent = anyio.Event()

  async def call_next(request: Request) -> Response:

  ...

  with collapse_excgroups():
    async with anyio.create_task_group() as task_group:
      response = await self.dispatch_func(request, call_next)
      await response(scope, wrapped_receive, send)
      response_sent.set()

async def dispatch(
  self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
  raise NotImplementedError()  # pragma: no cover

하나의 middleware에서 call_next 메서드를 통해 다음 middleware로 작업을 넘기기 전에, _CachedRequest 타입의 새로운 request를 생성하는 것을 확인할 수 있다. 이때, scopereceive라는 것이 그대로 전달된다. 둘 중 우선 scope에 초점을 맞춰보자. Scope는 요청에 대한 여러가지 메타 정보들을 담고 있는 dictionary 형식의 객체이다. 예를 들어, 클라이언트의 IP, port 정보, 요청에 포함된 parameter 정보 등을 담고 있다.

ASGI 문서를 보면 scope에 어떤 정보들이 포함되어 있는지 자세히 나와있다. 이 중 state에 대한 설명을 잘 읽어보자.

  • state Optional(dict[Unicode string, Any]) – A copy of the namespace passed into the lifespan corresponding to this request. (See Lifespan Protocol). Optional; if missing the server does not support this feature.

이어서 lifespan에 대한 문서에서 state에 대한 설명들을 살펴보자.

  • state Optional(dict[Unicode string, Any]) – An empty namespace where the application can persist state to be used when handling subsequent requests. Optional; if missing the server does not support this feature.

Applications often want to persist data from the lifespan cycle to request/response handling. For example, a database connection can be established in the lifespan cycle and persisted to the request/response cycle.

정리해보면, state는 사용자의 요청에 따라 발생하는 여러 request 객체들간에 정보를 공유하고 싶을 때 사용하는 것이다. Key는 str 타입, value를 Any 타입인 dictionary라고 가이드하고 있다. 즉, 원하는 key값과 is_disconnected 메서드를 value로 전달하면 다른 request 객체에도 그대로 전달된다는 의미다. ASGI 서버의 구현체가 지원해야 사용할 수 있다고 하지만, FastAPI에서는 사용 가능하니 안심해도 된다. 다음과 같이 web server 코드를 변경해서 테스트해보자.

class MyMiddleware(BaseHTTPMiddleware):
  async def dispatch(
    self, request: Request, call_next: RequestResponseEndpoint
  ) -> Response:
    print('[Middleware] Scope id:', id(request.scope))
    print('[Middleware] Receive id:', id(request.receive))
    request.state.is_disconnected = request.is_disconnected
    response = await call_next(request)
    return response

app.add_middleware(MyMiddleware)

@app.get("/")
async def hello(request: Request) -> Optional[Dict[str, Any]]:
  print('[API] Scope id:', id(request.scope))
  print('[API] Receive id:', id(request.receive))
  for _ in range(10):
    if await request.state.is_disconnected():
      print("Client is disconnected.")
      return None
    else:
      print("Client is connected.")
    await asyncio.sleep(1)
  return {"Hello": "World"}

request.state는 request 객체의 scope에 들어있는 state를 지름길처럼 참조하는 property이다. 따라서, request.state.is_disconnected = request.is_disconnected는 request 객체의 state에 is_disconnected 메서드를 저장하라는 의미이다. 이렇게 state에 저장된 메서드는 request 객체들 사이에 공유되므로, hello 메서드에서 접근할 수 있게 된다. 클라이언트 코드와 함께 실행하여 결과를 확인해보자.

INFO:     Started server process [60267]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
[Middleware] Scope id: 4424311424
[Middleware] Receive id: 4424397056
[API] Scope id: 4424311424
[API] Receive id: 4424383504
Client is connected.
Client is connected.
Client is connected.
Client is disconnected.

Middleware와 API에 각각 전달된 request 객체가 동일한 scope를 가지고 있는 것이 확인된다. 그리고 middleware를 쓰지 않았을 때와 똑같이 3초 후에 정상적으로 disconnection check가 된 것을 알 수 있다. 이로써 middleware를 사용하더라도 API 로직 부분에서 큰 코드 변화 없이 disconnection check가 정상 동작하게 되었다.

이 방법은 원할 때마다 API 로직에서 곧바로 disconnection check를 할 수 있기 때문에, lazy 방식에 비해 상대적으로 자원 낭비가 덜하다는 장점이 있다. 따라서, 특별한 이유가 없다면 이 immediate 방식을 사용하는 것이 편리할 것이다. 어떤 side effect가 있을지는 아직 파악하지 못했지만, 실제 프로젝트에 적용했을 때도 잘 동작하긴 한다.

당장의 문제 해결은 된 것처럼 보이지만, 한 가지 찝찝함이 남아있다. 위에서 살펴봤던 starlette 코드에 의하면, request 객체를 생성할 때 scope말고도 receive를 함께 전달한다. 그런데, 결과를 보면 middleware와 API에 전달된 각 receive의 참조가 다르다는 것이다. 이에 대한 내용은 뒤에서 좀 더 자세히 살펴보겠다.

마치며

해결 방법 요약 및 평가

Middleware 사용 시 disconnection check를 위해 두 가지 해결 방법을 알아보았다. 두 방식 모두 최초의 middleware가 참조하는 request 객체의 is_disconnected 메서드를 사용하기 위한 몸부림이라고 요약할 수 있다. Lazy 방식은 모든 작업이 끝난 후 최초의 middleware의 후처리 부분에서 is_disconnected를 호출하는 방식이다. 그리고 immediate 방식은 request 객체의 scope에 저장된 state를 통해 is_disconnected 메서드를 API에 전달하는 방식이다.

Lazy 방식은 사실 모든 작업이 끝난 후 disconnection check를 하는 것이기 때문에 효용성이 별로 없다고 볼 수 있다. 물론 없는 것보다는 낫겠지만, API 로직이 오래걸리거나 많은 자원을 요구할수록 가치가 떨어진다. 반면, immediate 방식은 ASGI 서버가 state를 지원하는 이상 간편하고 효율적으로 disconnection에 대한 대응을 할 수 있게 된다. 따라서, 별다른 이유가 없다면 immediate 방식을 채택하는 것을 권장한다.

어느 방식을 선택하든, 핵심은 "최초로 request를 받는 middleware"의 is_disconnected 메서드를 호출하는 것이다. 따라서, middleware의 추가 순서에 유의해야 한다. FastAPI 객체에 middleware를 추가할 때, 반드시 disconnection check와 관련된 middleware를 가장 나중에 추가하는 것을 잊지 말자. 다른 개발자들을 위해 주석으로 안내문구를 추가해주는 것이 인지상정이다.

app = FastAPI()
app.add_middleware(MyMiddleware3)
app.add_middleware(MyMiddleware2)
app.add_middleware(MyMiddleware1)  # disconnection check

근본 원인은 무엇일까?

이것저것 해결 방법은 살펴봤지만, 사실 문제 자체가 이상하다고 생각하지 않는가? 원래는 잘 되던 disconnection check가 middleware 하나 추가한다고 동작하지 않다니... 심오한 뜻이 있다기보단 버그에 가까운 현상이라고 생각한다.

앞서 살펴본 바에 따르면, request 객체를 생성할 때 scope뿐만 아니라 receive라는 것도 인자로 전달한다는 것을 알 수 있었다. 그런데, immediate 방식 설명 부분에서 receive의 id가 request 객체마다 다르다는 것을 확인했다. 뭔가 이상하지 않은가? 코드만 보면 receivescope와 마찬가지로 id가 같아야 할 것 같은데 말이다.

의문이 생겨서 starlette 코드를 분석하다보니, 결국 이 receive가 문제였다는 것을 깨달았다.

async def is_disconnected(self) -> bool:
  if not self._is_disconnected:
    message: Message = {}

    # If message isn't immediately available, move on
    with anyio.CancelScope() as cs:
      cs.cancel()
      message = await self._receive()

    if message.get("type") == "http.disconnect":
      self._is_disconnected = True

  return self._is_disconnected

is_disconnected 메서드 안에서 request 객체 안에 있는 _receive를 호출해서 message를 얻는 부분을 확인할 수 있다. 반환된 메시지의 내용을 통해 disconnection 여부를 알 수 있는 것이다. 결국 이 _receive가 request 객체마다 다르다보니, disconnection을 알리는 메시지를 제대로 받을 수 없는 것은 아닌지 의심된다.

앞서 언급했듯이 동일한 문제를 겪고 있는 사람들이 있는 것으로 보인다. 현재까지 파악된 바로는 cs.cancel()이 실행될 때 아예 기다려주지 않기 때문에 message가 항상 {}로 남게 되는 문제가 있다고 한다. 여전히 첫 번째 middleware가 받은 receive는 왜 정상적으로 동작하는지 의문이다.

지속 모니터링하며 patch가 될 때까지 기다리거나, 기회가 된다면 contribution을 해보는 것도 좋을 것 같다. 2024년 4월 기준 현재까지 제안된 여러 해결 방법들 중 적용해 볼 만한 꼼수는 다음과 같다.

async def my_is_disconnected(request: Request) -> bool:
  if request._is_disconnected:
    return True
  message = {}
  with anyio.move_on_after(0.01):  # timeout
    message = await request.receive()
  if message.get("type") == "http.disconnect":
    request._is_disconnected = True
  return request._is_disconnected

cs.cancel() 대신 anyio.move_on_after(0.01)을 통해 일정 시간 동안 기다려주는 방법이다. Timeout을 넣지 않으면 메시지가 도착할 때까지 계속 기다리기 때문에, disconnection check로써는 비효율적이다. 아무튼 이렇게 메서드를 정의해두고 API 코드에서 is_disconnected 메서드 대신 사용하면, middleware 순서를 신경쓸 필요없이 API에서 disconnection check를 수행할 수 있게 된다.

API 설계를 잘못한 것은 아닐까?

사실 이 글에서 살펴본 문제에 대해 심각하게 고민을 하면서 읽었다면, API 설계가 잘못된 것은 아닌지 고민해볼 필요가 있다. 오래 걸리는 작업에 대해 사용자가 완료될 때까지 기다려야 하는 것이 과연 바람직할까? 당연히 지양해야 한다. 애초에 오랜 시간이 걸릴만 한 작업을 API로 설계할 때는, 사용자에게 "요청이 완료되었다"는 메시지만 곧바로 반환해준 뒤 백그라운드에서 무거운 작업을 진행하도록 하는 것이 좋다. 따라서, 앞서 언급했던 EDA를 적용하거나, state 정보를 DB에 저장해서 polling을 통해 작업 진행 상황을 사용자에게 알리도록 해야 한다.

내가 이 문제를 고민하게 된 계기는 참여중인 프로젝트에서 특수한 시나리오에 대한 일종의 "땜빵" 작업이 있었기 때문이었다. UI에서 DB 접속 정보를 입력했을 때, 연결이 잘 되는지 connection test를 해보고 DB 정보를 등록하는 버튼이 있었다. 사용자가 버튼을 눌렀을 때 DB에 접속이 가능하다면 곧바로 초록색으로 표시를 해주고, connection test가 성공적으로 완료되어야 DB 정보가 서버에 정상 등록되도록 제한되어 있었다.

문제는 네트워크 문제로 DB connection test가 느린 경우였다. 한 번 요청을 하면, 사용자는 connection test가 완료될 때까지 빙글빙글 아이콘을 마주하게 된다. 따라서, frontend 개발자분이 임시로 취소 버튼을 만드셨고, 사용자가 취소 버튼을 누르면 connection test를 위한 연결을 끊겠다는 전략이었다. 당장 구조 리팩토링을 하기에는 일정이 빠듯하니 어쩔 수 없이 disconnection check와 취소 동작에 대한 고민을 하게 된 것이다.

만약 비슷한 고민이 필요한 상황이고, 시간의 여유가 있다면 구조 개선을 고민해볼 것을 추천한다. Starlette 코드의 문제로 인해 일종의 꼼수같은 해결책을 쓰는 것도 찝찝하지만, 오래 걸릴 수도 있는 작업에 대해 사용자가 응답을 기다려야 하는 시나리오 자체가 바람직하지 않다고 생각한다. DB 접속이나 외부 API 호출 등 네트워크 작업이 포함되어 있다면 더더욱 state 관리 방법을 도입하는 것을 적극 검토하도록 하자.

Comments

    More Posts

    Python으로 kubernetes 환경에 app 배포하기 - Kubernetes deployment with python

    Kubectl 명령어를 통해 kubernetes 환경에 다양한 구성요소들을 배포할 수 있지만, python으로 템플릿 코드를 짜두면 개발자들도 편리하게 배포를 자동화할 수 있다. Jinja2와 kubernetes 패키지를 통해 사용자 요청에 따라 kubernetes 환경에 pod들을 띄우는 기능을 만들어보자.

    Domain entity와 ORM을 따로 구분해서 정의하는 방법 - Domain entity with imperative ORM

    Domain entity를 정의할 때 declarative 방식의 ORM을 사용하면 domain model에 DB 관련 정보가 노출되는 문제가 있다. Python sqlalchemy를 활용하여 imperative mapping style로 domain entity로부터 ORM을 분리해보자.

    도메인 entity와 ORM을 동시에 추구하면 안 되는 걸까? - Domain entity with declarative ORM

    ORM을 사용하는 환경에서 DDD를 따르는 코드를 작성하다보면, domain entity를 어떤식으로 정의해야 하는지 혼란스러울 때가 있다. Python sqlchemy를 통해 declarative mapping 방식으로 domain entity를 어떻게 구현하면 좋은지 알아보자.

    Font Size