from operator import or_ import os import dotenv dotenv.load_dotenv() from interface.common_response import CommonResponse from schemas.ResponseSchemas import UserResponse import datetime import json import urllib.request from services.jwtService import create_jwt_token from services.userServices import UserServices from models import BlockedEmail from services.emailService import EmailService from exceptions.validation_exception import ValidationException from models import OTP from enums.enums import UserType from models import Users from exceptions.resource_not_found_exception import ResourceNotFoundException from models import ResetPasswordTokens from utils.constants import generateOTP from utils.password_utils import generate_reset_password_token, generate_secure_password, hash_password, verify_password from schemas.CreateSchemas import CreateSuperAdmin, UpdateSuperAdmin, UserCreate from schemas.BaseSchemas import AuthBase, AuthOTP from exceptions.unauthorized_exception import UnauthorizedException from database import get_db class AuthService: def __init__(self): self.user_service = UserServices() self.db = next(get_db()) self.email_service = EmailService() self.url = os.getenv("FRONTEND_URL") async def login(self, data: AuthBase) -> str: # get user user = await self.user_service.get_user_by_email(data.email) # verify password if not verify_password(data.password, user.password): raise UnauthorizedException("Invalid credentials") # remove password from user dict user_dict = user.__dict__.copy() user_dict.pop("password", None) # create token token = create_jwt_token(user_dict) return token async def register(self, user_data: UserCreate, background_tasks=None): response = await self.user_service.create_user(user_data, background_tasks) user = { "id": response.id, "username": response.username, "email": response.email, "clinicRole": response.clinicRole, "userType": response.userType, "mobile": response.mobile, "clinicId": response.created_clinics[0].id } token = create_jwt_token(user) return token def blockEmailSNS(self, body: str): # confirm subscription if body["Type"] == "SubscriptionConfirmation": urllib.request.urlopen(body["SubscribeURL"]) # disable automatic unsubscribe confirmation by activating subscription again elif body["Type"] == "UnsubscribeConfirmation": urllib.request.urlopen(body["SubscribeURL"]) # handle bounce notifications only elif body["Type"] == "Notification": msg = json.loads(body["Message"]) # check if msg contains notificationType if "notificationType" not in msg: return recepients = msg["bounce"]["bouncedRecipients"] for recipient in recepients: blockEmail = BlockedEmail(email=recipient["emailAddress"], reason=msg["notificationType"], severity=msg["bounce"]["bounceType"]) self.db.add(blockEmail) self.db.commit() return "OK" async def send_otp(self, email:str): otp = generateOTP() self.email_service.send_otp_email(email, otp) # Create OTP record with proper datetime handling expire_time = datetime.datetime.now() + datetime.timedelta(minutes=10) otp_record = OTP(email=email, otp=otp, expireAt=expire_time) self.db.add(otp_record) self.db.commit() return async def verify_otp(self, data: AuthOTP): db_otp = self.db.query(OTP).filter(OTP.email == data.email, OTP.otp == data.otp).first() if not db_otp: raise ValidationException("Invalid OTP") if db_otp.otp != data.otp: raise ValidationException("Invalid OTP") if db_otp.expireAt < datetime.datetime.now(): raise ValidationException("OTP expired") # OTP is valid, delete it to prevent reuse # self.db.delete(db_otp) # self.db.commit() return async def get_admins(self, user, limit:int, offset:int, search:str): try: if user["userType"] != UserType.SUPER_ADMIN: raise UnauthorizedException("User is not authorized to perform this action") admins = self.db.query(Users).filter(Users.userType == UserType.SUPER_ADMIN) total = self.db.query(Users).filter(Users.userType == UserType.SUPER_ADMIN).count() if search: admins = admins.filter( or_( Users.username.contains(search), Users.email.contains(search), ) ) total = admins.count() admins = admins.limit(limit).offset(offset).all() response = [UserResponse(**admin.__dict__.copy()) for admin in admins] common_response = CommonResponse(data=response, total=total) return common_response except Exception as e: raise e async def create_super_admin(self, user, data: CreateSuperAdmin): if user["userType"] != UserType.SUPER_ADMIN: raise UnauthorizedException("User is not authorized to perform this action") # password = "admin@123" password = generate_secure_password() hashed_password = hash_password(password) # check if username and email are unique existing_user = ( self.db.query(Users) .filter( Users.email == data.email.lower(), ) .first() ) if existing_user: raise ValidationException("User with same email already exists") user = Users( username=data.username.lower(), email=data.email.lower(), password=hashed_password, userType=UserType.SUPER_ADMIN, ) self.db.add(user) self.db.commit() LOGIN_URL = self.url # send email to user self.email_service.send_new_admin_email(data.username, data.email, password, LOGIN_URL) return async def update_super_admin(self, user, user_id: int, data: UpdateSuperAdmin): if user["userType"] != UserType.SUPER_ADMIN: raise UnauthorizedException("User is not authorized to perform this action") user = self.db.query(Users).filter(Users.id == user_id).first() if not user: raise ResourceNotFoundException("User not found") user.username = data.username.lower() self.db.add(user) self.db.commit() return async def delete_super_admin(self, user, user_id: int): if user["userType"] != UserType.SUPER_ADMIN: raise UnauthorizedException("User is not authorized to perform this action") user = self.db.query(Users).filter(Users.id == user_id).first() if not user: raise ResourceNotFoundException("User not found") user.soft_delete(self.db) return async def forget_password(self, email: str): user = self.db.query(Users).filter(Users.email == email.lower()).first() if not user: raise ResourceNotFoundException("User not found") # get reset password token reset_password_token = generate_reset_password_token() reset_password = ResetPasswordTokens(email=email, token=reset_password_token) self.db.add(reset_password) self.db.commit() reset_password_url = f"{self.url}/auth/reset-password?token={reset_password_token}" self.email_service.send_reset_password_email(email, reset_password_url) return async def reset_password(self, token: str, password: str): reset_password = self.db.query(ResetPasswordTokens).filter(ResetPasswordTokens.token == token).first() if not reset_password: raise ResourceNotFoundException("Reset password token not found") user = self.db.query(Users).filter(Users.email == reset_password.email).first() if not user: raise ResourceNotFoundException("User not found") user.password = hash_password(password) self.db.delete(reset_password) self.db.commit() return