# FastAPI Script

***

**1. Create a FastAPI app with a single endpoint that returns a JSON response**

```python
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def read_root():
    return {"Hello": "World"}
```

**2. Use FastAPI to create a REST API that provides CRUD operations on a database**

```python
from fastapi import FastAPI
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker

# Create the FastAPI app
app = FastAPI()

# Create the database engine
engine = create_engine("sqlite:///database.db")

# Create the metadata and table
metadata = MetaData()
users = Table("users", metadata, Column("id", Integer, primary_key=True), Column("name", String(255)))
metadata.create_all(engine)

# Create the session factory
Session = sessionmaker(bind=engine)

# Create the session
session = Session()

# Add a new user
new_user = users.insert().values(name="John Doe")
session.execute(new_user)
session.commit()

# Get all users
users = session.query(users).all()

# Close the session
session.close()

# Define the GET endpoint
@app.get("/users")
def get_users():
    return users

# Define the POST endpoint
@app.post("/users")
def create_user(user: User):
    new_user = users.insert().values(**user.dict())
    session.execute(new_user)
    session.commit()
    return user

# Define the PUT endpoint
@app.put("/users/{user_id}")
def update_user(user_id: int, user: User):
    session.query(users).filter(users.c.id == user_id).update(user.dict())
    session.commit()
    return user

# Define the DELETE endpoint
@app.delete("/users/{user_id}")
def delete_user(user_id: int):
    session.query(users).filter(users.c.id == user_id).delete()
    session.commit()
    return {"message": "User deleted"}
```

**3. Use FastAPI to create a REST API that uses authentication and authorization**

```python
from fastapi import FastAPI, HTTPException
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext

# Create the FastAPI app
app = FastAPI()

# Create the OAuth2 password bearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")

# Create the password context
pwd_context = CryptContext(schemes=["bcrypt"])

# Create the users database
users = {"john": {"password": pwd_context.hash("secret")}}

# Define the token generation function
def generate_token(username: str):
    return jwt.encode({"username": username}, "secret", algorithm="HS256")

# Define the token verification function
def verify_token(token: str):
    try:
        decoded_token = jwt.decode(token, "secret", algorithms=["HS256"])
        username = decoded_token["username"]
    except jwt.DecodeError:
        raise HTTPException(status_code=401, detail="Invalid token")
    return username

# Define the get_current_user function
def get_current_user(token: str = Depends(oauth2_scheme)):
    username = verify_token(token)
    user = users.get(username)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid token")
    return user

# Define the token endpoint
@app.post("/token")
def token(credentials: Credentials):
    username = credentials.username
    password = credentials.password
    user = users.get(username)
    if not user or not pwd_context.verify(password, user["password"]):
        raise HTTPException(status_code=401, detail="Invalid credentials")
    return {"access_token": generate_token(username), "token_type": "bearer"}

# Define the protected endpoint
@app.get("/protected")
def protected(current_user: User = Depends(get_current_user)):
    return {"message": "Hello, {}!".format(current_user.username)}
```

**4. Use FastAPI to create a REST API that uses OpenAPI documentation**

```python
from fastapi import FastAPI, OpenAPIBearer
from fastapi.responses import JSONResponse
from fastapi.openapi.utils import get_openapi

# Create the FastAPI app
app = FastAPI()

# Create the OpenAPI bearer
openapi_bearer = OpenAPIBearer(tokenUrl="/token")

# Define the token generation function
def generate_token(username: str):
    return jwt.encode({"username": username}, "secret", algorithm="HS256")

# Define the token verification function
def verify_token(token: str):
    try:
        decoded_token = jwt.decode(token, "secret", algorithms=["HS256"])
        username = decoded_token["username"]
    except jwt.DecodeError:
        raise HTTPException(status_code=401, detail="Invalid token")
    return username

# Define the get_current_user function
def get_current_user(token: str = Depends(openapi_bearer)):
    username = verify_token(token)
    user = users.get(username)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid token")
    return user

# Define the get_openapi function
def get_openapi(app: FastAPI):
    if app.openapi_schema:
        return app.openapi_schema
    openapi_schema = super().get_openapi(app)
    openapi_schema["components"]["securitySchemes"]["bearerAuth"] = {
        "type": "http",
        "scheme": "bearer",
        "bearerFormat": "JWT"
    }
    return openapi_schema

# Set the get_openapi function
app.get_openapi = get_openapi

# Define the token endpoint
@app.post("/token")
def token(credentials: Credentials):
    username = credentials.username
    password = credentials.password
    user = users.get(username)
    if not user or not pwd_context.verify(password, user["password"]):
        raise HTTPException(status_code=401, detail="Invalid credentials")
    return {"access_token": generate_token(username), "token_type": "bearer"}

# Define the protected endpoint
@app.get("/protected")
def protected(current_user: User = Depends(get_current_user)):
    return {"message": "Hello, {}!".format(current_user.username)}
```

