import datetime import json import urllib.request from sqlalchemy.orm import Session from services.jwtService import create_jwt_token from services.userServices import UserServices from models import BlockedEmail from services.emailService import EmailService from exceptions.validation_exception import ValidationException from models import OTP from utils.constants import generateOTP from utils.password_utils import verify_password from schemas.CreateSchemas import UserCreate from schemas.BaseSchemas import AuthBase, AuthOTP from exceptions.unauthorized_exception import UnauthorizedException from database import get_db class AuthService: def __init__(self): self.user_service = UserServices() self.db = next(get_db()) self.email_service = EmailService() def login(self, data: AuthBase) -> str: # get user user = self.user_service.get_user_by_email(data.email) # verify password if not verify_password(data.password, user.password): raise UnauthorizedException("Invalid credentials") # remove password from user dict user_dict = user.__dict__.copy() user_dict.pop("password", None) # create token token = create_jwt_token(user_dict) return token def register(self, user_data: UserCreate, background_tasks=None): response = self.user_service.create_user(user_data, background_tasks) user = { "id": response.id, "username": response.username, "email": response.email, "clinicRole": response.clinicRole, "userType": response.userType, "mobile": response.mobile, "clinicId": response.created_clinics[0].id } token = create_jwt_token(user) return token def blockEmailSNS(self, body: str): # confirm subscription if body["Type"] == "SubscriptionConfirmation": urllib.request.urlopen(body["SubscribeURL"]) # disable automatic unsubscribe confirmation by activating subscription again elif body["Type"] == "UnsubscribeConfirmation": urllib.request.urlopen(body["SubscribeURL"]) # handle bounce notifications only elif body["Type"] == "Notification": msg = json.loads(body["Message"]) # check if msg contains notificationType if "notificationType" not in msg: return recepients = msg["bounce"]["bouncedRecipients"] for recipient in recepients: blockEmail = BlockedEmail(email=recipient["emailAddress"], reason=msg["notificationType"], severity=msg["bounce"]["bounceType"]) self.db.add(blockEmail) self.db.commit() return "OK" def send_otp(self, email:str): otp = generateOTP() self.email_service.send_otp_email(email, otp) # Create OTP record with proper datetime handling expire_time = datetime.datetime.now() + datetime.timedelta(minutes=10) otp_record = OTP(email=email, otp=otp, expireAt=expire_time) self.db.add(otp_record) self.db.commit() return def verify_otp(self, data: AuthOTP): db_otp = self.db.query(OTP).filter(OTP.email == data.email, OTP.otp == data.otp).first() if not db_otp: raise ValidationException("Invalid OTP") if db_otp.otp != data.otp: raise ValidationException("Invalid OTP") if db_otp.expireAt < datetime.datetime.now(): raise ValidationException("OTP expired") # OTP is valid, delete it to prevent reuse # self.db.delete(db_otp) # self.db.commit() return