564 lines
20 KiB
Python
564 lines
20 KiB
Python
import json
|
|
import os
|
|
import dotenv
|
|
|
|
|
|
dotenv.load_dotenv()
|
|
|
|
from models import ClinicOffers, StripeUsers
|
|
from services.dashboardService import DashboardService
|
|
from exceptions.validation_exception import ValidationException
|
|
from exceptions.resource_not_found_exception import ResourceNotFoundException
|
|
from exceptions.unauthorized_exception import UnauthorizedException
|
|
from models import Clinics, PaymentSessions, Subscriptions
|
|
import uuid
|
|
from fastapi import Request
|
|
from datetime import datetime
|
|
from models import PaymentLogs
|
|
from enums.enums import ClinicStatus, UserType
|
|
from schemas.ResponseSchemas import StripeUserReponse
|
|
|
|
from database import get_db
|
|
from sqlalchemy.orm import Session
|
|
import stripe
|
|
from loguru import logger
|
|
from decimal import Decimal
|
|
|
|
|
|
class StripeServices:
|
|
def __init__(self):
|
|
self.db: Session = next(get_db())
|
|
self.logger = logger
|
|
self.webhook_secret = os.getenv("STRIPE_WEBHOOK_SECRET")
|
|
self.redirect_url = os.getenv("FRONTEND_URL")
|
|
self.dashboard_service = DashboardService()
|
|
|
|
async def create_customer(self, user_id: int, email: str, name: str):
|
|
try:
|
|
customer = await stripe.Customer.create_async(
|
|
email=email, name=name, metadata={"user_id": user_id}
|
|
)
|
|
return customer
|
|
except stripe.error.StripeError as e:
|
|
self.logger.error(f"Error creating customer: {e}")
|
|
raise
|
|
|
|
async def delete_customer(self, customer_id: str):
|
|
try:
|
|
await stripe.Customer.delete_async(customer_id)
|
|
except stripe.error.StripeError as e:
|
|
self.logger.error(f"Error deleting customer: {e}")
|
|
raise
|
|
|
|
async def create_account(self, user_id: int, email: str, name: str, phone: str):
|
|
try:
|
|
account = await stripe.Account.create_async(
|
|
type="express",
|
|
country="AU",
|
|
capabilities={
|
|
"card_payments": {"requested": True},
|
|
"transfers": {"requested": True},
|
|
},
|
|
business_type="individual",
|
|
individual={"first_name": name, "email": email},
|
|
metadata={"user_id": user_id},
|
|
)
|
|
return account
|
|
except stripe.error.StripeError as e:
|
|
self.logger.error(f"Error creating account: {e}")
|
|
raise
|
|
|
|
async def delete_account(self, account_id: str):
|
|
try:
|
|
await stripe.Account.delete_async(account_id)
|
|
except stripe.error.StripeError as e:
|
|
self.logger.error(f"Error deleting account: {e}")
|
|
raise
|
|
|
|
async def get_stripe_data(self, clinic_id: int):
|
|
try:
|
|
user = (
|
|
self.db.query(StripeUsers)
|
|
.filter(StripeUsers.clinic_id == clinic_id)
|
|
.first()
|
|
)
|
|
|
|
if not user:
|
|
self.logger.error(f"User not found!")
|
|
raise ResourceNotFoundException("User not found!")
|
|
|
|
return StripeUserReponse.model_validate(user).model_dump()
|
|
except Exception as e:
|
|
self.logger.error(f"Error retrieving account data: {e}")
|
|
raise
|
|
finally:
|
|
self.db.close()
|
|
|
|
async def create_stripe_account_link(self, user):
|
|
try:
|
|
stripe_account = await self.get_stripe_data(user["created_clinics"][0]["id"])
|
|
|
|
if not stripe_account:
|
|
self.logger.error("Stripe account not found!")
|
|
raise ResourceNotFoundException("Stripe account not found!")
|
|
|
|
# Pass the account_id as a string, not as a dictionary
|
|
data = await stripe.AccountLink.create_async(
|
|
account=stripe_account["account_id"],
|
|
refresh_url=self.redirect_url,
|
|
return_url=self.redirect_url,
|
|
type="account_onboarding",
|
|
)
|
|
|
|
return data.url
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error creating stripe account link: {e}")
|
|
raise
|
|
|
|
async def check_account_capabilities(self, user):
|
|
try:
|
|
stripe_account = await self.get_stripe_data(user["created_clinics"][0]["id"])
|
|
|
|
if not stripe_account:
|
|
self.logger.error("Stripe account not found!")
|
|
raise ResourceNotFoundException("Stripe account not found!")
|
|
|
|
data = await stripe.Account.retrieve_async(stripe_account["account_id"])
|
|
|
|
return {
|
|
"capabilities": data.capabilities,
|
|
"charges_enabled": data.charges_enabled,
|
|
"requirements": data.requirements.currently_due,
|
|
"error": data.requirements.errors,
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error checking stripe account capabilities: {e}")
|
|
raise
|
|
finally:
|
|
self.db.close()
|
|
|
|
async def get_invoice(self, user):
|
|
try:
|
|
|
|
if user["userType"] != UserType.CLINIC_ADMIN:
|
|
raise UnauthorizedException(
|
|
"User is not authorized to perform this action"
|
|
)
|
|
|
|
clinic = (
|
|
self.db.query(Clinics).filter(Clinics.creator_id == user["id"]).first()
|
|
)
|
|
|
|
if not clinic:
|
|
raise ResourceNotFoundException("Clinic not found!")
|
|
|
|
customer = (
|
|
self.db.query(StripeUsers)
|
|
.filter(StripeUsers.user_id == user["id"])
|
|
.first()
|
|
)
|
|
|
|
if not customer:
|
|
raise ResourceNotFoundException("Customer not found!")
|
|
|
|
subscription = (
|
|
self.db.query(Subscriptions)
|
|
.filter(
|
|
Subscriptions.clinic_id == clinic.id,
|
|
Subscriptions.customer_id == customer.customer_id,
|
|
Subscriptions.status == "active",
|
|
)
|
|
.first()
|
|
)
|
|
|
|
if not subscription:
|
|
raise ResourceNotFoundException("Subscription not found!")
|
|
|
|
stripe_subscription = await stripe.Subscription.retrieve_async(
|
|
subscription.subscription_id
|
|
)
|
|
|
|
invoice = await stripe.Invoice.retrieve_async(
|
|
stripe_subscription["latest_invoice"]
|
|
)
|
|
return invoice.hosted_invoice_url
|
|
except stripe.error.StripeError as e:
|
|
self.logger.error(f"Error getting invoice: {e}")
|
|
raise
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting invoice: {e}")
|
|
raise
|
|
finally:
|
|
self.db.close()
|
|
|
|
async def create_payment_session(self, user):
|
|
try:
|
|
if user["userType"] != UserType.CLINIC_ADMIN:
|
|
raise UnauthorizedException(
|
|
"User is not authorized to perform this action"
|
|
)
|
|
|
|
clinic = user["created_clinics"][0]
|
|
|
|
if clinic["status"] != ClinicStatus.PAYMENT_DUE:
|
|
raise ValidationException("Clinic is not due for payment")
|
|
|
|
customer = (
|
|
self.db.query(StripeUsers)
|
|
.filter(StripeUsers.user_id == user["id"])
|
|
.first()
|
|
)
|
|
|
|
if not customer:
|
|
raise ResourceNotFoundException("Customer not found")
|
|
|
|
clinic_offers = (
|
|
self.db.query(ClinicOffers)
|
|
.filter(ClinicOffers.clinic_email == clinic["email"])
|
|
.first()
|
|
)
|
|
|
|
signup_pricing = await self.dashboard_service.get_signup_pricing_master()
|
|
|
|
fees_to_be = {
|
|
"setup_fees": signup_pricing.setup_fees,
|
|
"subscription_fees": signup_pricing.subscription_fees,
|
|
"per_call_charges": signup_pricing.per_call_charges,
|
|
"total": signup_pricing.setup_fees
|
|
+ signup_pricing.subscription_fees
|
|
+ signup_pricing.per_call_charges,
|
|
}
|
|
|
|
if clinic_offers:
|
|
fees_to_be["setup_fees"] = clinic_offers.setup_fees
|
|
fees_to_be["per_call_charges"] = clinic_offers.per_call_charges
|
|
fees_to_be["total"] = (
|
|
clinic_offers.setup_fees
|
|
+ fees_to_be["subscription_fees"]
|
|
+ clinic_offers.per_call_charges
|
|
)
|
|
|
|
payment_link = await self.create_subscription_checkout(
|
|
fees_to_be, clinic["id"], customer.account_id, customer.customer_id
|
|
)
|
|
|
|
return payment_link.url
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error creating payment session: {e}")
|
|
raise
|
|
finally:
|
|
self.db.close()
|
|
|
|
async def create_checkout_session(self, user_id: int):
|
|
try:
|
|
checkout_session = await stripe.checkout.Session.create_async(
|
|
payment_method_types=["card"],
|
|
line_items=[
|
|
{
|
|
"price_data": {
|
|
"currency": "aud",
|
|
"product_data": {
|
|
"name": "Willio Voice Subscription",
|
|
},
|
|
"unit_amount": 5000,
|
|
},
|
|
"quantity": 1,
|
|
}
|
|
],
|
|
expand=["payment_intent"],
|
|
mode="payment",
|
|
payment_intent_data={"metadata": {"order_id": "1"}},
|
|
success_url=f"{self.redirect_url}auth/waiting",
|
|
cancel_url=f"{self.redirect_url}auth/waiting",
|
|
metadata={"user_id": user_id},
|
|
)
|
|
return checkout_session
|
|
except stripe.error.StripeError as e:
|
|
self.logger.error(f"Error creating checkout session: {e}")
|
|
raise
|
|
|
|
async def create_setup_fees(self, customer_id: str, amount: int):
|
|
try:
|
|
setup_intent = await stripe.InvoiceItem.create_async(
|
|
customer=customer_id,
|
|
amount=amount,
|
|
currency="aud",
|
|
description="Setup Fees",
|
|
)
|
|
return setup_intent
|
|
except stripe.error.StripeError as e:
|
|
self.logger.error(f"Error creating setup intent: {e}")
|
|
raise
|
|
|
|
async def create_subscription_checkout(
|
|
self, fees_to_be: dict, clinic_id: int, account_id: str, customer_id: str
|
|
):
|
|
try:
|
|
|
|
unique_id = str(uuid.uuid4())
|
|
unique_clinic_id = f"clinic_{clinic_id}_{unique_id}"
|
|
|
|
line_items = [
|
|
{
|
|
"price_data": {
|
|
"currency": "aud",
|
|
"product_data": {
|
|
"name": "Monthly Subscription",
|
|
},
|
|
"unit_amount": int(
|
|
fees_to_be["subscription_fees"] * 100
|
|
), # Convert to cents
|
|
"recurring": {
|
|
"interval": "year",
|
|
},
|
|
},
|
|
"quantity": 1,
|
|
}
|
|
]
|
|
|
|
line_items.append(
|
|
{
|
|
"price_data": {
|
|
"currency": "aud",
|
|
"product_data": {
|
|
"name": "Per Call",
|
|
},
|
|
"unit_amount": int(
|
|
fees_to_be["per_call_charges"] * 100
|
|
), # Convert to cents
|
|
},
|
|
"quantity": 1,
|
|
}
|
|
)
|
|
|
|
line_items.append(
|
|
{
|
|
"price_data": {
|
|
"currency": "aud",
|
|
"product_data": {
|
|
"name": "Setup Fee",
|
|
},
|
|
"unit_amount": int(
|
|
fees_to_be["setup_fees"] * 100
|
|
), # Convert to cents
|
|
},
|
|
"quantity": 1,
|
|
}
|
|
)
|
|
|
|
metadata = {
|
|
"clinic_id": clinic_id,
|
|
"unique_clinic_id": unique_clinic_id,
|
|
"account_id": account_id,
|
|
"customer_id": customer_id,
|
|
"fees_to_be": json.dumps(fees_to_be),
|
|
}
|
|
|
|
session_data = {
|
|
"customer": customer_id,
|
|
"payment_method_types": ["card", "au_becs_debit"],
|
|
"mode": "subscription",
|
|
"line_items": line_items,
|
|
"success_url": f"{self.redirect_url}auth/waiting",
|
|
"cancel_url": f"{self.redirect_url}auth/waiting",
|
|
"metadata": metadata,
|
|
"subscription_data": {"metadata": metadata},
|
|
}
|
|
|
|
session = await stripe.checkout.Session.create_async(**session_data)
|
|
|
|
payment_log = PaymentLogs(
|
|
customer_id=customer_id,
|
|
account_id=account_id,
|
|
amount=Decimal(
|
|
str(fees_to_be["total"])
|
|
), # Keep as Decimal for database storage
|
|
clinic_id=clinic_id,
|
|
unique_clinic_id=unique_clinic_id,
|
|
payment_status="pending",
|
|
metadata_logs=json.dumps(metadata),
|
|
)
|
|
|
|
new_payment_session = PaymentSessions(
|
|
session_id=session.id,
|
|
customer_id=customer_id,
|
|
clinic_id=clinic_id,
|
|
status="pending",
|
|
)
|
|
|
|
self.db.add(payment_log)
|
|
self.db.add(new_payment_session)
|
|
self.db.commit()
|
|
|
|
return session
|
|
except stripe.error.StripeError as e:
|
|
self.logger.error(f"Error creating checkout session: {e}")
|
|
raise
|
|
finally:
|
|
self.db.close()
|
|
|
|
async def handle_webhook(self, request: Request):
|
|
try:
|
|
payload = await request.body()
|
|
event = stripe.Webhook.construct_event(
|
|
payload, request.headers.get("Stripe-Signature"), self.webhook_secret
|
|
)
|
|
self.logger.info(f"Stripe webhook event type: {event['type']}")
|
|
|
|
if event["type"] == "customer.subscription.deleted":
|
|
self.logger.info("customer subscription ended")
|
|
subscription_id = event["data"]["object"]["items"]["data"][0][
|
|
"subscription"
|
|
]
|
|
|
|
await self._subscription_expired(subscription_id)
|
|
|
|
if event["type"] == "checkout.session.completed":
|
|
unique_clinic_id = event["data"]["object"]["metadata"][
|
|
"unique_clinic_id"
|
|
]
|
|
clinic_id = event["data"]["object"]["metadata"]["clinic_id"]
|
|
customer_id = event["data"]["object"]["metadata"]["customer_id"]
|
|
account_id = event["data"]["object"]["metadata"]["account_id"]
|
|
total = event["data"]["object"]["amount_total"]
|
|
metadata = event["data"]["object"]["metadata"]
|
|
session_id = event["data"]["object"]["id"]
|
|
subscription_id = event["data"]["object"]["subscription"]
|
|
|
|
await self._update_payment_log(
|
|
unique_clinic_id,
|
|
clinic_id,
|
|
customer_id,
|
|
account_id,
|
|
total,
|
|
metadata,
|
|
)
|
|
|
|
await self._create_subscription_entry(
|
|
{
|
|
"clinic_id": clinic_id,
|
|
"customer_id": customer_id,
|
|
"account_id": account_id,
|
|
"session_id": session_id,
|
|
"subscription_id": subscription_id,
|
|
}
|
|
)
|
|
# TODO: handle subscription period end
|
|
|
|
return "OK"
|
|
except ValueError as e:
|
|
self.logger.error(f"Invalid payload: {e}")
|
|
except stripe.error.SignatureVerificationError as e:
|
|
self.logger.error(f"Invalid signature: {e}")
|
|
finally:
|
|
self.db.close()
|
|
|
|
async def _update_payment_log(
|
|
self,
|
|
unique_clinic_id: str,
|
|
clinic_id: int,
|
|
customer_id: str,
|
|
account_id: str,
|
|
total: float,
|
|
metadata: any,
|
|
):
|
|
try:
|
|
self.db.query(PaymentSessions).filter(
|
|
PaymentSessions.clinic_id == clinic_id
|
|
).delete()
|
|
|
|
payment_log = PaymentLogs(
|
|
customer_id=customer_id,
|
|
account_id=account_id,
|
|
amount=total,
|
|
clinic_id=clinic_id,
|
|
unique_clinic_id=unique_clinic_id,
|
|
payment_status="paid",
|
|
metadata_logs=json.dumps(metadata.to_dict()),
|
|
)
|
|
self.db.add(payment_log)
|
|
|
|
clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first()
|
|
|
|
if clinic:
|
|
clinic.status = ClinicStatus.UNDER_REVIEW
|
|
self.db.add(clinic)
|
|
return
|
|
except Exception as e:
|
|
self.logger.error(f"Error updating payment log: {e}")
|
|
finally:
|
|
self.db.commit()
|
|
self.db.close()
|
|
|
|
async def _create_subscription_entry(self, data: dict):
|
|
try:
|
|
|
|
subscription = stripe.Subscription.retrieve(data["subscription_id"])
|
|
|
|
metadata_dict = subscription.metadata
|
|
fees_to_be = json.loads(metadata_dict["fees_to_be"])
|
|
|
|
new_subscription = Subscriptions(
|
|
clinic_id=data["clinic_id"],
|
|
customer_id=data["customer_id"],
|
|
account_id=data["account_id"],
|
|
total=fees_to_be["total"],
|
|
setup_fee=fees_to_be["setup_fees"],
|
|
subscription_fee=fees_to_be["subscription_fees"],
|
|
per_call_charge=fees_to_be["per_call_charges"],
|
|
subscription_id=data["subscription_id"],
|
|
status=subscription.status,
|
|
current_period_start=subscription["items"]["data"][0][
|
|
"current_period_start"
|
|
],
|
|
current_period_end=subscription["items"]["data"][0][
|
|
"current_period_end"
|
|
],
|
|
metadata_logs=json.dumps(subscription.metadata),
|
|
)
|
|
self.db.add(new_subscription)
|
|
|
|
payment_session = PaymentSessions(
|
|
session_id=data["session_id"],
|
|
customer_id=data["customer_id"],
|
|
clinic_id=data["clinic_id"],
|
|
status="paid",
|
|
)
|
|
self.db.add(payment_session)
|
|
return
|
|
except Exception as e:
|
|
self.logger.error(f"Error creating subscription entry: {e}")
|
|
finally:
|
|
self.db.commit()
|
|
self.db.close()
|
|
|
|
async def _subscription_expired(self, subscription_id):
|
|
try:
|
|
subscription = stripe.Subscription.retrieve(subscription_id)
|
|
|
|
db_subscription = (
|
|
self.db.query(Subscriptions)
|
|
.filter(Subscriptions.subscription_id == subscription_id)
|
|
.first()
|
|
)
|
|
|
|
if not db_subscription:
|
|
self.logger.error("Subscription not found!")
|
|
raise Exception("Subscription not found!")
|
|
|
|
db_subscription.status = subscription.status
|
|
self.db.add(db_subscription)
|
|
|
|
# TODO: update clinic status
|
|
# TODO: send email to user
|
|
|
|
return
|
|
except Exception as e:
|
|
self.logger.error(f"Error ending subscription: {e}")
|
|
finally:
|
|
self.db.commit()
|
|
self.db.close()
|