23 changed files with 772 additions and 198 deletions
@ -1,24 +0,0 @@
|
||||
from functools import wraps |
||||
|
||||
import db |
||||
import models |
||||
|
||||
def get_db(fn): |
||||
@wraps(fn) |
||||
def wrapper(*args, **kargs): |
||||
return fn(db.get(), *args, **kargs) |
||||
return wrapper |
||||
|
||||
@get_db |
||||
def get_user(db: db.get_return, username: str|None = None, user_id: int|None = None) -> models.User | None: |
||||
cur = db.cursor() |
||||
if username is not None: |
||||
cur.execute('select * from users where username=?', (username,)) |
||||
elif user_id is not None: |
||||
cur.execute('select * from users where id=?', (user_id,)) |
||||
else: |
||||
raise Exception('Neither username or user_id passed') |
||||
result = cur.fetchone() |
||||
if result is None: |
||||
return None |
||||
return models.User.from_query(result) |
@ -1,12 +0,0 @@
|
||||
from http import HTTPStatus |
||||
from functools import wraps |
||||
|
||||
def no_content(fn): |
||||
@wraps(fn) |
||||
def wrapper(*args, **kargs): |
||||
result = fn(*args, **kargs) |
||||
if result is None: |
||||
return None, HTTPStatus.NO_CONTENT |
||||
else: |
||||
return result |
||||
return wrapper |
@ -0,0 +1,33 @@
|
||||
from flask import Flask |
||||
from .apis import init_apis |
||||
|
||||
class Config: |
||||
OPENAPI_VERSION = "3.0.2" |
||||
OPENAPI_JSON_PATH = "api-spec.json" |
||||
OPENAPI_URL_PREFIX = "/" |
||||
OPENAPI_REDOC_PATH = "/redoc" |
||||
OPENAPI_REDOC_URL = ( |
||||
"https://cdn.jsdelivr.net/npm/redoc@next/bundles/redoc.standalone.js" |
||||
) |
||||
OPENAPI_SWAGGER_UI_PATH = "/swagger-ui" |
||||
OPENAPI_SWAGGER_UI_URL = "https://cdn.jsdelivr.net/npm/swagger-ui-dist/" |
||||
OPENAPI_RAPIDOC_PATH = "/rapidoc" |
||||
OPENAPI_RAPIDOC_URL = "https://unpkg.com/rapidoc/dist/rapidoc-min.js" |
||||
|
||||
def create_app(): |
||||
app = Flask(__name__) |
||||
app.config.from_object(Config()) |
||||
|
||||
init_db(app) |
||||
init_cors(app) |
||||
init_apis(app) |
||||
|
||||
return app |
||||
|
||||
def init_cors(app): |
||||
from flask_cors import CORS |
||||
cors = CORS(app) |
||||
|
||||
def init_db(app): |
||||
from .db import init_app |
||||
init_app(app) |
@ -0,0 +1,29 @@
|
||||
from flask import Flask |
||||
from flask_smorest import Api |
||||
|
||||
from .accounts import bp as acc_bp |
||||
from .login import bp as login_bp |
||||
|
||||
class ApiWithErr(Api): |
||||
def handle_http_exception(self, error): |
||||
if error.data and error.data['response']: |
||||
return error.data['response'] |
||||
return super().handle_http_exception(error) |
||||
|
||||
def init_apis(app: Flask): |
||||
api = ApiWithErr(app, spec_kwargs={ |
||||
'title': 'FoxBank', |
||||
'version': '1', |
||||
'openapi_version': '3.0.0', |
||||
'components': { |
||||
'securitySchemes': { |
||||
'Token': { |
||||
'type': 'http', |
||||
'scheme': 'bearer', |
||||
'bearerFormat': 'Token ', |
||||
} |
||||
} |
||||
}, |
||||
}) |
||||
api.register_blueprint(login_bp, url_prefix='/login') |
||||
api.register_blueprint(acc_bp, url_prefix='/accounts') |
@ -0,0 +1,111 @@
|
||||
from http import HTTPStatus |
||||
from flask.views import MethodView |
||||
from flask_smorest import Blueprint, abort |
||||
from marshmallow import Schema, fields |
||||
from ..decorators import ensure_logged_in |
||||
from ..models import Account |
||||
from .. import decorators |
||||
from .. import db_utils |
||||
from .. import returns |
||||
|
||||
bp = Blueprint('accounts', __name__, description='Bank Accounts operations') |
||||
|
||||
VALID_CURRENCIES = ['RON', 'EUR', 'USD'] |
||||
ACCOUNT_TYPES = ['Checking', 'Savings'] |
||||
|
||||
class MetaCurrenciesSchema(Schema): |
||||
status = fields.Constant('success') |
||||
currencies = fields.List(fields.Str()) |
||||
|
||||
class MetaAccountTypesSchema(Schema): |
||||
status = fields.Constant('success') |
||||
account_types = fields.List(fields.Str(), data_key='accountTypes') |
||||
|
||||
@bp.get('/meta/currencies') |
||||
@bp.response(200, MetaCurrenciesSchema) |
||||
def get_valid_currencies(): |
||||
"""Get valid account currencies""" |
||||
return returns.success(currencies=VALID_CURRENCIES) |
||||
|
||||
|
||||
@bp.get('/meta/account_types') |
||||
@bp.response(200, MetaAccountTypesSchema) |
||||
def get_valid_account_types(): |
||||
"""Get valid account types""" |
||||
return returns.success(account_types=ACCOUNT_TYPES) |
||||
|
||||
|
||||
class AccountResponseSchema(returns.SuccessSchema): |
||||
account = fields.Nested(Account.Schema) |
||||
|
||||
|
||||
@bp.get('/<int:account_id>') |
||||
@ensure_logged_in |
||||
@bp.response(401, returns.ErrorSchema, description='Login failure') |
||||
@bp.doc(security=[{'Token': []}]) |
||||
@bp.response(200, AccountResponseSchema) |
||||
def get_account_id(account_id: int): |
||||
"""Get account by id""" |
||||
account = db_utils.get_account(account_id=account_id) |
||||
if account is None: |
||||
return returns.abort(returns.NOT_FOUND) |
||||
if decorators.user_id != db_utils.whose_account(account): |
||||
return returns.abort(returns.UNAUTHORIZED) |
||||
account = account.to_json() |
||||
return returns.success(account=account) |
||||
|
||||
|
||||
@bp.get('/IBAN_<iban>') |
||||
@ensure_logged_in |
||||
@bp.response(401, returns.ErrorSchema, description='Login failure') |
||||
@bp.doc(security=[{'Token': []}]) |
||||
@bp.response(200, AccountResponseSchema) |
||||
def get_account_iban(iban: str): |
||||
"""Get account by IBAN""" |
||||
account = db_utils.get_account(iban=iban) |
||||
if account is None: |
||||
return returns.abort(returns.NOT_FOUND) |
||||
if decorators.user_id != db_utils.whose_account(account): |
||||
return returns.abort(returns.UNAUTHORIZED) |
||||
account = account.to_json() |
||||
return returns.success(account=account) |
||||
|
||||
|
||||
@bp.route('/') |
||||
class AccountsList(MethodView): |
||||
class CreateAccountParams(Schema): |
||||
currency = fields.String() |
||||
account_type = fields.String(data_key='accountType') |
||||
custom_name = fields.String(data_key='customName') |
||||
|
||||
class CreateAccountResponseSchema(returns.SuccessSchema): |
||||
account = fields.Nested(Account.Schema) |
||||
|
||||
@ensure_logged_in |
||||
@bp.response(401, returns.ErrorSchema, description='Login failure') |
||||
@bp.doc(security=[{'Token': []}]) |
||||
@bp.arguments(CreateAccountParams, as_kwargs=True) |
||||
@bp.response(200, CreateAccountResponseSchema) |
||||
@bp.response(HTTPStatus.UNPROCESSABLE_ENTITY, description='Invalid currency or account type') |
||||
def post(self, currency: str, account_type: str, custom_name: str): |
||||
"""Create account""" |
||||
if currency not in VALID_CURRENCIES: |
||||
return returns.abort(returns.invalid_argument('currency')) |
||||
if account_type not in ACCOUNT_TYPES: |
||||
return returns.abort(returns.invalid_argument('account_type')) |
||||
|
||||
account = Account(-1, '', currency, account_type, custom_name or '') |
||||
db_utils.insert_account(decorators.user_id, account) |
||||
return returns.success(account=account.to_json()) |
||||
|
||||
class AccountsResponseSchema(returns.SuccessSchema): |
||||
accounts = fields.List(fields.Nested(Account.Schema)) |
||||
|
||||
@ensure_logged_in |
||||
@bp.response(401, returns.ErrorSchema, description='Login failure') |
||||
@bp.doc(security=[{'Token': []}]) |
||||
@bp.response(200, AccountsResponseSchema) |
||||
def get(self): |
||||
"""Get all accounts of user""" |
||||
return returns.success(accounts=db_utils.get_accounts(decorators.user_id)) |
||||
|
@ -0,0 +1,73 @@
|
||||
from flask.views import MethodView |
||||
from flask_smorest import Blueprint |
||||
from marshmallow import Schema, fields |
||||
from .. import returns, ram_db, decorators |
||||
from ..db_utils import get_user |
||||
from ..models import User |
||||
from ..decorators import ensure_logged_in |
||||
|
||||
from pyotp import TOTP |
||||
|
||||
bp = Blueprint('login', __name__, description='Login operations') |
||||
|
||||
class LoginParams(Schema): |
||||
username = fields.String() |
||||
code = fields.String() |
||||
|
||||
class LoginResult(returns.SuccessSchema): |
||||
token = fields.String() |
||||
|
||||
class LoginSuccessSchema(returns.SuccessSchema): |
||||
token = fields.String() |
||||
|
||||
@bp.route('/') |
||||
class Login(MethodView): |
||||
@bp.arguments(LoginParams, as_kwargs=True) |
||||
@bp.response(401, returns.ErrorSchema, description='Login failure') |
||||
@bp.response(200, LoginSuccessSchema) |
||||
def post(self, username: str, code: str): |
||||
"""Login via username and TOTP code""" |
||||
user: User | None = get_user(username=username) |
||||
if user is None: |
||||
return returns.abort(returns.INVALID_DETAILS) |
||||
|
||||
otp = TOTP(user.otp) |
||||
if not otp.verify(code, valid_window=1): |
||||
return returns.abort(returns.INVALID_DETAILS) |
||||
|
||||
token = ram_db.login_user(user.id) |
||||
return returns.success(token=token) |
||||
|
||||
@ensure_logged_in |
||||
@bp.doc(security=[{'Token': []}]) |
||||
@bp.response(401, returns.ErrorSchema, description='Login failure') |
||||
@bp.response(204) |
||||
def delete(self): |
||||
"""Logout""" |
||||
ram_db.logout_user(decorators.token) |
||||
|
||||
@bp.post('/logout') |
||||
@ensure_logged_in |
||||
@bp.doc(security=[{'Token': []}]) |
||||
@bp.response(401, returns.ErrorSchema, description='Login failure') |
||||
@bp.response(204) |
||||
def logout_route(): |
||||
"""Logout""" |
||||
ram_db.logout_user(decorators.token) |
||||
|
||||
@bp.route('/whoami') |
||||
class WhoAmI(MethodView): |
||||
class WhoAmISchema(returns.SuccessSchema): |
||||
user = fields.Nested(User.UserSchema) |
||||
|
||||
@bp.response(401, returns.ErrorSchema, description='Login failure') |
||||
@bp.response(200, WhoAmISchema) |
||||
@bp.doc(security=[{'Token': []}]) |
||||
@ensure_logged_in |
||||
def get(self): |
||||
"""Get information about currently logged in user""" |
||||
user: User | None = get_user(user_id=decorators.user_id) |
||||
if user is not None: |
||||
user = user.to_json() |
||||
|
||||
return returns.success(user=user) |
@ -0,0 +1,161 @@
|
||||
from functools import wraps |
||||
import sys |
||||
from types import ModuleType |
||||
|
||||
from . import db as _db |
||||
from . import models |
||||
|
||||
_db_global: None | tuple[_db.get_return, int] = None |
||||
|
||||
|
||||
def get_db(fn): |
||||
@wraps(fn) |
||||
def wrapper(*args, **kargs): |
||||
global _db_global |
||||
if _db_global is None: |
||||
_db_global = _db.get(), 1 |
||||
else: |
||||
_db_global = _db_global[0], _db_global[1] + 1 |
||||
result = fn(*args, **kargs) |
||||
_db_global = _db_global[0], _db_global[1] - 1 |
||||
if _db_global[1] == 0: |
||||
_db_global = None |
||||
return result |
||||
|
||||
return wrapper |
||||
|
||||
|
||||
class Module(ModuleType): |
||||
@property |
||||
def db(self) -> _db.get_return: |
||||
if _db_global is None: |
||||
raise Exception('Function not wrapped with @get_db, db unavailable') |
||||
return _db_global[0] |
||||
|
||||
|
||||
@get_db |
||||
def get_user(self, username: str | None = None, user_id: int | None = None) -> models.User | None: |
||||
cur = self.db.cursor() |
||||
if username is not None: |
||||
cur.execute('select * from users where username=?', (username,)) |
||||
elif user_id is not None: |
||||
cur.execute('select * from users where id=?', (user_id,)) |
||||
else: |
||||
raise Exception('Neither username or user_id passed') |
||||
result = cur.fetchone() |
||||
if result is None: |
||||
return None |
||||
return models.User.from_query(result) |
||||
|
||||
|
||||
@get_db |
||||
def insert_user(self, user: models.User): |
||||
# Prepare user |
||||
if not user.otp: |
||||
from pyotp import random_base32 |
||||
user.otp = random_base32() |
||||
|
||||
cur = self.db.cursor() |
||||
cur.execute( |
||||
'insert into users(username, email, otp, fullname) values (?, ?, ?, ?)', |
||||
(user.username, user.email, user.otp, user.fullname), |
||||
) |
||||
|
||||
cur.execute( |
||||
'select id from users where username = ? and email = ? and otp = ? and fullname = ?', |
||||
(user.username, user.email, user.otp, user.fullname), |
||||
) |
||||
|
||||
user.id = cur.fetchone()['id'] |
||||
|
||||
|
||||
@get_db |
||||
def get_accounts(self, user_id: int | None = None) -> list[models.Account]: |
||||
""" |
||||
Get all accounts. |
||||
If `user_id` is provided, get only the accounts for the matching user. |
||||
""" |
||||
cur = self.db.cursor() |
||||
if user_id: |
||||
cur.execute(''' |
||||
select id, iban, currency, account_type, custom_name from accounts |
||||
inner join users_accounts |
||||
on accounts.id = users_accounts.account_id |
||||
where users_accounts.user_id = ? |
||||
''', (user_id,)) |
||||
else: |
||||
cur.execute('select id, iban, currency, account_type, custom_name from accounts') |
||||
return [models.Account.from_query(q) for q in cur.fetchall()] |
||||
|
||||
|
||||
@get_db |
||||
def get_account(self, account_id: int | None = None, iban: str | None = None) -> models.Account | None: |
||||
cur = self.db.cursor() |
||||
if account_id is not None: |
||||
cur.execute( |
||||
'select * from accounts where id=?', |
||||
(account_id,), |
||||
) |
||||
elif iban is not None: |
||||
cur.execute( |
||||
'select * from accounts where iban=?', |
||||
(iban,), |
||||
) |
||||
else: |
||||
raise Exception('Neither username or user_id passed') |
||||
result = cur.fetchone() |
||||
if result is None: |
||||
return None |
||||
return models.Account.from_query(result) |
||||
|
||||
|
||||
@get_db |
||||
def whose_account(self, account: int | models.Account) -> int | None: |
||||
try: |
||||
account_id = account.id |
||||
except AttributeError: |
||||
account_id = account |
||||
|
||||
cur = self.db.cursor() |
||||
cur.execute('select user_id from users_accounts where account_id = ?', (account_id,)) |
||||
result = cur.fetchone() |
||||
if not result: |
||||
return None |
||||
return result['user_id'] |
||||
|
||||
|
||||
@get_db |
||||
def insert_account(self, user_id: int, account: models.Account): |
||||
# Prepare account |
||||
ibans = [acc.iban for acc in self.get_accounts(user_id)] |
||||
if not account.iban: |
||||
from random import randint |
||||
while True: |
||||
iban = 'RO00FOXB0' + account.currency |
||||
iban += str(randint(10, 10 ** 12 - 1)).rjust(12, '0') |
||||
from .utils.iban import gen_check_digits |
||||
iban = gen_check_digits(iban) |
||||
if iban not in ibans: |
||||
break |
||||
account.iban = iban |
||||
|
||||
cur = self.db.cursor() |
||||
cur.execute( |
||||
'insert into accounts(iban, currency, account_type, custom_name) values (?, ?, ?, ?)', |
||||
(account.iban, account.currency, account.account_type, account.custom_name), |
||||
) |
||||
|
||||
cur.execute( |
||||
'select id from accounts where iban = ?', |
||||
(account.iban,), |
||||
) |
||||
account.id = cur.fetchone()['id'] |
||||
|
||||
cur.execute( |
||||
'insert into users_accounts(user_id, account_id) VALUES (?, ?)', |
||||
(user_id, account.id) |
||||
) |
||||
|
||||
self.db.commit() |
||||
|
||||
sys.modules[__name__] = Module(__name__) |
@ -0,0 +1,74 @@
|
||||
import sys |
||||
from types import ModuleType |
||||
from flask import request |
||||
from http import HTTPStatus |
||||
from functools import wraps |
||||
from . import ram_db |
||||
from . import returns |
||||
|
||||
_token: str | None = None |
||||
_user_id: int | None = None |
||||
|
||||
class Module(ModuleType): |
||||
def no_content(self, fn): |
||||
""" |
||||
Allows a Flask route to return None, which is converted into |
||||
HTTP 201 No Content. |
||||
""" |
||||
@wraps(fn) |
||||
def wrapper(*args, **kargs): |
||||
result = fn(*args, **kargs) |
||||
if result is None: |
||||
return None, HTTPStatus.NO_CONTENT |
||||
else: |
||||
return result |
||||
return wrapper |
||||
|
||||
@property |
||||
def token(self) -> str: |
||||
if _token is None: |
||||
raise Exception('No token available') |
||||
return _token |
||||
|
||||
@property |
||||
def user_id(self) -> int: |
||||
if _user_id is None: |
||||
raise Exception('No user_id available') |
||||
return _user_id |
||||
|
||||
def ensure_logged_in(self, fn): |
||||
""" |
||||
Ensure the user is logged in by providing an Authorization: Bearer token |
||||
header. |
||||
|
||||
@param token whether the token should be supplied after validation |
||||
@param user_id whether the user_id should be supplied after validation |
||||
@return decorator which supplies the requested parameters |
||||
""" |
||||
@wraps(fn) |
||||
def wrapper(*args, **kargs): |
||||
token = request.headers.get('Authorization', None) |
||||
if token is None: |
||||
return returns.abort(returns.NO_AUTHORIZATION) |
||||
if not token.startswith('Bearer '): |
||||
return returns.abort(returns.INVALID_AUTHORIZATION) |
||||
token = token[7:] |
||||
user_id = ram_db.get_user(token) |
||||
if user_id is None: |
||||
return returns.abort(returns.INVALID_AUTHORIZATION) |
||||
|
||||
global _token |
||||
_token = token |
||||
global _user_id |
||||
_user_id = user_id |
||||
|
||||
result = fn(*args, **kargs) |
||||
|
||||
_token = None |
||||
_user_id = None |
||||
|
||||
return result |
||||
|
||||
return wrapper |
||||
|
||||
sys.modules[__name__] = Module(__name__) |
@ -0,0 +1,84 @@
|
||||
from dataclasses import dataclass |
||||
from marshmallow import Schema, fields |
||||
|
||||
@dataclass |
||||
class User: |
||||
id: int |
||||
username: str |
||||
email: str |
||||
otp: str |
||||
fullname: str |
||||
|
||||
class UserSchema(Schema): |
||||
id = fields.Int(required=False) |
||||
username = fields.String() |
||||
email = fields.String() |
||||
otp = fields.String(load_only=True, required=False) |
||||
fullname = fields.String() |
||||
|
||||
@staticmethod |
||||
def new_user(username: str, email: str, fullname: str) -> 'User': |
||||
return User( |
||||
id=-1, |
||||
username=username, |
||||
email=email, |
||||
otp='', |
||||
fullname=fullname, |
||||
) |
||||
|
||||
def to_json(self, include_otp=False, include_id=False): |
||||
result = { |
||||
'username': self.username, |
||||
'email': self.email, |
||||
'fullname': self.fullname, |
||||
} |
||||
if include_id: |
||||
result['id'] = self.id |
||||
if include_otp: |
||||
result['otp'] = self.otp |
||||
return result |
||||
|
||||
@classmethod |
||||
def from_query(cls, query_result): |
||||
return cls(*query_result) |
||||
|
||||
|
||||
@dataclass |
||||
class Account: |
||||
id: int |
||||
iban: str |
||||
currency: str |
||||
account_type: str |
||||
custom_name: str |
||||
|
||||
class Schema(Schema): |
||||
id = fields.Int(required=False) |
||||
iban = fields.Str() |
||||
currency = fields.Str() |
||||
account_type = fields.Str(data_key='accountType') |
||||
custom_name = fields.Str(data_key='customName') |
||||
|
||||
@staticmethod |
||||
def new_account(currency: str, account_type: str, custom_name: str = '') -> 'Account': |
||||
return Account( |
||||
id=-1, |
||||
iban='', |
||||
currency=currency, |
||||
account_type=account_type, |
||||
custom_name=custom_name, |
||||
) |
||||
|
||||
def to_json(self, include_id=True): |
||||
result = { |
||||
'iban': self.iban, |
||||
'currency': self.currency, |
||||
'accountType': self.account_type, |
||||
'customName': self.custom_name, |
||||
} |
||||
if include_id: |
||||
result['id'] = self.id |
||||
return result |
||||
|
||||
@classmethod |
||||
def from_query(cls, query_result): |
||||
return cls(*query_result) |
@ -1,5 +1,4 @@
|
||||
from datetime import datetime, timedelta |
||||
from types import TracebackType |
||||
from uuid import uuid4 |
||||
|
||||
USED_TOKENS = set() |
@ -0,0 +1,94 @@
|
||||
from http import HTTPStatus as _HTTPStatus |
||||
from typing import Any |
||||
|
||||
|
||||
def _make_error(http_status, code: str, message: str | None = None): |
||||
try: |
||||
http_status = http_status[0] |
||||
except Exception: |
||||
pass |
||||
|
||||
payload = { |
||||
'status': 'error', |
||||
'code': code, |
||||
} |
||||
|
||||
if message is not None: |
||||
payload['message'] = message |
||||
|
||||
return payload, http_status |
||||
|
||||
|
||||
# General |
||||
|
||||
INVALID_REQUEST = _make_error( |
||||
_HTTPStatus.BAD_REQUEST, |
||||
'general/invalid_request', |
||||
) |
||||
|
||||
NOT_FOUND = _make_error( |
||||
_HTTPStatus.NOT_FOUND, |
||||
'general/not_found', |
||||
) |
||||
|
||||
def invalid_argument(argname: str) -> tuple[Any, int]: |
||||
return _make_error( |
||||
_HTTPStatus.UNPROCESSABLE_ENTITY, |
||||
'general/invalid_argument', |
||||
message=f'Invalid argument: {argname}', |
||||
) |
||||
|
||||
# Login |
||||
|
||||
INVALID_DETAILS = _make_error( |
||||
_HTTPStatus.UNAUTHORIZED, |
||||
'login/invalid_details', |
||||
) |
||||
|
||||
NO_AUTHORIZATION = _make_error( |
||||
_HTTPStatus.UNAUTHORIZED, |
||||
'login/no_authorization', |
||||
) |
||||
|
||||
INVALID_AUTHORIZATION = _make_error( |
||||
_HTTPStatus.UNAUTHORIZED, |
||||
'login/invalid_authorization', |
||||
) |
||||
|
||||
UNAUTHORIZED = _make_error( |
||||
_HTTPStatus.UNAUTHORIZED, |
||||
'login/unauthorized', |
||||
"You are logged in but the resource you're trying to access isn't available to you", |
||||
) |
||||
|
||||
|
||||
# Success |
||||
|
||||
def success(http_status: Any = _HTTPStatus.OK, /, **kargs): |
||||
try: |
||||
http_status = http_status[0] |
||||
except Exception: |
||||
pass |
||||
|
||||
return dict(kargs, status='success'), http_status |
||||
|
||||
# Schemas |
||||
|
||||
from marshmallow import Schema, fields |
||||
|
||||
class ErrorSchema(Schema): |
||||
status = fields.Constant('error') |
||||
code = fields.Str() |
||||
message = fields.Str(required=False) |
||||
|
||||
class SuccessSchema(Schema): |
||||
status = fields.Constant('success') |
||||
|
||||
# smorest |
||||
|
||||
def abort(result: tuple[Any, int]): |
||||
try: |
||||
from flask_smorest import abort as _abort |
||||
_abort(result[1], response=result) |
||||
except ImportError: |
||||
return result |
@ -0,0 +1,31 @@
|
||||
from .string import str_range_replace |
||||
|
||||
|
||||
def c_to_iban_i(c: str) -> int: |
||||
a = ord(c) |
||||
if a in range(48, 58): |
||||
return a - 48 |
||||
elif a in range(65, 91): |
||||
return a - 65 + 10 |
||||
elif a in range(97, 123): |
||||
return a - 97 + 10 |
||||
else: |
||||
raise ValueError(f'Invalid IBAN character: {c} (ord: {a})') |
||||
|
||||
|
||||
def iban_to_int(iban: str) -> int: |
||||
iban = iban[4:] + iban[0:4] |
||||
return int(''.join(map(str, map(c_to_iban_i, iban)))) |
||||
|
||||
|
||||
def check_iban(iban: str) -> bool: |
||||
num = iban_to_int(iban) |
||||
return num % 97 == 1 |
||||
|
||||
|
||||
def gen_check_digits(iban: str) -> str: |
||||
iban = str_range_replace(iban, '00', 2, 4) |
||||
num = iban_to_int(iban) |
||||
check = 98 - (num % 97) |
||||
iban = str_range_replace(iban, str(check).rjust(2, '0'), 2, 4) |
||||
return iban |
@ -0,0 +1,7 @@
|
||||
def str_range_replace( |
||||
input: str, |
||||
replace_with: str, |
||||
range_start: int | None = None, |
||||
range_end: int | None = None, |
||||
) -> str: |
||||
return input[:range_start] + replace_with + input[range_end:] |
@ -1,73 +0,0 @@
|
||||
from functools import wraps |
||||
from flask import Blueprint, request |
||||
|
||||
from pyotp import TOTP |
||||
|
||||
import db_utils |
||||
from decorators import no_content |
||||
import models |
||||
import ram_db |
||||
import returns |
||||
|
||||
login = Blueprint('login', __name__) |
||||
|
||||
@login.post('/') |
||||
def make_login(): |
||||
try: |
||||
username = request.json['username'] |
||||
code = request.json['code'] |
||||
except (TypeError, KeyError): |
||||
return returns.INVALID_REQUEST |
||||
|
||||
user: models.User | None = db_utils.get_user(username=username) |
||||
if user is None: |
||||
return returns.INVALID_DETAILS |
||||
|
||||
otp = TOTP(user.otp) |
||||
if not otp.verify(code, valid_window=1): |
||||
return returns.INVALID_DETAILS |
||||
|
||||
token = ram_db.login_user(user.id) |
||||
return returns.success(token=token) |
||||
|
||||
def ensure_logged_in(token=False, user_id=False): |
||||
def decorator(fn): |
||||
pass_token = token |
||||
pass_user_id = user_id |
||||
@wraps(fn) |
||||
def wrapper(*args, **kargs): |
||||
token = request.headers.get('Authorization', None) |
||||
if token is None: |
||||
return returns.NO_AUTHORIZATION |
||||
if not token.startswith('Bearer '): |
||||
return returns.INVALID_AUTHORIZATION |
||||
token = token[7:] |
||||
user_id = ram_db.get_user(token) |
||||
if user_id is None: |
||||
return returns.INVALID_AUTHORIZATION |
||||
|
||||
if pass_user_id and pass_token: |
||||
return fn(user_id=user_id, token=token, *args, **kargs) |
||||
elif pass_user_id: |
||||
return fn(user_id=user_id, *args, **kargs) |
||||
elif pass_token: |
||||
return fn(token=token, *args, **kargs) |
||||
else: |
||||
return fn(*args, **kargs) |
||||
return wrapper |
||||
return decorator |
||||
|
||||
@login.post('/logout') |
||||
@ensure_logged_in(token=True) |
||||
@no_content |
||||
def logout(token: str): |
||||
ram_db.logout_user(token) |
||||
|
||||
@login.get('/whoami') |
||||
@ensure_logged_in(user_id=True) |
||||
def whoami(user_id: int): |
||||
user: models.User | None = db_utils.get_user(user_id=user_id) |
||||
if user is not None: |
||||
user = user.to_json() |
||||
|
||||
return returns.success(user=user) |
@ -1,25 +0,0 @@
|
||||
from dataclasses import dataclass |
||||
|
||||
@dataclass |
||||
class User: |
||||
id: int |
||||
username: str |
||||
email: str |
||||
otp: str |
||||
fullname: str |
||||
|
||||
def to_json(self, include_otp=False, include_id=False): |
||||
result = { |
||||
'username': self.username, |
||||
'email': self.email, |
||||
'fullname': self.fullname, |
||||
} |
||||
if include_id: |
||||
result['id'] = self.id |
||||
if include_otp: |
||||
result['otp'] = self.otp |
||||
return result |
||||
|
||||
@classmethod |
||||
def from_query(cls, query_result): |
||||
return cls(*query_result) |
@ -1,46 +0,0 @@
|
||||
from http import HTTPStatus as _HTTPStatus |
||||
|
||||
def _make_error(http_status, code: str): |
||||
try: |
||||
http_status = http_status[0] |
||||
except Exception: |
||||
pass |
||||
|
||||
return { |
||||
'status': 'error', |
||||
'code': code, |
||||
}, http_status |
||||
|
||||
# General |
||||
|
||||
INVALID_REQUEST = _make_error( |
||||
_HTTPStatus.BAD_REQUEST, |
||||
'general/invalid_request', |
||||
) |
||||
|
||||
# Login |
||||
|
||||
INVALID_DETAILS = _make_error( |
||||
_HTTPStatus.UNAUTHORIZED, |
||||
'login/invalid_details', |
||||
) |
||||
|
||||
NO_AUTHORIZATION = _make_error( |
||||
_HTTPStatus.UNAUTHORIZED, |
||||
'login/no_authorization', |
||||
) |
||||
|
||||
INVALID_AUTHORIZATION = _make_error( |
||||
_HTTPStatus.UNAUTHORIZED, |
||||
'login/invalid_authorization', |
||||
) |
||||
|
||||
# Success |
||||
|
||||
def success(http_status=_HTTPStatus.OK, /, **kargs): |
||||
try: |
||||
http_status = http_status[0] |
||||
except Exception: |
||||
pass |
||||
|
||||
return dict(kargs, status='success'), http_status |
@ -1,14 +1,18 @@
|
||||
from flask import Flask |
||||
from flask_cors import CORS |
||||
#! /usr/bin/env python3 |
||||
from foxbank_server import create_app |
||||
|
||||
import db |
||||
app = create_app() |
||||
# api = Api(app) |
||||
# CORS(app) |
||||
# db.init_app(app) |
||||
|
||||
app = Flask(__name__) |
||||
CORS(app) |
||||
db.init_app(app) |
||||
# from login import login |
||||
# app.register_blueprint(login, url_prefix='/login') |
||||
|
||||
from login import login |
||||
app.register_blueprint(login, url_prefix='/login') |
||||
# from bank_accounts import blueprint as ba_bp, namespace as ba_ns |
||||
# app.register_blueprint(ba_bp, url_prefix='/accounts') |
||||
|
||||
# accounts_ns = api.add_namespace(ba_ns, '/accounts') |
||||
|
||||
if __name__ == '__main__': |
||||
app.run() |
||||
app.run(debug=True) |
||||
|
Loading…
Reference in new issue