Skip to content

Commit

Permalink
are we done yet?
Browse files Browse the repository at this point in the history
  • Loading branch information
deeleeramone committed Dec 10, 2024
1 parent 53cd998 commit 078f5be
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ async def connect_and_stream():
logger.error("PROVIDER ERROR: WebSocket connection closed")

except Exception as e: # pylint: disable=broad-except
msg = f"PROVIDER ERROR: {e.__class__.__name__}: {e}"
logger.error(msg)
ERR = f"PROVIDER ERROR: {e.__class__.__name__}: {e}"
logger.error(ERR)

finally:
sys.exit(0)
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ async def subscribe(symbol, event):
elif event == "unsubscribe":
client.leave(ticker)
except Exception as e: # pylint: disable=broad-except
msg = f"PROVIDER ERROR: {e.__class__.__name__}: {e}"
logger.error(msg)
exc = f"PROVIDER ERROR: {e.__class__.__name__}: {e}"
logger.error(exc)


async def read_stdin_and_queue_commands():
Expand Down Expand Up @@ -133,8 +133,8 @@ async def connect_and_stream():
logger.error("PROVIDER INFO: WebSocket connection closed")

except Exception as e: # pylint: disable=broad-except
msg = f"PROVIDER ERROR: {e.__class__.__name__}: {e}"
logger.error(msg)
EXC = f"PROVIDER ERROR: {e.__class__.__name__}: {e}"
logger.error(EXC)

finally:
client.disconnect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ def validate_date(cls, v):
v = v / 1e9 # Convert nanoseconds to seconds
dt = datetime.fromtimestamp(v, tz=timezone("UTC"))
dt = dt.astimezone(timezone("America/New_York"))
return dt

return dt


class PolygonWebSocketQueryParams(WebSocketQueryParams):
Expand Down Expand Up @@ -352,9 +353,9 @@ def _validate_conditions(cls, v):
"""Validate the conditions."""
if v is None or isinstance(v, list) and v[0] == 0:
return None
elif isinstance(v, list) and v[0] == 1:
if isinstance(v, list) and v[0] == 1:
return "sellside"
elif isinstance(v, list) and v[0] == 2:
if isinstance(v, list) and v[0] == 2:
return "buyside"
return str(v)

Expand Down Expand Up @@ -1199,7 +1200,7 @@ def __new__(cls, **data):
or MODEL_MAP.get(data.get("type", ""))
)
if not model:
return super().__new__(cls)
return super().__new__(cls) # type: ignore

return model.model_validate(data) # type: ignore

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
CONNECT_KWARGS = kwargs.pop("connect_kwargs", {})
FEED = kwargs.pop("feed", None)
ASSET_TYPE = kwargs.pop("asset_type", None)
kwargs["results_file"] = os.path.abspath(kwargs["results_file"])


async def handle_symbol(symbol):
Expand Down Expand Up @@ -100,16 +101,14 @@ async def login(websocket, api_key):
err = f"UnauthorizedError -> {msg.get('message')}"
logger.error(err)
sys.exit(1)
break
if msg.get("status") != "auth_success":
err = (
f"UnauthorizedError -> {msg.get('status')} -> {msg.get('message')}"
)
logger.error(err)
sys.exit(1)
break
logger.info("PROVIDER INFO: %s", msg.get("message"))
except Exception as e:
except Exception as e: # pylint: disable=broad-except
logger.error("PROVIDER ERROR: %s -> %s", e.__class__.__name__, e.args[0])
sys.exit(1)

Expand All @@ -124,12 +123,12 @@ async def subscribe(websocket, symbol, event):
subscribe_event = f'{{"action":"{event}","params":"{ticker}"}}'
try:
await websocket.send(subscribe_event)
except Exception as e:
except Exception as e: # pylint: disable=broad-except
msg = f"PROVIDER ERROR: {e.__class__.__name__} -> {e}"
logger.error(msg)


async def read_stdin(command_queue):
async def read_stdin():
"""Read from stdin and queue commands."""
while True:
line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
Expand All @@ -155,7 +154,7 @@ async def process_stdin_queue(websocket):
await subscribe(websocket, symbol, event)


