feat: staff management apis

This commit is contained in:
deepvasoya 2025-05-21 15:59:33 +05:30
parent 7cae614e37
commit 9c39d369c1
13 changed files with 360 additions and 21 deletions

View File

@ -1,8 +1,11 @@
from fastapi import APIRouter, Request, status
from fastapi import APIRouter, Request
from services.clinicServices import ClinicServices
from schemas.UpdateSchemas import ClinicStatusUpdate
from schemas.ApiResponse import ApiResponse
from schemas.BaseSchemas import CreateSuperAdmin, ResetPasswordBase
from services.authService import AuthService
from utils.constants import DEFAULT_LIMIT, DEFAULT_PAGE
router = APIRouter()
@ -11,3 +14,18 @@ router = APIRouter()
def update_clinic_status(req:Request, data: ClinicStatusUpdate):
response = ClinicServices().update_clinic_status(req.state.user, data.clinic_id, data.status, data.documentStatus, data.rejection_reason)
return ApiResponse(data=response, message="Clinic status updated successfully")
@router.post("/user")
def create_user(req:Request, user_data: CreateSuperAdmin):
AuthService().create_super_admin(req.state.user, user_data)
return ApiResponse(data="OK", message="User created successfully")
@router.get("/")
def get_users(req:Request, limit:int = DEFAULT_LIMIT, page:int = DEFAULT_PAGE, search:str = ""):
if page < 1:
page = 1
offset = (page - 1) * limit
users = AuthService().get_admins(req.state.user, limit, offset, search)
return ApiResponse(data=users, message="Users retrieved successfully")

View File

@ -2,7 +2,7 @@ from fastapi import APIRouter, BackgroundTasks
from services.authService import AuthService
from schemas.CreateSchemas import UserCreate
from schemas.ApiResponse import ApiResponse
from schemas.BaseSchemas import AuthBase, AuthOTP
from schemas.BaseSchemas import AuthBase, AuthOTP, ResetPasswordBase
from services.clinicServices import ClinicServices
from http import HTTPStatus
@ -34,6 +34,17 @@ def get_latest_clinic_id():
)
@router.post('/admin/forget-password')
def forget_password(email: str):
AuthService().forget_password(email)
return ApiResponse(data="OK", message="Password reset email sent successfully")
@router.post('/admin/reset-password')
def reset_password(data: ResetPasswordBase):
AuthService().reset_password(data.token, data.password)
return ApiResponse(data="OK", message="Password reset successfully")
@router.post("/send-otp")
def send_otp(email: str):
AuthService().send_otp(email)

View File

@ -14,6 +14,7 @@ from apis import api_router
# middleware
from middleware.ErrorHandlerMiddleware import ErrorHandlerMiddleware, configure_exception_handlers
from middleware.CustomRequestTypeMiddleware import TextPlainMiddleware
from services.emailService import EmailService
dotenv.load_dotenv()
@ -69,6 +70,8 @@ configure_exception_handlers(app)
@app.get("/")
async def hello_world():
# email_service = EmailService()
# email_service.createTemplate()
return {"Hello": "World"}

View File

@ -0,0 +1,35 @@
"""updated_user_table2
Revision ID: 48785e34c37c
Revises: 7d1c821a7e05
Create Date: 2025-05-21 13:54:54.833038
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '48785e34c37c'
down_revision: Union[str, None] = '7d1c821a7e05'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
# op.drop_index('ix_users_username', table_name='users')
# op.create_index(op.f('ix_users_username'), 'users', ['username'], unique=False)
pass
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_users_username'), table_name='users')
op.create_index('ix_users_username', 'users', ['username'], unique=True)
# ### end Alembic commands ###

View File

@ -0,0 +1,32 @@
"""updated_user_table
Revision ID: 7d1c821a7e05
Revises: ec157808ef2a
Create Date: 2025-05-21 13:51:39.680812
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '7d1c821a7e05'
down_revision: Union[str, None] = 'ec157808ef2a'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('users', 'mobile', nullable=True)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('users', 'mobile', nullable=False)
# ### end Alembic commands ###

View File

@ -0,0 +1,11 @@
from sqlalchemy import Column, Integer, String
from database import Base
from .CustomBase import CustomBase
class ResetPasswordTokens(Base, CustomBase):
__tablename__ = "reset_password_tokens"
id = Column(Integer, primary_key=True)
email = Column(String)
token = Column(String)

View File

@ -14,7 +14,7 @@ class Users(Base, CustomBase):
clinicRole = Column(Enum(ClinicUserRoles), nullable=True)
userType = Column(Enum(UserType), nullable=True)
profile_pic = Column(String, nullable=True)
mobile = Column(String)
mobile = Column(String, nullable=True)
# Notification relationships
sent_notifications = relationship("Notifications", foreign_keys="Notifications.sender_id", back_populates="sender")

View File

