FastAPI WebSockets 即時通訊
WebSockets 是一種在單個 TCP 連線上進行全雙工通訊的協定。它允許伺服器主動向客戶端推播資訊,非常適合聊天室、即時通知、遊戲等應用。
FastAPI 對 WebSockets 有著原生的支持。
建立 WebSocket Endpoint
你需要使用 @app.websocket 裝飾器,並在函數中接收 WebSocket 參數。
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
# 1. 接受連線
await websocket.accept()
while True:
# 2. 接收資料 (字串)
data = await websocket.receive_text()
# 3. 處理資料並回傳
await websocket.send_text(f"Message text was: {data}")
關鍵步驟
await websocket.accept(): 建立 WebSocket 連線。await websocket.receive_text(): 等待並接收客戶端傳來的字串訊息。await websocket.send_text(data): 發送字串訊息給客戶端。
通常我們會將接收邏輯放在一個 while True 迴圈中,以保持連線並持續處理訊息。
處理斷線
當客戶端關閉連線時,receive_text 會拋出 WebSocketDisconnect 例外。我們應該捕捉這個例外來優雅地處理斷線。
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
print(f"Client #{client_id} left the chat")
Connection Manager (聊天室範例)
如果要實作多人聊天室(廣播訊息),我們需要一個管理器來追蹤所有連線中的 Websocket。
from typing import List
class ConnectionManager:
def __init__(self):
# 儲存所有活躍的連線
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
# 對所有連線發送訊息
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")
總結
- 使用
WebSocket處理即時雙向通訊。 - 記得使用
await websocket.accept()接受連線。 - 使用
try...except WebSocketDisconnect處理斷線。 - 透過自定義
ConnectionManager可以輕鬆實作廣播功能。