ENPICOM Logo API Docs Python SDK Docs Events

enpi_api.l2.client.api.workflow_api

 1from enpi_api.l1 import openapi_client
 2from enpi_api.l2.types.api_error import ApiError
 3from enpi_api.l2.types.log import LogLevel
 4from enpi_api.l2.types.task import TaskState
 5from enpi_api.l2.types.workflow import WorkflowExecutionId, WorkflowExecutionTask
 6
 7
 8class WorkflowApi:
 9    _inner_api_client: openapi_client.ApiClient
10    _log_level: LogLevel
11
12    def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel):
13        """@private"""
14        self._inner_api_client = inner_api_client
15        self._log_level = log_level
16
17    def get_workflow_execution_task_states(self, workflow_execution_id: WorkflowExecutionId) -> list[WorkflowExecutionTask]:
18        """Get workflow execution task states by ID.
19
20        Returns:
21            enpi_api.l2.types.workflow.WorkflowExecution: The workflow execution.
22        """
23        workflow_api_instance = openapi_client.WorkflowApi(self._inner_api_client)
24
25        try:
26            response = workflow_api_instance.get_workflow_execution_task_states(workflow_execution_id)
27        except openapi_client.ApiException as e:
28            raise ApiError(e)
29
30        return [WorkflowExecutionTask.from_raw(i) for i in response.workflow_execution_task_states]
31
32    def get_workflow_execution_state(self, workflow_execution_id: WorkflowExecutionId) -> TaskState:
33        """Get workflow execution state by ID.
34
35        Returns:
36            enpi_api.l2.types.task.TaskState
37        """
38
39        workflow_api_instance = openapi_client.WorkflowApi(self._inner_api_client)
40
41        try:
42            response = workflow_api_instance.get_workflow_execution_state(workflow_execution_id)
43        except openapi_client.ApiException as e:
44            raise ApiError(e)
45
46        return TaskState(response.workflow_execution_state.lower())
class WorkflowApi:
 9class WorkflowApi:
10    _inner_api_client: openapi_client.ApiClient
11    _log_level: LogLevel
12
13    def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel):
14        """@private"""
15        self._inner_api_client = inner_api_client
16        self._log_level = log_level
17
18    def get_workflow_execution_task_states(self, workflow_execution_id: WorkflowExecutionId) -> list[WorkflowExecutionTask]:
19        """Get workflow execution task states by ID.
20
21        Returns:
22            enpi_api.l2.types.workflow.WorkflowExecution: The workflow execution.
23        """
24        workflow_api_instance = openapi_client.WorkflowApi(self._inner_api_client)
25
26        try:
27            response = workflow_api_instance.get_workflow_execution_task_states(workflow_execution_id)
28        except openapi_client.ApiException as e:
29            raise ApiError(e)
30
31        return [WorkflowExecutionTask.from_raw(i) for i in response.workflow_execution_task_states]
32
33    def get_workflow_execution_state(self, workflow_execution_id: WorkflowExecutionId) -> TaskState:
34        """Get workflow execution state by ID.
35
36        Returns:
37            enpi_api.l2.types.task.TaskState
38        """
39
40        workflow_api_instance = openapi_client.WorkflowApi(self._inner_api_client)
41
42        try:
43            response = workflow_api_instance.get_workflow_execution_state(workflow_execution_id)
44        except openapi_client.ApiException as e:
45            raise ApiError(e)
46
47        return TaskState(response.workflow_execution_state.lower())
def get_workflow_execution_task_states( self, workflow_execution_id: enpi_api.l2.types.workflow.WorkflowExecutionId) -> list[enpi_api.l2.types.workflow.WorkflowExecutionTask]:
18    def get_workflow_execution_task_states(self, workflow_execution_id: WorkflowExecutionId) -> list[WorkflowExecutionTask]:
19        """Get workflow execution task states by ID.
20
21        Returns:
22            enpi_api.l2.types.workflow.WorkflowExecution: The workflow execution.
23        """
24        workflow_api_instance = openapi_client.WorkflowApi(self._inner_api_client)
25
26        try:
27            response = workflow_api_instance.get_workflow_execution_task_states(workflow_execution_id)
28        except openapi_client.ApiException as e:
29            raise ApiError(e)
30
31        return [WorkflowExecutionTask.from_raw(i) for i in response.workflow_execution_task_states]

Get workflow execution task states by ID.

Returns:

enpi_api.l2.types.workflow.WorkflowExecution: The workflow execution.

def get_workflow_execution_state( self, workflow_execution_id: enpi_api.l2.types.workflow.WorkflowExecutionId) -> enpi_api.l2.types.task.TaskState:
33    def get_workflow_execution_state(self, workflow_execution_id: WorkflowExecutionId) -> TaskState:
34        """Get workflow execution state by ID.
35
36        Returns:
37            enpi_api.l2.types.task.TaskState
38        """
39
40        workflow_api_instance = openapi_client.WorkflowApi(self._inner_api_client)
41
42        try:
43            response = workflow_api_instance.get_workflow_execution_state(workflow_execution_id)
44        except openapi_client.ApiException as e:
45            raise ApiError(e)
46
47        return TaskState(response.workflow_execution_state.lower())

Get workflow execution state by ID.

Returns:

enpi_api.l2.types.task.TaskState