ENPICOM Logo API Docs Python SDK Docs Events

enpi_api.l2.events.workflow_execution_waitable

  1from threading import Lock
  2from time import sleep
  3from typing import Any, Callable, Generic, Optional, TypeVar
  4
  5from loguru import logger
  6from pydantic import ValidationError
  7
  8from enpi_api.l2.client.api.workflow_api import WorkflowApi
  9from enpi_api.l2.events.space_event_listener import SpaceEventListener
 10from enpi_api.l2.types.event import Event, WorkflowExecutionPayload
 11from enpi_api.l2.types.log import LogLevel
 12from enpi_api.l2.types.task import END_STATES, TaskState
 13from enpi_api.l2.types.workflow import WorkflowExecutionId
 14from enpi_api.l2.util.client import get_client
 15
 16T = TypeVar("T")
 17
 18
 19class WorkflowExecutionWaitManager:
 20    _instance = None
 21    _lock = Lock()
 22    _initialized: bool
 23
 24    def __new__(cls) -> "WorkflowExecutionWaitManager":
 25        with cls._lock:
 26            if cls._instance is None:
 27                cls._instance = super(WorkflowExecutionWaitManager, cls).__new__(cls)
 28                cls._instance._initialized = False
 29        return cls._instance
 30
 31    def __init__(self) -> None:
 32        if not self._initialized:
 33            self.waitables: list["WorkflowExecutionWaitable[Any]"] = []  # type: ignore[misc]
 34            self.listener = SpaceEventListener(on_event=self._on_event)
 35            self._running = False
 36            self._initialized = True
 37
 38    def start_waiting(self) -> None:
 39        if not self._running:
 40            self._running = True
 41            self.listener.start_listening()
 42            self._wait_loop()
 43
 44    def _wait_loop(self) -> None:
 45        i = 0
 46        while self.waitables:
 47            # Log every 10 seconds
 48            if i % 10 == 0:
 49                logger.info(f"Waiting for {len(self.waitables)} workflow execution to finish...")
 50            sleep(1)
 51            i += 1
 52        self._running = False
 53        self.listener.stop_listening()
 54
 55    def register_waitable(self, waitable: "WorkflowExecutionWaitable[T]") -> None:
 56        self.waitables.append(waitable)
 57
 58    def _on_event(self, topic: str, event: Event) -> None:
 59        try:
 60            payload = WorkflowExecutionPayload.model_validate(event.payload)
 61        except ValidationError:
 62            return
 63
 64        self.waitables = [w for w in self.waitables if w.on_event(payload)]
 65        logger.info(f"After event still waiting for {len(self.waitables)} workflow execution to finish...")
 66
 67
 68class WorkflowExecutionWaitable(Generic[T]):
 69    def __init__(
 70        self,
 71        workflow_execution_id: WorkflowExecutionId,
 72        on_complete: Optional[Callable[[WorkflowExecutionId, TaskState], T]] | None = None,
 73    ) -> None:
 74        self.workflow_execution_id = workflow_execution_id
 75        self.on_complete = on_complete
 76
 77        self.waiting_for_workflow_execution = self.needs_to_wait_after_init()
 78        self.result: T | None = None
 79
 80        if self.waiting_for_workflow_execution:
 81            self.manager = WorkflowExecutionWaitManager()
 82
 83    def on_event(self, payload: WorkflowExecutionPayload) -> bool:
 84        if payload.id == self.workflow_execution_id and (payload.state.lower() in END_STATES):
 85            logger.info(f"Workflow execution {self.workflow_execution_id} finished with state: {payload.state}")
 86            if self.on_complete:
 87                try:
 88                    self.result = self.on_complete(payload.id, TaskState(payload.state.lower()))
 89                except Exception as e:
 90                    # It should not re-raise to avoid waitables never stopping to wait
 91                    logger.error(f"Error in on_complete callback: {e}")
 92
 93            self.waiting_for_workflow_execution = False
 94
 95        return self.waiting_for_workflow_execution
 96
 97    def needs_to_wait_after_init(self) -> bool:
 98        # First, poll the api once and make sure the workflow execution isn't already finished
 99        workflow_api = WorkflowApi(get_client(), log_level=LogLevel.Info)
