-
Notifications
You must be signed in to change notification settings - Fork 0
/
main_old.py
78 lines (57 loc) · 1.99 KB
/
main_old.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# Fast API Imports
from fastapi import APIRouter, File, UploadFile, HTTPException, Query
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from fastapi import FastAPI
from typing import List
from fastapi.middleware.cors import CORSMiddleware
from concurrent.futures import ThreadPoolExecutor
import asyncio
# Internal modules imports
from utils.case_details import get_case_details
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins
allow_credentials=True,
allow_methods=["*"], # Allow all methods
allow_headers=["*"], # Allow all headers
)
class DataPayloadBulkCase(BaseModel):
cinos: List[str]
# @app.post('/case_details_bulk')
# async def case_details(data: DataPayloadBulkCase):
# try:
# results = []
# for cino in data.cinos:
# data_payload = {
# 'cino': cino,
# 'source': 'undefined'
# }
# table_data = await get_case_details(data_payload['cino'])
# results.append({
# 'cino': cino,
# 'data': table_data
# })
# return {"results": results}
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
@app.post('/case_details_bulk')
async def case_details(data: DataPayloadBulkCase):
loop = asyncio.get_event_loop()
results = []
with ThreadPoolExecutor() as pool:
futures = []
for cino in data.cinos:
data_payload = {
'cino': cino,
'source': 'undefined'
}
futures.append(loop.run_in_executor(pool, get_case_details, data_payload))
for future in asyncio.as_completed(futures):
table_data = await future
results.append({
'cino': future.cino, # You might want to handle this differently
'data': table_data
})
return {"results": results}