@ -14,6 +14,7 @@ from .BlockedEmail import BlockedEmail
from .SignupPricingMaster import SignupPricingMaster
from .ClinicFileVerifications import ClinicFileVerifications
from .OTP import OTP
from .ResetPasswordTokens import ResetPasswordTokens
__all__ = [
"Users",
@ -31,5 +32,6 @@ __all__ = [
"BlockedEmail",
"SignupPricingMaster",
"ClinicFileVerifications",
"OTP"
"OTP",
"ResetPasswordTokens"
]

View File

@ -12,7 +12,6 @@ class SNSBase(BaseModel):
SubscribeURL: str
Message: str
class AuthOTP(BaseModel):
email: EmailStr
otp: str
@ -33,6 +32,15 @@ class AuthBase(BaseModel):
email: EmailStr
password: str
class ResetPasswordBase(BaseModel):
token: str
password: str
class CreateSuperAdmin(BaseModel):
username:str
email:EmailStr
# Base schemas (shared attributes for create/read operations)
class ClinicBase(BaseModel):
name: str
@ -101,7 +109,7 @@ class UserBase(BaseModel):
password: str
clinicRole: Optional[ClinicUserRoles] = None
userType: Optional[UserType] = None
mobile: str
mobile: Optional[str] = None
class ClinicDoctorBase(BaseModel):

View File

@ -1,17 +1,28 @@
from operator import or_
import os
import dotenv
dotenv.load_dotenv()
from interface.common_response import CommonResponse
from schemas.ResponseSchemas import UserResponse
import datetime
import json
import urllib.request
from sqlalchemy.orm import Session
from services.jwtService import create_jwt_token
from services.userServices import UserServices
from models import BlockedEmail
from services.emailService import EmailService
from exceptions.validation_exception import ValidationException
from models import OTP
from enums.enums import UserType
from models import Users
from exceptions.resource_not_found_exception import ResourceNotFoundException
from models import ResetPasswordTokens
from utils.constants import generateOTP
from utils.password_utils import verify_password
from utils.password_utils import generate_reset_password_token, generate_secure_password, hash_password, verify_password
from schemas.CreateSchemas import UserCreate
from schemas.BaseSchemas import AuthBase, AuthOTP
from schemas.BaseSchemas import AuthBase, AuthOTP, CreateSuperAdmin
from exceptions.unauthorized_exception import UnauthorizedException
from database import get_db
@ -23,6 +34,7 @@ class AuthService:
self.user_service = UserServices()
self.db = next(get_db())
self.email_service = EmailService()
self.url = os.getenv("FRONTEND_URL")
def login(self, data: AuthBase) -> str:
@ -107,3 +119,110 @@ class AuthService:
# self.db.commit()
return
def get_admins(self, user, limit:int, offset:int, search:str):
try:
if user["userType"] != UserType.SUPER_ADMIN:
raise UnauthorizedException("User is not authorized to perform this action")
admins = self.db.query(Users).filter(Users.userType == UserType.SUPER_ADMIN)
total = self.db.query(Users).filter(Users.userType == UserType.SUPER_ADMIN).count()
if search:
admins = admins.filter(
or_(
Users.username.contains(search),
Users.email.contains(search),
)
)
total = admins.count()
admins = admins.limit(limit).offset(offset).all()
response = [UserResponse(**admin.__dict__.copy()) for admin in admins]
common_response = CommonResponse(data=response, total=total)
return common_response
except Exception as e:
raise e
def create_super_admin(self, user, data: CreateSuperAdmin):
if user["userType"] != UserType.SUPER_ADMIN:
raise UnauthorizedException("User is not authorized to perform this action")
password = generate_secure_password()
hashed_password = hash_password(password)
# check if username and email are unique
existing_user = (
self.db.query(Users)
.filter(
Users.email == data.email.lower(),
)
.first()
)
if existing_user:
raise ValidationException("User with same email already exists")
user = Users(
username=data.username.lower(),
email=data.email.lower(),
password=hashed_password,
userType=UserType.SUPER_ADMIN,
)
self.db.add(user)
self.db.commit()
LOGIN_URL = self.url
# send email to user
self.email_service.send_new_admin_email(data.email, password, LOGIN_URL)
return
def forget_password(self, email: str):
user = self.db.query(Users).filter(Users.email == email.lower()).first()
if not user:
raise ResourceNotFoundException("User not found")
# get reset password token
reset_password_token = generate_reset_password_token()
reset_password = ResetPasswordTokens(email=email, token=reset_password_token)
self.db.add(reset_password)
self.db.commit()
reset_password_url = f"{self.url}/auth/reset-password?token={reset_password_token}"
self.email_service.send_reset_password_email(email, reset_password_url)
return
def reset_password(self, token: str, password: str):
reset_password = self.db.query(ResetPasswordTokens).filter(ResetPasswordTokens.token == token).first()
if not reset_password:
raise ResourceNotFoundException("Reset password token not found")
user = self.db.query(Users).filter(Users.email == reset_password.email).first()
if not user:
raise ResourceNotFoundException("User not found")
user.password = hash_password(password)
self.db.delete(reset_password)
self.db.commit()
return

View File

@ -67,10 +67,45 @@ class EmailService:
"TextPart": "Dear User, Congratulations! Your clinic {{name}} has been approved. Thank you, Team 24x7 AIHR"
}
self.client.create_template(Template=otp_template)
self.client.create_template(Template=new_clinic_template)
self.client.create_template(Template=reject_clinic_template)
self.client.create_template(Template=apporve_clinic_template)
new_admin_template = {
"TemplateName": "newAdmin",
"SubjectPart": "Login Credentials",
"HtmlPart": """
<p>Dear User,</p>
<p>Your login credentials are:</p>
<div>
<p>Email: {{email}}</p>
<p>Password: {{password}}</p>
</div>
<br />
<p>Use the following link to login:</p>
<p>Login URL: {{login_url}}</p>
<br />
<p>Thank you,<br/>Team 24x7 AIHR</p>
""",
"TextPart": "Dear User, Your login credentials are: Email: {{email}} Password: {{password}} Login URL: {{login_url}} Thank you, Team 24x7 AIHR"
}
reset_password_template = {
"TemplateName": "resetPassword",
"SubjectPart": "Reset Password",
"HtmlPart": """
<p>Dear User,</p>
<p>You have requested to reset your password. Please use the following link to reset your password:</p>
<p>Reset Password URL: {{reset_password_url}}</p>
<br />
<p>Thank you,<br/>Team 24x7 AIHR</p>
""",
"TextPart": "Dear User, You have requested to reset your password. Please use the following link to reset your password: Reset Password URL: {{reset_password_url}} Thank you, Team 24x7 AIHR"
}
# self.client.create_template(Template=otp_template)
# self.client.create_template(Template=new_clinic_template)
# self.client.create_template(Template=reject_clinic_template)
# self.client.create_template(Template=apporve_clinic_template)
# self.client.create_template(Template=new_admin_template)
# self.client.create_template(Template=new_admin_template)
self.client.create_template(Template=reset_password_template)
except Exception as e:
logger.error(f"Failed to create template: {e}")
@ -133,3 +168,21 @@ class EmailService:
template_data={"clinic_name": clinic_name, "email": email}
)
return
def send_new_admin_email(self, email: str, password:str, login_url:str):
"""Send new admin email"""
self.send_email(
template_name="newAdmin",
to_address=email,
template_data={"email": email, "password": password, "login_url": login_url}
)
return
def send_reset_password_email(self, email: str, reset_password_url: str):
"""Send reset password email"""
self.send_email(
template_name="resetPassword",
to_address=email,
template_data={"reset_password_url": reset_password_url}
)
return

