# FastAPI JWT Extended

***

**1. Simple JWT Token Generation:**

```python
from fastapi import FastAPI, Body
from fastapi.responses import JSONResponse
from fastapi_jwt_extended import create_access_token

app = FastAPI()

@app.post("/login")
async def login(username: str = Body(...), password: str = Body(...)):
    if username == "admin" and password == "secret":
        return JSONResponse(content={"token": create_access_token(identity=username)})
    return JSONResponse(content={"error": "Invalid credentials"}, status_code=401)
```

**2. JWT Token with Custom Claims:**

```python
from fastapi import FastAPI, Body
from fastapi.responses import JSONResponse
from fastapi_jwt_extended import create_access_token, get_jwt_current_user

app = FastAPI()

@app.post("/login")
async def login(username: str = Body(...), password: str = Body(...)):
    if username == "admin" and password == "secret":
        return JSONResponse(content={"token": create_access_token(identity=username, claims={"role": "admin"})})
    return JSONResponse(content={"error": "Invalid credentials"}, status_code=401)

@app.get("/protected")
async def protected_route():
    current_user = get_jwt_current_user()
    return JSONResponse(content={"user": current_user})
```

**3. JWT Token Refresh:**

```python
from fastapi import FastAPI, Body
from fastapi.responses import JSONResponse
from fastapi_jwt_extended import create_access_token, get_jwt_current_user
from fastapi_jwt_extended.exceptions import TokenExpiredError

app = FastAPI()

@app.get("/refresh")
async def refresh_token():
    try:
        current_user = get_jwt_current_user()
        new_token = create_access_token(identity=current_user.identity)
        return JSONResponse(content={"token": new_token})
    except TokenExpiredError:
        return JSONResponse(content={"error": "Token expired"}, status_code=401)
```

**4. JWT Token Blacklist:**

```python
from fastapi import FastAPI, Body
from fastapi.responses import JSONResponse
from fastapi_jwt_extended import create_access_token, get_jwt_current_user, create_refresh_token
from fastapi_jwt_extended.exceptions import TokenExpiredError
from fastapi_jwt_extended.utils import get_jti_header

app = FastAPI()

jwt_blacklist = set()

@app.post("/login")
async def login(username: str = Body(...), password: str = Body(...)):
    if username == "admin" and password == "secret":
        access_token = create_access_token(identity=username)
        refresh_token = create_refresh_token(identity=username)
        return JSONResponse(content={"access_token": access_token, "refresh_token": refresh_token})
    return JSONResponse(content={"error": "Invalid credentials"}, status_code=401)

@app.get("/protected")
async def protected_route():
    current_user = get_jwt_current_user()
    return JSONResponse(content={"user": current_user})

@app.get("/refresh")
async def refresh_token():
    try:
        current_user = get_jwt_current_user()
        jti = get_jti_header()
        if jti not in jwt_blacklist:
            new_access_token = create_access_token(identity=current_user.identity)
            return JSONResponse(content={"access_token": new_access_token})
        return JSONResponse(content={"error": "Token revoked"}, status_code=401)
    except TokenExpiredError:
        return JSONResponse(content={"error": "Token expired"}, status_code=401)
```

**5. JWT Token with HTTPOnly Cookies:**

```python
from fastapi import FastAPI, Cookie, HTTPException
from fastapi.responses import JSONResponse
from fastapi_jwt_extended import create_access_token, get_jwt_current_user
from fastapi_jwt_extended.exceptions import TokenExpiredError

app = FastAPI()

@app.post("/login")
async def login(username: str = Cookie(...), password: str = Cookie(...)):
    if username == "admin" and password == "secret":
        access_token = create_access_token(identity=username)
        return JSONResponse(content={"access_token": access_token}, headers={"Set-Cookie": f"access_token={access_token}; HttpOnly"})
    return JSONResponse(content={"error": "Invalid credentials"}, status_code=401)

@app.get("/protected")
async def protected_route(access_token: str = Cookie(...)):
    try:
        current_user = get_jwt_current_user(access_token=access_token)
        return JSONResponse(content={"user": current_user})
    except TokenExpiredError:
        raise HTTPException(status_code=401, detail="Token expired")
```

**6. JWT Token with HTTPS Only:**

