Browse Source

Implemented notifications support

pull/7/head
Kenneth Bruen 3 years ago
parent
commit
f1cbb3a125
Signed by: kbruen
GPG Key ID: C1980A470C3EE5B1
  1. 2
      server/foxbank_server/apis/__init__.py
  2. 2
      server/foxbank_server/apis/accounts.py
  3. 60
      server/foxbank_server/apis/notifications.py
  4. 72
      server/foxbank_server/db_utils.py
  5. 34
      server/foxbank_server/models.py

2
server/foxbank_server/apis/__init__.py

@ -4,6 +4,7 @@ from flask_smorest import Api
from .accounts import bp as acc_bp
from .login import bp as login_bp
from .transactions import bp as transactions_bp
from .notifications import bp as notifications_bp
class ApiWithErr(Api):
def handle_http_exception(self, error):
@ -29,3 +30,4 @@ def init_apis(app: Flask):
api.register_blueprint(login_bp, url_prefix='/login')
api.register_blueprint(acc_bp, url_prefix='/accounts')
api.register_blueprint(transactions_bp, url_prefix='/transactions')
api.register_blueprint(notifications_bp, url_prefix='/notifications')

2
server/foxbank_server/apis/accounts.py

@ -70,7 +70,7 @@ class AccountResponseSchema(returns.SuccessSchema):
@bp.get('/<int:account_id>')
@ensure_logged_in
@bp.response(401, returns.ErrorSchema, description='Login failure')
@bp.response(401, returns.ErrorSchema, description='Login failure or not allowed')
@bp.doc(security=[{'Token': []}])
@bp.response(200, AccountResponseSchema)
def get_account_id(account_id: int):

60
server/foxbank_server/apis/notifications.py

@ -0,0 +1,60 @@
from datetime import datetime
from flask.views import MethodView
from flask_smorest import Blueprint
from marshmallow import Schema, fields
from ..db_utils import get_notifications, insert_notification, mark_notification_as_read, whose_notification
from ..decorators import ensure_logged_in
from ..models import Notification
from .. import decorators, returns
bp = Blueprint('notifications', __name__, description='Notifications operations')
@bp.post('/<int:notification_id>/mark_read')
@ensure_logged_in
@bp.response(401, returns.ErrorSchema, description='Login failure or not allowed')
@bp.doc(security=[{'Token': []}])
@bp.response(201, description='Successfully marked as read')
def mark_as_read(notification_id: int):
"""Mark notification as read"""
if decorators.user_id != whose_notification(notification_id):
return returns.abort(returns.UNAUTHORIZED)
mark_notification_as_read(notification_id)
@bp.route('/')
class NotificationsList(MethodView):
class NotificationsListPostParams(Schema):
body = fields.Str(description='Text of the notification')
read = fields.Bool(default=False, description='Whether the notification was read or not')
class NotificationsListPostSchema(returns.SuccessSchema):
notification = fields.Nested(Notification.NotificationSchema)
@ensure_logged_in
@bp.response(401, returns.ErrorSchema, description='Login failure')
@bp.doc(security=[{'Token': []}])
@bp.arguments(NotificationsListPostParams, as_kwargs=True)
@bp.response(200, NotificationsListPostSchema)
def post(self, body: str, read: bool = False):
"""Post a notification to the currently logged in user
The usefulness of this endpoint is questionable besides debugging since it's a notification to self
"""
now = datetime.now()
notification = Notification.new_notification(body, now, read)
insert_notification(decorators.user_id, notification)
return returns.success(notification=notification)
class NotificationsListGetSchema(returns.SuccessSchema):
notifications = fields.List(fields.Nested(Notification.NotificationSchema))
@ensure_logged_in
@bp.response(401, returns.ErrorSchema, description='Login failure')
@bp.doc(security=[{'Token': []}])
@bp.response(200, NotificationsListGetSchema)
def get(self):
"""Get all notifications for current user"""
notifications = get_notifications(decorators.user_id)
return returns.success(notifications=notifications)

72
server/foxbank_server/db_utils.py

@ -231,10 +231,80 @@ class Module(ModuleType):
transaction.id = cur.fetchone()['id']
cur.execute(
'insert into accounts_transactions(account_id, transaction_id) VALUES (?, ?)',
'insert into accounts_transactions(account_id, transaction_id) values (?, ?)',
(account_id, transaction.id),
)
self.db.commit()
@get_db
def get_notifications(self, user_id: int) -> list[models.Notification]:
cur = self.db.cursor()
cur.execute(
'''
select n.id, n.body, n.datetime, n.read
from notifications as n
inner join users_notifications on n.id = users_notifications.notification_id
where users_notifications.user_id = ?
''',
(user_id,),
)
return [models.Notification.from_query(q) for q in cur.fetchall()]
@get_db
def insert_notification(self, user_id: int, notification: models.Notification):
cur = self.db.cursor()
cur.execute(
'insert into notifications(body, datetime, read) values (?, ?, ?)',
(
notification.body,
notification.date_time.isoformat(),
1 if notification.read else 0,
),
)
cur.execute(
'select id from notifications where body = ? and datetime = ? and read = ?',
(
notification.body,
notification.date_time.isoformat(),
1 if notification.read else 0,
),
)
notification.id = cur.fetchone()['id']
cur.execute(
'insert into users_notifications values (?, ?)',
(user_id, notification.id,),
)
self.db.commit()
@get_db
def whose_notification(self, notification: int | models.Notification) -> int | None:
try:
notification_id = notification.id
except AttributeError:
notification_id = notification
cur = self.db.cursor()
cur.execute('select user_id from users_notifications where notification_id = ?', (notification_id,))
result = cur.fetchone()
if not result:
return None
return result[0]
@get_db
def mark_notification_as_read(self, notification_id: int):
cur = self.db.cursor()
cur.execute(
'update notifications set read = 1 where id = ?',
(notification_id,),
)
self.db.commit()
sys.modules[__name__] = Module(__name__)

34
server/foxbank_server/models.py

@ -106,7 +106,7 @@ class Transaction:
extra = fields.Dict(keys=fields.Str(), values=fields.Raw())
@staticmethod
def new_transaction(date_time: datetime, other_party: str, status: str, transaction_type: str, extra: str = '') -> 'Account':
def new_transaction(date_time: datetime, other_party: str, status: str, transaction_type: str, extra: str = '') -> 'Transaction':
return Transaction(
id=-1,
date_time=date_time,
@ -141,3 +141,35 @@ class Transaction:
query_result[5] = json.loads(query_result[5])
return cls(*query_result)
@dataclass
class Notification:
id: int
body: str
date_time: datetime
read: bool
class NotificationSchema(Schema):
id = fields.Int(required=False)
body = fields.Str()
date_time = fields.DateTime(data_key='datetime')
read = fields.Bool()
@staticmethod
def new_notification(body: str, date_time: datetime, read: bool = False) -> 'Notification':
return Notification(
id=-1,
body=body,
date_time=date_time,
read=read,
)
@classmethod
def from_query(cls, query_result):
query_result = list(query_result)
if type(query_result[2]) is str:
query_result[2] = datetime.fromisoformat(query_result[2])
if type(query_result[3]) is not bool:
query_result[3] = bool(query_result[3])
return cls(*query_result)

Loading…
Cancel
Save