health-apps-backend/services/stripeServices.py

299 lines
11 KiB
Python

import json
import os
import dotenv
dotenv.load_dotenv()
from models import ClinicOffers, StripeUsers
from services.dashboardService import DashboardService
from exceptions.validation_exception import ValidationException
from exceptions.resource_not_found_exception import ResourceNotFoundException
from exceptions.unauthorized_exception import UnauthorizedException
from models import Clinics,PaymentSessions
import uuid
from fastapi import Request
from datetime import datetime
from models import PaymentLogs
from enums.enums import ClinicStatus, UserType
from database import get_db
from sqlalchemy.orm import Session
import stripe
from loguru import logger
from decimal import Decimal
class StripeServices:
def __init__(self):
self.db: Session = next(get_db())
self.logger = logger
self.webhook_secret = os.getenv("STRIPE_WEBHOOK_SECRET")
self.dashboard_service = DashboardService()
async def create_customer(self, user_id: int, email: str, name: str):
try:
customer = stripe.Customer.create(
email=email, name=name, metadata={"user_id": user_id}
)
return customer
except stripe.error.StripeError as e:
self.logger.error(f"Error creating customer: {e}")
raise
async def delete_customer(self, customer_id: str):
try:
stripe.Customer.delete(customer_id)
except stripe.error.StripeError as e:
self.logger.error(f"Error deleting customer: {e}")
raise
async def create_account(self, user_id: int, email: str, name: str, phone: str):
try:
account = stripe.Account.create(
type="express",
country="AU",
capabilities={
"card_payments": {"requested": True},
"transfers": {"requested": True},
},
business_type="individual",
individual={"first_name": name, "email": email},
metadata={"user_id": user_id},
)
return account
except stripe.error.StripeError as e:
self.logger.error(f"Error creating account: {e}")
raise
async def delete_account(self, account_id: str):
try:
stripe.Account.delete(account_id)
except stripe.error.StripeError as e:
self.logger.error(f"Error deleting account: {e}")
raise
async def create_payment_session(self, user):
try:
if user["userType"] != UserType.CLINIC_ADMIN:
raise UnauthorizedException("User is not authorized to perform this action")
clinic = user["created_clinics"][0]
if clinic["status"] != ClinicStatus.PAYMENT_DUE:
raise ValidationException("Clinic is not due for payment")
customer = self.db.query(StripeUsers).filter(StripeUsers.user_id == user['id']).first()
if not customer:
raise ResourceNotFoundException("Customer not found")
clinic_offers = self.db.query(ClinicOffers).filter(ClinicOffers.clinic_email == clinic["email"]).first()
signup_pricing= await self.dashboard_service.get_signup_pricing_master()
fees_to_be = {
"setup_fees": signup_pricing.setup_fees,
"subscription_fees": signup_pricing.subscription_fees,
"per_call_charges": signup_pricing.per_call_charges,
"total": signup_pricing.setup_fees + signup_pricing.subscription_fees + signup_pricing.per_call_charges
}
if clinic_offers:
fees_to_be["setup_fees"] = clinic_offers.setup_fees
fees_to_be["per_call_charges"] = clinic_offers.per_call_charges
fees_to_be["total"] = clinic_offers.setup_fees + fees_to_be["subscription_fees"] + clinic_offers.per_call_charges
# remove previouis payment session
self.db.query(PaymentSessions).filter(PaymentSessions.clinic_id == clinic["id"]).delete()
payment_link = await self.create_subscription_checkout(fees_to_be, clinic["id"], customer.account_id, customer.customer_id)
return payment_link.url
except Exception as e:
self.logger.error(f"Error creating payment session: {e}")
raise
finally:
self.db.close()
async def create_checkout_session(self, user_id: int):
try:
checkout_session = stripe.checkout.Session.create(
payment_method_types=["card"],
line_items=[
{
"price_data": {
"currency": "aud",
"product_data": {
"name": "Willio Voice Subscription",
},
"unit_amount": 5000,
},
"quantity": 1,
}
],
expand=["payment_intent"],
mode="payment",
payment_intent_data={"metadata": {"order_id": "1"}},
success_url="http://54.79.156.66/",
cancel_url="http://54.79.156.66/",
metadata={"user_id": user_id},
)
return checkout_session
except stripe.error.StripeError as e:
self.logger.error(f"Error creating checkout session: {e}")
raise
async def create_setup_fees(self, customer_id: str, amount: int):
try:
setup_intent = stripe.InvoiceItem.create(
customer=customer_id,
amount=amount,
currency="aud",
description="Setup Fees",
)
return setup_intent
except stripe.error.StripeError as e:
self.logger.error(f"Error creating setup intent: {e}")
raise
async def create_subscription_checkout(self, fees_to_be: dict, clinic_id: int, account_id: str, customer_id: str):
try:
unique_id = str(uuid.uuid4())
unique_clinic_id = f"clinic_{clinic_id}_{unique_id}"
line_items = [{
'price_data': {
'currency': 'aud',
'product_data': {
'name': 'Monthly Subscription',
},
'unit_amount': int(fees_to_be["subscription_fees"] * 100), # Convert to cents
'recurring': {
'interval': 'year',
},
},
'quantity': 1,
}]
line_items.append({
'price_data': {
'currency': 'aud',
'product_data': {
'name': 'Per Call',
},
'unit_amount': int(fees_to_be["per_call_charges"] * 100), # Convert to cents
},
'quantity': 1,
})
line_items.append({
'price_data': {
'currency': 'aud',
'product_data': {
'name': 'Setup Fee',
},
'unit_amount': int(fees_to_be["setup_fees"] * 100), # Convert to cents
},
'quantity': 1,
})
metadata = {
"clinic_id": clinic_id,
"unique_clinic_id": unique_clinic_id,
"account_id": account_id,
"customer_id": customer_id,
"fees_to_be": json.dumps(fees_to_be)
}
session_data = {
'customer': customer_id,
'payment_method_types': ['card'],
'mode': 'subscription',
'line_items': line_items,
'success_url': 'http://54.79.156.66/',
'cancel_url': 'http://54.79.156.66/',
'metadata': metadata,
'subscription_data': {
'metadata': metadata
}
}
session = stripe.checkout.Session.create(**session_data)
payment_log = PaymentLogs(
customer_id=customer_id,
account_id=account_id,
amount=Decimal(str(fees_to_be["total"])), # Keep as Decimal for database storage
clinic_id=clinic_id,
unique_clinic_id=unique_clinic_id,
payment_status="pending",
metadata_logs=json.dumps(metadata)
)
new_payment_session = PaymentSessions(
session_id=session.id,
customer_id=customer_id,
clinic_id=clinic_id,
status="pending"
)
self.db.add(payment_log)
self.db.add(new_payment_session)
self.db.commit()
return session
except stripe.error.StripeError as e:
self.logger.error(f"Error creating checkout session: {e}")
raise
finally:
self.db.close()
async def handle_webhook(self, request: Request):
try:
payload = await request.body()
event = stripe.Webhook.construct_event(
payload, request.headers.get("Stripe-Signature"), self.webhook_secret
)
self.logger.info(f"Stripe webhook event type: {event['type']}")
if event["type"] == "invoice.payment_succeeded":
pass
if event["type"] == "checkout.session.async_payment_succeeded":
self.logger.info("Async payment succeeded")
elif event["type"] == "checkout.session.completed":
self.update_payment_log(event["data"]["object"]["metadata"]["unique_clinic_id"], event["data"]["object"]["metadata"]["clinic_id"])
# TODO: handle subscription period end
return event
except ValueError as e:
self.logger.error(f"Invalid payload: {e}")
except stripe.error.SignatureVerificationError as e:
self.logger.error(f"Invalid signature: {e}")
finally:
self.db.close()
def update_payment_log(self, unique_clinic_id:str, clinic_id:int):
try:
payment_log = self.db.query(PaymentLogs).filter(PaymentLogs.unique_clinic_id == unique_clinic_id).first()
self.db.query(PaymentSessions).filter(PaymentSessions.clinic_id == clinic_id).delete()
if payment_log:
payment_log.payment_status = "success"
self.db.commit()
clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first()
if clinic:
clinic.status = ClinicStatus.UNDER_REVIEW
self.db.commit()
except Exception as e:
self.logger.error(f"Error updating payment log: {e}")
finally:
self.db.close()