Kenneth Bruen
3 years ago
11 changed files with 243 additions and 9 deletions
@ -0,0 +1,23 @@
|
||||
{ |
||||
// Use IntelliSense to learn about possible attributes. |
||||
// Hover to view descriptions of existing attributes. |
||||
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 |
||||
"version": "0.2.0", |
||||
"configurations": [ |
||||
{ |
||||
"name": "Python: Flask", |
||||
"type": "python", |
||||
"request": "launch", |
||||
"module": "flask", |
||||
"env": { |
||||
"FLASK_APP": "server.py", |
||||
"FLASK_ENV": "development" |
||||
}, |
||||
"args": [ |
||||
"run", |
||||
"--no-debugger" |
||||
], |
||||
"jinja": true |
||||
} |
||||
] |
||||
} |
@ -0,0 +1,24 @@
|
||||
from functools import wraps |
||||
|
||||
import db |
||||
import models |
||||
|
||||
def get_db(fn): |
||||
@wraps(fn) |
||||
def wrapper(*args, **kargs): |
||||
return fn(db.get(), *args, **kargs) |
||||
return wrapper |
||||
|
||||
@get_db |
||||
def get_user(db: db.get_return, username: str|None = None, user_id: int|None = None) -> models.User | None: |
||||
cur = db.cursor() |
||||
if username is not None: |
||||
cur.execute('select * from users where username=?', (username,)) |
||||
elif user_id is not None: |
||||
cur.execute('select * from users where id=?', (user_id,)) |
||||
else: |
||||
raise Exception('Neither username or user_id passed') |
||||
result = cur.fetchone() |
||||
if result is None: |
||||
return None |
||||
return models.User.from_query(result) |
@ -0,0 +1,54 @@
|
||||
from functools import wraps |
||||
from flask import Blueprint, request |
||||
|
||||
from pyotp import TOTP |
||||
|
||||
import db_utils |
||||
import models |
||||
import ram_db |
||||
import returns |
||||
|
||||
login = Blueprint('login', __name__) |
||||
|
||||
@login.post('/') |
||||
def make_login(): |
||||
try: |
||||
username = request.json['username'] |
||||
code = request.json['code'] |
||||
except (TypeError, KeyError): |
||||
return returns.INVALID_REQUEST |
||||
|
||||
user: models.User | None = db_utils.get_user(username=username) |
||||
if user is None: |
||||
return returns.INVALID_DETAILS |
||||
|
||||
otp = TOTP(user.otp) |
||||
if not otp.verify(code, valid_window=1): |
||||
return returns.INVALID_DETAILS |
||||
|
||||
token = ram_db.login_user(user.id) |
||||
return returns.success(token=token) |
||||
|
||||
def ensure_logged_in(fn): |
||||
@wraps(fn) |
||||
def wrapper(*args, **kargs): |
||||
token = request.headers.get('Authorization', None) |
||||
if token is None: |
||||
return returns.NO_AUTHORIZATION |
||||
if not token.startswith('Bearer '): |
||||
return returns.INVALID_AUTHORIZATION |
||||
token = token[7:] |
||||
user_id = ram_db.get_user(token) |
||||
if user_id is None: |
||||
return returns.INVALID_AUTHORIZATION |
||||
return fn(user_id=user_id, *args, **kargs) |
||||
return wrapper |
||||
|
||||
@login.get('/whoami') |
||||
@ensure_logged_in |
||||
def whoami(user_id): |
||||
user: models.User | None = db_utils.get_user(user_id=user_id) |
||||
if user is not None: |
||||
user = user.to_json() |
||||
|
||||
return returns.success(user=user) |
@ -0,0 +1,25 @@
|
||||
from dataclasses import dataclass |
||||
|
||||
@dataclass |
||||
class User: |
||||
id: int |
||||
username: str |
||||
email: str |
||||
otp: str |
||||
fullname: str |
||||
|
||||
def to_json(self, include_otp=False, include_id=False): |
||||
result = { |
||||
'username': self.username, |
||||
'email': self.email, |
||||
'fullname': self.fullname, |
||||
} |
||||
if include_id: |
||||
result['id'] = self.id |
||||
if include_otp: |
||||
result['otp'] = self.otp |
||||
return result |
||||
|
||||
@classmethod |
||||
def from_query(cls, query_result): |
||||
return cls(*query_result) |
@ -0,0 +1,30 @@
|
||||
from datetime import datetime, timedelta |
||||
from types import TracebackType |
||||
from uuid import uuid4 |
||||
|
||||
USED_TOKENS = set() |
||||
LOGGED_IN_USERS: dict[str, (int, datetime)] = {} |
||||
|
||||
def login_user(user_id: int) -> str: |
||||
''' |
||||
Creates token for user |
||||
''' |
||||
token = str(uuid4()) |
||||
while token in USED_TOKENS: |
||||
token = str(uuid4()) |
||||
if len(USED_TOKENS) > 10_000_000: |
||||
USED_TOKENS.clear() |
||||
USED_TOKENS.add(token) |
||||
LOGGED_IN_USERS[token] = user_id, datetime.now() |
||||
return token |
||||
|
||||
def get_user(token: str) -> int | None: |
||||
if token not in LOGGED_IN_USERS: |
||||
return None |
||||
|
||||
user_id, login_date = LOGGED_IN_USERS[token] |
||||
time_since_login: timedelta = datetime.now() - login_date |
||||
if time_since_login.total_seconds() > (60 * 30): # 30 mins |
||||
del LOGGED_IN_USERS[token] |
||||
return None |
||||
return user_id |
@ -0,0 +1,46 @@
|
||||
from http import HTTPStatus as _HTTPStatus |
||||
|
||||
def _make_error(http_status, code: str): |
||||
try: |
||||
http_status = http_status[0] |
||||
except Exception: |
||||
pass |
||||
|
||||
return { |
||||
'status': 'error', |
||||
'code': code, |
||||
}, http_status |
||||
|
||||
# General |
||||
|
||||
INVALID_REQUEST = _make_error( |
||||
_HTTPStatus.BAD_REQUEST, |
||||
'general/invalid_request', |
||||
) |
||||
|
||||
# Login |
||||
|
||||
INVALID_DETAILS = _make_error( |
||||
_HTTPStatus.UNAUTHORIZED, |
||||
'login/invalid_details', |
||||
) |
||||
|
||||
NO_AUTHORIZATION = _make_error( |
||||
_HTTPStatus.UNAUTHORIZED, |
||||
'login/no_authorization', |
||||
) |
||||
|
||||
INVALID_AUTHORIZATION = _make_error( |
||||
_HTTPStatus.UNAUTHORIZED, |
||||
'login/invalid_authorization', |
||||
) |
||||
|
||||
# Success |
||||
|
||||
def success(http_status=_HTTPStatus.OK, /, **kargs): |
||||
try: |
||||
http_status = http_status[0] |
||||
except Exception: |
||||
pass |
||||
|
||||
return dict(kargs, status='success'), http_status |
@ -1,10 +1,14 @@
|
||||
from flask import Flask |
||||
from flask_cors import CORS |
||||
|
||||
import db |
||||
|
||||
app = Flask(__name__) |
||||
CORS(app) |
||||
db.init_app(app) |
||||
|
||||
@app.get('/') |
||||
def root(): |
||||
return 'Hello from FoxBank!' |
||||
from login import login |
||||
app.register_blueprint(login, url_prefix='/login') |
||||
|
||||
if __name__ == '__main__': |
||||
app.run() |
||||
|
Loading…
Reference in new issue