feat: clinic setup update
fix: migration issue
This commit is contained in:
@@ -20,7 +20,7 @@ class AuthService:
|
||||
|
||||
# remove password from user dict
|
||||
user_dict = user.__dict__.copy()
|
||||
user_dict.pop("password", None)
|
||||
user_dict.pop("password", None)
|
||||
|
||||
# create token
|
||||
token = create_jwt_token(user_dict)
|
||||
|
||||
@@ -1,21 +1,49 @@
|
||||
from schemas.CreateSchemas import ClinicDoctorCreate
|
||||
from schemas.UpdateSchemas import ClinicDoctorUpdate
|
||||
from schemas.ResponseSchemas import ClinicDoctorResponse
|
||||
from database import get_db
|
||||
from models import ClinicDoctors
|
||||
from sqlalchemy.orm import Session
|
||||
from services.clinicServices import ClinicServices
|
||||
from exceptions import ResourceNotFoundException
|
||||
|
||||
class ClinicDoctorsServices:
|
||||
def __init__(self):
|
||||
self.db = next(get_db())
|
||||
self.db: Session = next(get_db())
|
||||
self.clinic_services = ClinicServices()
|
||||
|
||||
def create_clinic_doctor(self, clinic_doctor: ClinicDoctorCreate):
|
||||
clinic_doctor = ClinicDoctors(**clinic_doctor.dict())
|
||||
def create_clinic_doctor(self, clinic_doctor: ClinicDoctorCreate) -> ClinicDoctorResponse:
|
||||
|
||||
# check if clinic exists
|
||||
self.clinic_services.get_clinic_by_id(clinic_doctor.clinic_id)
|
||||
|
||||
clinic_doctor = ClinicDoctors(**clinic_doctor.model_dump())
|
||||
self.db.add(clinic_doctor)
|
||||
self.db.commit()
|
||||
self.db.refresh(clinic_doctor)
|
||||
return clinic_doctor
|
||||
return ClinicDoctorResponse(**clinic_doctor.__dict__.copy())
|
||||
|
||||
def update_clinic_doctor(self, clinic_doctor_id: int, clinic_doctor: ClinicDoctorUpdate):
|
||||
pass
|
||||
def update_clinic_doctor(self, clinic_doctor_id: int, clinic_doctor_data: ClinicDoctorUpdate) -> ClinicDoctorResponse:
|
||||
# check if clinic doctor exists
|
||||
clinic_doctor = self.db.query(ClinicDoctors).filter(ClinicDoctors.id == clinic_doctor_id).first()
|
||||
if clinic_doctor is None:
|
||||
raise ResourceNotFoundException("Clinic doctor not found")
|
||||
|
||||
if clinic_doctor_data.clinic_id != clinic_doctor.clinic_id:
|
||||
# check if clinic exists
|
||||
self.clinic_services.get_clinic_by_id(clinic_doctor_data.clinic_id)
|
||||
|
||||
# Update the existing object with new values
|
||||
update_data = clinic_doctor_data.model_dump(exclude_unset=True)
|
||||
for key, value in update_data.items():
|
||||
setattr(clinic_doctor, key, value)
|
||||
|
||||
self.db.add(clinic_doctor)
|
||||
self.db.commit()
|
||||
self.db.refresh(clinic_doctor)
|
||||
return ClinicDoctorResponse(**clinic_doctor.__dict__.copy())
|
||||
|
||||
def delete_clinic_doctor(self, clinic_doctor_id: int):
|
||||
pass
|
||||
clinic_doctor = self.db.query(ClinicDoctors).filter(ClinicDoctors.id == clinic_doctor_id).first()
|
||||
self.db.delete(clinic_doctor)
|
||||
self.db.commit()
|
||||
@@ -0,0 +1,52 @@
|
||||
from database import get_db
|
||||
from sqlalchemy.orm import Session
|
||||
from models import Clinics
|
||||
from schemas.UpdateSchemas import ClinicUpdate
|
||||
from schemas.ResponseSchemas import Clinic
|
||||
from typing import List
|
||||
from exceptions import ResourceNotFoundException
|
||||
|
||||
class ClinicServices:
|
||||
def __init__(self):
|
||||
self.db: Session = next(get_db())
|
||||
|
||||
def get_clinics(self, limit:int, offset:int) -> List[Clinic]:
|
||||
clinics = self.db.query(Clinics).limit(limit).offset(offset).all()
|
||||
|
||||
clinic_response = [Clinic(**clinic.__dict__.copy()) for clinic in clinics]
|
||||
|
||||
return clinic_response
|
||||
|
||||
|
||||
def get_clinic_by_id(self, clinic_id: int) -> Clinic:
|
||||
clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first()
|
||||
|
||||
if clinic is None:
|
||||
raise ResourceNotFoundException("Clinic not found")
|
||||
|
||||
clinic_response = Clinic(**clinic.__dict__.copy())
|
||||
return clinic_response
|
||||
|
||||
def update_clinic(self, clinic_id: int, clinic_data: ClinicUpdate):
|
||||
clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first()
|
||||
|
||||
if clinic is None:
|
||||
raise ResourceNotFoundException("Clinic not found")
|
||||
|
||||
update_data = clinic_data.model_dump(exclude_unset=True)
|
||||
for key, value in update_data.items():
|
||||
setattr(clinic, key, value)
|
||||
|
||||
self.db.add(clinic)
|
||||
self.db.commit()
|
||||
self.db.refresh(clinic)
|
||||
return Clinic(**clinic.__dict__.copy())
|
||||
|
||||
def delete_clinic(self, clinic_id: int):
|
||||
clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first()
|
||||
|
||||
if clinic is None:
|
||||
raise ResourceNotFoundException("Clinic not found")
|
||||
|
||||
self.db.delete(clinic)
|
||||
self.db.commit()
|
||||
+17
-15
@@ -6,7 +6,7 @@ from models.Users import Users
|
||||
from exceptions.validation_exception import ValidationException
|
||||
from schemas.ResponseSchemas import UserResponse
|
||||
from models import Clinics
|
||||
from enums.enums import ClinicStatus
|
||||
from enums.enums import ClinicStatus, UserType
|
||||
from schemas.UpdateSchemas import UserUpdate
|
||||
from exceptions.unauthorized_exception import UnauthorizedException
|
||||
from utils.password_utils import hash_password
|
||||
@@ -150,34 +150,36 @@ class UserServices:
|
||||
|
||||
return user_response
|
||||
|
||||
def update_user(self, admin_id:int|None, user_id: int, user_data: UserUpdate):
|
||||
|
||||
def update_user(self, admin_id:int|None, user_id: int, user_data: UserUpdate) -> UserResponse:
|
||||
# Check admin authorization if admin_id is provided
|
||||
if admin_id:
|
||||
admin = self.db.query(Users).filter(Users.id == admin_id).first()
|
||||
|
||||
if not admin:
|
||||
logger.error("Admin not found")
|
||||
raise ResourceNotFoundException("Admin not found")
|
||||
|
||||
# Only check admin type if admin_id was provided
|
||||
if admin.userType != UserType.SUPER_ADMIN:
|
||||
logger.error("User is not authorized to perform this action")
|
||||
raise UnauthorizedException("User is not authorized to perform this action")
|
||||
|
||||
if admin.userType != UserType.ADMIN:
|
||||
logger.error("User is not authorized to perform this action")
|
||||
raise UnauthorizedException("User is not authorized to perform this action")
|
||||
|
||||
# Find the user to update
|
||||
user = self.db.query(Users).filter(Users.id == user_id).first()
|
||||
|
||||
if not user:
|
||||
logger.error("User not found")
|
||||
raise ResourceNotFoundException("User not found")
|
||||
|
||||
user.username = user_data.username
|
||||
user.clinicRole = user_data.clinicRole
|
||||
user.userType = user_data.userType
|
||||
user.profile_pic = user_data.profile_pic
|
||||
# Update only the fields that were provided
|
||||
update_data = user_data.model_dump(exclude_unset=True)
|
||||
for key, value in update_data.items():
|
||||
setattr(user, key, value)
|
||||
|
||||
self.db.add(user)
|
||||
self.db.commit()
|
||||
|
||||
return user
|
||||
self.db.refresh(user)
|
||||
|
||||
# Return properly serialized response
|
||||
return UserResponse.model_validate(user)
|
||||
|
||||
def delete_user(self, user_id: int):
|
||||
user = self.db.query(Users).filter(Users.id == user_id).first()
|
||||
|
||||
Reference in New Issue
Block a user