135 lines
5.4 KiB
Python
135 lines
5.4 KiB
Python
import dotenv
|
|
import json
|
|
|
|
dotenv.load_dotenv()
|
|
import os
|
|
import boto3
|
|
from logging import getLogger
|
|
|
|
logger = getLogger(__name__)
|
|
|
|
class EmailService:
|
|
def __init__(self):
|
|
self.client = boto3.client(
|
|
"ses",
|
|
region_name=os.getenv("AWS_REGION"),
|
|
aws_access_key_id=os.getenv("AWS_ACCESS_KEY"),
|
|
aws_secret_access_key=os.getenv("AWS_SECRET_KEY"),
|
|
)
|
|
self.senderEmail = os.getenv("AWS_SENDER_EMAIL")
|
|
|
|
def createTemplate(self):
|
|
"""Create or update email templates"""
|
|
try:
|
|
otp_template = {
|
|
"TemplateName": "sendOTP",
|
|
"SubjectPart": "Your OTP for login is {{otp}}",
|
|
"HtmlPart": """
|
|
<p>Dear User,</p>
|
|
<p>Thank you for using our service. To complete your authentication, please use the following one-time password (OTP):</p>
|
|
<p>OTP: {{otp}}</p>
|
|
<p>This OTP is valid for 15 minutes. Do not share this OTP with anyone for security reasons. If you did not request this OTP, please ignore this email.</p>
|
|
<p>Thank you,<br/>Team 24x7 AIHR</p>
|
|
""",
|
|
"TextPart": "Dear User, Thank you for using our service. To complete your authentication, please use the following one-time password (OTP): OTP: {{otp}} This OTP is valid for 15 minutes. Do not share this OTP with anyone for security reasons. If you did not request this OTP, please ignore this email. Thank you, Team 24x7 AIHR"
|
|
}
|
|
|
|
new_clinic_template = {
|
|
"TemplateName": "newClinic",
|
|
"SubjectPart": "New Clinic Added",
|
|
"HtmlPart": """
|
|
<p>Dear Admin,</p>
|
|
<p>A new clinic has been added to the system.</p>
|
|
<p>Thank you,<br/>Team 24x7 AIHR</p>
|
|
""",
|
|
"TextPart": "Dear Admin, A new clinic has been added to the system. Thank you, Team 24x7 AIHR"
|
|
}
|
|
|
|
reject_clinic_template = {
|
|
"TemplateName": "rejectClinic",
|
|
"SubjectPart": "Clinic Rejected",
|
|
"HtmlPart": """
|
|
<p>Dear User,</p>
|
|
<p>Your clinic {{name}} has been rejected.</p>
|
|
<p>Thank you,<br/>Team 24x7 AIHR</p>
|
|
""",
|
|
"TextPart": "Dear User, Your clinic {{name}} has been rejected. Thank you, Team 24x7 AIHR"
|
|
}
|
|
|
|
apporve_clinic_template = {
|
|
"TemplateName": "apporveClinic",
|
|
"SubjectPart": "Clinic Approved",
|
|
"HtmlPart": """
|
|
<p>Dear User,</p>
|
|
<p>Congratulations! Your clinic {{name}} has been approved.</p>
|
|
<p>Thank you,<br/>Team 24x7 AIHR</p>
|
|
""",
|
|
"TextPart": "Dear User, Congratulations! Your clinic {{name}} has been approved. Thank you, Team 24x7 AIHR"
|
|
}
|
|
|
|
self.client.create_template(Template=otp_template)
|
|
self.client.create_template(Template=new_clinic_template)
|
|
self.client.create_template(Template=reject_clinic_template)
|
|
self.client.create_template(Template=apporve_clinic_template)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to create template: {e}")
|
|
raise Exception("Failed to create template")
|
|
|
|
def send_email(self, template_name: str, to_address: str, template_data: dict) -> None:
|
|
"""Send an email using a template"""
|
|
try:
|
|
response = self.client.send_templated_email(
|
|
Source=self.senderEmail,
|
|
Destination={
|
|
'ToAddresses': [to_address]
|
|
},
|
|
Template=template_name,
|
|
TemplateData=json.dumps(template_data),
|
|
ReplyToAddresses=[self.senderEmail]
|
|
)
|
|
logger.info(f"Email sent to {to_address} successfully. MessageId: {response['MessageId']}")
|
|
return
|
|
except Exception as e:
|
|
logger.error(f"Error sending email to {to_address}: {str(e)}")
|
|
raise
|
|
|
|
def send_otp_email(self, email: str, otp: str) -> None:
|
|
try:
|
|
"""Send OTP email"""
|
|
self.send_email(
|
|
template_name="sendOTP",
|
|
to_address=email,
|
|
template_data={"otp": otp, "email": email}
|
|
)
|
|
return
|
|
except Exception as e:
|
|
logger.error(f"Error sending OTP email to {email}: {str(e)}")
|
|
raise
|
|
|
|
def send_new_clinic_email(self, email: str, clinic_name: str):
|
|
"""Send new clinic email"""
|
|
self.send_email(
|
|
template_name="newClinic",
|
|
to_address=email,
|
|
template_data={"clinic_name": clinic_name, "email": email}
|
|
)
|
|
return
|
|
|
|
def send_reject_clinic_email(self, email: str, clinic_name: str):
|
|
"""Send reject clinic email"""
|
|
self.send_email(
|
|
template_name="rejectClinic",
|
|
to_address=email,
|
|
template_data={"clinic_name": clinic_name, "email": email}
|
|
)
|
|
return
|
|
|
|
def send_apporve_clinic_email(self, email: str, clinic_name: str):
|
|
"""Send apporve clinic email"""
|
|
self.send_email(
|
|
template_name="apporveClinic",
|
|
to_address=email,
|
|
template_data={"clinic_name": clinic_name, "email": email}
|
|
)
|
|
return |