Skip to content

Commit

Permalink
update query refresh to include terminal states (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
bh2smith authored Oct 7, 2022
1 parent 93afe3b commit 569f4a3
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 11 deletions.
28 changes: 17 additions & 11 deletions dune_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,22 @@ def refresh(self, query: Query, ping_frequency: int = 5) -> list[DuneRecord]:
Sleeps `ping_frequency` seconds between each status request.
"""
job_id = self.execute(query).execution_id
state = self.get_status(job_id).state
while state != ExecutionState.COMPLETED:
log.info(
f"waiting for query execution {job_id} to complete: current state {state}"
)
status = self.get_status(job_id)
while status.state not in ExecutionState.terminal_states():
log.info(f"waiting for query execution {job_id} to complete: {status}")
time.sleep(ping_frequency)
state = self.get_status(job_id).state
status = self.get_status(job_id)

full_response = self.get_result(job_id)
assert (
full_response.result is not None
), f"Expected Results on completed execution status {full_response}"
return full_response.result.rows
if status.state == ExecutionState.COMPLETED:
full_response = self.get_result(job_id)
assert (
full_response.result is not None
), f"Expected Results on completed execution status {full_response}"
return full_response.result.rows

if status.state == ExecutionState.CANCELLED:
log.info("Execution Cancelled, returning empty record set")
return []

log.error(status)
raise Exception(f"{status}. Perhaps your query took too long to run!")
19 changes: 19 additions & 0 deletions dune_client/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ class ExecutionState(Enum):
EXECUTING = "QUERY_STATE_EXECUTING"
PENDING = "QUERY_STATE_PENDING"
CANCELLED = "QUERY_STATE_CANCELLED"
FAILED = "QUERY_STATE_FAILED"

@classmethod
def terminal_states(cls) -> set[ExecutionState]:
"""
Returns the terminal states (i.e. when a query execution is no longer executing
"""
return {cls.COMPLETED, cls.CANCELLED, cls.FAILED}


@dataclass
Expand Down Expand Up @@ -116,6 +124,17 @@ def from_dict(cls, data: dict[str, Any]) -> ExecutionStatusResponse:
times=TimeData.from_dict(data), # Sending the entire data dict
)

def __str__(self) -> str:
if self.state == ExecutionState.PENDING:
return f"{self.state} (queue position: {self.queue_position})"
if self.state == ExecutionState.FAILED:
return (
f"{self.state}: execution_id={self.execution_id}, "
f"query_id={self.query_id}, times={self.times}"
)

return f"{self.state}"


@dataclass
class ResultMetadata:
Expand Down

0 comments on commit 569f4a3

Please sign in to comment.