359 lines
14 KiB
Python
359 lines
14 KiB
Python
from loguru import logger
|
|
from sqlalchemy.orm import Session
|
|
|
|
from database import get_db
|
|
from models.Users import Users
|
|
from exceptions.validation_exception import ValidationException
|
|
from schemas.ResponseSchemas import UserResponse
|
|
from models import Clinics
|
|
from enums.enums import ClinicStatus, UserType
|
|
from schemas.UpdateSchemas import UserUpdate
|
|
from exceptions.unauthorized_exception import UnauthorizedException
|
|
from interface.common_response import CommonResponse
|
|
from models import ClinicFileVerifications, StripeUsers
|
|
from services.stripeServices import StripeServices
|
|
from utils.password_utils import hash_password
|
|
from schemas.CreateSchemas import UserCreate
|
|
from exceptions.resource_not_found_exception import ResourceNotFoundException
|
|
from exceptions.db_exceptions import DBExceptionHandler
|
|
from sqlalchemy.orm import joinedload
|
|
from services.emailService import EmailService
|
|
from services.clinicServices import ClinicServices
|
|
from services.dashboardService import DashboardService
|
|
class UserServices:
|
|
def __init__(self):
|
|
self.db: Session = next(get_db())
|
|
self.email_service = EmailService()
|
|
self.stripe_service = StripeServices()
|
|
self.clinic_service = ClinicServices()
|
|
self.dashboard_service = DashboardService()
|
|
|
|
async def create_user(self, user_data: UserCreate, background_tasks=None):
|
|
|
|
stripe_customer = None
|
|
stripe_account = None
|
|
|
|
# Start a transaction
|
|
try:
|
|
user = user_data.user
|
|
# Check if user with same username or email exists
|
|
existing_user = (
|
|
self.db.query(Users)
|
|
.filter(Users.email == user.email.lower())
|
|
.first()
|
|
)
|
|
|
|
if existing_user:
|
|
raise ValidationException(
|
|
"User with same email already exists"
|
|
)
|
|
|
|
# Create a new user instance
|
|
new_user = Users(
|
|
username=user.username,
|
|
email=user.email.lower(),
|
|
password=hash_password(user.password),
|
|
clinicRole=user.clinicRole,
|
|
userType=user.userType,
|
|
mobile=user.mobile
|
|
)
|
|
|
|
# Add user to database but don't commit yet
|
|
self.db.add(new_user)
|
|
self.db.flush() # Flush to get the user ID without committing
|
|
|
|
# Create stripe customer
|
|
stripe_customer = await self.stripe_service.create_customer(new_user.id, user.email, user.username)
|
|
|
|
# Create stripe account
|
|
stripe_account = await self.stripe_service.create_account(new_user.id, user.email, user.username, user.mobile)
|
|
|
|
# Create stripe user
|
|
stripe_user = StripeUsers(
|
|
user_id=new_user.id,
|
|
customer_id=stripe_customer.id,
|
|
account_id=stripe_account.id
|
|
)
|
|
self.db.add(stripe_user)
|
|
|
|
# Get clinic data
|
|
clinic = user_data.clinic
|
|
|
|
# cross verify domain, in db
|
|
# Convert to lowercase and keep only alphanumeric characters, hyphens, and underscores
|
|
domain = ''.join(char for char in clinic.name.lower() if char.isalnum() or char == '-' or char == '_')
|
|
existing_clinic = self.db.query(Clinics).filter(Clinics.domain == domain).first()
|
|
|
|
if existing_clinic:
|
|
# This will trigger rollback in the exception handler
|
|
raise ValidationException("Clinic with same domain already exists")
|
|
|
|
# Create clinic instance
|
|
new_clinic = Clinics(
|
|
name=clinic.name,
|
|
address=clinic.address,
|
|
phone=clinic.phone,
|
|
email=clinic.email,
|
|
integration=clinic.integration,
|
|
pms_id=clinic.pms_id,
|
|
practice_name=clinic.practice_name,
|
|
logo=clinic.logo,
|
|
country=clinic.country,
|
|
postal_code=clinic.postal_code,
|
|
city=clinic.city,
|
|
state=clinic.state,
|
|
abn_doc=clinic.abn_doc,
|
|
abn_number=clinic.abn_number,
|
|
contract_doc=clinic.contract_doc,
|
|
clinic_phone=clinic.clinic_phone,
|
|
is_clinic_phone_enabled=clinic.is_clinic_phone_enabled,
|
|
other_info=clinic.other_info,
|
|
greeting_msg=clinic.greeting_msg,
|
|
voice_model=clinic.voice_model,
|
|
voice_model_provider=clinic.voice_model_provider,
|
|
voice_model_gender=clinic.voice_model_gender,
|
|
scenarios=clinic.scenarios,
|
|
general_info=clinic.general_info,
|
|
status=ClinicStatus.PAYMENT_DUE, #TODO: change this to PAYMENT_DUE
|
|
domain=domain,
|
|
creator_id=new_user.id, # Set the creator_id to link the clinic to the user who created it
|
|
)
|
|
|
|
# Add clinic to database
|
|
self.db.add(new_clinic)
|
|
self.db.flush()
|
|
|
|
# Create clinic files
|
|
clinic_files = ClinicFileVerifications(
|
|
clinic_id=new_clinic.id,
|
|
abn_doc_is_verified=None,
|
|
contract_doc_is_verified=None,
|
|
last_changed_by=new_user.id
|
|
)
|
|
|
|
# Add clinic files to database
|
|
self.db.add(clinic_files)
|
|
|
|
# Send mail to admin in a non-blocking way using background tasks
|
|
if background_tasks:
|
|
background_tasks.add_task(self._send_emails_to_admins, clinic.email)
|
|
|
|
offer = await self.clinic_service.get_clinic_offer_by_clinic_email(clinic.email)
|
|
|
|
signup_pricing = await self.dashboard_service.get_signup_pricing_master()
|
|
|
|
fees_to_be = {
|
|
"setup_fees": signup_pricing.setup_fees,
|
|
"subscription_fees": signup_pricing.subscription_fees,
|
|
"per_call_charges": signup_pricing.per_call_charges,
|
|
"total": signup_pricing.setup_fees + signup_pricing.subscription_fees + signup_pricing.per_call_charges
|
|
}
|
|
|
|
if offer:
|
|
fees_to_be["setup_fees"] = offer.setup_fees
|
|
fees_to_be["per_call_charges"] = offer.per_call_charges
|
|
fees_to_be["total"] = offer.setup_fees + fees_to_be["subscription_fees"] + offer.per_call_charges
|
|
|
|
payment_link = await self.stripe_service.create_subscription_checkout(fees_to_be, new_clinic.id, stripe_account.id,stripe_customer.id)
|
|
|
|
self.db.commit()
|
|
|
|
# Convert the user object to a dictionary before the session is closed
|
|
user_dict = {
|
|
"id": new_user.id,
|
|
"username": new_user.username,
|
|
"email": new_user.email,
|
|
"clinicRole": new_user.clinicRole,
|
|
"userType": new_user.userType,
|
|
"mobile": new_user.mobile,
|
|
"clinicId": new_clinic.id
|
|
}
|
|
|
|
return {
|
|
"url": payment_link.url,
|
|
"user": user_dict,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating user: {str(e)}")
|
|
# Rollback the transaction if any error occurs
|
|
self.db.rollback()
|
|
|
|
# Delete stripe customer and account
|
|
if stripe_customer:
|
|
await self.stripe_service.delete_customer(stripe_customer.id)
|
|
if stripe_account:
|
|
await self.stripe_service.delete_account(stripe_account.id)
|
|
|
|
# Use the centralized exception handler
|
|
DBExceptionHandler.handle_exception(e, context="creating user")
|
|
finally:
|
|
self.db.close()
|
|
|
|
|
|
async def get_user(self, user_id) -> UserResponse:
|
|
try:
|
|
# Query the user by ID and explicitly load the created clinics relationship
|
|
user = self.db.query(Users).options(joinedload(Users.created_clinics)).filter(Users.id == user_id).first()
|
|
|
|
if not user:
|
|
logger.error("User not found")
|
|
raise ResourceNotFoundException("User not found")
|
|
|
|
# First convert the user to a dictionary
|
|
user_dict = {}
|
|
for column in user.__table__.columns:
|
|
user_dict[column.name] = getattr(user, column.name)
|
|
|
|
# Convert created clinics to dictionaries
|
|
if user.created_clinics:
|
|
clinics_list = []
|
|
for clinic in user.created_clinics:
|
|
clinic_dict = {}
|
|
for column in clinic.__table__.columns:
|
|
clinic_dict[column.name] = getattr(clinic, column.name)
|
|
clinics_list.append(clinic_dict)
|
|
user_dict['created_clinics'] = clinics_list
|
|
|
|
# Create the user response
|
|
user_response = UserResponse.model_validate(user_dict)
|
|
|
|
# Return the response as a dictionary
|
|
return user_response.model_dump()
|
|
except Exception as e:
|
|
DBExceptionHandler.handle_exception(e, context="getting user")
|
|
finally:
|
|
self.db.close()
|
|
|
|
async def get_users(self, limit:int, offset:int, search:str):
|
|
try:
|
|
query = self.db.query(Users)
|
|
if search:
|
|
query = query.filter(
|
|
or_(
|
|
Users.username.contains(search),
|
|
Users.email.contains(search),
|
|
Users.clinicRole.contains(search),
|
|
Users.userType.contains(search)
|
|
)
|
|
)
|
|
|
|
users = query.limit(limit).offset(offset).all()
|
|
|
|
total = self.db.query(Users).count()
|
|
|
|
response = CommonResponse(data=[UserResponse(**user.__dict__.copy()) for user in users], total=total)
|
|
|
|
return response
|
|
except Exception as e:
|
|
DBExceptionHandler.handle_exception(e, context="getting users")
|
|
finally:
|
|
self.db.close()
|
|
|
|
async def get_user_by_email(self, email: str) -> UserResponse:
|
|
try:
|
|
user = self.db.query(Users).filter(Users.email == email.lower()).first()
|
|
|
|
if not user:
|
|
logger.error("User not found")
|
|
raise ResourceNotFoundException("User not found")
|
|
|
|
user_dict = user.__dict__.copy()
|
|
|
|
user_response = UserResponse(**user_dict)
|
|
|
|
return user_response
|
|
except Exception as e:
|
|
DBExceptionHandler.handle_exception(e, context="getting user by email")
|
|
finally:
|
|
self.db.close()
|
|
|
|
async def update_user(self, admin_id:int|None, user_id: int, user_data: UserUpdate) -> UserResponse:
|
|
try:
|
|
# Check admin authorization if admin_id is provided
|
|
if admin_id:
|
|
admin = self.db.query(Users).filter(Users.id == admin_id).first()
|
|
if not admin:
|
|
logger.error("Admin not found")
|
|
raise ResourceNotFoundException("Admin not found")
|
|
|
|
# Only check admin type if admin_id was provided
|
|
if admin.userType != UserType.SUPER_ADMIN:
|
|
logger.error("User is not authorized to perform this action")
|
|
raise UnauthorizedException("User is not authorized to perform this action")
|
|
|
|
# Find the user to update
|
|
user = self.db.query(Users).filter(Users.id == user_id).first()
|
|
if not user:
|
|
logger.error("User not found")
|
|
raise ResourceNotFoundException("User not found")
|
|
|
|
# Update only the fields that were provided
|
|
update_data = user_data.model_dump(exclude_unset=True)
|
|
for key, value in update_data.items():
|
|
setattr(user, key, value)
|
|
|
|
self.db.add(user)
|
|
self.db.commit()
|
|
self.db.refresh(user)
|
|
|
|
# Return properly serialized response
|
|
return UserResponse.model_validate(user)
|
|
except Exception as e:
|
|
DBExceptionHandler.handle_exception(e, context="updating user")
|
|
finally:
|
|
self.db.close()
|
|
|
|
async def delete_user(self, user_id: int):
|
|
try:
|
|
user = self.db.query(Users).filter(Users.id == user_id).first()
|
|
|
|
if not user:
|
|
logger.error("User not found")
|
|
raise ResourceNotFoundException("User not found")
|
|
|
|
# Use the soft_delete method from CustomBase
|
|
user.soft_delete(self.db)
|
|
|
|
return True
|
|
except Exception as e:
|
|
DBExceptionHandler.handle_exception(e, context="deleting user")
|
|
finally:
|
|
self.db.close()
|
|
|
|
async def get_super_admins(self):
|
|
try:
|
|
return self.db.query(Users).filter(Users.userType == UserType.SUPER_ADMIN).all()
|
|
except Exception as e:
|
|
DBExceptionHandler.handle_exception(e, context="getting super admins")
|
|
finally:
|
|
self.db.close()
|
|
|
|
async def _send_emails_to_admins(self, clinic_name):
|
|
"""Helper method to send emails to all super admins"""
|
|
try:
|
|
admins = await self.get_super_admins()
|
|
for admin in admins:
|
|
self.email_service.send_new_clinic_email(
|
|
to_address=admin.email,
|
|
clinic_name=clinic_name
|
|
)
|
|
except Exception as e:
|
|
# Log the error but don't interrupt the main flow
|
|
logger.error(f"Error sending admin emails: {str(e)}")
|
|
finally:
|
|
self.db.close()
|
|
|
|
async def create_payment_link(self, user_id: int):
|
|
try:
|
|
user = self.db.query(Users).filter(Users.id == user_id).first()
|
|
|
|
if not user:
|
|
logger.error("User not found")
|
|
raise ResourceNotFoundException("User not found")
|
|
|
|
return self.stripe_service.create_payment_link(user_id)
|
|
except Exception as e:
|
|
DBExceptionHandler.handle_exception(e, context="creating payment link")
|
|
finally:
|
|
self.db.close() |