RestFul API

instargram ์‚ฌ์ง„ ํฌ์ŠคํŒ… API (2) ํšŒ์›๊ฐ€์ž…, ๋กœ๊ทธ์ธ, ๋กœ๊ทธ์•„์›ƒ API

567Rabbit 2024. 5. 30. 17:54

 

MySQL user ํ…Œ์ด๋ธ” ์ž‘์„ฑํ•˜๊ธฐ

 

 

 

 

app.py ์ž‘์„ฑํ•˜๊ธฐ

import serverless_wsgi

from flask import Flask
from flask_jwt_extended import JWTManager
from flask_restful import Api

from config import Config

from resources.user import jwt_blacklist

from resources.user import UserRegisterResource
from resources.user import UserLoginResource
from resources.user import UserLogoutResource


app = Flask(__name__)

# ํ™˜๊ฒฝ๋ณ€์ˆ˜ ์…‹ํŒ…
app.config.from_object(Config)

# JWT ๋งค๋‹ˆ์ € ์ดˆ๊ธฐํ™”
jwt = JWTManager(app)

# ๋กœ๊ทธ์•„์›ƒ๋œ ํ† ํฐ์œผ๋กœ ์š”์ฒญํ•˜๋Š” ๊ฒฝ์šฐ, ์ฒ˜๋ฆฌํ•˜๋Š” ํ•จ์ˆ˜ ์ž‘์„ฑ
@jwt.token_in_blocklist_loader
def check_if_token_is_revoked(jwt_header, jwt_payload):
    jti = jwt_payload['jti']
    return jti in jwt_blacklist

def handler(event, context) :
    return serverless_wsgi.handle_request(app, event, context)

api = Api(app)

api.add_resource( UserRegisterResource, '/user/register')
api.add_resource( UserLoginResource, '/user/login')
api.add_resource( UserLogoutResource, '/user/logout')


if __name__ == '__main__' :
    app.run()

 

 

 

 

user.py ๋งŒ๋“ค๊ณ  ์ž„ํฌํŠธํ•˜๊ธฐ

from email_validator import EmailNotValidError, validate_email
from flask import request
from flask_jwt_extended import create_access_token, get_jwt, jwt_required
from flask_restful import Resource
from mysql.connector import Error
from mysql_connection import get_connection
from utils import check_password, hash_password

 

 

 

 

ํšŒ์›๊ฐ€์ž… class ๋งŒ๋“ค๊ธฐ

class UserRegisterResource(Resource) :

    def post(self) :

        # ๋ณด๋‚ธ ๋ฐ์ดํ„ฐ ๋ฐ›๊ธฐ
        data = request.get_json()
        print(data)

        # ํšŒ์›๊ฐ€์ž… ๋ฐ์ดํ„ฐ๊ฐ€ ๋ชจ๋‘ ์žˆ๋Š”์ง€ ํ™•์ธ
        if 'email' not in data or 'password' not in data :
            return {"result":"fail"}, 401
        
        if data['email'].strip() == '' or data['password'].strip() == '' :
            return {"result":"fail"}, 401
        
        print('๋ฐ์ดํ„ฐ ํ™•์ธ ์™„๋ฃŒ')
        
        # ๋‹ค ํ†ต๊ณผ ์ด๋ฉ”์ผ ํ™•์ธํ•˜๊ธฐ
        try :
            validate_email(data['email'])
        except EmailNotValidError as e :
            return {"result":"fail"}, 401
        
        print('์ด๋ฉ”์ผ ํ™•์ธ')
        
        # ๋น„๋ฐ€๋ฒˆํ˜ธ ๊ธธ์ด ์œ ํšจ ํ™•์ธ
        if len(data['password']) < 4 or len(data['password']) > 12 :
            return {"result":"fail"}, 401
        
        print('๋น„๋ฐ€๋ฒˆํ˜ธ ํ™•์ธ')
        
        # ๋น„๋ฐ€๋ฒˆํ˜ธ ์•”ํ˜ธํ™”
        password = hash_password(data['password'])
        print(password)

        # DB์— ํšŒ์›์ •๋ณด ์ €์žฅ
        try :
            connection = get_connection()

            query = '''
                    insert into user
                    (email, password)
                    values
                    (%s, %s);'''
            recode = ( data['email'], password)

            cursor = connection.cursor()
            cursor.execute(query, recode)

            connection.commit()

            user_id = cursor.lastrowid

            cursor.close()
            connection.close()


        except Error as e :
            if cursor is not None :
                cursor.close()
            if connection is not None :
                connection.close()
            return {"result":"fail", "error":str(e)}, 500

        # ์œ ์ €์•„์ด๋””๋กœ ํ† ํฐ ์ƒ์„ฑ
        accessToken = create_access_token(user_id)

        # ์‘๋‹ตํ•˜๊ธฐ
        return {"result":"seccess", "accessToken":accessToken}

 

 

ํฌ์ŠคํŠธ๋งจ ์„ค์ •ํ•˜๊ณ  sendํ•˜์—ฌ ๊ฒฐ๊ณผ ํ™•์ธํ•˜๊ธฐ

 

 

 

 

 

๋กœ๊ทธ์ธ class ๋งŒ๋“ค๊ธฐ

class UserLoginResource(Resource) :
    
    def post(self) :
        
        data = request.get_json()

        print(data)

        if 'email' not in data or 'password' not in data :
            return {"result":"fail"}, 401
        
        if data['email'].strip() == '' or data['password'].strip() == '' :
            return {"result":"fail"}, 401
        
        # DB์—์„œ ์œ ์ € ์ •๋ณด ๊ฐ€์ ธ์˜ค๊ธฐ
        try :
            connection = get_connection()

            query = '''
                    select *
                    from user
                    where email = %s;'''
            recode = ( data['email'] , )
        
            cursor = connection.cursor(dictionary=True)
            cursor.execute(query, recode)

            result_list = cursor.fetchall()
            print(result_list)

            cursor.close()
            connection.close()

        except Error as e :
            if cursor is not None :
                cursor.close()
            if connection is not None :
                connection.close()
            return {"result":"fail", "error":str(e)}, 500
        
        if result_list == [] :
            return {"result":"fail"}, 401
        
        isCorrect = check_password(data['password'], result_list[0]['password'])
        if isCorrect == False :
            return {"result":"fail"}, 401
        
        userId = result_list[0]['id']

        accessToken = create_access_token(userId)

        return {"result":"seccess", "accessToken":accessToken}

 

 

 

ํฌ์ŠคํŠธ๋งจ ์„ค์ •ํ•˜๊ณ  sendํ•˜์—ฌ ๊ฒฐ๊ณผ ํ™•์ธํ•˜๊ธฐ

 

 

 

 

 

๋กœ๊ทธ์•„์›ƒ class ๋งŒ๋“ค๊ธฐ

jwt_blacklist = set()

class UserLogoutResource(Resource) :

    @jwt_required()
    def delete(self) :
        
        jti = get_jwt()['jti']
        jwt_blacklist.add(jti)
        
        return {"result":"success"}, 200

 

 

 

ํฌ์ŠคํŠธ๋งจ ์„ค์ •ํ•˜๊ณ  sendํ•˜์—ฌ ๊ฒฐ๊ณผ ํ™•์ธํ•˜๊ธฐ

 

 

 

 

MySQL์— ๋ฐ˜์˜๋˜์—ˆ๋‹ค.

 

select * from user;