# FastAPI SocketIO

***

**1. Simple Chat Application**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('message')
def message(sid, data):
    print('Message received:', data)
    sio.emit('message', data, room=sid)
```

**2. Real-Time Data Streaming**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)
    sio.start_background_task(send_data, sid)

async def send_data(sid):
    while True:
        data = get_data()  # Replace with actual data fetching logic
        sio.emit('data', data, room=sid)
        await asyncio.sleep(1)
```

**3. Collaborative Drawing**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('drawing')
def drawing(sid, data):
    sio.emit('drawing', data, room='drawing')
```

**4. Multi-Room Chat**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('join')
def join(sid, data):
    room = data['room']
    sio.enter_room(sid, room)

@sio.on('message')
def message(sid, data):
    room = data['room']
    sio.emit('message', data, room=room)
```

**5. File Upload and Progress Tracking**

```python
from fastapi import FastAPI, File, UploadFile
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('upload')
async def upload(sid, file: UploadFile = File(...)):
     sio.emit('progress', 0, room=sid)
    try:
        while True:
            data = await file.read(1024)
            if not data:
                break
            sio.emit('progress', file.tell() / file.size * 100, room=sid)
    except Exception as e:
        sio.emit('error', str(e), room=sid)
    else:
        sio.emit('success', 'File uploaded successfully', room=sid)
```

**6. Real-Time Geolocation Tracking**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('location')
def location(sid, data):
    lat = data['lat']
    lon = data['lon']
    sio.emit('location', {'lat': lat, 'lon': lon}, room='tracking')
```

**7. Live Voting**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('vote')
def vote(sid, data):
    option = data['option']
    sio.emit('vote', option, room='voting')
    results = get_results()  # Replace with actual result fetching logic
    sio.emit('results', results, room='voting')
```

**8. Real-Time Analytics Dashboard**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)
    sio.start_background_task(send_metrics, sid)

async def send_metrics(sid):
    while True:
        metrics = get_metrics()  # Replace with actual metrics fetching logic
        sio.emit('metrics', metrics, room=sid)
        await asyncio.sleep(1)
```

**9. Real-Time Inventory Management**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('stock_update')
def stock_update(sid, data):
    product_id = data['product_id']
    quantity = data['quantity']
    update_stock(product_id, quantity)  # Replace with actual stock update logic

@sio.on('get_stock')
def get_stock(sid):
    stock = get_stock_data()  # Replace with actual stock data fetching logic
    sio.emit('stock_data', stock, room=sid)
```

**10. Real-Time Task Management**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('task_create')
def task_create(sid, data):
    title = data['title']
    description = data['description']
    create_task(title, description)  # Replace with actual task creation logic

@sio.on('task_update')
def task_update(sid, data):
    task_id = data['task_id']
    status = data['status']
    update_task(task_id, status)  # Replace with actual task update logic

@sio.on('task_delete')
def task_delete(sid, data):
    task_id = data['task_id']
    delete_task(task_id)  # Replace with actual task delete logic
```

**11. Real-Time CRM**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('customer_create')
def customer_create(sid, data):
    name = data['name']
    email = data['email']
    create_customer(name, email)  # Replace with actual customer creation logic

@sio.on('customer_update')
def customer_update(sid, data):
    customer_id = data['customer_id']
    data = data['data']
    update_customer(customer_id, data)  # Replace with actual customer update logic

@sio.on('customer_delete')
def customer_delete(sid, data):
    customer_id = data['customer_id']
    delete_customer(customer_id)  # Replace with actual customer delete logic
```

**12. Real-Time Order Management**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('order_create')
def order_create(sid, data):
    customer_id = data['customer_id']
    products = data['products']
    create_order(customer_id, products)  # Replace with actual order creation logic

@sio.on('order_update')
def order_update(sid, data):
    order_id = data['order_id']
    status = data['status']
    update_order(order_id, status)  # Replace with actual order update logic

@sio.on('order_delete')
def order_delete(sid, data):
    order_id = data['order_id']
    delete_order(order_id)  # Replace with actual order delete logic
```

**13. Real-Time E-commerce Cart Management**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('add_to_cart')
def add_to_cart(sid, data):
    product_id = data['product_id']
    quantity = data['quantity']
    add_item_to_cart(sid, product_id, quantity)  # Replace with actual cart logic

@sio.on('remove_from_cart')
def remove_from_cart(sid, data):
    item_id = data['item_id']
    remove_item_from_cart(sid, item_id)  # Replace with actual cart logic

@sio.on('get_cart')
def get_cart(sid):
    cart_data = get_cart_data(sid)  # Replace with actual cart data fetching logic
    sio.emit('cart_data', cart_data, room=sid)
```

**14. Real-Time Quiz Application**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('answer')
def answer(sid, data):
    question_id = data['question_id']
    answer = data['answer']
    check_answer(question_id, answer)  # Replace with actual answer checking logic

@sio.on('get_question')
def get_question(sid):
    question_data = get_question_data()  # Replace with actual question data fetching logic
    sio.emit('question', question_data, room=sid)
```

**15. Real-Time Game**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('move')
def move(sid, data):
    direction = data['direction']
    move_player(sid, direction)  # Replace with actual game move logic

@sio.on('get_state')
def get_state(sid):
    game_state = get_game_state()  # Replace with actual game state fetching logic
    sio.emit('state', game_state, room=sid)
```

**16. Real-Time Collaboration Platform**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('join')
def join(sid, data):
    room = data['room']
    sio.join_room(sid, room)

