ENPICOM Logo API Docs Python SDK Docs Events

enpi_api.l2.util.events

 1from loguru import logger
 2
 3from enpi_api.l2.types.event import Event
 4from enpi_api.l2.types.task import TaskId, TaskState
 5
 6
 7def job_is_finished(event: Event, task_id: TaskId) -> bool:
 8    if event.payload:
 9        running_job_id = dict(event.payload).get("job_id")
10        state = str(dict(event.payload).get("state")).lower()
11        if running_job_id == task_id and (state == TaskState.SUCCEEDED or state == TaskState.FAILED):
12            logger.info(f"Job {task_id} finished with state: {state}")
13            return True
14    return False
def job_is_finished( event: enpi_api.l2.types.event.Event, task_id: enpi_api.l2.types.task.TaskId) -> bool:
 8def job_is_finished(event: Event, task_id: TaskId) -> bool:
 9    if event.payload:
10        running_job_id = dict(event.payload).get("job_id")
11        state = str(dict(event.payload).get("state")).lower()
12        if running_job_id == task_id and (state == TaskState.SUCCEEDED or state == TaskState.FAILED):
13            logger.info(f"Job {task_id} finished with state: {state}")
14            return True
15    return False