View File

@ -186,17 +186,20 @@ class UserServices:
return response
def get_user_by_email(self, email: str) -> UserResponse:
user = self.db.query(Users).filter(Users.email == email.lower()).first()
try:
user = self.db.query(Users).filter(Users.email == email.lower()).first()
if not user:
logger.error("User not found")
raise ResourceNotFoundException("User not found")
if not user:
logger.error("User not found")
raise ResourceNotFoundException("User not found")
user_dict = user.__dict__.copy()
user_dict = user.__dict__.copy()
user_response = UserResponse(**user_dict)
user_response = UserResponse(**user_dict)
return user_response
return user_response
except Exception as e:
raise e
def update_user(self, admin_id:int|None, user_id: int, user_data: UserUpdate) -> UserResponse:
# Check admin authorization if admin_id is provided

View File

@ -1,8 +1,13 @@
from passlib.context import CryptContext
import string
import secrets
# Create a password context for hashing and verifying
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def generate_reset_password_token():
return secrets.token_urlsafe(32)
def hash_password(password: str) -> str:
"""
Hash a password using bcrypt
@ -14,3 +19,42 @@ def verify_password(plain_password: str, hashed_password: str) -> bool:
Verify a password against a hash
"""
return pwd_context.verify(plain_password, hashed_password)
def generate_secure_password(length: int = 16, include_special: bool = True) -> str:
"""
Generate a cryptographically secure random password
Args:
length: Length of the password (default 16)
include_special: Include special characters (default True)
Returns:
A secure random password string
"""
# Define character sets
lowercase = string.ascii_lowercase
uppercase = string.ascii_uppercase
digits = string.digits
special_chars = string.punctuation if include_special else ""
# Combined character set
all_chars = lowercase + uppercase + digits + special_chars
# Ensure at least one character from each required group
password = [
secrets.choice(lowercase),
secrets.choice(uppercase),
secrets.choice(digits)
]
if include_special and special_chars:
password.append(secrets.choice(special_chars))
# Fill remaining length with random characters
password.extend(secrets.choice(all_chars) for _ in range(length - len(password)))
# Shuffle the password characters
secrets.SystemRandom().shuffle(password)
return ''.join(password)