JSON Web Tokens(JWT):python-jose
PassLib
JSON Web Tokens
JWT(JSON Web Tokens),在之前的文章介绍过一种Python对JWT的实现PyJWT,这里再介绍一种fastapi官方示例使用的python-jose
python-jose
- python-jose官方地址
- 安装
pip install python-jose[cryptography]
from jose import jwt
key = "thisisthekey"
payload = {'name': 'monkeyjerry'}
token = jwt.encode(payload, key, algorithm="HS256") # 默认HS256
print(token, type(token))
token_decrypt = jwt.decode(token, key, algorithms=["HS256"]) # algorithms为str或list
print(token_decrypt)
key可使用
openssl rand -hex 32
来生成
- 需要加密的 payload 字典可添加的k值
- exp:即Expiration,int类型,表示在该时间后,token无效。常用于token有一定时效,如设置参数值为一个小时后的时间,则一个小时后该token就失效
- nbf:即Not Before, int类型,表示在该时间之前,token无效
from jose import jwt, JWTError
# 当前时间时间戳1652443272
key = "thisisthekey"
payload1 = {'name': 'monkeyjerry', "exp": 1652443272} # 在该时间之后是无效的
payload2 = {'name': 'monkeyjerry', "nbf": 1683979272} # 在该时间以前是无效的
token1 = jwt.encode(payload1, key, algorithm="HS256")
token2 = jwt.encode(payload2, key, algorithm="HS256")
try:
token_decrypt1 = jwt.decode(token1, key, algorithms=["HS256"])
print("token_decrypt1=", token_decrypt1)
except JWTError:
print("Signature has expired")
try:
token_decrypt2 = jwt.decode(token2, key, algorithms=["HS256"])
print("token_decrypt2=", token_decrypt2)
except JWTError:
print("The token is not yet valid (nbf)")
- 所有的数据都是明文传输的,只是做了base64,因此敏感信息不要使用jwt
import base64
import jwt
key = "thisisthekey"
payload = {'name': 'monkeyjerry'}
token = jwt.encode(payload, key, algorithm="HS256")
header, payload, signature = token.split('.')
def add_eq(encrypt_str):
'''为base64编码补齐等号'''
rest = 4 - len(encrypt_str) % 4
return encrypt_str + '=' * rest
print('header=', base64.urlsafe_b64decode(add_eq(header)))
print('payload=', base64.urlsafe_b64decode(add_eq(payload)))
print('signature=', base64.urlsafe_b64decode(add_eq(signature)))
哈希
- 哈希即将某些内容如密码转换为看起来像乱码的字节序列(字符串),且每次你传入完全相同的内容(如完全相同的密码)时,你都会得到完全相同的乱码
PassLib
- PassLib 是一个用于处理哈希密码的很棒的 Python 包,它支持许多安全哈希算法以及配合算法使用的实用程序,推荐的算法是Bcrypt
- 安装附带 Bcrypt 的 PassLib:
pip install passlib[bcrypt]
- PassLib官方文档
from passlib.context import CryptContext
password = "monkeyjerry123456"
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
hashed_password = pwd_context.hash(password)
print(hashed_password)
print(pwd_context.verify(password, hashed_password))
哈希密码校验及jwt令牌处理
哈希密码校验
- 创建一个工具函数用来哈希来自用户的密码
- 然后创建另一个工具函数,用于校验接收的密码是否与存储在数据库的哈希值匹配
- 再创建另一个工具函数用于认证并返回用户
from typing import Optional
from passlib.context import CryptContext
from pydantic import BaseModel
# 模拟数据库中存储的用户信息,hash密码
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$OptbGvSqkLcgkGwfZjUXr.Z2bEa4dIbT2xiSd7UGNtIAoN9H1G6eK"
}
}
class UserInDB(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 工具函数以哈希来自用户的密码
def get_password_hash(password):
return pwd_context.hash(password)
# 校验接收的密码是否与存储的哈希值匹配
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
# 工具函数用于认证并返回用户
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
if __name__ == '__main__':
username = "johndoe"
password = "monkeyjerry123456"
print(get_password_hash(password))
print(authenticate_user(fake_users_db, username, password))
jwt令牌处理
用户通过如上authenticate_user认证通过后,则可返回jwt token给用户,用户再次发送该token后,则可校验有效性或其是否在有效期内(如果失效需重新登录获取新的token)
- 使用
openssl rand -hex 32
创建一个随机密钥,该密钥将用于对 JWT 令牌进行签名 - 创建工具函数用于jwt encode生成新的访问令牌(设置过期时间)
- 用户再次发送该令牌时,创建工具函数对其jwt decode校验有效性和时效性,校验通过则返回该用户
import time
from datetime import timedelta, datetime
from typing import Optional
from jose import jwt, JWTError
# openssl rand -hex 32
SECRET_KEY = "9a56471444eb5e91210b1adaeb5f4d429eb460c46ecc01b88b554098a4b42835"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 0.1
# jwt.encode 生成 token
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15) # 默认15分钟失效
to_encode.update({"exp": expire}) # 需要时效校验,添加上exp
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 登录成功后返回token(带时效)
def login_for_access_token(username: str):
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) # 这里模拟为0.1min
access_token = create_access_token(data={"sub": username}, expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer"}
# 根据jwt.decode token来校验,通过则返回username
def get_current_user(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
except JWTError as e:
return e
return username
if __name__ == '__main__':
username = "monkeyjerry"
# 前提:hash密码校验通过
token = login_for_access_token(username)
time.sleep(1)
access_token = token.get("access_token")
# 校验通过
print(get_current_user(access_token))
print("**" * 20)
# 校验token时效
time.sleep(6)
print(get_current_user(access_token))
# 假设修改了token,校验token有效性
token = login_for_access_token(username)
access_token = token.get("access_token") + "modified"
print(get_current_user(access_token))
#########################输出结果########################
monkeyjerry
****************************************
Signature has expired.
Signature verification failed.