100        state = workflow_api.get_workflow_execution_state(self.workflow_execution_id)
101        if state in END_STATES:
102            logger.info(f"Workflow execution {self.workflow_execution_id} finished with state: {state}")
103            if self.on_complete:
104                self.result = self.on_complete(self.workflow_execution_id, state)
105            return False
106
107        return True
108
109    def wait(self) -> None:
110        self.waiting_for_workflow_execution = self.needs_to_wait_after_init()
111        if self.waiting_for_workflow_execution:
112            self.manager.register_waitable(self)
113            self.manager.start_waiting()
114
115    def wait_and_return_result(self) -> T:
116        self.wait()
117        assert self.result is not None
118        return self.result
119
120    def check_execution_state(self) -> TaskState:
121        workflow_api = WorkflowApi(get_client(), log_level=LogLevel.Info)
122        return workflow_api.get_workflow_execution_state(self.workflow_execution_id)
class WorkflowExecutionWaitManager:
20class WorkflowExecutionWaitManager:
21    _instance = None
22    _lock = Lock()
23    _initialized: bool
24
25    def __new__(cls) -> "WorkflowExecutionWaitManager":
26        with cls._lock:
27            if cls._instance is None:
28                cls._instance = super(WorkflowExecutionWaitManager, cls).__new__(cls)
29                cls._instance._initialized = False
30        return cls._instance
31
32    def __init__(self) -> None:
33        if not self._initialized:
34            self.waitables: list["WorkflowExecutionWaitable[Any]"] = []  # type: ignore[misc]
35            self.listener = SpaceEventListener(on_event=self._on_event)
36            self._running = False
37            self._initialized = True
38
39    def start_waiting(self) -> None:
40        if not self._running:
41            self._running = True
42            self.listener.start_listening()
43            self._wait_loop()
44
45    def _wait_loop(self) -> None:
46        i = 0
47        while self.waitables:
48            # Log every 10 seconds
49            if i % 10 == 0:
50                logger.info(f"Waiting for {len(self.waitables)} workflow execution to finish...")
51            sleep(1)
52            i += 1
53        self._running = False
54        self.listener.stop_listening()
55
56    def register_waitable(self, waitable: "WorkflowExecutionWaitable[T]") -> None:
57        self.waitables.append(waitable)
58
59    def _on_event(self, topic: str, event: Event) -> None:
60        try:
61            payload = WorkflowExecutionPayload.model_validate(event.payload)
62        except ValidationError:
63            return
64
65        self.waitables = [w for w in self.waitables if w.on_event(payload)]
66        logger.info(f"After event still waiting for {len(self.waitables)} workflow execution to finish...")
def start_waiting(self) -> None:
39    def start_waiting(self) -> None:
40        if not self._running:
41            self._running = True
42            self.listener.start_listening()
43            self._wait_loop()
def register_waitable( self, waitable: WorkflowExecutionWaitable[~T]) -> None:
56    def register_waitable(self, waitable: "WorkflowExecutionWaitable[T]") -> None:
57        self.waitables.append(waitable)
class WorkflowExecutionWaitable(typing.Generic[~T]):
 69class WorkflowExecutionWaitable(Generic[T]):
 70    def __init__(
 71        self,
 72        workflow_execution_id: WorkflowExecutionId,
 73        on_complete: Optional[Callable[[WorkflowExecutionId, TaskState], T]] | None = None,
 74    ) -> None:
 75        self.workflow_execution_id = workflow_execution_id
 76        self.on_complete = on_complete
 77
 78        self.waiting_for_workflow_execution = self.needs_to_wait_after_init()
 79        self.result: T | None = None
 80
 81        if self.waiting_for_workflow_execution:
 82            self.manager = WorkflowExecutionWaitManager()
 83
 84    def on_event(self, payload: WorkflowExecutionPayload) -> bool:
 85        if payload.id == self.workflow_execution_id and (payload.state.lower() in END_STATES):
 86            logger.info(f"Workflow execution {self.workflow_execution_id} finished with state: {payload.state}")
 87            if self.on_complete:
 88                try:
 89                    self.result = self.on_complete(payload.id, TaskState(payload.state.lower()))
 90                except Exception as e:
 91                    # It should not re-raise to avoid waitables never stopping to wait
 92                    logger.error(f"Error in on_complete callback: {e}")
 93
 94            self.waiting_for_workflow_execution = False
 95
 96        return self.waiting_for_workflow_execution
 97
 98    def needs_to_wait_after_init(self) -> bool:
 99        # First, poll the api once and make sure the workflow execution isn't already finished
