MySQL user ํ ์ด๋ธ ์์ฑํ๊ธฐ
app.py ์์ฑํ๊ธฐ
import serverless_wsgi
from flask import Flask
from flask_jwt_extended import JWTManager
from flask_restful import Api
from config import Config
from resources.user import jwt_blacklist
from resources.user import UserRegisterResource
from resources.user import UserLoginResource
from resources.user import UserLogoutResource
app = Flask(__name__)
# ํ๊ฒฝ๋ณ์ ์
ํ
app.config.from_object(Config)
# JWT ๋งค๋์ ์ด๊ธฐํ
jwt = JWTManager(app)
# ๋ก๊ทธ์์๋ ํ ํฐ์ผ๋ก ์์ฒญํ๋ ๊ฒฝ์ฐ, ์ฒ๋ฆฌํ๋ ํจ์ ์์ฑ
@jwt.token_in_blocklist_loader
def check_if_token_is_revoked(jwt_header, jwt_payload):
jti = jwt_payload['jti']
return jti in jwt_blacklist
def handler(event, context) :
return serverless_wsgi.handle_request(app, event, context)
api = Api(app)
api.add_resource( UserRegisterResource, '/user/register')
api.add_resource( UserLoginResource, '/user/login')
api.add_resource( UserLogoutResource, '/user/logout')
if __name__ == '__main__' :
app.run()
user.py ๋ง๋ค๊ณ ์ํฌํธํ๊ธฐ
from email_validator import EmailNotValidError, validate_email
from flask import request
from flask_jwt_extended import create_access_token, get_jwt, jwt_required
from flask_restful import Resource
from mysql.connector import Error
from mysql_connection import get_connection
from utils import check_password, hash_password
ํ์๊ฐ์ class ๋ง๋ค๊ธฐ
class UserRegisterResource(Resource) :
def post(self) :
# ๋ณด๋ธ ๋ฐ์ดํฐ ๋ฐ๊ธฐ
data = request.get_json()
print(data)
# ํ์๊ฐ์
๋ฐ์ดํฐ๊ฐ ๋ชจ๋ ์๋์ง ํ์ธ
if 'email' not in data or 'password' not in data :
return {"result":"fail"}, 401
if data['email'].strip() == '' or data['password'].strip() == '' :
return {"result":"fail"}, 401
print('๋ฐ์ดํฐ ํ์ธ ์๋ฃ')
# ๋ค ํต๊ณผ ์ด๋ฉ์ผ ํ์ธํ๊ธฐ
try :
validate_email(data['email'])
except EmailNotValidError as e :
return {"result":"fail"}, 401
print('์ด๋ฉ์ผ ํ์ธ')
# ๋น๋ฐ๋ฒํธ ๊ธธ์ด ์ ํจ ํ์ธ
if len(data['password']) < 4 or len(data['password']) > 12 :
return {"result":"fail"}, 401
print('๋น๋ฐ๋ฒํธ ํ์ธ')
# ๋น๋ฐ๋ฒํธ ์ํธํ
password = hash_password(data['password'])
print(password)
# DB์ ํ์์ ๋ณด ์ ์ฅ
try :
connection = get_connection()
query = '''
insert into user
(email, password)
values
(%s, %s);'''
recode = ( data['email'], password)
cursor = connection.cursor()
cursor.execute(query, recode)
connection.commit()
user_id = cursor.lastrowid
cursor.close()
connection.close()
except Error as e :
if cursor is not None :
cursor.close()
if connection is not None :
connection.close()
return {"result":"fail", "error":str(e)}, 500
# ์ ์ ์์ด๋๋ก ํ ํฐ ์์ฑ
accessToken = create_access_token(user_id)
# ์๋ตํ๊ธฐ
return {"result":"seccess", "accessToken":accessToken}
ํฌ์คํธ๋งจ ์ค์ ํ๊ณ sendํ์ฌ ๊ฒฐ๊ณผ ํ์ธํ๊ธฐ
๋ก๊ทธ์ธ class ๋ง๋ค๊ธฐ
class UserLoginResource(Resource) :
def post(self) :
data = request.get_json()
print(data)
if 'email' not in data or 'password' not in data :
return {"result":"fail"}, 401
if data['email'].strip() == '' or data['password'].strip() == '' :
return {"result":"fail"}, 401
# DB์์ ์ ์ ์ ๋ณด ๊ฐ์ ธ์ค๊ธฐ
try :
connection = get_connection()
query = '''
select *
from user
where email = %s;'''
recode = ( data['email'] , )
cursor = connection.cursor(dictionary=True)
cursor.execute(query, recode)
result_list = cursor.fetchall()
print(result_list)
cursor.close()
connection.close()
except Error as e :
if cursor is not None :
cursor.close()
if connection is not None :
connection.close()
return {"result":"fail", "error":str(e)}, 500
if result_list == [] :
return {"result":"fail"}, 401
isCorrect = check_password(data['password'], result_list[0]['password'])
if isCorrect == False :
return {"result":"fail"}, 401
userId = result_list[0]['id']
accessToken = create_access_token(userId)
return {"result":"seccess", "accessToken":accessToken}
ํฌ์คํธ๋งจ ์ค์ ํ๊ณ sendํ์ฌ ๊ฒฐ๊ณผ ํ์ธํ๊ธฐ
๋ก๊ทธ์์ class ๋ง๋ค๊ธฐ
jwt_blacklist = set()
class UserLogoutResource(Resource) :
@jwt_required()
def delete(self) :
jti = get_jwt()['jti']
jwt_blacklist.add(jti)
return {"result":"success"}, 200
ํฌ์คํธ๋งจ ์ค์ ํ๊ณ sendํ์ฌ ๊ฒฐ๊ณผ ํ์ธํ๊ธฐ
MySQL์ ๋ฐ์๋์๋ค.
select * from user;