# -*- coding: utf-8 -*- from fastapi import FastAPI, Form, HTTPException, Request,status,APIRouter import jwt from jwt.exceptions import ExpiredSignatureError,DecodeError,InvalidAlgorithmError from core.respone_format import * from tools.sql_format import UserCRUD import datetime import asyncio from tools.loglog import logger import traceback from common.common_data import SECRET_KEY router = APIRouter() user_crud = UserCRUD() def create_access_token(username: str): payload = { "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1), "iat": datetime.datetime.utcnow(), "username": username } try: encoded_jwt = jwt.encode(payload, SECRET_KEY, algorithm="HS256") return encoded_jwt except Exception as e: logger.info(f"出错日志:创建token中 {payload} 秘钥{SECRET_KEY}") logger.error(f"{traceback.format_exc()}") logger.error(f"{type(e).__name__}, {e}") def verify_token_sync(token: str): if not token: return 1 try: decoded_payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) if not decoded_payload.get("username"): return 2 else: return 0 except ExpiredSignatureError: return 3 except (InvalidAlgorithmError,DecodeError): return 4 async def verify_token(token: str): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, verify_token_sync, token) async def verify_token2(token): msg_verify_code = await verify_token(token) if msg_verify_code != 0: if msg_verify_code == 3: return resp_401(message="The token has expired") error_msg = { 1: "No token provided", 2: "Token lacks username", 4: "Token decoding error" }.get(msg_verify_code, "Invalid token") return resp_400(message=error_msg) return 0 @router.post("/user/login") async def get_token(username: str = Form(...), password: str = Form(...)): user_info = user_crud.get_userinfo_by_account(username) if user_info: userid, account, true_pwd, uname, create_time = user_info else: return resp_400(message="user does not exist") if password==true_pwd: access_token = create_access_token(username) return_data = {"access_token": access_token} return resp_200(data=return_data) else: return resp_400(message="Incorrect username or password") @router.get("/user") async def get_user(request:Request): token = request.headers.get("Authorization") try: decoded_payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) account = decoded_payload.get("username") user_info = user_crud.get_userinfo_by_account(account=account) userid, account, true_pwd, uname, create_time = user_info data = {"id":userid,"name":uname,"account":account,"create_time":create_time} return resp_200(data=data) except ExpiredSignatureError: return resp_401(message="The token has expired") except (InvalidAlgorithmError,DecodeError): return resp_400(message="Token decoding error") except Exception as e: return resp_400(message=f"Error in get user information.{e}") @router.post("/user/logout") async def get_token(request:Request): token = request.headers.get("Authorization") try: decoded_payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) account = decoded_payload.get("username") logger.info(f"账号:{account}注销成功") data = {"result": "注销成功"} return resp_200(data=data) except ExpiredSignatureError: return resp_401(message="The token has expired") except (InvalidAlgorithmError, DecodeError): return resp_400(message="Token decoding error") except Exception as e: return resp_400(message=f"User logout error.{e}")