@sio.on('message')
def message(sid, data):
    room = data['room']
    message = data['message']
    sio.emit('message', message, room=room)
```

**17. Real-Time Social Network**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('join')
def join(sid, data):
    user_id = data['user_id']
    sio.join_room(sid, user_id)

@sio.on('message')
def message(sid, data):
    recipient_id = data['recipient_id']
    message = data['message']
    sio.emit('message', message, room=recipient_id)
```

**18. Real-Time Notification System**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('subscribe')
def subscribe(sid, data):
    topic = data['topic']
    sio.enter_room(sid, topic)

@sio.on('publish')
def publish(sid, data):
    topic = data['topic']
    message = data['message']
    sio.emit('message', message, room=topic)
```

**19. Real-Time Chatbot**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('message')
def message(sid, data):
    message = data['message']
    response = get_response(message)  # Replace with actual chatbot logic
    sio.emit('response', response, room=sid)
```

**20. Real-Time Machine Learning Model**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('predict')
def predict(sid, data):
    features = data['features']
    prediction = get_prediction(features)  # Replace with actual ML model inference logic
    sio.emit('prediction', prediction, room=sid)
```

**21. Real-Time Data Visualizer**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('subscribe')
def subscribe(sid, data):
    datasource = data['datasource']
    sio.enter_room(sid, datasource)

@sio.on('data')
def data(sid, data):
    datasource = data['datasource']
    data = data['data']
    sio.emit('data', data, room=datasource)
```

**22. Real-Time Weather Forecast**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('get_forecast')
def get_forecast(sid, data):
    location = data['location']
    forecast = get_forecast_data(location)  # Replace with actual weather forecast fetching logic
    sio.emit('forecast', forecast, room=sid)
```

**23. Real-Time Stock Market Data**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('subscribe')
def subscribe(sid, data):
    stock = data['stock']
    sio.enter_room(sid, stock)

@sio.on('data')
def data(sid, data):
    stock = data['stock']
    price = data['price']
    sio.emit('data', {'stock': stock, 'price': price}, room=stock)
```

**24. Real-Time Currency Exchange Rates**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('subscribe')
def subscribe(sid, data):
    currency_pair = data['currency_pair']
    sio.enter_room(sid, currency_pair)

@sio.on('data')
def data(sid, data):
    currency_pair = data['currency_pair']
    rate = data['rate']
    sio.emit('data', {'currency_pair': currency_pair, 'rate': rate}, room=currency_pair)
```

**25. Real-Time Flight Tracking**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('subscribe')
def subscribe(sid, data):
    flight_id = data['flight_id']
    sio.enter_room(sid, flight_id)

@sio.on('data')
def data(sid, data):
    flight_id = data['flight_id']
    location = data['location']
    sio.emit('data', {'flight_id': flight_id, 'location': location}, room=flight_id)
```

**26. Real-Time IoT Device Monitoring**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('subscribe')
def subscribe(sid, data):
    device_id = data['device_id']
    sio.enter_room(sid, device_id)

@sio.on('data')
def data(sid, data):
    device_id = data['device_id']
    data = data['data']
    sio.emit('data', {'device_id': device_id, 'data': data}, room=device_id)
```

**27. Real-Time Smart Home Control**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('control')
def control(sid, data):
    device_id = data['device_id']
    command = data['command']
    control_device(device_id, command)  # Replace with actual device control logic
```

**28. Real-Time Streaming Audio**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('audio_stream')
def audio_stream(sid, data):
    audio_data = data['audio_data']
    sio.emit('audio_data', audio_data, room=sid)
```

**29. Real-Time Streaming Video**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('video_stream')
def video_stream(sid, data):
    video_data = data['video_data']
    sio.emit('video_data', video_data, room=sid)
```

**30. Real-Time Text Editor**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('edit')
def edit(sid, data):
    content = data['content']
    sio.emit('content', content, room='editor')
```

**31. Real-Time Collaborative Whiteboard**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('draw')
def draw(sid, data):
    drawing_data = data['drawing_data']
    sio.emit('drawing_data', drawing_data, room='whiteboard')
```

**32. Real-Time Collaborative Code Editor**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('edit')
def edit(sid, data):
    code = data['code']
    sio.emit('code', code, room='editor')
```

**33. Real-Time Multiplayer Game**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('move')
def move(sid, data):
    direction = data['direction']
    sio.emit('move', direction, room='game')
```

**34. Real-Time Auction System**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('bid')
def bid(sid, data):
    item_id = data['item_id']
    bid_amount = data['bid_amount']
    sio.emit('bid', {'item_id': item_id, 'bid_amount': bid_amount}, room='auction')
```

**35. Real-Time Ride-Hailing Platform**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('request_ride')
def request_ride(sid, data):
    pickup_location = data['pickup_location']
    destination = data['destination']
    sio.emit('request_ride', {'pickup_location': pickup_location, 'destination': destination}, room='rides')
```

**36. Real-Time Event Ticketing System**

```python
from fastapi import FastAPI, WebSocket
from socketio import SocketIO

app = FastAPI()
sio = SocketIO(app)

@sio.on('connect')
def connect(sid, environ):
    print('Connected', sid)

@sio.on('book_ticket')
def book_ticket(sid, data):
    event_id = data['event_id']
    quantity = data['quantity']
    sio.emit('book_ticket', {'event_id': event_id, 'quantity': quantity}, room='events')
```
