from database import get_db from sqlalchemy.orm import Session, joinedload from schemas.UpdateSchemas import ClinicStatusUpdate, ClinicUpdate from schemas.ResponseSchemas import Clinic, ClinicOfferResponse from typing import List, Literal, Optional, Union from exceptions import ResourceNotFoundException, ValidationException from enums.enums import ClinicStatus, UserType from exceptions.unauthorized_exception import UnauthorizedException from interface.common_response import CommonResponse from sqlalchemy import or_,not_ from sqlalchemy import text from services.s3Service import get_file_key, get_signed_url from models import ClinicFileVerifications, ClinicOffers, Clinics, Users from schemas.BaseSchemas import ClinicFileVerificationBase, ClinicOffersBase from services.emailService import EmailService class ClinicServices: def __init__(self): self.db: Session = next(get_db()) self.email_service = EmailService() async def get_clinics(self, user, limit:int, offset:int, filter_type: Union[Literal["UNREGISTERED"], Literal["REGISTERED"]] = "UNREGISTERED", search:str = ""): if user["userType"] != UserType.SUPER_ADMIN: raise UnauthorizedException("You are not authorized to get clinics") clinics_query = self.db.query(Clinics).order_by(Clinics.update_time.desc()) if filter_type == "UNREGISTERED": clinics_query = clinics_query.filter(not_(Clinics.status == ClinicStatus.ACTIVE),not_(Clinics.status == ClinicStatus.INACTIVE)) elif filter_type == "REGISTERED": clinics_query = clinics_query.filter(or_(Clinics.status == ClinicStatus.ACTIVE,Clinics.status == ClinicStatus.INACTIVE)) if search: clinics_query = clinics_query.filter( or_( Clinics.name.contains(search), Clinics.email.contains(search), Clinics.phone.contains(search), Clinics.address.contains(search) ) ) clinics = clinics_query.offset(offset).limit(limit).all() count_query = text(""" SELECT COUNT(*) as total, COUNT(CASE WHEN status = 'ACTIVE' OR status = 'INACTIVE' THEN 1 END) as active, COUNT(CASE WHEN status != 'ACTIVE' AND status != 'INACTIVE' THEN 1 END) as rejected FROM clinics """) result = self.db.execute(count_query).first() totalClinics = result.total or 0 totalRegisteredClinics = result.active or 0 totalRejectedClinics = result.rejected or 0 clinic_response = [Clinic(**clinic.__dict__.copy()) for clinic in clinics] for clinic in clinic_response: clinic.logo = await get_signed_url(clinic.logo) if clinic.logo else None clinic_response_with_total = { "clinics": clinic_response, "totalRegisteredClinics": totalRegisteredClinics, "totalRejectedClinics": totalRejectedClinics, } response = CommonResponse(data=clinic_response_with_total, total=totalClinics) return response async def get_latest_clinic_id(self) -> int: clinic = self.db.query(Clinics).order_by(Clinics.id.desc()).first() return clinic.id if clinic else 0 async def get_clinic_by_id(self, clinic_id: int): clinic = self.db.query(Clinics).options(joinedload(Clinics.creator)).filter(Clinics.id == clinic_id).first() if clinic is None: raise ResourceNotFoundException("Clinic not found") clinic_response = Clinic(**clinic.__dict__.copy()) clinic_response.logo = await get_signed_url(clinic_response.logo) if clinic_response.logo else None clinic_response.abn_doc = await get_signed_url(clinic_response.abn_doc) if clinic_response.abn_doc else None clinic_response.contract_doc = await get_signed_url(clinic_response.contract_doc) if clinic_response.contract_doc else None clinicFiles = None if(clinic.status != ClinicStatus.ACTIVE): clinicFiles = self.db.query(ClinicFileVerifications).filter(ClinicFileVerifications.clinic_id == clinic_id).first() clinic_resp = { "clinic": clinic_response, "creator": { "name": clinic.creator.username, "email": clinic.creator.email, "phone": clinic.creator.mobile, "designation": clinic.creator.clinicRole }, "clinic_files": await self.get_clinic_files(clinic_id), "fileStatus": {"reason":clinicFiles.rejection_reason if clinicFiles else None}, } return clinic_resp async def get_clinic_files(self, clinic_id: int): clinic_files = self.db.query(ClinicFileVerifications).filter(ClinicFileVerifications.clinic_id == clinic_id).first() if clinic_files is None: raise ResourceNotFoundException("Clinic not found") response = ClinicFileVerificationBase(**clinic_files.__dict__.copy()) return response async def update_clinic(self, user, clinic_id: int, clinic_data: ClinicUpdate): clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first() if clinic is None: raise ResourceNotFoundException("Clinic not found") if clinic.creator_id != user["id"]: raise UnauthorizedException("You are not authorized to update this clinic") clinic_data.abn_doc = get_file_key(clinic_data.abn_doc) if clinic_data.abn_doc else None clinic_data.contract_doc = get_file_key(clinic_data.contract_doc) if clinic_data.contract_doc else None clinic_data.logo = get_file_key(clinic_data.logo) if clinic_data.logo else None update_data = clinic_data.model_dump(exclude_unset=True) for key, value in update_data.items(): setattr(clinic, key, value) self.db.add(clinic) self.db.commit() self.db.refresh(clinic) clinic_response = Clinic(**clinic.__dict__.copy()) clinic_response.logo = get_signed_url(clinic_response.logo) if clinic_response.logo else None return clinic_response async def delete_clinic(self, clinic_id: int): clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first() if clinic is None: raise ResourceNotFoundException("Clinic not found") clinic.soft_delete(self.db) async def get_clinic_count(self): from sqlalchemy import func # Get total count totalClinics = self.db.query(Clinics).count() # Get counts for specific statuses in a single query status_counts = self.db.query( Clinics.status, func.count(Clinics.id).label('count') ).filter( Clinics.status.in_([ ClinicStatus.ACTIVE, ClinicStatus.UNDER_REVIEW, ClinicStatus.REJECTED ]) ).group_by(Clinics.status).all() # Initialize counts with 0 counts = { "totalClinics": totalClinics, "totalActiveClinics": 0, "totalUnderReviewClinics": 0, "totalRejectedClinics": 0 } # Map status values to their respective count keys status_to_key = { ClinicStatus.ACTIVE: "totalActiveClinics", ClinicStatus.UNDER_REVIEW: "totalUnderReviewClinics", ClinicStatus.REJECTED: "totalRejectedClinics" } # Update counts with actual values from the query for status, count in status_counts: key = status_to_key.get(status) if key: counts[key] = count return counts async def update_clinic_status(self, user, clinic_id: int, status: ClinicStatus, documentStatus: Optional[dict] = None, rejection_reason: Optional[str] = None): if user["userType"] != UserType.SUPER_ADMIN: raise UnauthorizedException("You are not authorized to update clinic status") clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first() if clinic is None: raise ResourceNotFoundException("Clinic not found") clinic.status = status self.db.add(clinic) self.db.commit() self.db.refresh(clinic) if clinic.status == ClinicStatus.ACTIVE: clinic_file_verification = self.db.query(ClinicFileVerifications).filter(ClinicFileVerifications.clinic_id == clinic_id).first() clinic_file_verification.logo_is_verified = True clinic_file_verification.abn_doc_is_verified = True clinic_file_verification.contract_doc_is_verified = True self.db.add(clinic_file_verification) self.db.commit() # send mail to user self.email_service.send_apporve_clinic_email(clinic.email, clinic.name) if clinic.status == ClinicStatus.REJECTED or clinic.status == ClinicStatus.UNDER_REVIEW: clinic_file_verification = self.db.query(ClinicFileVerifications).filter(ClinicFileVerifications.clinic_id == clinic_id).first() if documentStatus and "LOGO" in documentStatus: clinic_file_verification.logo_is_verified = documentStatus.get("LOGO") clinic_file_verification.rejection_reason = rejection_reason if documentStatus and "ABN" in documentStatus: clinic_file_verification.abn_doc_is_verified = documentStatus.get("ABN") clinic_file_verification.rejection_reason = rejection_reason if documentStatus and "CONTRACT" in documentStatus: clinic_file_verification.contract_doc_is_verified = documentStatus.get("CONTRACT") clinic_file_verification.rejection_reason = rejection_reason self.db.add(clinic_file_verification) self.db.commit() # send mail to user self.email_service.send_reject_clinic_email(clinic.email, clinic.name) # if rejected or under review then email to clinic creator if clinic.status == ClinicStatus.REJECTED or clinic.status == ClinicStatus.UNDER_REVIEW: pass # handle inactive status if clinic.status == ClinicStatus.INACTIVE: # get clinic creator # clinic_creator = self.db.query(Users).options(joinedload(Users.created_clinics)).filter(Clinics.id == clinic.id).first() # # block clinic creator # clinic_creator.isBlocked = True # self.db.add(clinic_creator) # self.db.commit() pass return async def get_clinic_offers(self, user, limit:int, offset:int, search:str = ""): if user["userType"] != UserType.SUPER_ADMIN: raise UnauthorizedException("You are not authorized to get clinic offers") clinic_offers_query = self.db.query(ClinicOffers) total = clinic_offers_query.count() if search: clinic_offers_query = clinic_offers_query.filter(ClinicOffers.clinic_email.contains(search)) total = clinic_offers_query.count() clinic_offers = clinic_offers_query.limit(limit).offset(offset).all() clinic_offers_response = [ClinicOfferResponse(**clinic_offer.__dict__.copy()) for clinic_offer in clinic_offers] response = CommonResponse(data=clinic_offers_response, total=total) return response async def create_clinic_offer(self, user, clinic_offer_data: ClinicOffersBase): if user["userType"] != UserType.SUPER_ADMIN: raise UnauthorizedException("You are not authorized to create clinic offer") existing_offer = self.db.query(ClinicOffers).filter(ClinicOffers.clinic_email == clinic_offer_data.clinic_email).first() if existing_offer: raise ValidationException("Offer already exists for this clinic") clinic_offer = ClinicOffers(**clinic_offer_data.model_dump()) self.db.add(clinic_offer) self.db.commit() return async def update_clinic_offer(self, user, clinic_offer_id: int, clinic_offer_data: ClinicOffersBase): if user["userType"] != UserType.SUPER_ADMIN: raise UnauthorizedException("You are not authorized to update clinic offer") clinic_offer = self.db.query(ClinicOffers).filter(ClinicOffers.id == clinic_offer_id).first() if clinic_offer is None: raise ResourceNotFoundException("Clinic offer not found") clinic_offer.setup_fees_waived = clinic_offer_data.setup_fees_waived clinic_offer.special_offer_for_month = clinic_offer_data.special_offer_for_month self.db.add(clinic_offer) self.db.commit() return async def delete_clinic_offer(self, user, clinic_offer_id: int): if user["userType"] != UserType.SUPER_ADMIN: raise UnauthorizedException("You are not authorized to delete clinic offer") clinic_offer = self.db.query(ClinicOffers).filter(ClinicOffers.id == clinic_offer_id).first() if clinic_offer is None: raise ResourceNotFoundException("Clinic offer not found") clinic_offer.soft_delete(self.db) return