OAuth2 密码模式(resource owner password credentials)
OAuth2PasswordBearer
OAuth2PasswordRequestForm
使用哈希密码和JWT Bearer令牌的OAuth2

OAuth2

OAuth2是一个关于授权(authorization)的开放网络标准。OAuth2运行流程:

  1. 用户打开客户端以后,客户端要求用户给予授权
  2. 用户同意给予客户端授权
  3. 客户端使用上一步获得的授权,向认证服务器申请令牌
  4. 认证服务器对客户端进行认证以后,确认无误,同意发放令牌
  5. 客户端使用令牌,向资源服务器申请获取资源
  6. 资源服务器确认令牌无误,同意向客户端开放资源
    客户端必须得到用户的授权(authorization grant),才能获得令牌(access token)。OAuth 2.0定义了四种授权方式:
  7. 授权码模式(authorization code)
  8. 简化模式(implicit)
  9. 密码模式(resource owner password credentials)
  10. 客户端模式(client credentials)

更多关于OAuth2可进一步参看:

OpenAPI

OpenAPI(以前称为 Swagger)是用于构建 API 的开放规范(现已成为 Linux Foundation 的一部分)

  • FastAPI 基于 OpenAPI,OpenAPI 有一种定义多个安全方案的方法,包括所有的 OAuth2 处理安全性的如上介绍的四种方式(也称为flow),其中密码模式(resource owner password credentials)可以完美地用于直接在同一应用程序中处理身份认证

密码模式

密码模式(Resource Owner Password Credentials Grant)中,用户向客户端提供自己的用户名和密码。客户端使用这些信息,向"服务商提供商"索要授权

  1. 用户向客户端提供用户名和密码
  2. 客户端将用户名和密码发给认证服务器,向后者请求令牌
  3. 认证服务器确认无误后,向客户端提供访问令牌

fastapi实现密码模式授权的OAuth2

如下场景:

  1. 用户/客户端将username和password字段作为表单数据发送给认证服务器(OAuth2规范还写明了 username 和 password 必须作为表单数据发送)
  2. 认证服务器获取用户名和密码
  • 使用用户名username从数据库中获取用户数据,无该用户,返回错误消息,提示“用户名或密码错误”
  • 数据库中有该用户,使用密码password与数据库中存储的哈希密码进行匹配,如果不匹配,返回错误消息,提示“用户名或密码错误”
  1. 如果密码校验通过则返回令牌
  2. 用户再次使用令牌向服务器获取资源,服务器校验令牌有效性和时效性
  3. 如果令牌无效或过期返回错误提示认证失败并要求用户重新登录获取令牌,否则返回当前用户

fastapi OAuth2PasswordBearer

from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 使用token来获取其他资源
@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):
    return {"token": token}   # 一般返回当前用户
  1. oauth2_scheme是OAuth2PasswordBearer的实例,查看源码类OAuth2PasswordBearer实现了__call__魔术方法,因此oauth2_scheme(类的实例)是callable,可用于Depends
  2. 这里未实现发送 username 和 password 的路径操作tokenUrl,见下一节
  • OAuth2PasswordBearer()接收参数tokenUrl
    • 注意参数是tokenUrl,而非token_url,这是因为它使用了与 OpenAPI 规范相同的名称
    • 前端用户将会发送username和password到特定的url(即设置的tokenUrl),校验通过后该api将返回token

获取用户名和密码

接下来我们需要获取到用户名和密码并以表单的形式发送到tokenUrl指定的路径来获取token,这就需要使用到FastAPI 提供的实用工具OAuth2PasswordRequestForm

  • OAuth2PasswordRequestForm 是一个类依赖项,接收必填参数username: str = Form(...),password: str = Form(...)
  • form_data: OAuth2PasswordRequestForm = Depends():通过form_data.username, form_data.password获取username和password
  • 校验用户名密码通过后返回token,根据OAth2规范,返回一个JSON对象,如我们使用Bearer令牌,这个JSON包含两个字段,access_token(包含我们的访问令牌的字符串)和token_type(Bearer令牌,则令牌类型应为bearer)【这几乎是唯一的你需要自己记住并正确地执行以符合规范的事情,其余的,FastAPI 都会为你处理】

哈希密码和令牌处理

在上一篇文章介绍过JWT和PassLib,这里不再赘述

使用哈希密码和JWT Bearer令牌的OAuth2

from datetime import datetime, timedelta
from typing import Union

from fastapi import FastAPI, HTTPException, status, Depends
import uvicorn
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import jwt, JWTError
from passlib.context import CryptContext
from pydantic import BaseModel

app = FastAPI()

# password: monkeyjerry123
fake_users_db = {
    "jerry": {
        "username": "jerry",
        "full_name": "Monkey Jerry",
        "email": "monkeyjerry@126.com",
        "hashed_password": "$2b$12$cIbmtp4I4VwPMyGRe7yiYuAvdTrGWYcPwsrsw8EWGQ1idQDGFZGsS"
    },
    "tom": {
        "username": "tom",
        "full_name": "Cat Tom",
        "email": "cattom@126.com",
        "hashed_password": "$2b$12$cIbmtp4I4VwPMyGRe7yiYuAvdTrGWYcPwsrsw8EWGQ1idQDGFZGsS"
    }
}


class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

'''
【1】哈希并校验密码
get_user 获取数据库中的用户
get_password_hash hash用户输入的密码
verify_password 用户输入的密码和数据库中的hashed密码进行校验
'''


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


# 将发送过来的密码与数据库中的hashed密码比对进行校验
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


@app.post("/users/register/")
async def user_register(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        hashed_password = get_password_hash(form_data.password)
        user_in_db = {form_data.username: {"username": form_data.username, "hashed_password": hashed_password}}
        fake_users_db.update(user_in_db)
    print(fake_users_db)
    return {"message": "register success"}


SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Union[str, None] = None


'''
【2】处理jwt令牌
create_access_token 创建jwt encode token
login_for_access_token 密码登录校验成功后(使用authenticate_user)返回token
'''
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}


'''
【3】再次使用token请求资源时校验token
credentials_exception token校验错误的 HTTPException
get_current_user token校验通过返回当前用户
read_users_me、read_own_items携带token获取该用户其他数据
'''
credentials_exception = HTTPException(
    status_code=status.HTTP_401_UNAUTHORIZED,
    detail="Could not validate credentials",
    headers={"WWW-Authenticate": "Bearer"},
)


async def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        # print(username)
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user


@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_user)):
    return [{"item_id": "Foo", "owner": current_user.username}]


if __name__ == '__main__':
    uvicorn.run("main:app", reload=True)

在交互docs中检查效果

在浏览器中打开http://127.0.0.1:8000/docs
点击「Authorize」按钮,使用以下凭证:username:jerry,password:monkeyjerry123

Authorize 加锁后去调用其他接口,如/users/me/items/则可返回[{"item_id": "Foo","owner": "jerry"}]

参考及扩展阅读