91 lines
3.7 KiB
Python
91 lines
3.7 KiB
Python
from sqlalchemy.orm import Session
|
|
|
|
from database import get_db
|
|
from models import MasterAppointmentTypes
|
|
from schemas.BaseSchemas import MasterAppointmentTypeBase
|
|
from exceptions import ResourceNotFoundException
|
|
from interface.common_response import CommonResponse
|
|
from schemas.ResponseSchemas import MasterAppointmentTypeResponse
|
|
from loguru import logger
|
|
|
|
class MasterAppointmentServices:
|
|
def __init__(self):
|
|
self.db: Session = next(get_db())
|
|
self.logger = logger
|
|
|
|
|
|
async def get_master_appointment_types(self):
|
|
try:
|
|
appointment_types = self.db.query(MasterAppointmentTypes).all()
|
|
total= self.db.query(MasterAppointmentTypes).count()
|
|
response = CommonResponse(data=[MasterAppointmentTypeResponse(**appointment_type.__dict__.copy()) for appointment_type in appointment_types], total=total)
|
|
return response
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting master appointment types: {e}")
|
|
raise e
|
|
finally:
|
|
self.db.close()
|
|
|
|
def is_appointment_type_exists(self, appointment_type: MasterAppointmentTypeBase):
|
|
existing_appointment_type = self.db.query(MasterAppointmentTypes).filter(MasterAppointmentTypes.type == appointment_type.type.lower()).first()
|
|
return existing_appointment_type
|
|
|
|
async def create_master_appointment_type(self, appointment_type: MasterAppointmentTypeBase):
|
|
try:
|
|
# get existing appointment type
|
|
existing_appointment_type = self.is_appointment_type_exists(appointment_type)
|
|
|
|
if existing_appointment_type:
|
|
return
|
|
|
|
appointment_type.type = appointment_type.type.lower()
|
|
|
|
appointment_type = MasterAppointmentTypes(**appointment_type.model_dump())
|
|
self.db.add(appointment_type)
|
|
self.db.commit()
|
|
return
|
|
except Exception as e:
|
|
self.logger.error(f"Error creating master appointment type: {e}")
|
|
raise e
|
|
finally:
|
|
self.db.close()
|
|
|
|
async def delete_master_appointment_type(self, appointment_type_id: int):
|
|
try:
|
|
appointment_type = self.db.query(MasterAppointmentTypes).filter(MasterAppointmentTypes.id == appointment_type_id).first()
|
|
|
|
if appointment_type is None:
|
|
raise ResourceNotFoundException("Appointment type not found")
|
|
|
|
appointment_type.soft_delete(self.db)
|
|
self.db.commit()
|
|
return
|
|
except Exception as e:
|
|
self.logger.error(f"Error deleting master appointment type: {e}")
|
|
raise e
|
|
finally:
|
|
self.db.close()
|
|
|
|
async def update_master_appointment_type(self, appointment_type_id: int, appointment_type: MasterAppointmentTypeBase):
|
|
try:
|
|
appointment_type_db = self.db.query(MasterAppointmentTypes).filter(MasterAppointmentTypes.id == appointment_type_id).first()
|
|
|
|
if appointment_type_db is None:
|
|
raise ResourceNotFoundException("Appointment type not found")
|
|
|
|
# get existing appointment type
|
|
existing_appointment_type = self.is_appointment_type_exists(appointment_type)
|
|
|
|
if existing_appointment_type and existing_appointment_type.id != appointment_type_id:
|
|
raise ResourceNotFoundException("Appointment type already exists")
|
|
|
|
appointment_type_db.type = appointment_type.type.lower()
|
|
|
|
self.db.add(appointment_type_db)
|
|
self.db.commit()
|
|
return
|
|
except Exception as e:
|
|
self.logger.error(f"Error updating master appointment type: {e}")
|
|
raise e
|
|
finally:
|
|
self.db.close() |