health-apps-backend/services/clinicServices.py

253 lines
9.5 KiB
Python

from database import get_db
from sqlalchemy.orm import Session, joinedload
from models import Clinics
from schemas.UpdateSchemas import ClinicStatusUpdate, ClinicUpdate
from schemas.ResponseSchemas import Clinic
from typing import List, Literal, Optional, Union
from exceptions import ResourceNotFoundException
from enums.enums import ClinicStatus, UserType
from exceptions.unauthorized_exception import UnauthorizedException
from interface.common_response import CommonResponse
from sqlalchemy import or_,func, case
from services.s3Service import get_signed_url
from models import ClinicFileVerifications
from schemas.BaseSchemas import ClinicFileVerificationBase
class ClinicServices:
def __init__(self):
self.db: Session = next(get_db())
def get_clinics(self, user, limit:int, offset:int, filter_type: Union[Literal["UNREGISTERED"], Literal["REGISTERED"]] = "UNREGISTERED", search:str = ""):
if user["userType"] != UserType.SUPER_ADMIN:
raise UnauthorizedException("You are not authorized to get clinics")
clinics_query = self.db.query(Clinics)
if filter_type == "UNREGISTERED":
clinics_query = clinics_query.filter(Clinics.status != ClinicStatus.ACTIVE)
elif filter_type == "REGISTERED":
clinics_query = clinics_query.filter(Clinics.status == ClinicStatus.ACTIVE)
if search:
clinics_query = clinics_query.filter(
or_(
Clinics.name.contains(search),
Clinics.email.contains(search),
Clinics.phone.contains(search),
Clinics.address.contains(search)
)
)
clinics = clinics_query.limit(limit).offset(offset).all()
# Get all counts in a single optimized query
from sqlalchemy import text
count_query = text("""
SELECT
COUNT(*) as total,
COUNT(CASE WHEN status = 'ACTIVE' THEN 1 END) as active,
COUNT(CASE WHEN status = 'REJECTED' THEN 1 END) as rejected
FROM clinics
""")
result = self.db.execute(count_query).first()
totalClinics = result.total or 0
totalRegisteredClinics = result.active or 0
totalRejectedClinics = result.rejected or 0
clinic_response = [Clinic(**clinic.__dict__.copy()) for clinic in clinics]
for clinic in clinic_response:
clinic.logo = get_signed_url(clinic.logo) if clinic.logo else None
clinic_response_with_total = {
"clinics": clinic_response,
"totalRegisteredClinics": totalRegisteredClinics,
"totalRejectedClinics": totalRejectedClinics,
}
response = CommonResponse(data=clinic_response_with_total, total=totalClinics)
return response
def get_latest_clinic_id(self) -> int:
clinic = self.db.query(Clinics).order_by(Clinics.id.desc()).first()
return clinic.id if clinic else 0
def get_clinic_by_id(self, clinic_id: int):
try:
clinic = self.db.query(Clinics).options(joinedload(Clinics.creator)).filter(Clinics.id == clinic_id).first()
if clinic is None:
raise ResourceNotFoundException("Clinic not found")
clinic_response = Clinic(**clinic.__dict__.copy())
clinic_response.logo = get_signed_url(clinic_response.logo) if clinic_response.logo else None
clinic_response.abn_doc = get_signed_url(clinic_response.abn_doc) if clinic_response.abn_doc else None
clinic_response.contract_doc = get_signed_url(clinic_response.contract_doc) if clinic_response.contract_doc else None
clinic_resp = {
"clinic": clinic_response,
"creator": {
"name": clinic.creator.username,
"email": clinic.creator.email,
"phone": clinic.creator.mobile,
"designation": clinic.creator.clinicRole
},
"clinic_files": self.get_clinic_files(clinic_id)
}
return clinic_resp
except Exception as e:
raise Exception(e)
def get_clinic_files(self, clinic_id: int):
clinic_files = self.db.query(ClinicFileVerifications).filter(ClinicFileVerifications.clinic_id == clinic_id).first()
if clinic_files is None:
raise ResourceNotFoundException("Clinic not found")
response = ClinicFileVerificationBase(**clinic_files.__dict__.copy())
return response
def update_clinic(self, user, clinic_id: int, clinic_data: ClinicUpdate):
clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first()
if clinic is None:
raise ResourceNotFoundException("Clinic not found")
if clinic.creator_id != user["id"]:
raise UnauthorizedException("You are not authorized to update this clinic")
update_data = clinic_data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(clinic, key, value)
self.db.add(clinic)
self.db.commit()
self.db.refresh(clinic)
clinic_response = Clinic(**clinic.__dict__.copy())
clinic_response.logo = get_signed_url(clinic_response.logo) if clinic_response.logo else None
return clinic_response
def delete_clinic(self, clinic_id: int):
clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first()
if clinic is None:
raise ResourceNotFoundException("Clinic not found")
clinic.soft_delete(self.db)
def get_clinic_count(self):
from sqlalchemy import func
# Get total count
totalClinics = self.db.query(Clinics).count()
# Get counts for specific statuses in a single query
status_counts = self.db.query(
Clinics.status,
func.count(Clinics.id).label('count')
).filter(
Clinics.status.in_([
ClinicStatus.ACTIVE,
ClinicStatus.UNDER_REVIEW,
ClinicStatus.REJECTED
])
).group_by(Clinics.status).all()
# Initialize counts with 0
counts = {
"totalClinics": totalClinics,
"totalActiveClinics": 0,
"totalUnderReviewClinics": 0,
"totalRejectedClinics": 0
}
# Map status values to their respective count keys
status_to_key = {
ClinicStatus.ACTIVE: "totalActiveClinics",
ClinicStatus.UNDER_REVIEW: "totalUnderReviewClinics",
ClinicStatus.REJECTED: "totalRejectedClinics"
}
# Update counts with actual values from the query
for status, count in status_counts:
key = status_to_key.get(status)
if key:
counts[key] = count
return counts
def update_clinic_status(self, user, clinic_id: int, status: ClinicStatus, documentStatus: Optional[dict] = None, rejection_reason: Optional[str] = None):
try:
if user["userType"] != UserType.SUPER_ADMIN:
raise UnauthorizedException("You are not authorized to update clinic status")
clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first()
if clinic is None:
raise ResourceNotFoundException("Clinic not found")
clinic.status = status
self.db.add(clinic)
self.db.commit()
self.db.refresh(clinic)
if clinic.status == ClinicStatus.ACTIVE:
clinic_file_verification = self.db.query(ClinicFileVerifications).filter(ClinicFileVerifications.clinic_id == clinic_id).first()
clinic_file_verification.logo_is_verified = True
clinic_file_verification.abn_doc_is_verified = True
clinic_file_verification.contract_doc_is_verified = True
self.db.add(clinic_file_verification)
self.db.commit()
if clinic.status == ClinicStatus.REJECTED or clinic.status == ClinicStatus.UNDER_REVIEW:
clinic_file_verification = self.db.query(ClinicFileVerifications).filter(ClinicFileVerifications.clinic_id == clinic_id).first()
if documentStatus and "LOGO" in documentStatus:
clinic_file_verification.logo_is_verified = documentStatus.get("LOGO")
clinic_file_verification.rejection_reason = rejection_reason
if documentStatus and "ABN" in documentStatus:
clinic_file_verification.abn_doc_is_verified = documentStatus.get("ABN")
clinic_file_verification.rejection_reason = rejection_reason
if documentStatus and "CONTRACT" in documentStatus:
clinic_file_verification.contract_doc_is_verified = documentStatus.get("CONTRACT")
clinic_file_verification.rejection_reason = rejection_reason
self.db.add(clinic_file_verification)
self.db.commit()
# if rejected or under review then email to clinic creator
if clinic.status == ClinicStatus.REJECTED or clinic.status == ClinicStatus.UNDER_REVIEW:
pass
# handle inactive status
if clinic.status == ClinicStatus.INACTIVE:
pass
return
except Exception as e:
print(e)
raise Exception(e)