from database import get_db from sqlalchemy.orm import Session, joinedload from models import Clinics from schemas.UpdateSchemas import ClinicStatusUpdate, ClinicUpdate from schemas.ResponseSchemas import Clinic from typing import List, Literal, Union from exceptions import ResourceNotFoundException from enums.enums import ClinicStatus, UserType from exceptions.unauthorized_exception import UnauthorizedException from interface.common_response import CommonResponse from sqlalchemy import or_,func, case from services.s3Service import get_signed_url from models import ClinicFileVerifications from schemas.BaseSchemas import ClinicFileVerificationBase class ClinicServices: def __init__(self): self.db: Session = next(get_db()) def get_clinics(self, user, limit:int, offset:int, filter_type: Union[Literal["UNREGISTERED"], Literal["REGISTERED"]] = "UNREGISTERED", search:str = ""): if user["userType"] != UserType.SUPER_ADMIN: raise UnauthorizedException("You are not authorized to get clinics") clinics_query = self.db.query(Clinics) if filter_type == "UNREGISTERED": clinics_query = clinics_query.filter(Clinics.status != ClinicStatus.ACTIVE) elif filter_type == "REGISTERED": clinics_query = clinics_query.filter(Clinics.status == ClinicStatus.ACTIVE) if search: clinics_query = clinics_query.filter( or_( Clinics.name.contains(search), Clinics.email.contains(search), Clinics.phone.contains(search), Clinics.address.contains(search) ) ) clinics = clinics_query.limit(limit).offset(offset).all() # Get all counts in a single optimized query from sqlalchemy import text count_query = text(""" SELECT COUNT(*) as total, COUNT(CASE WHEN status = 'ACTIVE' THEN 1 END) as active, COUNT(CASE WHEN status = 'REJECTED' THEN 1 END) as rejected FROM clinics """) result = self.db.execute(count_query).first() totalClinics = result.total or 0 totalRegisteredClinics = result.active or 0 totalRejectedClinics = result.rejected or 0 clinic_response = [Clinic(**clinic.__dict__.copy()) for clinic in clinics] for clinic in clinic_response: clinic.logo = get_signed_url(clinic.logo) if clinic.logo else None clinic_response_with_total = { "clinics": clinic_response, "totalRegisteredClinics": totalRegisteredClinics, "totalRejectedClinics": totalRejectedClinics, } response = CommonResponse(data=clinic_response_with_total, total=totalClinics) return response def get_latest_clinic_id(self) -> int: clinic = self.db.query(Clinics).order_by(Clinics.id.desc()).first() return clinic.id if clinic else 0 def get_clinic_by_id(self, clinic_id: int): try: clinic = self.db.query(Clinics).options(joinedload(Clinics.creator)).filter(Clinics.id == clinic_id).first() if clinic is None: raise ResourceNotFoundException("Clinic not found") clinic_response = Clinic(**clinic.__dict__.copy()) clinic_response.logo = get_signed_url(clinic_response.logo) if clinic_response.logo else None clinic_response.abn_doc = get_signed_url(clinic_response.abn_doc) if clinic_response.abn_doc else None clinic_response.contract_doc = get_signed_url(clinic_response.contract_doc) if clinic_response.contract_doc else None clinic_resp = { "clinic": clinic_response, "creator": { "name": clinic.creator.username, "email": clinic.creator.email, "phone": clinic.creator.mobile, "designation": clinic.creator.clinicRole }, "clinic_files": self.get_clinic_files(clinic_id) } return clinic_resp except Exception as e: raise Exception(e) def get_clinic_files(self, clinic_id: int): clinic_files = self.db.query(ClinicFileVerifications).filter(ClinicFileVerifications.clinic_id == clinic_id).first() if clinic_files is None: raise ResourceNotFoundException("Clinic not found") response = ClinicFileVerificationBase(**clinic_files.__dict__.copy()) return response def update_clinic(self, user, clinic_id: int, clinic_data: ClinicUpdate): clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first() if clinic is None: raise ResourceNotFoundException("Clinic not found") if clinic.creator_id != user["id"]: raise UnauthorizedException("You are not authorized to update this clinic") update_data = clinic_data.model_dump(exclude_unset=True) for key, value in update_data.items(): setattr(clinic, key, value) self.db.add(clinic) self.db.commit() self.db.refresh(clinic) clinic_response = Clinic(**clinic.__dict__.copy()) clinic_response.logo = get_signed_url(clinic_response.logo) if clinic_response.logo else None return clinic_response def delete_clinic(self, clinic_id: int): clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first() if clinic is None: raise ResourceNotFoundException("Clinic not found") clinic.soft_delete(self.db) def get_clinic_count(self): from sqlalchemy import func # Get total count totalClinics = self.db.query(Clinics).count() # Get counts for specific statuses in a single query status_counts = self.db.query( Clinics.status, func.count(Clinics.id).label('count') ).filter( Clinics.status.in_([ ClinicStatus.ACTIVE, ClinicStatus.UNDER_REVIEW, ClinicStatus.REJECTED ]) ).group_by(Clinics.status).all() # Initialize counts with 0 counts = { "totalClinics": totalClinics, "totalActiveClinics": 0, "totalUnderReviewClinics": 0, "totalRejectedClinics": 0 } # Map status values to their respective count keys status_to_key = { ClinicStatus.ACTIVE: "totalActiveClinics", ClinicStatus.UNDER_REVIEW: "totalUnderReviewClinics", ClinicStatus.REJECTED: "totalRejectedClinics" } # Update counts with actual values from the query for status, count in status_counts: key = status_to_key.get(status) if key: counts[key] = count return counts def update_clinic_status(self, user, clinic_id: int, status: ClinicStatus): if user["userType"] != UserType.SUPER_ADMIN: raise UnauthorizedException("You are not authorized to update clinic status") clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first() if clinic is None: raise ResourceNotFoundException("Clinic not found") clinic.status = status self.db.add(clinic) self.db.commit() self.db.refresh(clinic) clinic_response = Clinic(**clinic.__dict__.copy()) # if rejected then email to clinic creator if clinic.status == ClinicStatus.REJECTED: pass return clinic_response