from loguru import logger from sqlalchemy.orm import Session from database import get_db from models.Users import Users from exceptions.validation_exception import ValidationException from schemas.ResponseSchemas import UserResponse from models import Clinics from enums.enums import ClinicStatus, UserType from schemas.UpdateSchemas import UserUpdate from exceptions.unauthorized_exception import UnauthorizedException from interface.common_response import CommonResponse from models import ClinicFileVerifications, StripeUsers from services.stripeServices import StripeServices from utils.password_utils import hash_password from schemas.CreateSchemas import UserCreate from exceptions.resource_not_found_exception import ResourceNotFoundException from exceptions.db_exceptions import DBExceptionHandler from sqlalchemy.orm import joinedload from services.emailService import EmailService from services.clinicServices import ClinicServices from services.dashboardService import DashboardService class UserServices: def __init__(self): self.db: Session = next(get_db()) self.email_service = EmailService() self.stripe_service = StripeServices() self.clinic_service = ClinicServices() self.dashboard_service = DashboardService() async def create_user(self, user_data: UserCreate, background_tasks=None): stripe_customer = None stripe_account = None # Start a transaction try: user = user_data.user # Check if user with same username or email exists existing_user = ( self.db.query(Users) .filter(Users.email == user.email.lower()) .first() ) if existing_user: raise ValidationException( "User with same email already exists" ) # Create a new user instance new_user = Users( username=user.username, email=user.email.lower(), password=hash_password(user.password), clinicRole=user.clinicRole, userType=user.userType, mobile=user.mobile ) # Add user to database but don't commit yet self.db.add(new_user) self.db.flush() # Flush to get the user ID without committing # Create stripe customer stripe_customer = await self.stripe_service.create_customer(new_user.id, user.email, user.username) # Create stripe account stripe_account = await self.stripe_service.create_account(new_user.id, user.email, user.username, user.mobile) # Create stripe user stripe_user = StripeUsers( user_id=new_user.id, customer_id=stripe_customer.id, account_id=stripe_account.id ) self.db.add(stripe_user) # Get clinic data clinic = user_data.clinic # cross verify domain, in db # Convert to lowercase and keep only alphanumeric characters, hyphens, and underscores domain = ''.join(char for char in clinic.name.lower() if char.isalnum() or char == '-' or char == '_') existing_clinic = self.db.query(Clinics).filter(Clinics.domain == domain).first() if existing_clinic: # This will trigger rollback in the exception handler raise ValidationException("Clinic with same domain already exists") # Create clinic instance new_clinic = Clinics( name=clinic.name, address=clinic.address, phone=clinic.phone, email=clinic.email, integration=clinic.integration, pms_id=clinic.pms_id, practice_name=clinic.practice_name, logo=clinic.logo, country=clinic.country, postal_code=clinic.postal_code, city=clinic.city, state=clinic.state, abn_doc=clinic.abn_doc, abn_number=clinic.abn_number, contract_doc=clinic.contract_doc, clinic_phone=clinic.clinic_phone, is_clinic_phone_enabled=clinic.is_clinic_phone_enabled, other_info=clinic.other_info, greeting_msg=clinic.greeting_msg, voice_model=clinic.voice_model, voice_model_provider=clinic.voice_model_provider, voice_model_gender=clinic.voice_model_gender, scenarios=clinic.scenarios, general_info=clinic.general_info, status=ClinicStatus.PAYMENT_DUE, #TODO: change this to PAYMENT_DUE domain=domain, creator_id=new_user.id, # Set the creator_id to link the clinic to the user who created it ) # Add clinic to database self.db.add(new_clinic) self.db.flush() # Create clinic files clinic_files = ClinicFileVerifications( clinic_id=new_clinic.id, abn_doc_is_verified=False, contract_doc_is_verified=False, last_changed_by=new_user.id ) # Add clinic files to database self.db.add(clinic_files) # Send mail to admin in a non-blocking way using background tasks if background_tasks: background_tasks.add_task(self._send_emails_to_admins, clinic.email) offer = await self.clinic_service.get_clinic_offer_by_clinic_email(clinic.email) signup_pricing = await self.dashboard_service.get_signup_pricing_master() fees_to_be = { "setup_fees": signup_pricing.setup_fees, "subscription_fees": signup_pricing.subscription_fees, "per_call_charges": signup_pricing.per_call_charges, "total": signup_pricing.setup_fees + signup_pricing.subscription_fees + signup_pricing.per_call_charges } if offer: fees_to_be["setup_fees"] = offer.setup_fees fees_to_be["per_call_charges"] = offer.per_call_charges fees_to_be["total"] = offer.setup_fees + fees_to_be["subscription_fees"] + offer.per_call_charges payment_link = await self.stripe_service.create_subscription_checkout(fees_to_be, new_clinic.id, stripe_account.id,stripe_customer.id) self.db.commit() # Convert the user object to a dictionary before the session is closed user_dict = { "id": new_user.id, "username": new_user.username, "email": new_user.email, "clinicRole": new_user.clinicRole, "userType": new_user.userType, "mobile": new_user.mobile, "clinicId": new_clinic.id } return { "url": payment_link.url, "user": user_dict, } except Exception as e: logger.error(f"Error creating user: {str(e)}") # Rollback the transaction if any error occurs self.db.rollback() # Delete stripe customer and account if stripe_customer: await self.stripe_service.delete_customer(stripe_customer.id) if stripe_account: await self.stripe_service.delete_account(stripe_account.id) # Use the centralized exception handler DBExceptionHandler.handle_exception(e, context="creating user") finally: self.db.close() async def get_user(self, user_id) -> UserResponse: try: # Query the user by ID and explicitly load the created clinics relationship user = self.db.query(Users).options(joinedload(Users.created_clinics)).filter(Users.id == user_id).first() if not user: logger.error("User not found") raise ResourceNotFoundException("User not found") # First convert the user to a dictionary user_dict = {} for column in user.__table__.columns: user_dict[column.name] = getattr(user, column.name) # Convert created clinics to dictionaries if user.created_clinics: clinics_list = [] for clinic in user.created_clinics: clinic_dict = {} for column in clinic.__table__.columns: clinic_dict[column.name] = getattr(clinic, column.name) clinics_list.append(clinic_dict) user_dict['created_clinics'] = clinics_list # Create the user response user_response = UserResponse.model_validate(user_dict) # Return the response as a dictionary return user_response.model_dump() except Exception as e: DBExceptionHandler.handle_exception(e, context="getting user") finally: self.db.close() async def get_users(self, limit:int, offset:int, search:str): try: query = self.db.query(Users) if search: query = query.filter( or_( Users.username.contains(search), Users.email.contains(search), Users.clinicRole.contains(search), Users.userType.contains(search) ) ) users = query.limit(limit).offset(offset).all() total = self.db.query(Users).count() response = CommonResponse(data=[UserResponse(**user.__dict__.copy()) for user in users], total=total) return response except Exception as e: DBExceptionHandler.handle_exception(e, context="getting users") finally: self.db.close() async def get_user_by_email(self, email: str) -> UserResponse: try: user = self.db.query(Users).filter(Users.email == email.lower()).first() if not user: logger.error("User not found") raise ResourceNotFoundException("User not found") user_dict = user.__dict__.copy() user_response = UserResponse(**user_dict) return user_response except Exception as e: DBExceptionHandler.handle_exception(e, context="getting user by email") finally: self.db.close() async def update_user(self, admin_id:int|None, user_id: int, user_data: UserUpdate) -> UserResponse: try: # Check admin authorization if admin_id is provided if admin_id: admin = self.db.query(Users).filter(Users.id == admin_id).first() if not admin: logger.error("Admin not found") raise ResourceNotFoundException("Admin not found") # Only check admin type if admin_id was provided if admin.userType != UserType.SUPER_ADMIN: logger.error("User is not authorized to perform this action") raise UnauthorizedException("User is not authorized to perform this action") # Find the user to update user = self.db.query(Users).filter(Users.id == user_id).first() if not user: logger.error("User not found") raise ResourceNotFoundException("User not found") # Update only the fields that were provided update_data = user_data.model_dump(exclude_unset=True) for key, value in update_data.items(): setattr(user, key, value) self.db.add(user) self.db.commit() self.db.refresh(user) # Return properly serialized response return UserResponse.model_validate(user) except Exception as e: DBExceptionHandler.handle_exception(e, context="updating user") finally: self.db.close() async def delete_user(self, user_id: int): try: user = self.db.query(Users).filter(Users.id == user_id).first() if not user: logger.error("User not found") raise ResourceNotFoundException("User not found") # Use the soft_delete method from CustomBase user.soft_delete(self.db) return True except Exception as e: DBExceptionHandler.handle_exception(e, context="deleting user") finally: self.db.close() async def get_super_admins(self): try: return self.db.query(Users).filter(Users.userType == UserType.SUPER_ADMIN).all() except Exception as e: DBExceptionHandler.handle_exception(e, context="getting super admins") finally: self.db.close() async def _send_emails_to_admins(self, clinic_name): """Helper method to send emails to all super admins""" try: admins = await self.get_super_admins() for admin in admins: self.email_service.send_new_clinic_email( to_address=admin.email, clinic_name=clinic_name ) except Exception as e: # Log the error but don't interrupt the main flow logger.error(f"Error sending admin emails: {str(e)}") finally: self.db.close() async def create_payment_link(self, user_id: int): try: user = self.db.query(Users).filter(Users.id == user_id).first() if not user: logger.error("User not found") raise ResourceNotFoundException("User not found") return self.stripe_service.create_payment_link(user_id) except Exception as e: DBExceptionHandler.handle_exception(e, context="creating payment link") finally: self.db.close()