async def process_message(message, results_path, table_name, limit):
async def process_message(message):
"""Process the WebSocket message."""
messages = message if isinstance(message, list) else [message]
for msg in messages:
Expand All @@ -168,7 +167,6 @@ async def process_message(message, results_path, table_name, limit):
err = f"UnauthorizedError -> {msg.get('message')}"
logger.error(err)
sys.exit(1)
break

logger.info("PROVIDER INFO: %s", msg.get("message"))
elif msg and "ev" in msg and "status" not in msg:
Expand All @@ -183,20 +181,23 @@ async def process_message(message, results_path, table_name, limit):
raise e from e

if result:
await write_to_db(result, results_path, table_name, limit)
await write_to_db(
result,
kwargs["results_path"],
kwargs["table_name"],
kwargs.get("limit"),
)
else:
logger.info("PROVIDER INFO: %s", msg)


async def connect_and_stream(url, symbol, api_key, results_path, table_name, limit):
async def connect_and_stream(): # pylint: disable=too-many-branches, too-many-statements
"""Connect to the WebSocket and stream data to file."""

handler_task = asyncio.create_task(
queue.process_queue(
lambda message: process_message(message, results_path, table_name, limit)
)
queue.process_queue(lambda message: process_message(message))
)
stdin_task = asyncio.create_task(read_stdin(command_queue))
stdin_task = asyncio.create_task(read_stdin())
try:
connect_kwargs = CONNECT_KWARGS.copy()
if "ping_timeout" not in connect_kwargs:
Expand All @@ -205,16 +206,15 @@ async def connect_and_stream(url, symbol, api_key, results_path, table_name, lim
connect_kwargs["close_timeout"] = None

try:
async with websockets.connect(url, **connect_kwargs) as websocket:

await login(websocket, api_key)
async with websockets.connect(kwargs["url"], **connect_kwargs) as websocket:
await login(websocket, kwargs["api_key"])
response = await websocket.recv()
messages = json.loads(response)
await process_message(messages, results_path, table_name, limit)
await subscribe(websocket, symbol, "subscribe")
await process_message(messages)
await subscribe(websocket, kwargs["symbol"], "subscribe")
response = await websocket.recv()
messages = json.loads(response)
await process_message(messages, results_path, table_name, limit)
await process_message(messages)
while True:
cmd_task = asyncio.create_task(process_stdin_queue(websocket))
msg_task = asyncio.create_task(websocket.recv())
Expand Down Expand Up @@ -255,14 +255,14 @@ async def connect_and_stream(url, symbol, api_key, results_path, table_name, lim
# Attempt to reopen the connection
logger.info("PROVIDER INFO: Attempting to reconnect after five seconds.")
await asyncio.sleep(5)
await connect_and_stream(url, symbol, api_key, results_path, table_name, limit)
await connect_and_stream()

except websockets.WebSocketException as e:
msg = f"PROVIDER ERROR: WebSocketException -> {e}"
logger.error(msg)
sys.exit(1)

except Exception as e:
except Exception as e: # pylint: disable=broad-except
msg = f"PROVIDER ERROR: Unexpected error -> {e.__class__.__name__}: {e}"
logger.error(msg)
sys.exit(1)
Expand All @@ -284,14 +284,7 @@ async def connect_and_stream(url, symbol, api_key, results_path, table_name, lim
loop.add_signal_handler(sig, handle_termination_signal, logger)

asyncio.run_coroutine_threadsafe(
connect_and_stream(
kwargs["url"],
kwargs["symbol"],
kwargs["api_key"],
os.path.abspath(kwargs["results_file"]),
kwargs["table_name"],
kwargs.get("limit", None),
),
connect_and_stream(),
loop,
)
loop.run_forever()
Expand All @@ -300,8 +293,8 @@ async def connect_and_stream(url, symbol, api_key, results_path, table_name, lim
logger.error("PROVIDER ERROR: WebSocket connection closed")

except Exception as e: # pylint: disable=broad-except
msg = f"PROVIDER ERROR: {e.__class__.__name__} -> {e}"
logger.error(msg)
ERR = f"PROVIDER ERROR: {e.__class__.__name__} -> {e}"
logger.error(ERR)

finally:
sys.exit(0)

0 comments on commit 078f5be

Please sign in to comment.