import json import urllib.request from sqlalchemy.orm import Session from services.jwtService import create_jwt_token from services.userServices import UserServices from models import BlockedEmail from utils.password_utils import verify_password from schemas.CreateSchemas import UserCreate from schemas.BaseSchemas import AuthBase from exceptions.unauthorized_exception import UnauthorizedException from database import get_db class AuthService: def __init__(self): self.user_service = UserServices() self.db = next(get_db()) def login(self, data: AuthBase) -> str: # get user user = self.user_service.get_user_by_email(data.email) # verify password if not verify_password(data.password, user.password): raise UnauthorizedException("Invalid credentials") # remove password from user dict user_dict = user.__dict__.copy() user_dict.pop("password", None) # create token token = create_jwt_token(user_dict) return token def register(self, user_data: UserCreate): response = self.user_service.create_user(user_data) user = { "id": response.id, "username": response.username, "email": response.email, "clinicRole": response.clinicRole, "userType": response.userType, "mobile": response.mobile, "clinicId": response.created_clinics[0].id } token = create_jwt_token(user) return token def blockEmailSNS(self, body: str): # confirm subscription if body["Type"] == "SubscriptionConfirmation": urllib.request.urlopen(body["SubscribeURL"]) # disable automatic unsubscribe confirmation by activating subscription again elif body["Type"] == "UnsubscribeConfirmation": urllib.request.urlopen(body["SubscribeURL"]) # handle bounce notifications only elif body["Type"] == "Notification": msg = json.loads(body["Message"]) # check if msg contains notificationType if "notificationType" not in msg: return recepients = msg["bounce"]["bouncedRecipients"] for recipient in recepients: blockEmail = BlockedEmail(email=recipient["emailAddress"], reason=msg["notificationType"], severity=msg["bounce"]["bounceType"]) self.db.add(blockEmail) self.db.commit() return "OK"