health-apps-backend/services/clinicDoctorsServices.py

258 lines
9.9 KiB
Python

from loguru import logger
from schemas.CreateSchemas import ClinicDoctorCreate
from schemas.UpdateSchemas import ClinicDoctorUpdate
from schemas.ResponseSchemas import ClinicDoctorResponse, MasterAppointmentTypeResponse
from database import get_db
from models import ClinicDoctors
from sqlalchemy.orm import Session, joinedload, selectinload
from services.clinicServices import ClinicServices
from exceptions import ResourceNotFoundException
from interface.common_response import CommonResponse
from sqlalchemy import func, or_, cast, String
from enums.enums import ClinicDoctorStatus, UserType
from models import MasterAppointmentTypes, AppointmentRelations
from utils.constants import DEFAULT_ORDER, DEFAULT_ORDER_BY
class ClinicDoctorsServices:
def __init__(self):
self.db: Session = next(get_db())
self.clinic_services = ClinicServices()
self.logger = logger
async def create_clinic_doctor(
self, user, clinic_doctor: ClinicDoctorCreate
) -> ClinicDoctorResponse:
try:
if user["userType"] != UserType.CLINIC_ADMIN:
self.logger.error("user is not clinic admin")
raise ResourceNotFoundException(
"You are not authorized to perform this action"
)
if not user["created_clinics"][0]["id"]:
self.logger.error("user has no clinics")
raise ResourceNotFoundException(
"You are not authorized to perform this action"
)
# verify all appointment types exist in a single query
if clinic_doctor.appointmentTypes:
existing_types = {
t.id
for t in self.db.query(MasterAppointmentTypes.id)
.filter(
MasterAppointmentTypes.id.in_(clinic_doctor.appointmentTypes)
)
.all()
}
missing_types = set(clinic_doctor.appointmentTypes) - existing_types
if missing_types:
raise ResourceNotFoundException(
f"Appointment types not found: {', '.join(map(str, missing_types))}"
)
# check if clinic exists
await self.clinic_services.get_clinic_by_id(user["created_clinics"][0]["id"])
# exclude appointmentTypes from clinic_doctor
clinic_doctor_db = ClinicDoctors(
name=clinic_doctor.name,
clinic_id=user["created_clinics"][0]["id"],
role=clinic_doctor.role,
status=ClinicDoctorStatus.ACTIVE,
)
self.db.add(clinic_doctor_db)
self.db.flush()
# create clinic doctor appointment types
for appointment_type_id in clinic_doctor.appointmentTypes:
clinic_doctor_appointment_type = AppointmentRelations(
clinic_doctor_id=clinic_doctor_db.id,
appointment_type_id=appointment_type_id,
)
self.db.add(clinic_doctor_appointment_type)
self.db.commit()
return
except Exception as e:
self.logger.error(e)
self.db.rollback()
raise e
async def update_clinic_doctor(
self, user, clinic_doctor_id: int, clinic_doctor_data: ClinicDoctorUpdate
) -> ClinicDoctorResponse:
try:
if user["userType"] != UserType.CLINIC_ADMIN:
self.logger.error("user is not clinic admin")
raise ResourceNotFoundException(
"You are not authorized to perform this action"
)
if not user["created_clinics"][0]["id"]:
self.logger.error("user has no clinics")
raise ResourceNotFoundException(
"You are not authorized to perform this action"
)
# verify all appointment types exist in a single query
if clinic_doctor_data.appointmentTypes:
existing_types = {
t.id
for t in self.db.query(MasterAppointmentTypes.id)
.filter(
MasterAppointmentTypes.id.in_(
clinic_doctor_data.appointmentTypes
)
)
.all()
}
missing_types = set(clinic_doctor_data.appointmentTypes) - existing_types
if missing_types:
raise ResourceNotFoundException(
f"Appointment types not found: {', '.join(map(str, missing_types))}"
)
# check if clinic doctor exists
clinic_doctor = (
self.db.query(ClinicDoctors)
.filter(ClinicDoctors.id == clinic_doctor_id)
.first()
)
if clinic_doctor is None:
raise ResourceNotFoundException("Clinic doctor not found")
# Update the existing object with new values
update_data = clinic_doctor_data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(clinic_doctor, key, value)
self.db.add(clinic_doctor)
# delete existing clinic doctor appointment types
self.db.query(AppointmentRelations).filter(
AppointmentRelations.clinic_doctor_id == clinic_doctor_id
).delete()
# create clinic doctor appointment types
for appointment_type_id in clinic_doctor_data.appointmentTypes:
clinic_doctor_appointment_type = AppointmentRelations(
clinic_doctor_id=clinic_doctor_id,
appointment_type_id=appointment_type_id,
)
self.db.add(clinic_doctor_appointment_type)
self.db.commit()
return
except Exception as e:
self.db.rollback()
raise e
async def delete_clinic_doctor(self, clinic_doctor_id: int):
clinic_doctor = (
self.db.query(ClinicDoctors)
.filter(ClinicDoctors.id == clinic_doctor_id)
.first()
)
self.db.delete(clinic_doctor)
self.db.commit()
async def get_doctor_status_count(self):
# Query to count doctors by status
status_counts = (
self.db.query(
ClinicDoctors.status, func.count(ClinicDoctors.id).label("count")
)
.group_by(ClinicDoctors.status)
.all()
)
# Initialize result dictionary with all possible statuses set to 0
result = {status.value: 0 for status in ClinicDoctorStatus}
# Update with actual counts from the query
for status, count in status_counts:
result[status.value] = count
return result
async def get_clinic_doctors(self, limit: int, offset: int, search: str = "", sort_by: str = DEFAULT_ORDER, sort_order: str = DEFAULT_ORDER_BY):
try:
clinic_doctors_query = (
self.db.query(ClinicDoctors)
.options(
selectinload(ClinicDoctors.appointmentRelations)
.selectinload(AppointmentRelations.masterAppointmentTypes)
)
.order_by(
getattr(ClinicDoctors, sort_by).desc()
if sort_order == "desc"
else getattr(ClinicDoctors, sort_by).asc()
)
)
total = self.db.query(ClinicDoctors).count()
if search:
clinic_doctors_query = clinic_doctors_query.filter(
or_(
ClinicDoctors.name.ilike(f"%{search}%"),
cast(ClinicDoctors.role, String).ilike(f"%{search}%"),
ClinicDoctors.appointmentRelations.any(
AppointmentRelations.masterAppointmentTypes.has(
MasterAppointmentTypes.type.ilike(f"%{search}%")
)
)
)
)
total = clinic_doctors_query.count()
clinic_doctors = clinic_doctors_query.limit(limit).offset(offset).all()
# Build response data manually to include appointment types
response_data = []
for clinic_doctor in clinic_doctors:
# Extract appointment types from the relationships
appointment_types = []
for relation in clinic_doctor.appointmentRelations:
if relation.masterAppointmentTypes:
appointment_types.append(
MasterAppointmentTypeResponse(
id=relation.masterAppointmentTypes.id,
type=relation.masterAppointmentTypes.type,
create_time=relation.masterAppointmentTypes.create_time,
update_time=relation.masterAppointmentTypes.update_time
)
)
# Create the clinic doctor response
clinic_doctor_data = ClinicDoctorResponse(
id=clinic_doctor.id,
name=clinic_doctor.name,
role=clinic_doctor.role,
status=clinic_doctor.status,
create_time=clinic_doctor.create_time,
update_time=clinic_doctor.update_time,
appointmentTypes=appointment_types
)
response_data.append(clinic_doctor_data)
response = CommonResponse(
data=response_data,
total=total,
)
return response
except Exception as e:
self.logger.error(e)
raise e