enpi_api.l2.events.workflow_execution_waitable
1from threading import Lock 2from time import sleep 3from typing import Any, Callable, Generic, Optional, TypeVar 4 5from loguru import logger 6from pydantic import ValidationError 7 8from enpi_api.l2.client.api.workflow_api import WorkflowApi 9from enpi_api.l2.events.space_event_listener import SpaceEventListener 10from enpi_api.l2.types.event import Event, WorkflowExecutionPayload 11from enpi_api.l2.types.log import LogLevel 12from enpi_api.l2.types.task import END_STATES, TaskState 13from enpi_api.l2.types.workflow import WorkflowExecutionId 14from enpi_api.l2.util.client import get_client 15 16T = TypeVar("T") 17 18 19class WorkflowExecutionWaitManager: 20 _instance = None 21 _lock = Lock() 22 _initialized: bool 23 24 def __new__(cls) -> "WorkflowExecutionWaitManager": 25 with cls._lock: 26 if cls._instance is None: 27 cls._instance = super(WorkflowExecutionWaitManager, cls).__new__(cls) 28 cls._instance._initialized = False 29 return cls._instance 30 31 def __init__(self) -> None: 32 if not self._initialized: 33 self.waitables: list["WorkflowExecutionWaitable[Any]"] = [] # type: ignore[misc] 34 self.listener = SpaceEventListener(on_event=self._on_event) 35 self._running = False 36 self._initialized = True 37 38 def start_waiting(self) -> None: 39 if not self._running: 40 self._running = True 41 self.listener.start_listening() 42 self._wait_loop() 43 44 def _wait_loop(self) -> None: 45 i = 0 46 while self.waitables: 47 # Log every 10 seconds 48 if i % 10 == 0: 49 logger.info(f"Waiting for {len(self.waitables)} workflow execution to finish...") 50 sleep(1) 51 i += 1 52 self._running = False 53 self.listener.stop_listening() 54 55 def register_waitable(self, waitable: "WorkflowExecutionWaitable[T]") -> None: 56 self.waitables.append(waitable) 57 58 def _on_event(self, topic: str, event: Event) -> None: 59 try: 60 payload = WorkflowExecutionPayload.model_validate(event.payload) 61 except ValidationError: 62 return 63 64 self.waitables = [w for w in self.waitables if w.on_event(payload)] 65 logger.info(f"After event still waiting for {len(self.waitables)} workflow execution to finish...") 66 67 68class WorkflowExecutionWaitable(Generic[T]): 69 def __init__( 70 self, 71 workflow_execution_id: WorkflowExecutionId, 72 on_complete: Optional[Callable[[WorkflowExecutionId, TaskState], T]] | None = None, 73 ) -> None: 74 self.workflow_execution_id = workflow_execution_id 75 self.on_complete = on_complete 76 77 self.waiting_for_workflow_execution = self.needs_to_wait_after_init() 78 self.result: T | None = None 79 80 if self.waiting_for_workflow_execution: 81 self.manager = WorkflowExecutionWaitManager() 82 83 def on_event(self, payload: WorkflowExecutionPayload) -> bool: 84 if payload.id == self.workflow_execution_id and (payload.state.lower() in END_STATES): 85 logger.info(f"Workflow execution {self.workflow_execution_id} finished with state: {payload.state}") 86 if self.on_complete: 87 try: 88 self.result = self.on_complete(payload.id, TaskState(payload.state.lower())) 89 except Exception as e: 90 # It should not re-raise to avoid waitables never stopping to wait 91 logger.error(f"Error in on_complete callback: {e}") 92 93 self.waiting_for_workflow_execution = False 94 95 return self.waiting_for_workflow_execution 96 97 def needs_to_wait_after_init(self) -> bool: 98 # First, poll the api once and make sure the workflow execution isn't already finished 99 workflow_api = WorkflowApi(get_client(), log_level=LogLevel.Info) 100 state = workflow_api.get_workflow_execution_state(self.workflow_execution_id) 101 if state in END_STATES: 102 logger.info(f"Workflow execution {self.workflow_execution_id} finished with state: {state}") 103 if self.on_complete: 104 self.result = self.on_complete(self.workflow_execution_id, state) 105 return False 106 107 return True 108 109 def wait(self) -> None: 110 self.waiting_for_workflow_execution = self.needs_to_wait_after_init() 111 if self.waiting_for_workflow_execution: 112 self.manager.register_waitable(self) 113 self.manager.start_waiting() 114 115 def wait_and_return_result(self) -> T: 116 self.wait() 117 assert self.result is not None 118 return self.result 119 120 def check_execution_state(self) -> TaskState: 121 workflow_api = WorkflowApi(get_client(), log_level=LogLevel.Info) 122 return workflow_api.get_workflow_execution_state(self.workflow_execution_id)
class
WorkflowExecutionWaitManager:
20class WorkflowExecutionWaitManager: 21 _instance = None 22 _lock = Lock() 23 _initialized: bool 24 25 def __new__(cls) -> "WorkflowExecutionWaitManager": 26 with cls._lock: 27 if cls._instance is None: 28 cls._instance = super(WorkflowExecutionWaitManager, cls).__new__(cls) 29 cls._instance._initialized = False 30 return cls._instance 31 32 def __init__(self) -> None: 33 if not self._initialized: 34 self.waitables: list["WorkflowExecutionWaitable[Any]"] = [] # type: ignore[misc] 35 self.listener = SpaceEventListener(on_event=self._on_event) 36 self._running = False 37 self._initialized = True 38 39 def start_waiting(self) -> None: 40 if not self._running: 41 self._running = True 42 self.listener.start_listening() 43 self._wait_loop() 44 45 def _wait_loop(self) -> None: 46 i = 0 47 while self.waitables: 48 # Log every 10 seconds 49 if i % 10 == 0: 50 logger.info(f"Waiting for {len(self.waitables)} workflow execution to finish...") 51 sleep(1) 52 i += 1 53 self._running = False 54 self.listener.stop_listening() 55 56 def register_waitable(self, waitable: "WorkflowExecutionWaitable[T]") -> None: 57 self.waitables.append(waitable) 58 59 def _on_event(self, topic: str, event: Event) -> None: 60 try: 61 payload = WorkflowExecutionPayload.model_validate(event.payload) 62 except ValidationError: 63 return 64 65 self.waitables = [w for w in self.waitables if w.on_event(payload)] 66 logger.info(f"After event still waiting for {len(self.waitables)} workflow execution to finish...")
class
WorkflowExecutionWaitable(typing.Generic[~T]):
69class WorkflowExecutionWaitable(Generic[T]): 70 def __init__( 71 self, 72 workflow_execution_id: WorkflowExecutionId, 73 on_complete: Optional[Callable[[WorkflowExecutionId, TaskState], T]] | None = None, 74 ) -> None: 75 self.workflow_execution_id = workflow_execution_id 76 self.on_complete = on_complete 77 78 self.waiting_for_workflow_execution = self.needs_to_wait_after_init() 79 self.result: T | None = None 80 81 if self.waiting_for_workflow_execution: 82 self.manager = WorkflowExecutionWaitManager() 83 84 def on_event(self, payload: WorkflowExecutionPayload) -> bool: 85 if payload.id == self.workflow_execution_id and (payload.state.lower() in END_STATES): 86 logger.info(f"Workflow execution {self.workflow_execution_id} finished with state: {payload.state}") 87 if self.on_complete: 88 try: 89 self.result = self.on_complete(payload.id, TaskState(payload.state.lower())) 90 except Exception as e: 91 # It should not re-raise to avoid waitables never stopping to wait 92 logger.error(f"Error in on_complete callback: {e}") 93 94 self.waiting_for_workflow_execution = False 95 96 return self.waiting_for_workflow_execution 97 98 def needs_to_wait_after_init(self) -> bool: 99 # First, poll the api once and make sure the workflow execution isn't already finished 100 workflow_api = WorkflowApi(get_client(), log_level=LogLevel.Info) 101 state = workflow_api.get_workflow_execution_state(self.workflow_execution_id) 102 if state in END_STATES: 103 logger.info(f"Workflow execution {self.workflow_execution_id} finished with state: {state}") 104 if self.on_complete: 105 self.result = self.on_complete(self.workflow_execution_id, state) 106 return False 107 108 return True 109 110 def wait(self) -> None: 111 self.waiting_for_workflow_execution = self.needs_to_wait_after_init() 112 if self.waiting_for_workflow_execution: 113 self.manager.register_waitable(self) 114 self.manager.start_waiting() 115 116 def wait_and_return_result(self) -> T: 117 self.wait() 118 assert self.result is not None 119 return self.result 120 121 def check_execution_state(self) -> TaskState: 122 workflow_api = WorkflowApi(get_client(), log_level=LogLevel.Info) 123 return workflow_api.get_workflow_execution_state(self.workflow_execution_id)
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]:
def __getitem__(self, key: KT) -> VT:
...
# Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
try:
return mapping[key]
except KeyError:
return default
WorkflowExecutionWaitable( workflow_execution_id: enpi_api.l2.types.workflow.WorkflowExecutionId, on_complete: Optional[Callable[[enpi_api.l2.types.workflow.WorkflowExecutionId, enpi_api.l2.types.task.TaskState], ~T]] = None)
70 def __init__( 71 self, 72 workflow_execution_id: WorkflowExecutionId, 73 on_complete: Optional[Callable[[WorkflowExecutionId, TaskState], T]] | None = None, 74 ) -> None: 75 self.workflow_execution_id = workflow_execution_id 76 self.on_complete = on_complete 77 78 self.waiting_for_workflow_execution = self.needs_to_wait_after_init() 79 self.result: T | None = None 80 81 if self.waiting_for_workflow_execution: 82 self.manager = WorkflowExecutionWaitManager()
84 def on_event(self, payload: WorkflowExecutionPayload) -> bool: 85 if payload.id == self.workflow_execution_id and (payload.state.lower() in END_STATES): 86 logger.info(f"Workflow execution {self.workflow_execution_id} finished with state: {payload.state}") 87 if self.on_complete: 88 try: 89 self.result = self.on_complete(payload.id, TaskState(payload.state.lower())) 90 except Exception as e: 91 # It should not re-raise to avoid waitables never stopping to wait 92 logger.error(f"Error in on_complete callback: {e}") 93 94 self.waiting_for_workflow_execution = False 95 96 return self.waiting_for_workflow_execution
def
needs_to_wait_after_init(self) -> bool:
98 def needs_to_wait_after_init(self) -> bool: 99 # First, poll the api once and make sure the workflow execution isn't already finished 100 workflow_api = WorkflowApi(get_client(), log_level=LogLevel.Info) 101 state = workflow_api.get_workflow_execution_state(self.workflow_execution_id) 102 if state in END_STATES: 103 logger.info(f"Workflow execution {self.workflow_execution_id} finished with state: {state}") 104 if self.on_complete: 105 self.result = self.on_complete(self.workflow_execution_id, state) 106 return False 107 108 return True