# FastAPI WebSocket

***

**1. Simple Echo Server**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(data)
    except WebSocketDisconnect:
        pass
```

**2. Broadcast Messages to All Connected Clients**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

connected_clients = set()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    connected_clients.add(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            for client in connected_clients:
                await client.send_text(data)
    except WebSocketDisconnect:
        connected_clients.remove(websocket)
```

**3. Authentication with JWT**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends
from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(oauth2_scheme)):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(data)
    except WebSocketDisconnect:
        pass
```

**4. Real-Time Data Streaming**

```python
import asyncio

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        # Generate random data
        data = {"temperature": random.random(), "humidity": random.random()}
        await websocket.send_json(data)
        await asyncio.sleep(1)
```

**5. Subscription-Based Messaging**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

subscriptions = {}

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            if "subscribe" in data:
                subscriptions[websocket] = data["subscribe"]
            elif "unsubscribe" in data:
                subscriptions.pop(websocket, None)
            for subscribed_topic in subscriptions.get(websocket, []):
                await websocket.send_json({subscribed_topic: random.random()})
    except WebSocketDisconnect:
        for subscribed_topic in subscriptions.pop(websocket, []):
            pass
```

**6. File Upload and Download**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, File, UploadFile

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_bytes()
            if data[0] == 1:  # File upload
                file = await websocket.receive_file(data[1:])
                await websocket.send_bytes([2, file.filename.encode()])
            elif data[0] == 2:  # File download
                filename = data[1:].decode()
                file_data = open(filename, "rb").read()
                await websocket.send_bytes([3, file_data])
    except WebSocketDisconnect:
        pass
```

**7. Interactive Chat**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

users = {}

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    username = await websocket.receive_text()
    users[username] = websocket
    try:
        while True:
            data = await websocket.receive_text()
            for user in users.values():
                if user != websocket:
                    await user.send_text(f"{username}: {data}")
    except WebSocketDisconnect:
        users.pop(username, None)
```

**8. Real-Time Location Tracking**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            latitude = data["latitude"]
            longitude = data["longitude"]
            # Update user's location in the database
            await websocket.send_json({"status": "ok"})
    except WebSocketDisconnect:
        pass
```

**9. Remote Procedure Call (RPC)**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            method = data["method"]
            args = data["args"]
            result = await getattr(self, method)(*args)
            await websocket.send_json({"result": result})
    except WebSocketDisconnect:
        pass
```

**10. Game Loop**

```python
import asyncio

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        # Update game state
        game_state = {"players": [], "objects": []}
        await websocket.send_json(game_state)
        await asyncio.sleep(0.1)
```

**11. Real-Time Stock Market Data**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            if data == "subscribe":
                await websocket.send_json({"stocks": [{"symbol": "AAPL", "price": 100}]})
    except WebSocketDisconnect:
        pass
```

**12. Real-Time Social Media Feeds**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            if data == "subscribe":
                await websocket.send_json({"posts": [{"user": "johndoe", "content": "Hello world!"}]})
    except WebSocketDisconnect:
        pass
```

**13. Interactive Whiteboard**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            if "draw" in data:
                # Draw on whiteboard
                pass
            elif "clear" in data:
                # Clear whiteboard
                pass
    except WebSocketDisconnect:
        pass
```

**14. Remote Desktop**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_bytes()
            # Send screenshots back to client
            await websocket.send_bytes(b"image data")
    except WebSocketDisconnect:
        pass
```

**15. Real-Time Collaboration**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            if "edit" in data:
                # Collaborative editing
                pass
    except WebSocketDisconnect:
        pass
```

**16. Live Video Streaming**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # Send video frames to clients
            await websocket.send_bytes(b"video data")
    except WebSocketDisconnect:
        pass
```

**17. Chat with Text and Emojis**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            # Send text and emoji messages
            await websocket.send_text(data)
    except WebSocketDisconnect:
        pass
```

**18. Real-Time Data Visualization**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # Send real-time data updates (e.g., charts, graphs)
            await websocket.send_json({"data": [{"x": 1, "y": 2}]})
    except WebSocketDisconnect:
        pass
```