100        workflow_api = WorkflowApi(get_client(), log_level=LogLevel.Info)
101        state = workflow_api.get_workflow_execution_state(self.workflow_execution_id)
102        if state in END_STATES:
103            logger.info(f"Workflow execution {self.workflow_execution_id} finished with state: {state}")
104            if self.on_complete:
105                self.result = self.on_complete(self.workflow_execution_id, state)
106            return False
107
108        return True
109
110    def wait(self) -> None:
111        self.waiting_for_workflow_execution = self.needs_to_wait_after_init()
112        if self.waiting_for_workflow_execution:
113            self.manager.register_waitable(self)
114            self.manager.start_waiting()
115
116    def wait_and_return_result(self) -> T:
117        self.wait()
118        assert self.result is not None
119        return self.result
120
121    def check_execution_state(self) -> TaskState:
122        workflow_api = WorkflowApi(get_client(), log_level=LogLevel.Info)
123        return workflow_api.get_workflow_execution_state(self.workflow_execution_id)

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default
WorkflowExecutionWaitable( workflow_execution_id: enpi_api.l2.types.workflow.WorkflowExecutionId, on_complete: Optional[Callable[[enpi_api.l2.types.workflow.WorkflowExecutionId, enpi_api.l2.types.task.TaskState], ~T]] = None)
70    def __init__(
71        self,
72        workflow_execution_id: WorkflowExecutionId,
73        on_complete: Optional[Callable[[WorkflowExecutionId, TaskState], T]] | None = None,
74    ) -> None:
75        self.workflow_execution_id = workflow_execution_id
76        self.on_complete = on_complete
77
78        self.waiting_for_workflow_execution = self.needs_to_wait_after_init()
79        self.result: T | None = None
80
81        if self.waiting_for_workflow_execution:
82            self.manager = WorkflowExecutionWaitManager()
workflow_execution_id
on_complete
waiting_for_workflow_execution
result: Optional[~T]
def on_event(self, payload: enpi_api.l2.types.event.WorkflowExecutionPayload) -> bool:
84    def on_event(self, payload: WorkflowExecutionPayload) -> bool:
85        if payload.id == self.workflow_execution_id and (payload.state.lower() in END_STATES):
86            logger.info(f"Workflow execution {self.workflow_execution_id} finished with state: {payload.state}")
87            if self.on_complete:
88                try:
89                    self.result = self.on_complete(payload.id, TaskState(payload.state.lower()))
90                except Exception as e:
91                    # It should not re-raise to avoid waitables never stopping to wait
92                    logger.error(f"Error in on_complete callback: {e}")
93
94            self.waiting_for_workflow_execution = False
95
96        return self.waiting_for_workflow_execution
def needs_to_wait_after_init(self) -> bool:
 98    def needs_to_wait_after_init(self) -> bool:
 99        # First, poll the api once and make sure the workflow execution isn't already finished
100        workflow_api = WorkflowApi(get_client(), log_level=LogLevel.Info)
101        state = workflow_api.get_workflow_execution_state(self.workflow_execution_id)
102        if state in END_STATES:
103            logger.info(f"Workflow execution {self.workflow_execution_id} finished with state: {state}")
104            if self.on_complete:
105                self.result = self.on_complete(self.workflow_execution_id, state)
106            return False
107
108        return True
def wait(self) -> None:
110    def wait(self) -> None:
111        self.waiting_for_workflow_execution = self.needs_to_wait_after_init()
112        if self.waiting_for_workflow_execution:
113            self.manager.register_waitable(self)
114            self.manager.start_waiting()
def wait_and_return_result(self) -> ~T:
116    def wait_and_return_result(self) -> T:
117        self.wait()
118        assert self.result is not None
119        return self.result
def check_execution_state(self) -> enpi_api.l2.types.task.TaskState:
121    def check_execution_state(self) -> TaskState:
122        workflow_api = WorkflowApi(get_client(), log_level=LogLevel.Info)
123        return workflow_api.get_workflow_execution_state(self.workflow_execution_id)