health-apps-cms/services/stripeServices.py

554 lines
19 KiB
Python

import json
import os
import dotenv
dotenv.load_dotenv()
from models import ClinicOffers, StripeUsers
from services.dashboardService import DashboardService
from exceptions.validation_exception import ValidationException
from exceptions.resource_not_found_exception import ResourceNotFoundException
from exceptions.unauthorized_exception import UnauthorizedException
from models import Clinics, PaymentSessions, Subscriptions
import uuid
from fastapi import Request
from datetime import datetime
from models import PaymentLogs
from enums.enums import ClinicStatus, UserType
from schemas.ResponseSchemas import StripeUserReponse
from database import get_db
from sqlalchemy.orm import Session
import stripe
from loguru import logger
from decimal import Decimal
class StripeServices:
def __init__(self):
self.db: Session = next(get_db())
self.logger = logger
self.webhook_secret = os.getenv("STRIPE_WEBHOOK_SECRET")
self.redirect_url = os.getenv("FRONTEND_URL")
self.dashboard_service = DashboardService()
async def create_customer(self, user_id: int, email: str, name: str):
try:
customer = await stripe.Customer.create_async(
email=email, name=name, metadata={"user_id": user_id}
)
return customer
except stripe.error.StripeError as e:
self.logger.error(f"Error creating customer: {e}")
raise
async def delete_customer(self, customer_id: str):
try:
await stripe.Customer.delete_async(customer_id)
except stripe.error.StripeError as e:
self.logger.error(f"Error deleting customer: {e}")
raise
async def create_account(self, user_id: int, email: str, name: str, phone: str):
try:
account = await stripe.Account.create_async(
type="express",
country="AU",
capabilities={
"card_payments": {"requested": True},
"transfers": {"requested": True},
},
business_type="individual",
individual={"first_name": name, "email": email},
metadata={"user_id": user_id},
)
return account
except stripe.error.StripeError as e:
self.logger.error(f"Error creating account: {e}")
raise
async def delete_account(self, account_id: str):
try:
await stripe.Account.delete_async(account_id)
except stripe.error.StripeError as e:
self.logger.error(f"Error deleting account: {e}")
raise
async def get_stripe_data(self, clinic_id: int):
try:
user = (
self.db.query(StripeUsers)
.filter(StripeUsers.clinic_id == clinic_id)
.first()
)
if not user:
self.logger.error(f"User not found!")
raise ResourceNotFoundException("User not found!")
return StripeUserReponse.model_validate(user).model_dump()
except Exception as e:
self.logger.error(f"Error retrieving account data: {e}")
raise
finally:
self.db.close()
async def create_stripe_account_link(self, user):
try:
stripe_account = await self.get_stripe_data(user["created_clinics"][0]["id"])
if not stripe_account:
self.logger.error("Stripe account not found!")
raise ResourceNotFoundException("Stripe account not found!")
# Pass the account_id as a string, not as a dictionary
data = await stripe.AccountLink.create_async(
account=stripe_account["account_id"],
refresh_url=self.redirect_url,
return_url=self.redirect_url,
type="account_onboarding",
)
return data.url
except Exception as e:
self.logger.error(f"Error creating stripe account link: {e}")
raise
async def check_account_capabilities(self, user):
try:
stripe_account = await self.get_stripe_data(user["created_clinics"][0]["id"])
if not stripe_account:
self.logger.error("Stripe account not found!")
raise ResourceNotFoundException("Stripe account not found!")
data = await stripe.Account.retrieve_async(stripe_account["account_id"])
return {
"capabilities": data.capabilities,
"charges_enabled": data.charges_enabled,
"requirements": data.requirements.currently_due,
"error": data.requirements.errors,
}
except Exception as e:
self.logger.error(f"Error checking stripe account capabilities: {e}")
raise
finally:
self.db.close()
async def get_invoice(self, user):
try:
if user["userType"] != UserType.CLINIC_ADMIN:
raise UnauthorizedException(
"User is not authorized to perform this action"
)
clinic = (
self.db.query(Clinics).filter(Clinics.creator_id == user["id"]).first()
)
if not clinic:
raise ResourceNotFoundException("Clinic not found!")
customer = (
self.db.query(StripeUsers)
.filter(StripeUsers.user_id == user["id"])
.first()
)
if not customer:
raise ResourceNotFoundException("Customer not found!")
subscription = (
self.db.query(Subscriptions)
.filter(
Subscriptions.clinic_id == clinic.id,
Subscriptions.customer_id == customer.customer_id,
Subscriptions.status == "active",
)
.first()
)
if not subscription:
raise ResourceNotFoundException("Subscription not found!")
stripe_subscription = await stripe.Subscription.retrieve_async(
subscription.subscription_id
)
invoice = await stripe.Invoice.retrieve_async(
stripe_subscription["latest_invoice"]
)
return invoice.hosted_invoice_url
except stripe.error.StripeError as e:
self.logger.error(f"Error getting invoice: {e}")
raise
except Exception as e:
self.logger.error(f"Error getting invoice: {e}")
raise
finally:
self.db.close()
# NOTE: in case when checkout session expired or not created
async def create_payment_session(self, user):
try:
if user["userType"] != UserType.CLINIC_ADMIN:
raise UnauthorizedException(
"User is not authorized to perform this action"
)
clinic = user["created_clinics"][0]
if clinic["status"] != ClinicStatus.PAYMENT_DUE:
raise ValidationException("Clinic is not due for payment")
customer = (
self.db.query(StripeUsers)
.filter(StripeUsers.user_id == user["id"])
.first()
)
if not customer:
raise ResourceNotFoundException("Customer not found")
clinic_offers = (
self.db.query(ClinicOffers)
.filter(ClinicOffers.clinic_email == clinic["email"])
.first()
)
signup_pricing = await self.dashboard_service.get_signup_pricing_master()
fees_to_be = {
"setup_fees": signup_pricing.setup_fees,
"subscription_fees": signup_pricing.subscription_fees,
"per_call_charges": signup_pricing.per_call_charges,
"total": signup_pricing.setup_fees
+ signup_pricing.subscription_fees
+ signup_pricing.per_call_charges,
}
if clinic_offers:
fees_to_be["setup_fees"] = clinic_offers.setup_fees
fees_to_be["per_call_charges"] = clinic_offers.per_call_charges
fees_to_be["total"] = (
clinic_offers.setup_fees
+ fees_to_be["subscription_fees"]
+ clinic_offers.per_call_charges
)
payment_link = await self.create_subscription_checkout(
fees_to_be, clinic["id"], customer.account_id, customer.customer_id
)
return payment_link.url
except Exception as e:
self.logger.error(f"Error creating payment session: {e}")
raise
finally:
self.db.close()
# NOTE: This is not used
# async def create_checkout_session(self, user_id: int):
# try:
# checkout_session = await stripe.checkout.Session.create_async(
# payment_method_types=["card"],
# line_items=[
# {
# "price_data": {
# "currency": "aud",
# "product_data": {
# "name": "Willio Voice Subscription",
# },
# "unit_amount": 5000,
# },
# "quantity": 1,
# }
# ],
# expand=["payment_intent"],
# mode="payment",
# payment_intent_data={"metadata": {"order_id": "1"}},
# success_url=f"{self.redirect_url}auth/waiting",
# cancel_url=f"{self.redirect_url}auth/waiting",
# metadata={"user_id": user_id},
# )
# return checkout_session
# except stripe.error.StripeError as e:
# self.logger.error(f"Error creating checkout session: {e}")
# raise
async def create_subscription_checkout(
self, fees_to_be: dict, clinic_id: int, account_id: str, customer_id: str
):
try:
unique_id = str(uuid.uuid4())
unique_clinic_id = f"clinic_{clinic_id}_{unique_id}"
line_items = [
{
"price_data": {
"currency": "aud",
"product_data": {
"name": "Monthly Subscription",
},
"unit_amount": int(
fees_to_be["subscription_fees"] * 100
), # Convert to cents
"recurring": {
"interval": "year",
"interval_count": 3, #NOTE: max 3 years supported by stripe
},
},
"quantity": 1,
}
]
line_items.append(
{
"price_data": {
"currency": "aud",
"product_data": {
"name": "Per Call",
},
"unit_amount": int(
fees_to_be["per_call_charges"] * 100
), # Convert to cents
},
"quantity": 1,
}
)
line_items.append(
{
"price_data": {
"currency": "aud",
"product_data": {
"name": "Setup Fee",
},
"unit_amount": int(
fees_to_be["setup_fees"] * 100
), # Convert to cents
},
"quantity": 1,
}
)
metadata = {
"clinic_id": clinic_id,
"unique_clinic_id": unique_clinic_id,
"account_id": account_id,
"customer_id": customer_id,
"fees_to_be": json.dumps(fees_to_be),
}
session_data = {
"customer": customer_id,
"payment_method_types": ["card", "au_becs_debit"],
"mode": "subscription",
"line_items": line_items,
"success_url": f"{self.redirect_url}auth/waiting",
"cancel_url": f"{self.redirect_url}auth/waiting",
"metadata": metadata,
"subscription_data": {"metadata": metadata},
}
session = await stripe.checkout.Session.create_async(**session_data)
payment_log = PaymentLogs(
customer_id=customer_id,
account_id=account_id,
amount=Decimal(
str(fees_to_be["total"])
), # Keep as Decimal for database storage
clinic_id=clinic_id,
unique_clinic_id=unique_clinic_id,
payment_status="pending",
metadata_logs=json.dumps(metadata),
)
new_payment_session = PaymentSessions(
session_id=session.id,
customer_id=customer_id,
clinic_id=clinic_id,
status="pending",
)
self.db.add(payment_log)
self.db.add(new_payment_session)
self.db.commit()
return session
except stripe.error.StripeError as e:
self.logger.error(f"Error creating checkout session: {e}")
raise
finally:
self.db.close()
async def handle_webhook(self, request: Request):
try:
payload = await request.body()
event = stripe.Webhook.construct_event(
payload, request.headers.get("Stripe-Signature"), self.webhook_secret
)
self.logger.info(f"Stripe webhook event type: {event['type']}")
if event["type"] == "customer.subscription.deleted":
self.logger.info("customer subscription ended")
subscription_id = event["data"]["object"]["items"]["data"][0][
"subscription"
]
await self._subscription_expired(subscription_id)
if event["type"] == "checkout.session.completed":
unique_clinic_id = event["data"]["object"]["metadata"][
"unique_clinic_id"
]
clinic_id = event["data"]["object"]["metadata"]["clinic_id"]
customer_id = event["data"]["object"]["metadata"]["customer_id"]
account_id = event["data"]["object"]["metadata"]["account_id"]
total = event["data"]["object"]["amount_total"]
metadata = event["data"]["object"]["metadata"]
session_id = event["data"]["object"]["id"]
subscription_id = event["data"]["object"]["subscription"]
await self._update_payment_log(
unique_clinic_id,
clinic_id,
customer_id,
account_id,
total,
metadata,
)
await self._create_subscription_entry(
{
"clinic_id": clinic_id,
"customer_id": customer_id,
"account_id": account_id,
"session_id": session_id,
"subscription_id": subscription_id,
}
)
# TODO: handle subscription period end
return "OK"
except ValueError as e:
self.logger.error(f"Invalid payload: {e}")
except stripe.error.SignatureVerificationError as e:
self.logger.error(f"Invalid signature: {e}")
finally:
self.db.close()
async def _update_payment_log(
self,
unique_clinic_id: str,
clinic_id: int,
customer_id: str,
account_id: str,
total: float,
metadata: any,
):
try:
self.db.query(PaymentSessions).filter(
PaymentSessions.clinic_id == clinic_id
).delete()
payment_log = PaymentLogs(
customer_id=customer_id,
account_id=account_id,
amount=total,
clinic_id=clinic_id,
unique_clinic_id=unique_clinic_id,
payment_status="paid",
metadata_logs=json.dumps(metadata.to_dict()),
)
self.db.add(payment_log)
clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first()
if clinic:
clinic.status = ClinicStatus.UNDER_REVIEW
self.db.add(clinic)
return
except Exception as e:
self.logger.error(f"Error updating payment log: {e}")
finally:
self.db.commit()
self.db.close()
async def _create_subscription_entry(self, data: dict):
try:
subscription = stripe.Subscription.retrieve(data["subscription_id"])
metadata_dict = subscription.metadata
fees_to_be = json.loads(metadata_dict["fees_to_be"])
new_subscription = Subscriptions(
clinic_id=data["clinic_id"],
customer_id=data["customer_id"],
account_id=data["account_id"],
total=fees_to_be["total"],
setup_fee=fees_to_be["setup_fees"],
subscription_fee=fees_to_be["subscription_fees"],
per_call_charge=fees_to_be["per_call_charges"],
subscription_id=data["subscription_id"],
status=subscription.status,
current_period_start=subscription["items"]["data"][0][
"current_period_start"
],
current_period_end=subscription["items"]["data"][0][
"current_period_end"
],
metadata_logs=json.dumps(subscription.metadata),
)
self.db.add(new_subscription)
payment_session = PaymentSessions(
session_id=data["session_id"],
customer_id=data["customer_id"],
clinic_id=data["clinic_id"],
status="paid",
)
self.db.add(payment_session)
return
except Exception as e:
self.logger.error(f"Error creating subscription entry: {e}")
finally:
self.db.commit()
self.db.close()
async def _subscription_expired(self, subscription_id):
try:
subscription = stripe.Subscription.retrieve(subscription_id)
db_subscription = (
self.db.query(Subscriptions)
.filter(Subscriptions.subscription_id == subscription_id)
.first()
)
if not db_subscription:
self.logger.error("Subscription not found!")
raise Exception("Subscription not found!")
db_subscription.status = subscription.status
self.db.add(db_subscription)
# TODO: update clinic status
# TODO: send email to user
return
except Exception as e:
self.logger.error(f"Error ending subscription: {e}")
finally:
self.db.commit()
self.db.close()