123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- # -*- coding: utf-8 -*-
- from fastapi import FastAPI, Form, HTTPException, Request,status,APIRouter
- import jwt
- from jwt.exceptions import ExpiredSignatureError,DecodeError,InvalidAlgorithmError
- from core.respone_format import *
- from tools.sql_format import UserCRUD
- import datetime
- import asyncio
- from tools.loglog import logger
- import traceback
- from common.common_data import SECRET_KEY
- router = APIRouter()
- user_crud = UserCRUD()
- def create_access_token(username: str):
- payload = {
- "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1),
- "iat": datetime.datetime.utcnow(),
- "username": username
- }
- try:
- encoded_jwt = jwt.encode(payload, SECRET_KEY, algorithm="HS256")
- return encoded_jwt
- except Exception as e:
- logger.info(f"出错日志:创建token中 {payload} 秘钥{SECRET_KEY}")
- logger.error(f"{traceback.format_exc()}")
- logger.error(f"{type(e).__name__}, {e}")
- def verify_token_sync(token: str):
- if not token:
- return 1
- try:
- decoded_payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
- if not decoded_payload.get("username"):
- return 2
- else:
- return 0
- except ExpiredSignatureError:
- return 3
- except (InvalidAlgorithmError,DecodeError):
- return 4
- async def verify_token(token: str):
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, verify_token_sync, token)
- async def verify_token2(token):
- msg_verify_code = await verify_token(token)
- if msg_verify_code != 0:
- if msg_verify_code == 3:
- return resp_401(message="The token has expired")
- error_msg = {
- 1: "No token provided",
- 2: "Token lacks username",
- 4: "Token decoding error"
- }.get(msg_verify_code, "Invalid token")
- return resp_400(message=error_msg)
- return 0
- @router.post("/user/login")
- async def get_token(username: str = Form(...), password: str = Form(...)):
- user_info = user_crud.get_userinfo_by_account(username)
- if user_info:
- userid, account, true_pwd, uname, create_time = user_info
- else:
- return resp_400(message="user does not exist")
- if password==true_pwd:
- access_token = create_access_token(username)
- return_data = {"access_token": access_token}
- return resp_200(data=return_data)
- else:
- return resp_400(message="Incorrect username or password")
- @router.get("/user")
- async def get_user(request:Request):
-
- token = request.headers.get("Authorization")
- try:
- decoded_payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
- account = decoded_payload.get("username")
- user_info = user_crud.get_userinfo_by_account(account=account)
- userid, account, true_pwd, uname, create_time = user_info
- data = {"id":userid,"name":uname,"account":account,"create_time":create_time}
- return resp_200(data=data)
- except ExpiredSignatureError:
- return resp_401(message="The token has expired")
- except (InvalidAlgorithmError,DecodeError):
- return resp_400(message="Token decoding error")
- except Exception as e:
- return resp_400(message=f"Error in get user information.{e}")
- @router.post("/user/logout")
- async def get_token(request:Request):
- token = request.headers.get("Authorization")
- try:
- decoded_payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
- account = decoded_payload.get("username")
- logger.info(f"账号:{account}注销成功")
- data = {"result": "注销成功"}
- return resp_200(data=data)
- except ExpiredSignatureError:
- return resp_401(message="The token has expired")
- except (InvalidAlgorithmError, DecodeError):
- return resp_400(message="Token decoding error")
- except Exception as e:
- return resp_400(message=f"User logout error.{e}")
|