health-apps-backend/services/userServices.py

193 lines
6.7 KiB
Python

from loguru import logger
from sqlalchemy.orm import Session
from database import get_db
from models.Users import Users
from exceptions.validation_exception import ValidationException
from schemas.ResponseSchemas import UserResponse
from models import Clinics
from enums.enums import ClinicStatus
from schemas.UpdateSchemas import UserUpdate
from exceptions.unauthorized_exception import UnauthorizedException
from utils.password_utils import hash_password
from schemas.CreateSchemas import UserCreate
from exceptions.resource_not_found_exception import ResourceNotFoundException
class UserServices:
def __init__(self):
self.db: Session = next(get_db())
async def create_user(self, user_data: UserCreate):
# Start a transaction
try:
user = user_data.user
# Check if user with same username or email exists
existing_user = (
self.db.query(Users)
.filter(Users.email == user.email.lower())
.first()
)
if existing_user:
raise ValidationException(
"User with same email already exists"
)
# Create a new user instance
new_user = Users(
username=user.username,
email=user.email.lower(),
password=hash_password(user.password),
clinicRole=user.clinicRole,
userType=user.userType,
)
# Add user to database but don't commit yet
self.db.add(new_user)
# Get clinic data
clinic = user_data.clinic
# cross verify domain, in db
# Convert to lowercase and keep only alphabetic characters, hyphens, and underscores
domain = ''.join(char for char in clinic.name.lower() if char.isalpha() or char == '-' or char == '_')
existing_clinic = self.db.query(Clinics).filter(Clinics.domain == domain).first()
if existing_clinic:
# This will trigger rollback in the exception handler
raise ValidationException("Clinic with same domain already exists")
# Create clinic instance
new_clinic = Clinics(
name=clinic.name,
address=clinic.address,
phone=clinic.phone,
email=clinic.email,
integration=clinic.integration,
pms_id=clinic.pms_id,
practice_name=clinic.practice_name,
logo=clinic.logo,
country=clinic.country,
postal_code=clinic.postal_code,
city=clinic.city,
state=clinic.state,
abn_doc=clinic.abn_doc,
abn_number=clinic.abn_number,
contract_doc=clinic.contract_doc,
clinic_phone=clinic.clinic_phone,
is_clinic_phone_enabled=clinic.is_clinic_phone_enabled,
other_info=clinic.other_info,
greeting_msg=clinic.greeting_msg,
voice_model=clinic.voice_model,
voice_model_provider=clinic.voice_model_provider,
voice_model_gender=clinic.voice_model_gender,
scenarios=clinic.scenarios,
general_info=clinic.general_info,
status=ClinicStatus.UNDER_REVIEW,
domain=domain,
)
# Add clinic to database
self.db.add(new_clinic)
# Now commit both user and clinic in a single transaction
self.db.commit()
return
except Exception as e:
logger.error(f"Error creating user: {str(e)}")
# Rollback the transaction if any error occurs
self.db.rollback()
if isinstance(e, ValidationException):
raise ValidationException(e.message)
if isinstance(e, ResourceNotFoundException):
raise ResourceNotFoundException(e.message)
raise e
def get_user(self, user_id) -> UserResponse:
# Query the user by ID
user = self.db.query(Users).filter(Users.id == user_id).first()
if not user:
logger.error("User not found")
raise ResourceNotFoundException("User not found")
user_dict = user.__dict__.copy()
user_response = UserResponse(**user_dict).model_dump()
return user_response
def get_users(self, limit:int, offset:int, search:str):
query = self.db.query(Users)
if search:
query = query.filter(
or_(
Users.username.contains(search),
Users.email.contains(search),
Users.clinicRole.contains(search),
Users.userType.contains(search)
)
)
users = query.limit(limit).offset(offset).all()
user_response = [UserResponse(**user.__dict__.copy()) for user in users]
return user_response
async def get_user_by_email(self, email: str) -> UserResponse:
user = self.db.query(Users).filter(Users.email == email.lower()).first()
if not user:
logger.error("User not found")
raise ResourceNotFoundException("User not found")
user_dict = user.__dict__.copy()
user_response = UserResponse(**user_dict)
return user_response
def update_user(self, admin_id:int|None, user_id: int, user_data: UserUpdate):
if admin_id:
admin = self.db.query(Users).filter(Users.id == admin_id).first()
if not admin:
logger.error("Admin not found")
raise ResourceNotFoundException("Admin not found")
if admin.userType != UserType.ADMIN:
logger.error("User is not authorized to perform this action")
raise UnauthorizedException("User is not authorized to perform this action")
user = self.db.query(Users).filter(Users.id == user_id).first()
if not user:
logger.error("User not found")
raise ResourceNotFoundException("User not found")
user.username = user_data.username
user.clinicRole = user_data.clinicRole
user.userType = user_data.userType
user.profile_pic = user_data.profile_pic
self.db.add(user)
self.db.commit()
return user
def delete_user(self, user_id: int):
user = self.db.query(Users).filter(Users.id == user_id).first()
if not user:
logger.error("User not found")
raise ResourceNotFoundException("User not found")
# Use the soft_delete method from CustomBase
user.soft_delete(self.db)
return True