```python
from fastapi import FastAPI
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware

app = FastAPI()

app.add_middleware(HTTPSRedirectMiddleware)

@app.post("/login")
async def login():
    return JSONResponse(content={"token": "some_token"})
```

**7. JWT Token with Multiple Tokens:**

```python
from fastapi import FastAPI, Body, Cookie
from fastapi.responses import JSONResponse
from fastapi_jwt_extended import create_access_token, create_refresh_token, get_jwt_current_user
from fastapi_jwt_extended.exceptions import TokenExpiredError

app = FastAPI()

@app.post("/login")
async def login(username: str = Body(...), password: str = Body(...)):
    if username == "admin" and password == "secret":
        access_token = create_access_token(identity=username)
        refresh_token = create_refresh_token(identity=username)
        return JSONResponse(content={"access_token": access_token, "refresh_token": refresh_token})
    return JSONResponse(content={"error": "Invalid credentials"}, status_code=401)

@app.get("/protected_access")
async def protected_access():
    current_user = get_jwt_current_user()
    return JSONResponse(content={"user": current_user})

@app.get("/protected_refresh")
async def protected_refresh():
    try:
        current_user = get_jwt_current_user()
        new_access_token = create_access_token(identity=current_user.identity)
        return JSONResponse(content={"access_token": new_access_token})
    except TokenExpiredError:
        return JSONResponse(content={"error": "Token expired"}, status_code=401)
```

**8. JWT Token with Multiple Endpoints:**

```python
from fastapi import FastAPI, Body, Cookie
from fastapi.responses import JSONResponse
from fastapi_jwt_extended import create_access_token, create_refresh_token, get_jwt_current_user

app = FastAPI()

@app.post("/login")
async def login(username: str = Body(...), password: str = Body(...)):
    if username == "admin" and password == "secret":
        access_token = create_access_token(identity=username)
        refresh_token = create_refresh_token(identity=username)
        return JSONResponse(content={"access_token": access_token, "refresh_token": refresh_token})
    return JSONResponse(content={"error": "Invalid credentials"}, status_code=401)

@app.get("/route1")
async def route1():
    current_user = get_jwt_current_user()
    return JSONResponse(content={"user": current_user})

@app.get("/route2")
async def route2():
    current_user = get_jwt_current_user()
    return JSONResponse(content={"user": current_user})
```

**9. JWT Token with Rate Limiting:**

```python
from fastapi import FastAPI, Body, Request
from fastapi.responses import JSONResponse
from fastapi_jwt_extended import create_access_token, create_refresh_token, get_jwt_current_user
from fastapi_jwt_extended.exceptions import TokenExpiredError
from fastapi_rate_limits import RateLimiter

app = FastAPI()

limiter = RateLimiter(10, 1)

@limiter.limit("login")
@app.post("/login")
async def login(request: Request, username: str = Body(...), password: str = Body(...)):
    if username == "admin" and password == "secret":
        access_token = create_access_token(identity=username)
        refresh_token = create_refresh_token(identity=username)
        return JSONResponse(content={"access_token": access_token, "refresh_token": refresh_token})
    return JSONResponse(content={"error": "Invalid credentials"}, status_code=401)

@app.get("/protected")
async def protected_route():
    current_user = get_jwt_current_user()
    return JSONResponse(content={"user": current_user})

@app.get("/refresh")
async def refresh_token():
    try:
        current_user = get_jwt_current_user()
        new_access_token = create_access_token(identity=current_user.identity)
        return JSONResponse(content={"access_token": new_access_token})
    except TokenExpiredError:
        return JSONResponse(content={"error": "Token expired"}, status_code=401)
```

**10. JWT Token with Auto-Decoding:**

```python
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from fastapi_jwt_extended import JWTManager

app = FastAPI()

jwt_manager = JWTManager()

@app.post("/login")
async def login():
    access_token = jwt_manager.create_access_token(identity="admin")
    return JSONResponse(content={"access_token": access_token})

@app.get("/protected")
async def protected_route(authorization: str = Header(...)):
    try:
        payload = jwt_manager.decode_access_token(authorization.split(" ")[1])
        return JSONResponse(content={"user": payload.identity})
    except HTTPException as e:
        return JSONResponse(content={"error": str(e)}, status_code=401)
```

**11. JWT Token with Multiple Decoders:**

