feat: clinic offer api

This commit is contained in:
2025-05-22 11:35:33 +05:30
parent 9c39d369c1
commit 8a7f034801
9 changed files with 300 additions and 55 deletions
+112 -50
View File
@@ -1,10 +1,9 @@
from database import get_db
from sqlalchemy.orm import Session, joinedload
from models import Clinics
from schemas.UpdateSchemas import ClinicStatusUpdate, ClinicUpdate
from schemas.ResponseSchemas import Clinic
from schemas.ResponseSchemas import Clinic, ClinicOfferResponse
from typing import List, Literal, Optional, Union
from exceptions import ResourceNotFoundException
from exceptions import ResourceNotFoundException, ValidationException
from enums.enums import ClinicStatus, UserType
from exceptions.unauthorized_exception import UnauthorizedException
from interface.common_response import CommonResponse
@@ -12,8 +11,8 @@ from sqlalchemy import or_,func, case
from sqlalchemy import text
from services.s3Service import get_file_key, get_signed_url
from models import ClinicFileVerifications
from schemas.BaseSchemas import ClinicFileVerificationBase
from models import ClinicFileVerifications, ClinicOffers, Clinics
from schemas.BaseSchemas import ClinicFileVerificationBase, ClinicOffersBase
from services.emailService import EmailService
class ClinicServices:
@@ -93,6 +92,7 @@ class ClinicServices:
clinic_response.abn_doc = get_signed_url(clinic_response.abn_doc) if clinic_response.abn_doc else None
clinic_response.contract_doc = get_signed_url(clinic_response.contract_doc) if clinic_response.contract_doc else None
clinicFiles = None
if(clinic.status != ClinicStatus.ACTIVE):
clinicFiles = self.db.query(ClinicFileVerifications).filter(ClinicFileVerifications.clinic_id == clinic_id).first()
@@ -200,69 +200,131 @@ class ClinicServices:
def update_clinic_status(self, user, clinic_id: int, status: ClinicStatus, documentStatus: Optional[dict] = None, rejection_reason: Optional[str] = None):
try:
if user["userType"] != UserType.SUPER_ADMIN:
raise UnauthorizedException("You are not authorized to update clinic status")
if user["userType"] != UserType.SUPER_ADMIN:
raise UnauthorizedException("You are not authorized to update clinic status")
clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first()
clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first()
if clinic is None:
raise ResourceNotFoundException("Clinic not found")
if clinic is None:
raise ResourceNotFoundException("Clinic not found")
clinic.status = status
self.db.add(clinic)
self.db.commit()
self.db.refresh(clinic)
clinic.status = status
self.db.add(clinic)
self.db.commit()
self.db.refresh(clinic)
if clinic.status == ClinicStatus.ACTIVE:
clinic_file_verification = self.db.query(ClinicFileVerifications).filter(ClinicFileVerifications.clinic_id == clinic_id).first()
if clinic.status == ClinicStatus.ACTIVE:
clinic_file_verification = self.db.query(ClinicFileVerifications).filter(ClinicFileVerifications.clinic_id == clinic_id).first()
clinic_file_verification.logo_is_verified = True
clinic_file_verification.abn_doc_is_verified = True
clinic_file_verification.contract_doc_is_verified = True
clinic_file_verification.logo_is_verified = True
clinic_file_verification.abn_doc_is_verified = True
clinic_file_verification.contract_doc_is_verified = True
self.db.add(clinic_file_verification)
self.db.commit()
self.db.add(clinic_file_verification)
self.db.commit()
# send mail to user
self.email_service.send_apporve_clinic_email(clinic.email, clinic.name)
# send mail to user
self.email_service.send_apporve_clinic_email(clinic.email, clinic.name)
if clinic.status == ClinicStatus.REJECTED or clinic.status == ClinicStatus.UNDER_REVIEW:
clinic_file_verification = self.db.query(ClinicFileVerifications).filter(ClinicFileVerifications.clinic_id == clinic_id).first()
if clinic.status == ClinicStatus.REJECTED or clinic.status == ClinicStatus.UNDER_REVIEW:
clinic_file_verification = self.db.query(ClinicFileVerifications).filter(ClinicFileVerifications.clinic_id == clinic_id).first()
if documentStatus and "LOGO" in documentStatus:
clinic_file_verification.logo_is_verified = documentStatus.get("LOGO")
clinic_file_verification.rejection_reason = rejection_reason
if documentStatus and "LOGO" in documentStatus:
clinic_file_verification.logo_is_verified = documentStatus.get("LOGO")
clinic_file_verification.rejection_reason = rejection_reason
if documentStatus and "ABN" in documentStatus:
clinic_file_verification.abn_doc_is_verified = documentStatus.get("ABN")
clinic_file_verification.rejection_reason = rejection_reason
if documentStatus and "ABN" in documentStatus:
clinic_file_verification.abn_doc_is_verified = documentStatus.get("ABN")
clinic_file_verification.rejection_reason = rejection_reason
if documentStatus and "CONTRACT" in documentStatus:
clinic_file_verification.contract_doc_is_verified = documentStatus.get("CONTRACT")
clinic_file_verification.rejection_reason = rejection_reason
if documentStatus and "CONTRACT" in documentStatus:
clinic_file_verification.contract_doc_is_verified = documentStatus.get("CONTRACT")
clinic_file_verification.rejection_reason = rejection_reason
self.db.add(clinic_file_verification)
self.db.commit()
self.db.add(clinic_file_verification)
self.db.commit()
# send mail to user
self.email_service.send_reject_clinic_email(clinic.email, clinic.name)
# send mail to user
self.email_service.send_reject_clinic_email(clinic.email, clinic.name)
# if rejected or under review then email to clinic creator
if clinic.status == ClinicStatus.REJECTED or clinic.status == ClinicStatus.UNDER_REVIEW:
pass
# if rejected or under review then email to clinic creator
if clinic.status == ClinicStatus.REJECTED or clinic.status == ClinicStatus.UNDER_REVIEW:
pass
# handle inactive status
if clinic.status == ClinicStatus.INACTIVE:
pass
# handle inactive status
if clinic.status == ClinicStatus.INACTIVE:
pass
return
except Exception as e:
print(e)
raise Exception(e)
return
def get_clinic_offers(self, user, limit:int, offset:int, search:str = ""):
if user["userType"] != UserType.SUPER_ADMIN:
raise UnauthorizedException("You are not authorized to get clinic offers")
clinic_offers_query = self.db.query(ClinicOffers)
total = clinic_offers_query.count()
if search:
clinic_offers_query = clinic_offers_query.filter(ClinicOffers.clinic_email.contains(search))
total = clinic_offers_query.count()
clinic_offers = clinic_offers_query.limit(limit).offset(offset).all()
clinic_offers_response = [ClinicOfferResponse(**clinic_offer.__dict__.copy()) for clinic_offer in clinic_offers]
response = CommonResponse(data=clinic_offers_response, total=total)
return response
def create_clinic_offer(self, user, clinic_offer_data: ClinicOffersBase):
if user["userType"] != UserType.SUPER_ADMIN:
raise UnauthorizedException("You are not authorized to create clinic offer")
existing_offer = self.db.query(ClinicOffers).filter(ClinicOffers.clinic_email == clinic_offer_data.clinic_email).first()
if existing_offer:
raise ValidationException("Offer already exists for this clinic")
clinic_offer = ClinicOffers(**clinic_offer_data.model_dump())
self.db.add(clinic_offer)
self.db.commit()
return
def update_clinic_offer(self, user, clinic_offer_id: int, clinic_offer_data: ClinicOffersBase):
if user["userType"] != UserType.SUPER_ADMIN:
raise UnauthorizedException("You are not authorized to update clinic offer")
clinic_offer = self.db.query(ClinicOffers).filter(ClinicOffers.id == clinic_offer_id).first()
if clinic_offer is None:
raise ResourceNotFoundException("Clinic offer not found")
clinic_offer.setup_fees_waived = clinic_offer_data.setup_fees_waived
clinic_offer.special_offer_for_month = clinic_offer_data.special_offer_for_month
self.db.add(clinic_offer)
self.db.commit()
return
def delete_clinic_offer(self, user, clinic_offer_id: int):
if user["userType"] != UserType.SUPER_ADMIN:
raise UnauthorizedException("You are not authorized to delete clinic offer")
clinic_offer = self.db.query(ClinicOffers).filter(ClinicOffers.id == clinic_offer_id).first()
if clinic_offer is None:
raise ResourceNotFoundException("Clinic offer not found")
clinic_offer.soft_delete(self.db)
return
+73
View File
@@ -0,0 +1,73 @@
from sqlalchemy.orm import Session
from database import get_db
from models import MasterAppointmentTypes
from schemas.BaseSchemas import MasterAppointmentTypeBase
from exceptions import ResourceNotFoundException
from interface.common_response import CommonResponse
from schemas.ResponseSchemas import MasterAppointmentTypeResponse
class MasterAppointmentServices:
def __init__(self):
self.db: Session = next(get_db())
def get_master_appointment_types(self):
appointment_types = self.db.query(MasterAppointmentTypes).all()
total= self.db.query(MasterAppointmentTypes).count()
response = CommonResponse(data=[MasterAppointmentTypeResponse(**appointment_type.__dict__.copy()) for appointment_type in appointment_types], total=total)
return response
def is_appointment_type_exists(self, appointment_type: MasterAppointmentTypeBase):
existing_appointment_type = self.db.query(MasterAppointmentTypes).filter(MasterAppointmentTypes.type == appointment_type.type.lower()).first()
return existing_appointment_type
def create_master_appointment_type(self, appointment_type: MasterAppointmentTypeBase):
# get existing appointment type
existing_appointment_type = self.is_appointment_type_exists(appointment_type)
if existing_appointment_type:
return
appointment_type.type = appointment_type.type.lower()
appointment_type = MasterAppointmentTypes(**appointment_type.model_dump())
self.db.add(appointment_type)
self.db.commit()
self.db.refresh(appointment_type)
return
def delete_master_appointment_type(self, appointment_type_id: int):
appointment_type = self.db.query(MasterAppointmentTypes).filter(MasterAppointmentTypes.id == appointment_type_id).first()
if appointment_type is None:
raise ResourceNotFoundException("Appointment type not found")
appointment_type.soft_delete(self.db)
self.db.commit()
return
def update_master_appointment_type(self, appointment_type_id: int, appointment_type: MasterAppointmentTypeBase):
appointment_type = self.db.query(MasterAppointmentTypes).filter(MasterAppointmentTypes.id == appointment_type_id).first()
if appointment_type is None:
raise ResourceNotFoundException("Appointment type not found")
appointment_type.type = appointment_type.type.lower()
# get existing appointment type
existing_appointment_type = self.is_appointment_type_exists(appointment_type)
if existing_appointment_type and existing_appointment_type.id != appointment_type_id:
raise ResourceNotFoundException("Appointment type already exists")
update_data = appointment_type.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(appointment_type, key, value)
self.db.add(appointment_type)
self.db.commit()
self.db.refresh(appointment_type)
return