feat: user routes

fix: global delete filter
refactor: user update schema
This commit is contained in:
2025-05-12 18:46:11 +05:30
parent 8d8f205eb1
commit c235196990
6 changed files with 145 additions and 26 deletions
+58 -3
View File
@@ -7,6 +7,8 @@ from exceptions.validation_exception import ValidationException
from schemas.ResponseSchemas import UserResponse
from models import Clinics
from enums.enums import ClinicStatus
from schemas.UpdateSchemas import UserUpdate
from exceptions.unauthorized_exception import UnauthorizedException
from utils.password_utils import hash_password
from schemas.CreateSchemas import UserCreate
from exceptions.resource_not_found_exception import ResourceNotFoundException
@@ -118,6 +120,23 @@ class UserServices:
return user_response
def get_users(self, limit:int, offset:int, search:str):
query = self.db.query(Users)
if search:
query = query.filter(
or_(
Users.username.contains(search),
Users.email.contains(search),
Users.clinicRole.contains(search),
Users.userType.contains(search)
)
)
users = query.limit(limit).offset(offset).all()
user_response = [UserResponse(**user.__dict__.copy()) for user in users]
return user_response
async def get_user_by_email(self, email: str) -> UserResponse:
user = self.db.query(Users).filter(Users.email == email.lower()).first()
@@ -131,8 +150,44 @@ class UserServices:
return user_response
def update_user(self, user_id, user):
def update_user(self, admin_id:int|None, user_id: int, user_data: UserUpdate):
if admin_id:
admin = self.db.query(Users).filter(Users.id == admin_id).first()
if not admin:
logger.error("Admin not found")
raise ResourceNotFoundException("Admin not found")
if admin.userType != UserType.ADMIN:
logger.error("User is not authorized to perform this action")
raise UnauthorizedException("User is not authorized to perform this action")
user = self.db.query(Users).filter(Users.id == user_id).first()
if not user:
logger.error("User not found")
raise ResourceNotFoundException("User not found")
user.username = user_data.username
user.clinicRole = user_data.clinicRole
user.userType = user_data.userType
user.profile_pic = user_data.profile_pic
self.db.add(user)
self.db.commit()
return user
def delete_user(self, user_id):
return user
def delete_user(self, user_id: int):
user = self.db.query(Users).filter(Users.id == user_id).first()
if not user:
logger.error("User not found")
raise ResourceNotFoundException("User not found")
# Use the soft_delete method from CustomBase
user.soft_delete(self.db)
return True