RESTFul API in Flask With JSON Web Token

原视频https://www.youtube.com/watch?v=WxGBoY5iNXY&t=477s

RESTFul API in Flask With JSON Web Token

原视频https://www.youtube.com/watch?v=WxGBoY5iNXY&t=477s

未加token验证前【crud操作】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# -*- encoding: utf-8 -*-
"""
@File : app.py
@Time : 2020/2/13 2:07 下午
@Author : zhengjiani
@Email : 936089353@qq.com
@Software: PyCharm
"""
from flask import Flask, request, jsonify, make_response
from flask_sqlalchemy import SQLAlchemy
import uuid
from werkzeug.security import generate_password_hash, check_password_hash
import jwt
import datetime
from functools import wraps

app = Flask(__name__)

USERNAME = 'root' # 用户名
PASSWORD = '*' # 密码
HOST = 'localhost' # 数据库地址
PORT = '3306' # 端口
DATABASE = 'flask_pos' # 数据库名
database_url = 'mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format(
USERNAME, PASSWORD, HOST, PORT, DATABASE
)
# 添加数据库配置文件到flask app中
app.config['SECRET_KEY'] = 'the quick brown fox jumps over the lazy dog'
app.config['SQLALCHEMY_DATABASE_URI'] = database_url
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True

db = SQLAlchemy(app)


class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(32), index=True)
password = db.Column(db.String(128))
public_id = db.Column(db.String(50), unique=True)
admin = db.Column(db.Boolean)

@app.route('/user', methods=['GET'])
def get_all_users():
users = User.query.all()
output = []
for user in users:
user_data = {}
user_data['public_id'] = user.public_id
user_data['username'] = user.username
user_data['password'] = user.password
user_data['admin'] = user.admin

output.append(user_data)

return jsonify({'users': output})


@app.route('/user/<public_id>', methods=['GET'])
def get_one_user(current_user,public_id):
user = User.query.filter_by(public_id=public_id).first()

if not user:
return jsonify({'msg': 'No user found!'})

user_data = {}
user_data['public_id'] = user.public_id
user_data['username'] = user.username
user_data['password'] = user.password
user_data['admin'] = user.admin

return jsonify({'user': user_data})

@app.route('/user', methods=['POST'])
def create_user():
data = request.get_json()
hashed_password = generate_password_hash(data['password'], method='sha256')
new_user = User(public_id=str(uuid.uuid4()), username=data['username'], password=hashed_password,admin=False)
db.session.add(new_user)
db.session.commit()
return jsonify({"msg": "新用户创建成功"})

@app.route('/user/<public_id>', methods=['PUT'])
def promote_user(public_id):
"""
提升用户权限
:param current_user:
:param public_id:
:return:
"""
user = User.query.filter_by(public_id=public_id).first()
if not user:
return jsonify({'msg': 'No user found'})

user.admin = False
db.session.commit()

return jsonify({'msg':'用户权限提升'})

@app.route('/user/<public_id>', methods=['DELETE'])
def delete_user(current_user,public_id):
if not current_user.admin:
return jsonify({'msg': 'Cannot perform that function'})

user = User.query.filter_by(public_id=public_id).first()
if not user:
return jsonify({'msg': '没有找到该用户'})

db.session.delete(user)
db.session.commit()
return jsonify({'msg': '删除成功'})

@app.route('/login')
def login():
auth = request.authorization
if not auth or not auth.username or not auth.password:
return make_response('Could not verify',401,{'WWW-Authenticate':'Basic realm="Login required!"'})

user = User.query.filter_by(username=auth.username).first()

if not user:
return jsonify('Could not verify',401,{'WWW-Authenticate':'Basic realm="Login required!"'})

if check_password_hash(user.password,auth.password):
token = jwt.encode({'public_id':user.public_id,'exp':datetime.datetime.utcnow()+datetime.timedelta(minutes=30)},app.config['SECRET_KEY'])
return jsonify({'token':token.decode('UTF_8')})
return make_response('Could not verify', 401, {'WWW-Authenticate': 'Basic realm="Login required!"'})

if __name__ == '__main__':
app.run(debug=True)

加入token后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# -*- encoding: utf-8 -*-

def token_required(f):
@wraps(f)
def decorated(*args,**kwargs):
token = None

if 'x-access-token' in request.headers:
token = request.headers['x-access-token']

if not token:
return jsonify({'msg':'Token is missing'}),401

try:
data = jwt.decode(token,app.config['SECRET_KEY'])
current_user = User.query.filter_by(public_id=data['public_id']).first()
except:
return jsonify({'msg':'Token is invalid'}),401

return f(current_user,*args,**kwargs)
return decorated


@app.route('/user', methods=['GET'])
@token_required
def get_all_users(current_user):

if not current_user.admin:
return jsonify({'msg':'Cannot perform that function'})

users = User.query.all()
output = []
for user in users:
user_data = {}
user_data['public_id'] = user.public_id
user_data['username'] = user.username
user_data['password'] = user.password
user_data['admin'] = user.admin
output.append(user_data)

return jsonify({'users': output})


if __name__ == '__main__':
app.run(debug=True)

遇到问题:

发出了OPTIONS请求,初步判断处理跨域

1
2
3
4
5
from flask_cors import *

app = Flask(__name__)

CORS(app, supports_credentials=True)

jsonify报错TypeError

1
2
3
4
5
6
class MyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
return str(obj, encoding='utf-8');
return json.JSONEncoder.default(self, obj)
app.json_encoder = MyEncoder