Skip to content
Snippets Groups Projects
Commit d1d23514 authored by Dimitrios Spatharakis's avatar Dimitrios Spatharakis
Browse files

delete files

parent 363d7e79
No related branches found
No related tags found
2 merge requests!2Main,!1Main
Showing
with 0 additions and 629 deletions
DB_USER = 'root'
DB_PASSWORD = 'password'
DB_HOST = 'localhost'
DB_NAME = 'nephele-platform'
JWT_SECRET = 'R4ND0MK3Y'
REFRESH_SECRET = 'AN0TH3RK3Y'
\ No newline at end of file
'''
__init__.py
- Returns a configured Flask app
'''
import os
from flask import Flask
from flask_cors import CORS
from flask_bcrypt import Bcrypt
from config import configs
from routes.auth import auth
from routes.images import images
from models.models import db
from middleware.error_handlers import *
env = os.environ.get('FLASK_ENV', 'development')
def create_app(app_name = 'nephele-platform'):
app = Flask(app_name)
app.config.from_object(configs[env])
CORS(
app,
origins = os.getenv('FRONTEND_URL', 'http://localhost:9000'),
supports_credentials = True
)
app.bcrypt = Bcrypt(app)
app.register_blueprint(auth)
app.register_blueprint(images)
app.register_error_handler(APIAuthError, handle_api_auth_error)
db.init_app(app)
with app.app_context():
db.create_all()
return app
\ No newline at end of file
'''
app.py
- Calls the create_app function and then runs the app
'''
from __init__ import create_app
if __name__ == '__main__':
app = create_app()
app.run()
\ No newline at end of file
'''
config.py
- Settings for the flask application
'''
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
# Database connection credentials
SQLALCHEMY_DATABASE_URI = 'mysql://{}:{}@{}/{}'.format(
os.getenv('DB_USER', 'root'),
os.getenv('DB_PASSWORD', 'password'),
os.getenv('DB_HOST', 'localhost'),
os.getenv('DB_NAME', 'nephele-platform')
)
# Token encryption keys
JWT_SECRET = os.getenv('JWT_SECRET', 'R4ND0MK3Y')
REFRESH_SECRET = os.getenv('REFRESH_SECRET', 'AN0TH3RK3Y')
# Production settings
class ProdConfig(Config):
FLASK_ENV = 'production'
DEBUG = False
# Development settings
class DevConfig(Config):
FLASK_ENV = 'development'
DEBUG = True
configs = {
'default': ProdConfig,
'production': ProdConfig,
'development': DevConfig,
}
\ No newline at end of file
import os, jwt, uuid
from flask import request
from functools import wraps
from models.models import User
from utils.exceptions import APIAuthError
def auth_required(f):
@wraps(f)
def _token_check(*args, **kwargs):
'''
Decorator function that checks the validity of the access token.
- Parameters:
*args: Variable-length argument list.
**kwargs: Arbitrary keyword arguments.
- Returns:
The result of the decorated function.
- Raises:
APIAuthError: If authentication fails.
jwt.ExpiredSignatureError: If the token has expired.
jwt.InvalidTokenError: If the token is invalid.
'''
try:
access_token = request.headers.get('Authorization')
if not access_token:
raise APIAuthError('Missing access token', 401)
# Extract the access token
access_token = access_token.split(' ')[1]
# Decode the token
data = jwt.decode(
access_token,
os.getenv('JWT_SECRET'),
algorithms = ['HS256']
)
user_id = data['id'] # Convert the string to UUID
# Check if the token owner is valid
user = User.query.filter_by(id = user_id).first()
if not user:
raise APIAuthError('User not found', 401)
return f(user, *args, **kwargs)
except jwt.ExpiredSignatureError:
raise APIAuthError('Expired token', 401)
except jwt.InvalidTokenError:
raise APIAuthError('Invalid token', 401)
return _token_check
\ No newline at end of file
from flask import jsonify
from utils.exceptions import *
def handle_api_auth_error(e):
'''
Exception handler for API authentication errors.
- Args:
e (APIAuthError): The API authentication error that occurred.
- Returns:
dict: A dict containing the JSON response and the HTTP status code.
'''
# Create the error response
response = {
'error': e.description,
'message': e.message
}
print(response, e.code)
return jsonify(response), e.code
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
from models.user import User
from models.token import Token
\ No newline at end of file
'''
token.py
- Defines the Token model
'''
from models.models import db
from datetime import datetime
class Token(db.Model):
__tablename__ = 'tokens'
id = db.Column(
db.String(255),
primary_key = True
)
expires_at = db.Column(db.DateTime, nullable = False)
created_at = db.Column(
db.DateTime,
nullable = False,
default = datetime.now()
)
revoked_at = db.Column(db.DateTime)
replaced_by = db.Column(db.String(255))
user_id = db.Column(
db.String(255),
db.ForeignKey('users.id')
)
user = db.relationship('User', back_populates = 'tokens')
'''
user.py
- Defines the User model
- Check password method validates the given password
'''
from models.models import db
from flask import current_app
import uuid
class User(db.Model):
__tablename__ = 'users'
id = db.Column(
db.String(36),
primary_key = True,
default = uuid.uuid4
)
email = db.Column(db.String(255), unique = True, nullable = False)
password = db.Column(db.String(255), nullable = False)
tokens = db.relationship('Token', back_populates = 'user')
def __init__(self, email, password):
self.email = email
self.password = current_app.bcrypt.generate_password_hash(password)
def check_password(self, password):
# Compare the password with the hashed password
if current_app.bcrypt.check_password_hash(
self.password,
password
):
return True
# If the password doesn't match, return false
else:
return False
\ No newline at end of file
'''
auth.py
- Defines the blueprint for the auth routes
'''
from flask import Blueprint
from flask import request, jsonify, make_response
from services.auth_service import *
from services.token_service import *
from middleware.auth_middleware import auth_required
auth = Blueprint('auth', __name__)
@auth.route('/auth/login', methods = ['POST'])
def login():
'''
Handles the login request and returns the appropriate response.
- Returns:
The response object containing a message and a JWT token on successful login.
'''
data = request.get_json()
access_token, refresh_token = authenticate(**data).values()
# Create the response
res = make_response(
jsonify({
'message': 'Login successful',
'token': access_token
}), 200
)
# Attach the http-only refresh token cookie
res.set_cookie(
'refresh_token',
refresh_token,
httponly = True
)
return res
@auth.route('/auth/logout', methods = ['POST'])
def logout():
'''
Handles the logout request and returns the appropriate response.
- Returns:
The response object containing a success message on successful logout.
'''
# Get current refresh token and revoke it
current_token = request.cookies.get('refresh_token')
revoke_token(current_token)
# Delete the refresh token cookie
res = make_response(
jsonify({
'message': 'User logged out successfully'
}), 200
)
res.delete_cookie('refresh_token')
return res
@auth.route('/auth/register', methods = ['POST'])
def register():
'''
This function handles the registration of a new user.
- Returns:
A JSON response with a success message and a status code of 200 if the user is created successfully.
'''
data = request.get_json()
create_user(data)
return jsonify({
'message': 'User created successfully'
}), 200
@auth.route('/auth/renewal', methods = ['POST'])
def renew_token():
'''
Renew the access token and refresh token.
- Returns:
The response object with the renewed access token and refresh token.
'''
# Get current refresh token and renew it
current_token = request.cookies.get('refresh_token')
access_token, refresh_token = replace_token(current_token).values()
# Send the new access token
res = make_response(
jsonify({
'message': 'Token renewed successfully',
'token': access_token
}), 200
)
# Attach the http-only refresh token cookie
res.set_cookie(
'refresh_token',
refresh_token,
httponly = True
)
return res
@auth.route('/protected', methods = ['GET'])
@auth_required
def protected(current_user):
print(current_user)
return jsonify({
'message': 'This is a protected route'
})
'''
images.py
- Defines the blueprint for the images routes
'''
from flask import Blueprint
from flask import request, jsonify
from services.images_service import *
images = Blueprint('images', __name__)
@images.route('/images', methods = ['GET'])
def get_images():
return jsonify({}), 200
@images.route('/images/edit', methods = ['POST'])
def edit_descriptor():
data = request.get_json()
validate_json(data)
return jsonify({}), 200
from models.models import db, User
from services.token_service import *
from utils.exceptions import APIAuthError
def authenticate(email, password):
'''
Authenticates a user by checking the provided email and password.
- Args:
email (str): The email address of the user.
password (str): The password for the user.
- Raises:
APIAuthError: If the email or password is missing or invalid.
- Returns:
dict: A dictionary containing the JWT token and the ID of the refresh token.
'''
# Check if the data is present
if not email or not password:
raise APIAuthError('Missing data')
# Query the database for the user
user = User.query.filter_by(email = email).first()
# Check if the user exists and the password is correct
if not user or not user.check_password(password):
raise APIAuthError('Invalid credentials')
# Generate the tokens
access_token = generate_access_token(user.id)
refresh_token = generate_refresh_token(user.id)
# Save the refresh token in the database
db.session.add(refresh_token)
db.session.commit()
return {
'access_token': access_token,
'refresh_token': refresh_token.id
}
def create_user(data):
'''
Creates a new user with the provided data.
- Parameters:
data (dict): A dictionary containing the user data.
- Raises:
APIAuthError: If the data is missing or if the user already exists.
- Returns:
User: The newly created user object.
'''
# Check if the data is present
if not data:
raise APIAuthError('Missing data')
# Unpack the data and create the user object
user = User(**data)
# Check if the user already exists
is_user = User.query.filter_by(email = user.email).first()
if is_user:
raise APIAuthError('User already exists')
db.session.add(user)
db.session.commit()
return user
\ No newline at end of file
import json
from jsonschema import validate
# This is a sample schema, for testing purposes
# Normally, the schema should be selected from a list of schemas
# The selection is based on the image type
schema = {
"type" : "object",
"properties" : {
"price" : { "type" : "number" },
"name" : { "type" : "string" },
},
"required" : [ "name", "price" ],
}
def validate_json(descriptor):
if not descriptor:
return False
# Validate the descriptor
# If the validation isn't successful an exception will be raised
validate(instance = descriptor, schema = schema)
return True
\ No newline at end of file
import os, jwt
from models.models import db, Token
from datetime import datetime, timedelta
from utils.exceptions import APIAuthError
# TODO: These should be in the .env file
JWT_EXPIRATION_MINUTES = 30
REFRESH_EXPIRATION_DAYS = 30
def generate_access_token(user_id):
'''
Generate a JWT token with a short expiration time for authentication.
- Args:
user_id: The id of the user.
- Returns:
A string representing the JWT token.
'''
# Generate a jwt token with a short duration
access_token = jwt.encode({
'id': user_id,
'exp' : datetime.utcnow() + timedelta(
minutes = JWT_EXPIRATION_MINUTES)
}, os.getenv('JWT_SECRET'))
return access_token
def generate_refresh_token(user_id):
'''
Generate a JWT token with a long expiration time for refreshing.
- Args:
user_id: The id of the user.
- Returns:
A refresh token instance.
'''
# Generate a jwt token
refresh_token = jwt.encode({
'id': user_id,
'exp' : datetime.utcnow() + timedelta(
days = REFRESH_EXPIRATION_DAYS)
}, os.getenv('REFRESH_SECRET'))
new_token = Token(
id = refresh_token,
expires_at = datetime.utcnow() + timedelta(
days = REFRESH_EXPIRATION_DAYS),
user_id = user_id
)
return new_token
def replace_token(token_id):
'''
Replace a token with a new refresh token and generate a new auth token.
- Args:
token_id: The ID of the token to be replaced.
- Returns:
A dictionary containing the new auth token and the ID of the new refresh token.
- Raises:
APIAuthError: If the given token is active but has been replaced.
- Notes:
- This function first checks if the given token is active.
- If the token is active but has been replaced, all tokens associated with the user are revoked and an APIAuthError is raised.
- If the token is active, a new refresh token is generated and the old one is replaced by it.
- The new refresh token is added to the database and the changes are committed.
- Finally, a new auth token and the ID of the new refresh token are returned in a dictionary.
'''
# Check if the given token is active
current_token = get_token(token_id)
if current_token:
# If the token is active but has been replaced
# Someone is using a stolen token
# Revoke all tokens and raise exception
if current_token.replaced_by:
revoke_all_tokens(current_token.user_id)
raise APIAuthError('Illegal token')
# Generate new refresh token
new_token = generate_refresh_token(current_token.user_id)
current_token.replaced_by = new_token.id
db.session.add(new_token)
db.session.commit()
access_token = generate_access_token(current_token.user_id)
return {
'access_token': access_token,
'refresh_token': new_token.id
}
def revoke_all_tokens(user_id):
'''
Revoke all tokens for a user.
- Args:
user_id: The ID of the user.
'''
Token.query.filter_by(user_id = user_id).update({
'revoked_at': datetime.utcnow()
})
db.session.commit()
def revoke_token(token_id):
'''
Revoke a token by updating the 'revoked_at' field to the current datetime.
- Args:
token_id: The ID of the token to be revoked.
'''
# TODO: Maybe exception handling is needed for the db operation
Token.query.filter_by(id = token_id).update({
'revoked_at': datetime.utcnow()
})
db.session.commit()
def get_token(token_id):
'''
Retrieve a token from the database.
- Args:
token: The ID of the token to retrieve.
- Returns:
The corresponding token object.
- Raises APIAuthError: If the token is not found or is invalid.
'''
refresh_token = Token.query.filter_by(id = token_id).first()
if not refresh_token:
raise APIAuthError('Token not found')
if (
refresh_token.revoked_at or
refresh_token.expires_at < datetime.utcnow()
):
raise APIAuthError('Invalid token')
return refresh_token
class APIAuthError(Exception):
description = 'Authentication Error'
def __init__(self, message, code = 403):
self.message = message
self.code = code
super().__init__(self.message)
\ No newline at end of file
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment