import json import os import dotenv dotenv.load_dotenv() from models import ClinicOffers, StripeUsers from services.dashboardService import DashboardService from exceptions.validation_exception import ValidationException from exceptions.resource_not_found_exception import ResourceNotFoundException from exceptions.unauthorized_exception import UnauthorizedException from models import Clinics,PaymentSessions, Subscriptions import uuid from fastapi import Request from datetime import datetime from models import PaymentLogs from enums.enums import ClinicStatus, UserType from database import get_db from sqlalchemy.orm import Session import stripe from loguru import logger from decimal import Decimal class StripeServices: def __init__(self): self.db: Session = next(get_db()) self.logger = logger self.webhook_secret = os.getenv("STRIPE_WEBHOOK_SECRET") self.redirect_url = os.getenv("FRONTEND_URL") self.dashboard_service = DashboardService() async def create_customer(self, user_id: int, email: str, name: str): try: customer = stripe.Customer.create( email=email, name=name, metadata={"user_id": user_id} ) return customer except stripe.error.StripeError as e: self.logger.error(f"Error creating customer: {e}") raise async def delete_customer(self, customer_id: str): try: stripe.Customer.delete(customer_id) except stripe.error.StripeError as e: self.logger.error(f"Error deleting customer: {e}") raise async def create_account(self, user_id: int, email: str, name: str, phone: str): try: account = stripe.Account.create( type="express", country="AU", capabilities={ "card_payments": {"requested": True}, "transfers": {"requested": True}, }, business_type="individual", individual={"first_name": name, "email": email}, metadata={"user_id": user_id}, ) return account except stripe.error.StripeError as e: self.logger.error(f"Error creating account: {e}") raise async def delete_account(self, account_id: str): try: stripe.Account.delete(account_id) except stripe.error.StripeError as e: self.logger.error(f"Error deleting account: {e}") raise async def get_invoice(self, user): try: if user["userType"] != UserType.CLINIC_ADMIN: raise UnauthorizedException("User is not authorized to perform this action") clinic = self.db.query(Clinics).filter(Clinics.creator_id == user["id"]).first() if not clinic: raise ResourceNotFoundException("Clinic not found!") customer = self.db.query(StripeUsers).filter(StripeUsers.user_id == user["id"]).first() if not customer: raise ResourceNotFoundException("Customer not found!") subscription = self.db.query(Subscriptions).filter(Subscriptions.clinic_id == clinic.id, Subscriptions.customer_id == customer.customer_id, Subscriptions.status == "active").first() if not subscription: raise ResourceNotFoundException("Subscription not found!") stripe_subscription = stripe.Subscription.retrieve(subscription.subscription_id) invoice = stripe.Invoice.retrieve( stripe_subscription["latest_invoice"] ) return invoice.hosted_invoice_url except stripe.error.StripeError as e: self.logger.error(f"Error getting invoice: {e}") raise except Exception as e: self.logger.error(f"Error getting invoice: {e}") raise finally: self.db.close() async def create_payment_session(self, user): try: if user["userType"] != UserType.CLINIC_ADMIN: raise UnauthorizedException("User is not authorized to perform this action") clinic = user["created_clinics"][0] if clinic["status"] != ClinicStatus.PAYMENT_DUE: raise ValidationException("Clinic is not due for payment") customer = self.db.query(StripeUsers).filter(StripeUsers.user_id == user['id']).first() if not customer: raise ResourceNotFoundException("Customer not found") clinic_offers = self.db.query(ClinicOffers).filter(ClinicOffers.clinic_email == clinic["email"]).first() signup_pricing= await self.dashboard_service.get_signup_pricing_master() fees_to_be = { "setup_fees": signup_pricing.setup_fees, "subscription_fees": signup_pricing.subscription_fees, "per_call_charges": signup_pricing.per_call_charges, "total": signup_pricing.setup_fees + signup_pricing.subscription_fees + signup_pricing.per_call_charges } if clinic_offers: fees_to_be["setup_fees"] = clinic_offers.setup_fees fees_to_be["per_call_charges"] = clinic_offers.per_call_charges fees_to_be["total"] = clinic_offers.setup_fees + fees_to_be["subscription_fees"] + clinic_offers.per_call_charges payment_link = await self.create_subscription_checkout(fees_to_be, clinic["id"], customer.account_id, customer.customer_id) return payment_link.url except Exception as e: self.logger.error(f"Error creating payment session: {e}") raise finally: self.db.close() async def create_checkout_session(self, user_id: int): try: checkout_session = stripe.checkout.Session.create( payment_method_types=["card"], line_items=[ { "price_data": { "currency": "aud", "product_data": { "name": "Willio Voice Subscription", }, "unit_amount": 5000, }, "quantity": 1, } ], expand=["payment_intent"], mode="payment", payment_intent_data={"metadata": {"order_id": "1"}}, success_url=f"{self.redirect_url}auth/waiting", cancel_url=f"{self.redirect_url}auth/waiting", metadata={"user_id": user_id}, ) return checkout_session except stripe.error.StripeError as e: self.logger.error(f"Error creating checkout session: {e}") raise async def create_setup_fees(self, customer_id: str, amount: int): try: setup_intent = stripe.InvoiceItem.create( customer=customer_id, amount=amount, currency="aud", description="Setup Fees", ) return setup_intent except stripe.error.StripeError as e: self.logger.error(f"Error creating setup intent: {e}") raise async def create_subscription_checkout(self, fees_to_be: dict, clinic_id: int, account_id: str, customer_id: str): try: unique_id = str(uuid.uuid4()) unique_clinic_id = f"clinic_{clinic_id}_{unique_id}" line_items = [{ 'price_data': { 'currency': 'aud', 'product_data': { 'name': 'Monthly Subscription', }, 'unit_amount': int(fees_to_be["subscription_fees"] * 100), # Convert to cents 'recurring': { 'interval': 'year', }, }, 'quantity': 1, }] line_items.append({ 'price_data': { 'currency': 'aud', 'product_data': { 'name': 'Per Call', }, 'unit_amount': int(fees_to_be["per_call_charges"] * 100), # Convert to cents }, 'quantity': 1, }) line_items.append({ 'price_data': { 'currency': 'aud', 'product_data': { 'name': 'Setup Fee', }, 'unit_amount': int(fees_to_be["setup_fees"] * 100), # Convert to cents }, 'quantity': 1, }) metadata = { "clinic_id": clinic_id, "unique_clinic_id": unique_clinic_id, "account_id": account_id, "customer_id": customer_id, "fees_to_be": json.dumps(fees_to_be) } session_data = { 'customer': customer_id, 'payment_method_types': ['card'], 'mode': 'subscription', 'line_items': line_items, 'success_url': f"{self.redirect_url}auth/waiting", 'cancel_url': f"{self.redirect_url}auth/waiting", 'metadata': metadata, 'subscription_data': { 'metadata': metadata } } session = stripe.checkout.Session.create(**session_data) payment_log = PaymentLogs( customer_id=customer_id, account_id=account_id, amount=Decimal(str(fees_to_be["total"])), # Keep as Decimal for database storage clinic_id=clinic_id, unique_clinic_id=unique_clinic_id, payment_status="pending", metadata_logs=json.dumps(metadata) ) new_payment_session = PaymentSessions( session_id=session.id, customer_id=customer_id, clinic_id=clinic_id, status="pending" ) self.db.add(payment_log) self.db.add(new_payment_session) self.db.commit() return session except stripe.error.StripeError as e: self.logger.error(f"Error creating checkout session: {e}") raise finally: self.db.close() async def handle_webhook(self, request: Request): try: payload = await request.body() event = stripe.Webhook.construct_event( payload, request.headers.get("Stripe-Signature"), self.webhook_secret ) self.logger.info(f"Stripe webhook event type: {event['type']}") if event["type"] == "checkout.session.expired": pass if event["type"] == "checkout.session.completed": unique_clinic_id = event["data"]["object"]["metadata"]["unique_clinic_id"] clinic_id = event["data"]["object"]["metadata"]["clinic_id"] customer_id = event["data"]["object"]["metadata"]["customer_id"] account_id = event["data"]["object"]["metadata"]["account_id"] total = event["data"]["object"]["amount_total"] metadata = event["data"]["object"]["metadata"] session_id = event["data"]["object"]["id"] subscription_id = event["data"]["object"]["subscription"] self._update_payment_log(unique_clinic_id, clinic_id, customer_id, account_id, total, metadata, session_id) self._create_subscription_entry({ "clinic_id": clinic_id, "customer_id": customer_id, "account_id": account_id, "session_id": session_id, "subscription_id": subscription_id, }) # TODO: handle subscription period end return "OK" except ValueError as e: self.logger.error(f"Invalid payload: {e}") except stripe.error.SignatureVerificationError as e: self.logger.error(f"Invalid signature: {e}") finally: self.db.close() def _update_payment_log(self, unique_clinic_id:str, clinic_id:int, customer_id:str, account_id:str, total:float, metadata:any, session_id:str): try: self.db.query(PaymentSessions).filter(PaymentSessions.clinic_id == clinic_id).delete() payment_log = PaymentLogs( customer_id=customer_id, account_id=account_id, amount=total, clinic_id=clinic_id, unique_clinic_id=unique_clinic_id, payment_status="paid", metadata_logs=json.dumps(metadata.to_dict()) ) self.db.add(payment_log) clinic = self.db.query(Clinics).filter(Clinics.id == clinic_id).first() if clinic: clinic.status = ClinicStatus.UNDER_REVIEW self.db.add(clinic) return except Exception as e: self.logger.error(f"Error updating payment log: {e}") finally: self.db.commit() self.db.close() def _create_subscription_entry(self,data:dict): try: subscription = stripe.Subscription.retrieve(data["subscription_id"]) new_subscription = Subscriptions( clinic_id=data["clinic_id"], customer_id=data["customer_id"], account_id=data["account_id"], session_id=data["session_id"], subscription_id=data["subscription_id"], status=subscription.status, current_period_start=subscription["items"]["data"][0]["current_period_start"], current_period_end=subscription["items"]["data"][0]["current_period_end"], metadata_logs=json.dumps(subscription.metadata) ) self.db.add(new_subscription) payment_session = PaymentSessions( session_id=data["session_id"], customer_id=data["customer_id"], clinic_id=data["clinic_id"], status="paid" ) self.db.add(payment_session) return except Exception as e: self.logger.error(f"Error creating subscription entry: {e}") finally: self.db.commit() self.db.close()