enpi_api.l2.util.events
1from loguru import logger 2 3from enpi_api.l2.types.event import Event 4from enpi_api.l2.types.task import TaskId, TaskState 5 6 7def job_is_finished(event: Event, task_id: TaskId) -> bool: 8 if event.payload: 9 running_job_id = dict(event.payload).get("job_id") 10 state = str(dict(event.payload).get("state")).lower() 11 if running_job_id == task_id and (state == TaskState.SUCCEEDED or state == TaskState.FAILED): 12 logger.info(f"Job {task_id} finished with state: {state}") 13 return True 14 return False
def
job_is_finished( event: enpi_api.l2.types.event.Event, task_id: enpi_api.l2.types.task.TaskId) -> bool:
8def job_is_finished(event: Event, task_id: TaskId) -> bool: 9 if event.payload: 10 running_job_id = dict(event.payload).get("job_id") 11 state = str(dict(event.payload).get("state")).lower() 12 if running_job_id == task_id and (state == TaskState.SUCCEEDED or state == TaskState.FAILED): 13 logger.info(f"Job {task_id} finished with state: {state}") 14 return True 15 return False