import json import os import dotenv dotenv.load_dotenv() from models import ClinicOffers, StripeUsers from services.dashboardService import DashboardService from exceptions.validation_exception import ValidationException from exceptions.resource_not_found_exception import ResourceNotFoundException from exceptions.unauthorized_exception import UnauthorizedException from models import Clinics, PaymentSessions, Subscriptions import uuid from fastapi import Request from datetime import datetime from models import PaymentLogs from enums.enums import ClinicStatus, UserType from schemas.ResponseSchemas import StripeUserReponse from database import get_db from sqlalchemy.orm import Session import stripe from loguru import logger from decimal import Decimal class StripeServices: def __init__(self): self.db: Session = next(get_db()) self.logger = logger self.webhook_secret = os.getenv("STRIPE_WEBHOOK_SECRET") self.redirect_url = os.getenv("FRONTEND_URL") self.dashboard_service = DashboardService() async def create_customer(self, user_id: int, email: str, name: str): try: customer = await stripe.Customer.create_async( email=email, name=name, metadata={"user_id": user_id} ) return customer except stripe.error.StripeError as e: self.logger.error(f"Error creating customer: {e}") raise async def delete_customer(self, customer_id: str): try: await stripe.Customer.delete_async(customer_id) except stripe.error.StripeError as e: self.logger.error(f"Error deleting customer: {e}") raise async def create_account(self, user_id: int, email: str, name: str, phone: str): try: account = await stripe.Account.create_async( type="express", country="AU", capabilities={ "card_payments": {"requested": True}, "transfers": {"requested": True}, }, business_type="individual", individual={"first_name": name, "email": email}, metadata={"user_id": user_id}, ) return account except stripe.error.StripeError as e: self.logger.error(f"Error creating account: {e}") raise async def delete_account(self, account_id: str): try: await stripe.Account.delete_async(account_id) except stripe.error.StripeError as e: self.logger.error(f"Error deleting account: {e}") raise async def get_stripe_data(self, clinic_id: int): try: user = ( self.db.query(StripeUsers) .filter(StripeUsers.clinic_id == clinic_id) .first() ) if not user: self.logger.error(f"User not found!") raise ResourceNotFoundException("User not found!") return StripeUserReponse.model_validate(user).model_dump() except Exception as e: self.logger.error(f"Error retrieving account data: {e}") raise finally: self.db.close() async def create_stripe_account_link(self, user): try: stripe_account = await self.get_stripe_data(user["created_clinics"][0]["id"]) if not stripe_account: self.logger.error("Stripe account not found!") raise ResourceNotFoundException("Stripe account not found!") # Pass the account_id as a string, not as a dictionary data = await stripe.AccountLink.create_async( account=stripe_account["account_id"], refresh_url=self.redirect_url, return_url=self.redirect_url, type="account_onboarding", ) return data.url except Exception as e: self.logger.error(f"Error creating stripe account link: {e}") raise async def check_account_capabilities(self, user): try: stripe_account = await self.get_stripe_data(user["created_clinics"][0]["id"]) if not stripe_account: self.logger.error("Stripe account not found!") raise ResourceNotFoundException("Stripe account not found!") data = await stripe.Account.retrieve_async(stripe_account["account_id"]) return { "capabilities": data.capabilities, "charges_enabled": data.charges_enabled, "requirements": data.requirements.currently_due, "error": data.requirements.errors, } except Exception as e: self.logger.error(f"Error checking stripe account capabilities: {e}") raise finally: self.db.close() async def get_invoice(self, user): try: if user["userType"] != UserType.CLINIC_ADMIN: raise UnauthorizedException( "User is not authorized to perform this action" ) clinic = ( self.db.query(Clinics).filter(Clinics.creator_id == user["id"]).first() ) if not clinic: raise ResourceNotFoundException("Clinic not found!") customer = ( self.db.query(StripeUsers) .filter(StripeUsers.user_id == user["id"]) .first() ) if not customer: raise ResourceNotFoundException("Customer not found!") subscription = ( self.db.query(Subscriptions) .filter( Subscriptions.clinic_id == clinic.id, Subscriptions.customer_id == customer.customer_id, Subscriptions.status == "active", ) .first() ) if not subscription: raise ResourceNotFoundException("Subscription not found!") stripe_subscription = await stripe.Subscription.retrieve_async( subscription.subscription_id ) invoice = await stripe.Invoice.retrieve_async( stripe_subscription["latest_invoice"] ) return invoice.hosted_invoice_url except stripe.error.StripeError as e: self.logger.error(f"Error getting invoice: {e}") raise except Exception as e: self.logger.error(f"Error getting invoice: {e}") raise finally: self.db.close() # NOTE: in case when checkout session expired or not created async def create_payment_session(self, user): try: if user["userType"] != UserType.CLINIC_ADMIN: raise UnauthorizedException( "User is not authorized to perform this action" ) clinic = user["created_clinics"][0] if clinic["status"] != ClinicStatus.PAYMENT_DUE: raise ValidationException("Clinic is not due for payment") customer = ( self.db.query(StripeUsers) .filter(StripeUsers.user_id == user["id"]) .first() ) if not customer: raise ResourceNotFoundException("Customer not found") clinic_offers = ( self.db.query(ClinicOffers) .filter(ClinicOffers.clinic_email == clinic["email"]) .first() ) signup_pricing = await self.dashboard_service.get_signup_pricing_master() fees_to_be = { "setup_fees": signup_pricing.setup_fees, "subscription_fees": signup_pricing.subscription_fees, "per_call_charges": signup_pricing.per_call_charges, "total": signup_pricing.setup_fees + signup_pricing.subscription_fees + signup_pricing.per_call_charges, } if clinic_offers: fees_to_be["setup_fees"] = clinic_offers.setup_fees fees_to_be["per_call_charges"] = clinic_offers.per_call_charges fees_to_be["total"] = ( clinic_offers.setup_fees + fees_to_be["subscription_fees"] + clinic_offers.per_call_charges ) payment_link = await self.create_subscription_checkout( fees_to_be, clinic["id"], customer.account_id, customer.customer_id ) return payment_link.url except Exception as e: self.logger.error(f"Error creating payment session: {e}") raise finally: self.db.close() # NOTE: This is not used # async def create_checkout_session(self, user_id: int): # try: # checkout_session = await stripe.checkout.Session.create_async( # payment_method_types=["card"], # line_items=[ # { # "price_data": { # "currency": "aud", # "product_data": { # "name": "Willio Voice Subscription", # }, # "unit_amount": 5000, # }, # "quantity": 1, # } # ], # expand=["payment_intent"], # mode="payment", # payment_intent_data={"metadata": {"order_id": "1"}}, # success_url=f"{self.redirect_url}auth/waiting", # cancel_url=f"{self.redirect_url}auth/waiting", # metadata={"user_id": user_id}, # ) # return checkout_session # except stripe.error.StripeError as e: # self.logger.error(f"Error creating checkout session: {e}") # raise async def create_subscription_checkout( self, fees_to_be: dict, clinic_id: int, account_id: str, customer_id: str ): try: unique_id = str(uuid.uuid4()) unique_clinic_id = f"clinic_{clinic_id}_{unique_id}" line_items = [ { "price_data": { "currency": "aud", "product_data": { "name": "Monthly Subscription", }, "unit_amount": int( fees_to_be["subscription_fees"] * 100 ), # Convert to cents "recurring": { "interval": "year", "interval_count": 3, #NOTE: max 3 years supported by stripe }, }, "quantity": 1, } ] line_items.append( { "price_data": { "currency": "aud", "product_data": { "name": "Per Call", }, "unit_amount": int( fees_to_be["per_call_charges"] * 100 ), # Convert to cents }, "quantity": 1, } ) line_items.append( { "price_data": { "currency": "aud", "product_data": { "name": "Setup Fee", }, "unit_amount": int( fees_to_be["setup_fees"] * 100 ), # Convert to cents }, "quantity": 1, } ) metadata = { "clinic_id": clinic_id, "unique_clinic_id": unique_clinic_id, "account_id": account_id, "customer_id": customer_id, "fees_to_be": json.dumps(fees_to_be), } session_data = { "customer": customer_id, "payment_method_types": ["card", "au_becs_debit"], "mode": "subscription", "line_items": line_items, "success_url": f"{self.redirect_url}auth/waiting", "cancel_url": f"{self.redirect_url}auth/waiting", "metadata": metadata, "subscription_data": {"metadata": metadata}, } session = await stripe.checkout.Session.create_async(**session_data) payment_log = PaymentLogs( customer_id=customer_id, account_id=account_id, amount=Decimal( str(fees_to_be["total"]) ), # Keep as Decimal for database storage clinic_id=clinic_id, unique_clinic_id=unique_clinic_id, payment_status="pending", metadata_logs=json.dumps(metadata), ) new_payment_session = PaymentSessions( session_id=session.id, customer_id=customer_id, clinic_id=clinic_id, status="pending", ) self.db.add(payment_log) self.db.add(new_payment_session) self.db.commit() return session except stripe.error.StripeError as e: self.logger.error(f"Error creating checkout session: {e}") raise finally: self.db.close() async def handle_webhook(self, request: Request): try: payload = await request.body() event = stripe.Webhook.construct_event( payload, request.headers.get("Stripe-Signature"), self.webhook_secret ) self.logger.info(f"Stripe webhook event type: {event['type']}") if event["type"] == "customer.subscription.deleted": self.logger.info("customer subscription ended") subscription_id = event["data"]["object"]["items"]["data"][0][ "subscription" ] await self._subscription_expired(subscription_id) if event["type"] == "checkout.session.completed": unique_clinic_id = event["data"]["object"]["metadata"][ "unique_clinic_id" ] clinic_id = event["data"]["object"]["metadata"]["clinic_id"] customer_id = event["data"]["object"]["metadata"]["customer_id"] account_id = event["data"]["object"]["metadata"]["account_id"] total = event["data"]["object"]["amount_total"] metadata = event["data"]["object"]["metadata"] session_id = event["data"]["object"]["id"] subscription_id = event["data"]["object"]["subscription"] await self._update_payment_log( unique_clinic_id, clinic_id, customer_id, account_id, total, metadata, ) await self._create_subscription_entry( { "clinic_id": clinic_id, "customer_id": customer_id, "account_id": account_id, "session_id": session_id, "subscription_id": subscription_id, } ) # TODO: handle subscription period end return "OK" except ValueError as e: self.logger.error(f"Invalid payload: {e}") except stripe.error.SignatureVerificationError as e: self.logger.error(f"Invalid signature: {e}") finally: self.db.close() async def _update_payment_log( self, unique_clinic_id: str, clinic_id: int, customer_id: str, account_id: str, total: float, metadata: any, ): try: self.db.query(PaymentSessions).filter( PaymentSessions.clinic_id == clinic_id ).delete() payment_log = PaymentLogs( customer_id=customer_id, account_id=account_id, amount=total, clinic_id=clinic_id, unique_clinic_id=unique_clinic_id, payment_status="paid", metadata_logs=json.dumps(metadata.to_dict()), ) self.db.add(payment_log) clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first() if clinic: clinic.status = ClinicStatus.UNDER_REVIEW self.db.add(clinic) return except Exception as e: self.logger.error(f"Error updating payment log: {e}") finally: self.db.commit() self.db.close() async def _create_subscription_entry(self, data: dict): try: subscription = stripe.Subscription.retrieve(data["subscription_id"]) metadata_dict = subscription.metadata fees_to_be = json.loads(metadata_dict["fees_to_be"]) new_subscription = Subscriptions( clinic_id=data["clinic_id"], customer_id=data["customer_id"], account_id=data["account_id"], total=fees_to_be["total"], setup_fee=fees_to_be["setup_fees"], subscription_fee=fees_to_be["subscription_fees"], per_call_charge=fees_to_be["per_call_charges"], subscription_id=data["subscription_id"], status=subscription.status, current_period_start=subscription["items"]["data"][0][ "current_period_start" ], current_period_end=subscription["items"]["data"][0][ "current_period_end" ], metadata_logs=json.dumps(subscription.metadata), ) self.db.add(new_subscription) payment_session = PaymentSessions( session_id=data["session_id"], customer_id=data["customer_id"], clinic_id=data["clinic_id"], status="paid", ) self.db.add(payment_session) return except Exception as e: self.logger.error(f"Error creating subscription entry: {e}") finally: self.db.commit() self.db.close() async def _subscription_expired(self, subscription_id): try: subscription = stripe.Subscription.retrieve(subscription_id) db_subscription = ( self.db.query(Subscriptions) .filter(Subscriptions.subscription_id == subscription_id) .first() ) if not db_subscription: self.logger.error("Subscription not found!") raise Exception("Subscription not found!") db_subscription.status = subscription.status self.db.add(db_subscription) # TODO: update clinic status # TODO: send email to user return except Exception as e: self.logger.error(f"Error ending subscription: {e}") finally: self.db.commit() self.db.close()