```python
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from fastapi_jwt_extended import JWTManager

app = FastAPI()

jwt_manager = JWTManager()

@app.post("/login")
async def login():
    access_token = jwt_manager.create_access_token(identity="admin")
    return JSONResponse(content={"access_token": access_token})

@app.get("/protected")
async def protected_route(authorization: str = Header(...)):
    try:
        payload = jwt_manager.decode_access_token(authorization.split(" ")[1])

        # Check if the token is still valid
        if not jwt_manager.is_valid_payload(payload):
            raise HTTPException(status_code=401, detail="Invalid token")

        return JSONResponse(content={"user": payload.identity})
    except HTTPException as e:
        return JSONResponse(content={"error": str(e)}, status_code=401)
```

**12. JWT Token with Refresh Tokens:**

```python
from fastapi import FastAPI, Body, Cookie, HTTPException
from fastapi.responses import JSONResponse
from fastapi_jwt_extended import create_access_token, create_refresh_token, get_jwt_current_user, get_jwt
from fastapi_jwt_extended.exceptions import TokenExpiredError, InvalidTokenError

app = FastAPI()

@app.post("/login")
async def login(username: str = Body(...), password: str = Body(...)):
    if username == "admin" and password == "secret":
        access_token = create_access_token(identity=username)
        refresh_token = create_refresh_token(identity=username)
        return JSONResponse(content={"access_token": access_token, "refresh_token": refresh_token})
    return JSONResponse(content={"error": "Invalid credentials"}, status_code=401)

@app.get("/protected")
async def protected_route():
    current_user = get_jwt_current_user()
    return JSONResponse(content={"user": current_user})

@app.get("/refresh")
async def refresh_token(refresh_token: str = Cookie(...)):
    try:
        current_user = get_jwt_current_user(refresh_token=refresh_token)
        new_access_token = create_access_token(identity=current_user.identity)
        # Set the refresh token as a cookie
        response = JSONResponse(content={"access_token": new_access_token})
        response.set_cookie(key="refresh_token", value=refresh_token, httponly=True)
        return response
    except TokenExpiredError:
        raise HTTPException(status_code=401, detail="Refresh token expired")
    except InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid refresh token")
```

**13. JWT Token with User-Specific Keys:**

```python
from fastapi import FastAPI, Body, Cookie
from fastapi.responses import JSONResponse
from fastapi_jwt_extended import create_access_token, create_refresh_token, get_jwt_current_user
from fastapi_jwt_extended.exceptions import TokenExpiredError

app = FastAPI()

secret_keys = {
    "user1": "secret1",
    "user2": "secret2",
}

@app.post("/login")
async def login(username: str = Body(...), password: str = Body(...)):
    if username in secret_keys and password == "secret":
        access_token = create_access_token(identity=username, secret_key=secret_keys[username])
        refresh_token = create_refresh_token(identity=username, secret_key=secret_keys[username])
        return JSONResponse(content={"access_token": access_token, "refresh_token": refresh_token})
    return JSONResponse(content={"error": "Invalid credentials"}, status_code=401)

@app.get("/protected")
async def protected_route():
    current_user = get_jwt_current_user()
    return JSONResponse(content={"user": current_user})

@app.get("/refresh")
async def refresh_token(refresh_token: str = Cookie(...)):
    try:
        current_user = get_jwt_current_user(refresh_token=refresh_token)
        new_access_token = create_access_token(identity=current_user.identity, secret_key=secret_keys[current_user.identity])
        return JSONResponse(content={"access_token": new_access_token})
    except TokenExpiredError:
        return JSONResponse(content={"error": "Refresh token expired"}, status_code=401)
```

**14. JWT Token with Multiple Claims Sets:**

```python
from fastapi import FastAPI, Body, Cookie
from fastapi.responses import JSONResponse
from fastapi_jwt_extended import create_access_token, create_refresh_token, get_jwt_current_user
from fastapi_jwt_extended.exceptions import TokenExpiredError

app = FastAPI()

@app.post("/login")
async def login(username: str = Body(...), password: str = Body(...)):
    if username == "admin" and password == "secret":
        access_token = create_access_token(identity=username, claims={"role": "admin"})
        refresh_token = create_refresh_token(identity=username, claims={"role

```
