From b6e390f77cd45bc1331edb0574980f9dc2032ff0 Mon Sep 17 00:00:00 2001 From: deepvasoya Date: Wed, 14 May 2025 15:38:31 +0530 Subject: [PATCH] feat: call transcripts apis refactor: api response for list --- apis/__init__.py | 4 +- apis/endpoints/call_transcripts.py | 21 +++++ interface/common_response.py | 6 ++ models/CallTranscripts.py | 17 ++++ schemas/BaseSchemas.py | 10 +- schemas/CreateSchemas.py | 4 + schemas/ResponseSchemas.py | 9 ++ services/callTranscripts.py | 145 +++++++++++++++++++++++++++++ services/s3Service.py | 2 +- 9 files changed, 215 insertions(+), 3 deletions(-) create mode 100644 apis/endpoints/call_transcripts.py create mode 100644 interface/common_response.py create mode 100644 models/CallTranscripts.py create mode 100644 services/callTranscripts.py diff --git a/apis/__init__.py b/apis/__init__.py index d5540e1..4ebcc8c 100644 --- a/apis/__init__.py +++ b/apis/__init__.py @@ -5,7 +5,7 @@ from fastapi.security import HTTPBearer # Import the security scheme bearer_scheme = HTTPBearer(scheme_name="Bearer Authentication") -from .endpoints import clinics, doctors, calender, appointments, patients, admin, auth, s3, users, clinicDoctor, dashboard +from .endpoints import clinics, doctors, calender, appointments, patients, admin, auth, s3, users, clinicDoctor, dashboard, call_transcripts api_router = APIRouter() # api_router.include_router(twilio.router, prefix="/twilio") @@ -35,3 +35,5 @@ api_router.include_router(users.router, prefix="/users", tags=["users"], depende api_router.include_router(clinicDoctor.router, prefix="/clinic-doctors", tags=["clinic-doctors"], dependencies=[Depends(auth_required)]) api_router.include_router(dashboard.router, prefix="/dashboard", tags=["dashboard"], dependencies=[Depends(auth_required)]) + +api_router.include_router(call_transcripts.router, prefix="/call-transcripts", tags=["call-transcripts"], dependencies=[Depends(auth_required)]) diff --git a/apis/endpoints/call_transcripts.py b/apis/endpoints/call_transcripts.py new file mode 100644 index 0000000..cd3738b --- /dev/null +++ b/apis/endpoints/call_transcripts.py @@ -0,0 +1,21 @@ +from fastapi import APIRouter, BackgroundTasks +from services.callTranscripts import CallTranscriptServices +from utils.constants import DEFAULT_LIMIT, DEFAULT_PAGE +from schemas.ApiResponse import ApiResponse + +router = APIRouter() + + +@router.get("/") +def get_call_transcripts(limit:int = DEFAULT_LIMIT, page:int = DEFAULT_PAGE): + if page == 0: + page = 1 + offset = (page - 1) * limit + response = CallTranscriptServices().get_call_transcripts(limit, offset) + return ApiResponse(data=response, message="Call transcripts retrieved successfully") + +@router.post("/bulk-download") +def bulk_download_call_transcripts(key_ids: list[int], background_tasks: BackgroundTasks): + service = CallTranscriptServices() + response = service.bulk_download_call_transcripts(key_ids, background_tasks) + return response \ No newline at end of file diff --git a/interface/common_response.py b/interface/common_response.py new file mode 100644 index 0000000..2367c4e --- /dev/null +++ b/interface/common_response.py @@ -0,0 +1,6 @@ +from pydantic import BaseModel +from typing import Any + +class CommonResponse(BaseModel): + data: Any + total: int diff --git a/models/CallTranscripts.py b/models/CallTranscripts.py new file mode 100644 index 0000000..ca834e0 --- /dev/null +++ b/models/CallTranscripts.py @@ -0,0 +1,17 @@ +from sqlalchemy import Column, Integer, String + + +from database import Base +from .CustomBase import CustomBase + + +class CallTranscripts(Base, CustomBase): + __tablename__ = "call_transcripts" + + id = Column(Integer, primary_key=True, index=True) + patient_name = Column(String) + patient_number = Column(String) + call_duration = Column(String) + call_received_time = Column(String) + transcript_key_id = Column(String) + \ No newline at end of file diff --git a/schemas/BaseSchemas.py b/schemas/BaseSchemas.py index 5209c69..c129327 100644 --- a/schemas/BaseSchemas.py +++ b/schemas/BaseSchemas.py @@ -76,4 +76,12 @@ class ClinicDoctorBase(BaseModel): name: str role: ClinicDoctorType status: ClinicDoctorStatus - clinic_id: int \ No newline at end of file + clinic_id: int + + +class CallTranscriptsBase(BaseModel): + patient_name:str + patient_number:str + call_duration:str + call_received_time:str + transcript_key_id:str \ No newline at end of file diff --git a/schemas/CreateSchemas.py b/schemas/CreateSchemas.py index 0e60c73..021fa7b 100644 --- a/schemas/CreateSchemas.py +++ b/schemas/CreateSchemas.py @@ -41,3 +41,7 @@ class UserCreate(BaseModel): class ClinicDoctorCreate(ClinicDoctorBase): pass + + +class CallTranscriptsCreate(CallTranscriptsBase): + pass diff --git a/schemas/ResponseSchemas.py b/schemas/ResponseSchemas.py index 90097a7..b0814d9 100644 --- a/schemas/ResponseSchemas.py +++ b/schemas/ResponseSchemas.py @@ -141,5 +141,14 @@ class ClinicDoctorResponse(ClinicDoctorBase): create_time: datetime update_time: datetime + class Config: + orm_mode = True + + +class CallTranscriptsResponse(CallTranscriptsBase): + id: int + create_time: datetime + update_time: datetime + class Config: orm_mode = True \ No newline at end of file diff --git a/services/callTranscripts.py b/services/callTranscripts.py new file mode 100644 index 0000000..49f57b0 --- /dev/null +++ b/services/callTranscripts.py @@ -0,0 +1,145 @@ +from fastapi import BackgroundTasks +from sqlalchemy.orm import Session +import tempfile +import zipfile +import time +from fastapi.responses import FileResponse +import os +from concurrent.futures import ThreadPoolExecutor, as_completed + +from schemas.ResponseSchemas import CallTranscriptsResponse +from database import get_db +from models.CallTranscripts import CallTranscripts +from exceptions.business_exception import BusinessValidationException +from services.s3Service import get_signed_url +from interface.common_response import CommonResponse + +class CallTranscriptServices: + def __init__(self): + self.db:Session = next(get_db()) + + def get_call_transcripts(self, limit:int, offset:int): + call_transcripts = self.db.query(CallTranscripts).limit(limit).offset(offset).all() + + total = self.db.query(CallTranscripts).count() + + response = [CallTranscriptsResponse(**call_transcript.__dict__.copy()) for call_transcript in call_transcripts] + + for call_transcript in response: + call_transcript.transcript_key_id = get_signed_url(call_transcript.transcript_key_id) + + return_response = CommonResponse(data=response, total=total) + + return return_response + + def download_call_transcript(self, key_id: str): + call_transcript = self.db.query(CallTranscripts).filter(CallTranscripts.transcript_key_id == key_id).first() + + if not call_transcript: + raise BusinessValidationException("Call transcript not found!") + + return get_signed_url(call_transcript.transcript_key_id) + + + def download_file(self, url: str, file_path: str) -> None: + """ + Download a file from a signed URL to a local path. + + Args: + url: The pre-signed URL to download from + file_path: The local path to save the file to + """ + try: + import requests + response = requests.get(url) + if response.status_code == 200: + with open(file_path, 'wb') as f: + f.write(response.content) + else: + print(f"Failed to download file: {response.status_code}") + except Exception as e: + print(f"Error downloading file: {e}") + + def cleanup_temp_files(self, temp_dir: str, zip_path: str) -> None: + """ + Clean up temporary files after sending the zip. + + Args: + temp_dir: Directory containing temporary files + zip_path: Path to the zip file + """ + try: + # Wait a short time to ensure the file has been sent + time.sleep(5) + + # Remove the zip file + if os.path.exists(zip_path): + os.remove(zip_path) + + # Remove the temp directory and all its contents + if os.path.exists(temp_dir): + for file in os.listdir(temp_dir): + os.remove(os.path.join(temp_dir, file)) + os.rmdir(temp_dir) + except Exception as e: + print(f"Error during cleanup: {e}") + + def bulk_download_call_transcripts(self, key_ids: list[int], background_tasks: BackgroundTasks): + + transcript_ids = self.db.query(CallTranscripts).filter(CallTranscripts.id.in_(key_ids)).all() + + keys = [transcript.transcript_key_id for transcript in transcript_ids] + + if len(keys) < 1: + raise BusinessValidationException("No call transcripts found!") + + + temp_dir = tempfile.mkdtemp(prefix="call_transcripts_") + zip_path = os.path.join(temp_dir, "call_transcripts.zip") + + # Prepare download information + download_info = [] + for key in keys: + # Generate signed URL for each key + url = get_signed_url(key) + + # Determine filename (using key's basename or a formatted name) + filename = os.path.basename(key) + file_path = os.path.join(temp_dir, filename) + download_info.append((url, file_path, filename)) + + # Use ThreadPoolExecutor for concurrent downloads + # Adjust max_workers based on your system capabilities and S3 rate limits + max_workers = min(32, len(download_info)) # Cap at 32 threads or number of files, whichever is smaller + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all download tasks + future_to_file = {executor.submit(self.download_file, url, file_path): (file_path, filename) + for url, file_path, filename in download_info} + + # Collect results as they complete + file_paths = [] + for future in as_completed(future_to_file): + file_path, filename = future_to_file[future] + try: + future.result() # Get the result to catch any exceptions + file_paths.append((file_path, filename)) + except Exception as e: + print(f"Error downloading {filename}: {e}") + + # Create zip file from downloaded files + with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_file: + for file_path, arcname in file_paths: + if os.path.exists(file_path): + zip_file.write(file_path, arcname=arcname) + + # Add cleanup task to run after response is sent + background_tasks.add_task(self.cleanup_temp_files, temp_dir, zip_path) + + # Return the zip file as a response + return FileResponse( + path=zip_path, + media_type="application/zip", + filename="call_transcripts.zip", + background=background_tasks + ) \ No newline at end of file diff --git a/services/s3Service.py b/services/s3Service.py index b8664d1..ee809da 100644 --- a/services/s3Service.py +++ b/services/s3Service.py @@ -103,7 +103,7 @@ async def upload_file( print(f"Error generating pre-signed URL: {e}") raise BusinessValidationException(str(e)) -async def get_signed_url(key: str) -> str: +def get_signed_url(key: str) -> str: """ Generate a pre-signed URL for retrieving a file from S3.