**5. Use FastAPI to create a REST API that uses asynchronous operations**

```python
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from fastapi.concurrency import AsyncIOServer
import asyncio

# Create the FastAPI app
app = FastAPI()

# Define the asynchronous function
async def async_function(request):
    await asyncio.sleep(5)
    return JSONResponse({"message": "Hello, world!"})

# Define the endpoint
@app.get("/async")
async def async_endpoint(request: Request):
    return await async_function(request)

# Run the app with the async server
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000, server_class=AsyncIOServer)
```

**6. Use FastAPI to create a REST API that uses dependency injection**

```python
from fastapi import FastAPI, Depends
from fastapi.responses import JSONResponse

# Create the FastAPI app
app = FastAPI()

# Create the dependency
async def get_current_user():
    return {"username": "john"}

# Define the endpoint
@app.get("/user")
async def get_user(current_user: User = Depends(get_current_user)):
    return JSONResponse({"user": current_user})
```

**7. Use FastAPI to create a REST API that uses background tasks**

```python
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import JSONResponse

# Create the FastAPI app
app = FastAPI()

# Define the background task function
async def send_email(email: str):
    # Send the email
    pass

# Define the endpoint
@app.post("/send-email")
async def send_email_endpoint(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(send_email, email)
    return JSONResponse({"message": "Email sent"})
```

**8. Use FastAPI to create a REST API that uses websockets**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse

# Create the FastAPI app
app = FastAPI()

# Define the websocket endpoint
@app.websocket("/websocket")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            await websocket.send_json({"message": data["message"]})
    except WebSocketDisconnect:
        pass

# Define the HTML endpoint
@app.get("/")
async def get_html():
    return HTMLResponse(
        """
        <h1>WebSocket Chat</h1>
        <script>
            var websocket = new WebSocket("ws://localhost:8000/websocket");

            websocket.onopen = function() {
                console.log("Connected to the websocket");
            };

            websocket.onmessage = function(e) {
                console.log("Received a message from the websocket: ", e.data);
            };

            websocket.onerror = function(e) {
                console.log("An error occurred with the websocket: ", e);
            };

            websocket.onclose = function() {
                console.log("The websocket connection was closed");
            };

            function sendData(data) {
                websocket.send(JSON.stringify(data));
            }
        </script>
        """
    )
```

**9. Use FastAPI to create a REST API that uses a database**

```python
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker

# Create the FastAPI app
app = FastAPI()

# Create the database engine
engine = create_engine("sqlite:///database.db")

# Create the metadata and table
metadata = MetaData()
users = Table("users", metadata, Column("id", Integer, primary_key=True), Column("name", String(255)))
metadata.create_all(engine)

# Create the session factory
Session = sessionmaker(bind=engine)

# Create the session
session = Session()

# Define the get_user function
def get_user(id: int):
    user = session.query(users).filter(users.c.id == id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

# Define the get_users function
def get_users():
    users = session.query(users).all()
    return users

# Define the create_user function
def create_user(user: User):
    new_user = users.insert().values(**user.dict())
    session.execute(new_user)
    session.commit()
    return user

# Define the update_user function
def update_user(id: int, user: User):
    session.query(users).filter(users.c.id == id).update(user.dict())
    session.commit()
    return user

# Define the delete_user function
def delete_user(id: int):
    session.query(users).filter(users.c.id == id).delete()
    session.commit()
    return {"message": "User deleted"}

# Define the get_user endpoint
@app.get("/users/{id}", response_model=User)
def get_user_endpoint(id: int):
    user = get_user(id)
    return user

# Define the get_users endpoint
@app.get("/users", response_model=List[User])
def get_users_endpoint():
    users = get_users()
    return users

# Define the create_user endpoint
@app.post("/users", response_model=User)
def create_user_endpoint(user: User):
    user = create_user(user)
    return user

# Define the update_user endpoint
@app.put("/users/{id}", response_model=User)
def update_

```
