health-apps-backend/services/authService.py

109 lines
3.7 KiB
Python

import datetime
import json
import urllib.request
from sqlalchemy.orm import Session
from services.jwtService import create_jwt_token
from services.userServices import UserServices
from models import BlockedEmail
from services.emailService import EmailService
from exceptions.validation_exception import ValidationException
from models import OTP
from utils.constants import generateOTP
from utils.password_utils import verify_password
from schemas.CreateSchemas import UserCreate
from schemas.BaseSchemas import AuthBase, AuthOTP
from exceptions.unauthorized_exception import UnauthorizedException
from database import get_db
class AuthService:
def __init__(self):
self.user_service = UserServices()
self.db = next(get_db())
self.email_service = EmailService()
def login(self, data: AuthBase) -> str:
# get user
user = self.user_service.get_user_by_email(data.email)
# verify password
if not verify_password(data.password, user.password):
raise UnauthorizedException("Invalid credentials")
# remove password from user dict
user_dict = user.__dict__.copy()
user_dict.pop("password", None)
# create token
token = create_jwt_token(user_dict)
return token
def register(self, user_data: UserCreate, background_tasks=None):
response = self.user_service.create_user(user_data, background_tasks)
user = {
"id": response.id,
"username": response.username,
"email": response.email,
"clinicRole": response.clinicRole,
"userType": response.userType,
"mobile": response.mobile,
"clinicId": response.created_clinics[0].id
}
token = create_jwt_token(user)
return token
def blockEmailSNS(self, body: str):
# confirm subscription
if body["Type"] == "SubscriptionConfirmation":
urllib.request.urlopen(body["SubscribeURL"])
# disable automatic unsubscribe confirmation by activating subscription again
elif body["Type"] == "UnsubscribeConfirmation":
urllib.request.urlopen(body["SubscribeURL"])
# handle bounce notifications only
elif body["Type"] == "Notification":
msg = json.loads(body["Message"])
# check if msg contains notificationType
if "notificationType" not in msg:
return
recepients = msg["bounce"]["bouncedRecipients"]
for recipient in recepients:
blockEmail = BlockedEmail(email=recipient["emailAddress"], reason=msg["notificationType"], severity=msg["bounce"]["bounceType"])
self.db.add(blockEmail)
self.db.commit()
return "OK"
def send_otp(self, email:str):
otp = generateOTP()
self.email_service.send_otp_email(email, otp)
# Create OTP record with proper datetime handling
expire_time = datetime.datetime.now() + datetime.timedelta(minutes=10)
otp_record = OTP(email=email, otp=otp, expireAt=expire_time)
self.db.add(otp_record)
self.db.commit()
return
def verify_otp(self, data: AuthOTP):
db_otp = self.db.query(OTP).filter(OTP.email == data.email, OTP.otp == data.otp).first()
if not db_otp:
raise ValidationException("Invalid OTP")
if db_otp.otp != data.otp:
raise ValidationException("Invalid OTP")
if db_otp.expireAt < datetime.datetime.now():
raise ValidationException("OTP expired")
# OTP is valid, delete it to prevent reuse
# self.db.delete(db_otp)
# self.db.commit()
return