80 lines
3.2 KiB
Python
80 lines
3.2 KiB
Python
from schemas.CreateSchemas import ClinicDoctorCreate
|
|
from schemas.UpdateSchemas import ClinicDoctorUpdate
|
|
from schemas.ResponseSchemas import ClinicDoctorResponse
|
|
from database import get_db
|
|
from models import ClinicDoctors
|
|
from sqlalchemy.orm import Session
|
|
from services.clinicServices import ClinicServices
|
|
from exceptions import ResourceNotFoundException
|
|
from interface.common_response import CommonResponse
|
|
|
|
class ClinicDoctorsServices:
|
|
def __init__(self):
|
|
self.db: Session = next(get_db())
|
|
self.clinic_services = ClinicServices()
|
|
|
|
def create_clinic_doctor(self, clinic_doctor: ClinicDoctorCreate) -> ClinicDoctorResponse:
|
|
|
|
# check if clinic exists
|
|
self.clinic_services.get_clinic_by_id(clinic_doctor.clinic_id)
|
|
|
|
clinic_doctor = ClinicDoctors(**clinic_doctor.model_dump())
|
|
self.db.add(clinic_doctor)
|
|
self.db.commit()
|
|
self.db.refresh(clinic_doctor)
|
|
return ClinicDoctorResponse(**clinic_doctor.__dict__.copy())
|
|
|
|
def update_clinic_doctor(self, clinic_doctor_id: int, clinic_doctor_data: ClinicDoctorUpdate) -> ClinicDoctorResponse:
|
|
# check if clinic doctor exists
|
|
clinic_doctor = self.db.query(ClinicDoctors).filter(ClinicDoctors.id == clinic_doctor_id).first()
|
|
if clinic_doctor is None:
|
|
raise ResourceNotFoundException("Clinic doctor not found")
|
|
|
|
if clinic_doctor_data.clinic_id != clinic_doctor.clinic_id:
|
|
# check if clinic exists
|
|
self.clinic_services.get_clinic_by_id(clinic_doctor_data.clinic_id)
|
|
|
|
# Update the existing object with new values
|
|
update_data = clinic_doctor_data.model_dump(exclude_unset=True)
|
|
for key, value in update_data.items():
|
|
setattr(clinic_doctor, key, value)
|
|
|
|
self.db.add(clinic_doctor)
|
|
self.db.commit()
|
|
self.db.refresh(clinic_doctor)
|
|
return ClinicDoctorResponse(**clinic_doctor.__dict__.copy())
|
|
|
|
def delete_clinic_doctor(self, clinic_doctor_id: int):
|
|
clinic_doctor = self.db.query(ClinicDoctors).filter(ClinicDoctors.id == clinic_doctor_id).first()
|
|
self.db.delete(clinic_doctor)
|
|
self.db.commit()
|
|
|
|
def get_doctor_status_count(self):
|
|
# Use SQLAlchemy's func.count to get the count for each status
|
|
from sqlalchemy import func
|
|
from enums.enums import ClinicDoctorStatus
|
|
|
|
# Query to count doctors by status
|
|
status_counts = self.db.query(
|
|
ClinicDoctors.status,
|
|
func.count(ClinicDoctors.id).label('count')
|
|
).group_by(ClinicDoctors.status).all()
|
|
|
|
# Initialize result dictionary with all possible statuses set to 0
|
|
result = {status.value: 0 for status in ClinicDoctorStatus}
|
|
|
|
# Update with actual counts from the query
|
|
for status, count in status_counts:
|
|
result[status.value] = count
|
|
|
|
return result
|
|
|
|
def get_clinic_doctors(self):
|
|
clinic_doctors = self.db.query(ClinicDoctors).all()
|
|
|
|
total = self.db.query(ClinicDoctors).count()
|
|
|
|
response = CommonResponse(data=[ClinicDoctorResponse(**clinic_doctor.__dict__.copy()) for clinic_doctor in clinic_doctors], total=total)
|
|
|
|
return response
|
|
|