1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
|
import json from flask import Flask, request, jsonify,send_file,render_template_string import jwt import requests from functools import wraps from datetime import datetime import os
app = Flask(__name__) app.config['TEMPLATES_RELOAD']=True
app.config['SECRET_KEY'] = 'fake_flag' current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') response0 = { 'code': 0, 'message': 'failed', 'result': None } response1={ 'code': 1, 'message': 'success', 'result': current_time }
response2 = { 'code': 2, 'message': 'Invalid request parameters', 'result': None } class MemUser: def setUser(self, username, password): self.username = username self.password = password
def setToken(self, token): self.token = token
def __init__(self): self.username="admin" self.password="password" self.token=jwt.encode({'username': self.username, 'password': self.password}, app.config['SECRET_KEY'], algorithm='HS256') User = MemUser()
def auth(func): @wraps(func) def decorated(*args, **kwargs): token = request.cookies.get('token') if not token: return 'Invalid token', 401 try: payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256']) if payload['username'] == User.username and payload['password'] == User.password: return func(*args, **kwargs) else: return 'Invalid token', 401 except: return 'Something error?', 500
return decorated
@app.route('/',methods=['GET']) def index(): return send_file('api-docs.json', mimetype='application/json;charset=utf-8')
@app.route('/api-base/v0/register', methods=['GET', 'POST']) def register(): if request.method == 'POST': username = request.json['username'] password = request.json['password'] User.setUser(username,password) token = jwt.encode({'username': username, 'password': password}, app.config['SECRET_KEY'], algorithm='HS256') User.setToken(token) return jsonify(response1)
return jsonify(response2),400
@app.route('/api-base/v0/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.json['username'] password = request.json['password'] try: token = User.token payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256']) if payload['username'] == username and payload['password'] == password: response = jsonify(response1) response.set_cookie('token', token) return response else: return jsonify(response0), 401 except jwt.ExpiredSignatureError: return 'Invalid token', 401 except jwt.InvalidTokenError: return 'Invalid token', 401
return jsonify(response2), 400
@app.route('/api-base/v0/update', methods=['POST', 'GET']) @auth def update_password(): try: if request.method == 'POST': try: new_password = request.get_json() if new_password:
update(new_password, User)
updated_token = jwt.encode({'username': User.username, 'password': User.password}, app.config['SECRET_KEY'], algorithm='HS256') User.token = updated_token response = jsonify(response1) response.set_cookie('token',updated_token) return response else: return jsonify(response0), 401 except: return "Something error?",505 else: return jsonify(response2), 400
except jwt.ExpiredSignatureError: return 'Invalid token', 401 except jwt.InvalidTokenError: return 'Invalid token', 401
def update(src, dst): if hasattr(dst, '__getitem__'): for key in src: if isinstance(src[key], dict): if key in dst and isinstance(src[key], dict): update(src[key], dst[key]) else: dst[key] = src[key] else: dst[key] = src[key] else: for key, value in src.items() : if hasattr(dst,key) and isinstance(value, dict): update(value,getattr(dst, key)) else: setattr(dst, key, value)
@app.route('/api-base/v0/logout') def logout(): response = jsonify({'message': 'Logout successful!'}) response.delete_cookie('token') return response
@app.route('/api-base/v0/search', methods=['POST','GET']) @auth def api(): if request.args.get('file'): try: if request.args.get('id'): id = request.args.get('id') else: id = '' data = requests.get("http://127.0.0.1:8899/v2/users?file=" + request.args.get('file') + '&id=' + id) if data.status_code != 200: return data.status_code
if request.args.get('type') == "text":
return render_template_string(data.text) else: return jsonify(json.loads(data.text)) except jwt.ExpiredSignatureError: return 'Invalid token', 401 except jwt.InvalidTokenError: return 'Invalid token', 401 except Exception: return 'something error?' else: return jsonify(response2)
if __name__ == '__main__': app.run(host='0.0.0.0')
|