JSON Web Tokens(JWT):python-jose
PassLib

JSON Web Tokens

JWT(JSON Web Tokens),在之前的文章介绍过一种Python对JWT的实现PyJWT,这里再介绍一种fastapi官方示例使用的python-jose

python-jose

from jose import jwt

key = "thisisthekey"
payload = {'name': 'monkeyjerry'}
token = jwt.encode(payload, key, algorithm="HS256")  # 默认HS256
print(token, type(token))

token_decrypt = jwt.decode(token, key, algorithms=["HS256"])  # algorithms为str或list
print(token_decrypt)

key可使用openssl rand -hex 32 来生成

  • 需要加密的 payload 字典可添加的k值
    • exp:即Expiration,int类型,表示在该时间后,token无效。常用于token有一定时效,如设置参数值为一个小时后的时间,则一个小时后该token就失效
    • nbf:即Not Before, int类型,表示在该时间之前,token无效
from jose import jwt, JWTError

# 当前时间时间戳1652443272
key = "thisisthekey"
payload1 = {'name': 'monkeyjerry', "exp": 1652443272}   # 在该时间之后是无效的
payload2 = {'name': 'monkeyjerry', "nbf": 1683979272}   # 在该时间以前是无效的
token1 = jwt.encode(payload1, key, algorithm="HS256")
token2 = jwt.encode(payload2, key, algorithm="HS256")

try:
    token_decrypt1 = jwt.decode(token1, key, algorithms=["HS256"])
    print("token_decrypt1=", token_decrypt1)
except JWTError:
    print("Signature has expired")

try:
    token_decrypt2 = jwt.decode(token2, key, algorithms=["HS256"])
    print("token_decrypt2=", token_decrypt2)
except JWTError:
    print("The token is not yet valid (nbf)")
  • 所有的数据都是明文传输的,只是做了base64,因此敏感信息不要使用jwt
import base64
import jwt

key = "thisisthekey"
payload = {'name': 'monkeyjerry'}
token = jwt.encode(payload, key, algorithm="HS256")
header, payload, signature = token.split('.')

def add_eq(encrypt_str):
    '''为base64编码补齐等号'''
    rest = 4 - len(encrypt_str) % 4
    return encrypt_str + '=' * rest

print('header=', base64.urlsafe_b64decode(add_eq(header)))
print('payload=', base64.urlsafe_b64decode(add_eq(payload)))
print('signature=', base64.urlsafe_b64decode(add_eq(signature)))

哈希

  • 哈希即将某些内容如密码转换为看起来像乱码的字节序列(字符串),且每次你传入完全相同的内容(如完全相同的密码)时,你都会得到完全相同的乱码

PassLib

  • PassLib 是一个用于处理哈希密码的很棒的 Python 包,它支持许多安全哈希算法以及配合算法使用的实用程序,推荐的算法是Bcrypt
  • 安装附带 Bcrypt 的 PassLib:pip install passlib[bcrypt]
  • PassLib官方文档
from passlib.context import CryptContext

password = "monkeyjerry123456"
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

hashed_password = pwd_context.hash(password)
print(hashed_password)

print(pwd_context.verify(password, hashed_password))

哈希密码校验及jwt令牌处理

哈希密码校验

  1. 创建一个工具函数用来哈希来自用户的密码
  2. 然后创建另一个工具函数,用于校验接收的密码是否与存储在数据库的哈希值匹配
  3. 再创建另一个工具函数用于认证并返回用户
from typing import Optional

from passlib.context import CryptContext
from pydantic import BaseModel

# 模拟数据库中存储的用户信息,hash密码
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$OptbGvSqkLcgkGwfZjUXr.Z2bEa4dIbT2xiSd7UGNtIAoN9H1G6eK"
    }
}


class UserInDB(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


# 工具函数以哈希来自用户的密码
def get_password_hash(password):
    return pwd_context.hash(password)


# 校验接收的密码是否与存储的哈希值匹配
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


# 工具函数用于认证并返回用户
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


if __name__ == '__main__':
    username = "johndoe"
    password = "monkeyjerry123456"
    print(get_password_hash(password))
    print(authenticate_user(fake_users_db, username, password))

jwt令牌处理

用户通过如上authenticate_user认证通过后,则可返回jwt token给用户,用户再次发送该token后,则可校验有效性或其是否在有效期内(如果失效需重新登录获取新的token)

  1. 使用openssl rand -hex 32创建一个随机密钥,该密钥将用于对 JWT 令牌进行签名
  2. 创建工具函数用于jwt encode生成新的访问令牌(设置过期时间)
  3. 用户再次发送该令牌时,创建工具函数对其jwt decode校验有效性和时效性,校验通过则返回该用户
import time
from datetime import timedelta, datetime
from typing import Optional

from jose import jwt, JWTError

# openssl rand -hex 32
SECRET_KEY = "9a56471444eb5e91210b1adaeb5f4d429eb460c46ecc01b88b554098a4b42835"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 0.1


# jwt.encode 生成 token
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)  # 默认15分钟失效
    to_encode.update({"exp": expire})  # 需要时效校验,添加上exp
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


# 登录成功后返回token(带时效)
def login_for_access_token(username: str):
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)  # 这里模拟为0.1min
    access_token = create_access_token(data={"sub": username}, expires_delta=access_token_expires)
    return {"access_token": access_token, "token_type": "bearer"}


# 根据jwt.decode token来校验,通过则返回username
def get_current_user(token: str):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username = payload.get("sub")
    except JWTError as e:
        return e
    return username


if __name__ == '__main__':
    username = "monkeyjerry"
    # 前提:hash密码校验通过
    token = login_for_access_token(username)
    time.sleep(1)
    access_token = token.get("access_token")
    # 校验通过
    print(get_current_user(access_token))

    print("**" * 20)

    # 校验token时效
    time.sleep(6)
    print(get_current_user(access_token))

    # 假设修改了token,校验token有效性
    token = login_for_access_token(username)
    access_token = token.get("access_token") + "modified"
    print(get_current_user(access_token))
#########################输出结果########################
monkeyjerry
****************************************
Signature has expired.
Signature verification failed.