**19. Multiplayer Game**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            if "move" in data:
                # Handle player movements
                pass
            elif "attack" in data:
                # Handle player attacks
                pass
    except WebSocketDisconnect:
        pass
```

**20. Remote Code Execution**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            # Execute code on server and send results back
            await websocket.send_text(str(eval(data)))
    except WebSocketDisconnect:
        pass
```

**21. File Sharing**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, File, UploadFile

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_bytes()
            if data[0] == 1:  # File upload
                file = await websocket.receive_file(data[1:])
                # Save file on server
                pass
            elif data[0] == 2:  # File download
                # Send file to client
                await websocket.send_bytes(open("file.txt", "rb").read())
    except WebSocketDisconnect:
        pass
```

**22. Remote Desktop Control**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_bytes()
            # Control remote desktop
            pass
    except WebSocketDisconnect:
        pass
```

**23. Real-Time Data Visualizations**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            if "query" in data:
                # Execute data query on server and send results as JSON
                await websocket.send_json({"data": [{"x": 1, "y": 2}]})
    except WebSocketDisconnect:
        pass
```

**24. Collaborative Drawing**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            if "draw" in data:
                # Draw on collaborative canvas
                pass
            elif "clear" in data:
                # Clear canvas
                pass
    except WebSocketDisconnect:
        pass
```

**25. Real-Time Chat with Presence**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

connected_users = {}

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    username = await websocket.receive_text()
    connected_users[username] = websocket
    try:
        while True:
            data = await websocket.receive_text()
            for user, conn in connected_users.items():
                if user != username:
                    await conn.send_text(f"{username}: {data}")
    except WebSocketDisconnect:
        connected_users.pop(username, None)
```

**26. Remote Log Monitoring**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            if "query" in data:
                # Query logs on server and send results as JSON
                await websocket.send_json({"logs": [{"timestamp": 123456, "message": "Info: Server started."}]})
    except WebSocketDisconnect:
        pass
```

**27. Real-Time Error Monitoring**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            if "query" in data:
                # Query error logs on server and send results as JSON
                await websocket.send_json({"errors": [{"timestamp": 123456, "message": "Error: Server encountered an issue."}]})
    except WebSocketDisconnect:
        pass
```

**28. Real-Time Device Monitoring**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            if "device_id" in data:
                # Query device status from database and send as JSON
                await websocket.send_json({"status": "online"})
    except WebSocketDisconnect:
        pass
```

**29. Remote Control for Robots**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FASTAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            if "command" in data:
                # Execute command on robot (e.g., move forward, turn left)
                pass
    except WebSocketDisconnect:
        pass
```

**30. Real-Time Video Surveillance**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FASTAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # Send live video stream as bytes
            await websocket.send_bytes(b"video data")
    except WebSocketDisconnect:
        pass
```

**31. Remote Control for Drones**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FASTAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            if "command" in data:
                # Execute command on drone (e.g., take off, land, fly forward)
                pass
    except WebSocketDisconnect:
        pass
```

**32. Real-Time Weather Updates**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FASTAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # Send real-time weather updates (e.g., temperature, humidity, wind speed)
            await websocket.send_json({"weather": {"temperature": 25, "humidity": 50, "wind_speed": 10}})
    except WebSocketDisconnect:
        pass
```

**33. Remote Control for Smart Home Devices**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FASTAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            if "command" in data:
                # Execute command on smart home device (e.g., turn on lights, adjust thermostat)
                pass
    except WebSocketDisconnect:
        pass
```

**34. Real-Time Stock Market Trading**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FASTAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # Send real-time stock market data and allow users to place trades
            await websocket.send_json({"stocks": [{"symbol": "AAPL", "price": 100}]})
    except WebSocketDisconnect:
        pass
```

**35. Remote Control for Self-Driving Cars**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FASTAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            if "command" in data:
                # Execute command on self-driving car (e.g., accelerate, brake, change lane)
                pass
    except WebSocketDisconnect:
        pass
```

**36. Real-Time Health Monitoring**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FASTAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # Send real-time health data (e.g., heart rate, blood pressure, glucose levels)
            await websocket.send_json({"health": {"heart_rate": 70, "blood_pressure": 120/80, "glucose": 100}})
    except WebSocketDisconnect:
        pass
```
