health-apps-backend/services/userServices.py

94 lines
2.9 KiB
Python

from loguru import logger
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from sqlalchemy import or_
from database import get_db
from models.Users import Users
from exceptions.validation_exception import ValidationException
from schemas.ResponseSchemas import UserResponse
from utils.password_utils import hash_password
from schemas.CreateSchemas import UserCreate
from exceptions.resource_not_found_exception import ResourceNotFoundException
class UserServices:
def __init__(self):
self.db: Session = next(get_db())
async def create_user(self, user_data: UserCreate):
try:
# Check if user with same username or email exists
existing_user = (
self.db.query(Users)
.filter(Users.email == user_data.email.lower())
.first()
)
if existing_user:
raise ValidationException(
"User with same email already exists"
)
# Create a new user instance
new_user = Users(
username=user_data.username,
email=user_data.email.lower(),
password=hash_password(user_data.password),
clinicRole=user_data.clinicRole,
userType=user_data.userType,
)
# Add to database and commit
self.db.add(new_user)
self.db.commit()
self.db.refresh(new_user)
user_dict = new_user.__dict__.copy()
user_response = UserResponse(**user_dict).model_dump()
return user_response
except Exception as e:
logger.error("Error creating user", e)
self.db.rollback()
if e.__class__ == ValidationException:
raise ValidationException(e.message)
if e.__class__ == ResourceNotFoundException:
raise ResourceNotFoundException(e.message)
raise e
def get_user(self, user_id) -> UserResponse:
# Query the user by ID
user = self.db.query(Users).filter(Users.id == user_id).first()
if not user:
logger.error("User not found")
raise ResourceNotFoundException("User not found")
user_dict = user.__dict__.copy()
user_response = UserResponse(**user_dict).model_dump()
return user_response
async def get_user_by_email(self, email: str) -> UserResponse:
user = self.db.query(Users).filter(Users.email == email.lower()).first()
if not user:
logger.error("User not found")
raise ResourceNotFoundException("User not found")
user_dict = user.__dict__.copy()
user_response = UserResponse(**user_dict)
return user_response
def update_user(self, user_id, user):
return user
def delete_user(self, user_id):
return user