health-apps-backend/services/notificationServices.py

82 lines
3.0 KiB
Python

from logging import Logger
from database import get_db
from sqlalchemy.orm import Session
from firebase_admin import messaging
from models import Notifications, Users, Fcm
from interface.common_response import CommonResponse
from schemas.ResponseSchemas import NotificationResponse
from exceptions.resource_not_found_exception import ResourceNotFoundException
class NotificationServices:
def __init__(self):
self.db:Session = next(get_db())
def createNotification(self, title, message, sender_id, receiver_id):
# validate sender and receiver
sender = self.db.query(Users).filter(Users.id == sender_id).first()
receiver = self.db.query(Users).filter(Users.id == receiver_id).first()
if not sender or not receiver:
raise ResourceNotFoundException("Sender or receiver not found")
notification = Notifications(title=title, message=message, sender_id=sender_id, receiver_id=receiver_id)
self.db.add(notification)
self.db.commit()
return True
def getNotifications(self, user_id, limit: int, offset: int):
notifications = self.db.query(Notifications).filter(Notifications.receiver_id == user_id).limit(limit).offset(offset).all()
total = self.db.query(Notifications).filter(Notifications.receiver_id == user_id).count()
response = CommonResponse(data=[NotificationResponse(**notification.__dict__.copy()) for notification in notifications], total=total)
return response
def deleteNotification(self, notification_id):
notification = self.db.query(Notifications).filter(Notifications.id == notification_id).first()
if not notification:
raise ResourceNotFoundException("Notification not found")
self.db.delete(notification)
self.db.commit()
return True
def updateNotificationStatus(self, notification_id):
notification = self.db.query(Notifications).filter(Notifications.id == notification_id).first()
if not notification:
raise ResourceNotFoundException("Notification not found")
notification.is_read = not notification.is_read
self.db.commit()
return True
def createOrUpdateFCMToken(self,user_id,token):
fcm = self.db.query(Fcm).filter(Fcm.token == token).first()
if fcm is None:
fcm = Fcm(user_id=user_id, token=token)
self.db.add(fcm)
self.db.commit()
return True
fcm.token = token
self.db.commit()
return True
def sendPushNotification(self, token:str, payload):
try:
message = messaging.Message(
notification=messaging.Notification(
title=payload["title"],
body=payload["body"],
),
data={},
token=token,
)
response = messaging.send(message)
return response
except:
Logger.error("Failed to send push notification")