import json import os import dotenv dotenv.load_dotenv() from models import ClinicOffers, StripeUsers from services.dashboardService import DashboardService from exceptions.validation_exception import ValidationException from exceptions.resource_not_found_exception import ResourceNotFoundException from exceptions.unauthorized_exception import UnauthorizedException from models import Clinics,PaymentSessions, Subscriptions import uuid from fastapi import Request from datetime import datetime from models import PaymentLogs from enums.enums import ClinicStatus, UserType from database import get_db from sqlalchemy.orm import Session import stripe from loguru import logger from decimal import Decimal class StripeServices: def __init__(self): self.db: Session = next(get_db()) self.logger = logger self.webhook_secret = os.getenv("STRIPE_WEBHOOK_SECRET") self.redirect_url = os.getenv("FRONTEND_URL") self.dashboard_service = DashboardService() async def create_customer(self, user_id: int, email: str, name: str): try: customer = await stripe.Customer.create_async( email=email, name=name, metadata={"user_id": user_id} ) return customer except stripe.error.StripeError as e: self.logger.error(f"Error creating customer: {e}") raise async def delete_customer(self, customer_id: str): try: await stripe.Customer.delete_async(customer_id) except stripe.error.StripeError as e: self.logger.error(f"Error deleting customer: {e}") raise async def create_account(self, user_id: int, email: str, name: str, phone: str): try: account = await stripe.Account.create_async( type="express", country="AU", capabilities={ "card_payments": {"requested": True}, "transfers": {"requested": True}, }, business_type="individual", individual={"first_name": name, "email": email}, metadata={"user_id": user_id}, ) return account except stripe.error.StripeError as e: self.logger.error(f"Error creating account: {e}") raise async def delete_account(self, account_id: str): try: await stripe.Account.delete_async(account_id) except stripe.error.StripeError as e: self.logger.error(f"Error deleting account: {e}") raise async def get_invoice(self, user): try: if user["userType"] != UserType.CLINIC_ADMIN: raise UnauthorizedException("User is not authorized to perform this action") clinic = self.db.query(Clinics).filter(Clinics.creator_id == user["id"]).first() if not clinic: raise ResourceNotFoundException("Clinic not found!") customer = self.db.query(StripeUsers).filter(StripeUsers.user_id == user["id"]).first() if not customer: raise ResourceNotFoundException("Customer not found!") subscription = self.db.query(Subscriptions).filter(Subscriptions.clinic_id == clinic.id, Subscriptions.customer_id == customer.customer_id, Subscriptions.status == "active").first() if not subscription: raise ResourceNotFoundException("Subscription not found!") stripe_subscription = await stripe.Subscription.retrieve_async(subscription.subscription_id) invoice = await stripe.Invoice.retrieve_async( stripe_subscription["latest_invoice"] ) return invoice.hosted_invoice_url except stripe.error.StripeError as e: self.logger.error(f"Error getting invoice: {e}") raise except Exception as e: self.logger.error(f"Error getting invoice: {e}") raise finally: self.db.close() async def create_payment_session(self, user): try: if user["userType"] != UserType.CLINIC_ADMIN: raise UnauthorizedException("User is not authorized to perform this action") clinic = user["created_clinics"][0] if clinic["status"] != ClinicStatus.PAYMENT_DUE: raise ValidationException("Clinic is not due for payment") customer = self.db.query(StripeUsers).filter(StripeUsers.user_id == user['id']).first() if not customer: raise ResourceNotFoundException("Customer not found") clinic_offers = self.db.query(ClinicOffers).filter(ClinicOffers.clinic_email == clinic["email"]).first() signup_pricing= await self.dashboard_service.get_signup_pricing_master() fees_to_be = { "setup_fees": signup_pricing.setup_fees, "subscription_fees": signup_pricing.subscription_fees, "per_call_charges": signup_pricing.per_call_charges, "total": signup_pricing.setup_fees + signup_pricing.subscription_fees + signup_pricing.per_call_charges } if clinic_offers: fees_to_be["setup_fees"] = clinic_offers.setup_fees fees_to_be["per_call_charges"] = clinic_offers.per_call_charges fees_to_be["total"] = clinic_offers.setup_fees + fees_to_be["subscription_fees"] + clinic_offers.per_call_charges payment_link = await self.create_subscription_checkout(fees_to_be, clinic["id"], customer.account_id, customer.customer_id) return payment_link.url except Exception as e: self.logger.error(f"Error creating payment session: {e}") raise finally: self.db.close() async def create_checkout_session(self, user_id: int): try: checkout_session = await stripe.checkout.Session.create_async( payment_method_types=["card"], line_items=[ { "price_data": { "currency": "aud", "product_data": { "name": "Willio Voice Subscription", }, "unit_amount": 5000, }, "quantity": 1, } ], expand=["payment_intent"], mode="payment", payment_intent_data={"metadata": {"order_id": "1"}}, success_url=f"{self.redirect_url}auth/waiting", cancel_url=f"{self.redirect_url}auth/waiting", metadata={"user_id": user_id}, ) return checkout_session except stripe.error.StripeError as e: self.logger.error(f"Error creating checkout session: {e}") raise async def create_setup_fees(self, customer_id: str, amount: int): try: setup_intent = await stripe.InvoiceItem.create_async( customer=customer_id, amount=amount, currency="aud", description="Setup Fees", ) return setup_intent except stripe.error.StripeError as e: self.logger.error(f"Error creating setup intent: {e}") raise async def create_subscription_checkout(self, fees_to_be: dict, clinic_id: int, account_id: str, customer_id: str): try: unique_id = str(uuid.uuid4()) unique_clinic_id = f"clinic_{clinic_id}_{unique_id}" line_items = [{ 'price_data': { 'currency': 'aud', 'product_data': { 'name': 'Monthly Subscription', }, 'unit_amount': int(fees_to_be["subscription_fees"] * 100), # Convert to cents 'recurring': { 'interval': 'year', }, }, 'quantity': 1, }] line_items.append({ 'price_data': { 'currency': 'aud', 'product_data': { 'name': 'Per Call', }, 'unit_amount': int(fees_to_be["per_call_charges"] * 100), # Convert to cents }, 'quantity': 1, }) line_items.append({ 'price_data': { 'currency': 'aud', 'product_data': { 'name': 'Setup Fee', }, 'unit_amount': int(fees_to_be["setup_fees"] * 100), # Convert to cents }, 'quantity': 1, }) metadata = { "clinic_id": clinic_id, "unique_clinic_id": unique_clinic_id, "account_id": account_id, "customer_id": customer_id, "fees_to_be": json.dumps(fees_to_be) } session_data = { 'customer': customer_id, "payment_method_types": ["card","au_becs_debit"], 'mode': 'subscription', 'line_items': line_items, 'success_url': f"{self.redirect_url}auth/waiting", 'cancel_url': f"{self.redirect_url}auth/waiting", 'metadata': metadata, 'subscription_data': { 'metadata': metadata } } session = await stripe.checkout.Session.create_async(**session_data) payment_log = PaymentLogs( customer_id=customer_id, account_id=account_id, amount=Decimal(str(fees_to_be["total"])), # Keep as Decimal for database storage clinic_id=clinic_id, unique_clinic_id=unique_clinic_id, payment_status="pending", metadata_logs=json.dumps(metadata) ) new_payment_session = PaymentSessions( session_id=session.id, customer_id=customer_id, clinic_id=clinic_id, status="pending" ) self.db.add(payment_log) self.db.add(new_payment_session) self.db.commit() return session except stripe.error.StripeError as e: self.logger.error(f"Error creating checkout session: {e}") raise finally: self.db.close() async def handle_webhook(self, request: Request): try: payload = await request.body() event = stripe.Webhook.construct_event( payload, request.headers.get("Stripe-Signature"), self.webhook_secret ) self.logger.info(f"Stripe webhook event type: {event['type']}") if event["type"] == "customer.subscription.deleted": self.logger.info("customer subscription ended") subscription_id = event["data"]["object"]["items"]["data"][0]["subscription"] await self._subscription_expired(subscription_id) if event["type"] == "checkout.session.completed": unique_clinic_id = event["data"]["object"]["metadata"]["unique_clinic_id"] clinic_id = event["data"]["object"]["metadata"]["clinic_id"] customer_id = event["data"]["object"]["metadata"]["customer_id"] account_id = event["data"]["object"]["metadata"]["account_id"] total = event["data"]["object"]["amount_total"] metadata = event["data"]["object"]["metadata"] session_id = event["data"]["object"]["id"] subscription_id = event["data"]["object"]["subscription"] await self._update_payment_log(unique_clinic_id, clinic_id, customer_id, account_id, total, metadata) await self._create_subscription_entry({ "clinic_id": clinic_id, "customer_id": customer_id, "account_id": account_id, "session_id": session_id, "subscription_id": subscription_id, }) # TODO: handle subscription period end return "OK" except ValueError as e: self.logger.error(f"Invalid payload: {e}") except stripe.error.SignatureVerificationError as e: self.logger.error(f"Invalid signature: {e}") finally: self.db.close() async def _update_payment_log(self, unique_clinic_id:str, clinic_id:int, customer_id:str, account_id:str, total:float, metadata:any): try: self.db.query(PaymentSessions).filter(PaymentSessions.clinic_id == clinic_id).delete() payment_log = PaymentLogs( customer_id=customer_id, account_id=account_id, amount=total, clinic_id=clinic_id, unique_clinic_id=unique_clinic_id, payment_status="paid", metadata_logs=json.dumps(metadata.to_dict()) ) self.db.add(payment_log) clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first() if clinic: clinic.status = ClinicStatus.UNDER_REVIEW self.db.add(clinic) return except Exception as e: self.logger.error(f"Error updating payment log: {e}") finally: self.db.commit() self.db.close() async def _create_subscription_entry(self,data:dict): try: subscription = stripe.Subscription.retrieve(data["subscription_id"]) metadata_dict = json.loads(subscription.metadata) fees_to_be = json.loads(metadata_dict["fees_to_be"]) new_subscription = Subscriptions( clinic_id=data["clinic_id"], customer_id=data["customer_id"], account_id=data["account_id"], total=fees_to_be["total"], setup_fee=fees_to_be["setup_fees"], subscription_fee=fees_to_be["subscription_fees"], per_call_charge=fees_to_be["per_call_charges"], subscription_id=data["subscription_id"], status=subscription.status, current_period_start=subscription["items"]["data"][0]["current_period_start"], current_period_end=subscription["items"]["data"][0]["current_period_end"], metadata_logs=json.dumps(subscription.metadata) ) self.db.add(new_subscription) payment_session = PaymentSessions( session_id=data["session_id"], customer_id=data["customer_id"], clinic_id=data["clinic_id"], status="paid" ) self.db.add(payment_session) return except Exception as e: self.logger.error(f"Error creating subscription entry: {e}") finally: self.db.commit() self.db.close() async def _subscription_expired(self,subscription_id): try: subscription = stripe.Subscription.retrieve(subscription_id) db_subscription = self.db.query(Subscriptions).filter(Subscriptions.subscription_id == subscription_id).first() if not db_subscription: self.logger.error("Subscription not found!") raise Exception("Subscription not found!") db_subscription.status = subscription.status self.db.add(db_subscription) # TODO: update clinic status # TODO: send email to user return except Exception as e: self.logger.error(f"Error ending subscription: {e}") finally: self.db.commit() self.db.close()