enpi_api.l2.client.api.workflow_api
1from enpi_api.l1 import openapi_client 2from enpi_api.l2.types.api_error import ApiError 3from enpi_api.l2.types.log import LogLevel 4from enpi_api.l2.types.task import TaskState 5from enpi_api.l2.types.workflow import WorkflowExecutionId, WorkflowExecutionTask 6 7 8class WorkflowApi: 9 _inner_api_client: openapi_client.ApiClient 10 _log_level: LogLevel 11 12 def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel): 13 """@private""" 14 self._inner_api_client = inner_api_client 15 self._log_level = log_level 16 17 def get_workflow_execution_task_states(self, workflow_execution_id: WorkflowExecutionId) -> list[WorkflowExecutionTask]: 18 """Get workflow execution task states by ID. 19 20 Returns: 21 enpi_api.l2.types.workflow.WorkflowExecution: The workflow execution. 22 """ 23 workflow_api_instance = openapi_client.WorkflowApi(self._inner_api_client) 24 25 try: 26 response = workflow_api_instance.get_workflow_execution_task_states(workflow_execution_id) 27 except openapi_client.ApiException as e: 28 raise ApiError(e) 29 30 return [WorkflowExecutionTask.from_raw(i) for i in response.workflow_execution_task_states] 31 32 def get_workflow_execution_state(self, workflow_execution_id: WorkflowExecutionId) -> TaskState: 33 """Get workflow execution state by ID. 34 35 Returns: 36 enpi_api.l2.types.task.TaskState 37 """ 38 39 workflow_api_instance = openapi_client.WorkflowApi(self._inner_api_client) 40 41 try: 42 response = workflow_api_instance.get_workflow_execution_state(workflow_execution_id) 43 except openapi_client.ApiException as e: 44 raise ApiError(e) 45 46 return TaskState(response.workflow_execution_state.lower())
class
WorkflowApi:
9class WorkflowApi: 10 _inner_api_client: openapi_client.ApiClient 11 _log_level: LogLevel 12 13 def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel): 14 """@private""" 15 self._inner_api_client = inner_api_client 16 self._log_level = log_level 17 18 def get_workflow_execution_task_states(self, workflow_execution_id: WorkflowExecutionId) -> list[WorkflowExecutionTask]: 19 """Get workflow execution task states by ID. 20 21 Returns: 22 enpi_api.l2.types.workflow.WorkflowExecution: The workflow execution. 23 """ 24 workflow_api_instance = openapi_client.WorkflowApi(self._inner_api_client) 25 26 try: 27 response = workflow_api_instance.get_workflow_execution_task_states(workflow_execution_id) 28 except openapi_client.ApiException as e: 29 raise ApiError(e) 30 31 return [WorkflowExecutionTask.from_raw(i) for i in response.workflow_execution_task_states] 32 33 def get_workflow_execution_state(self, workflow_execution_id: WorkflowExecutionId) -> TaskState: 34 """Get workflow execution state by ID. 35 36 Returns: 37 enpi_api.l2.types.task.TaskState 38 """ 39 40 workflow_api_instance = openapi_client.WorkflowApi(self._inner_api_client) 41 42 try: 43 response = workflow_api_instance.get_workflow_execution_state(workflow_execution_id) 44 except openapi_client.ApiException as e: 45 raise ApiError(e) 46 47 return TaskState(response.workflow_execution_state.lower())
def
get_workflow_execution_task_states( self, workflow_execution_id: enpi_api.l2.types.workflow.WorkflowExecutionId) -> list[enpi_api.l2.types.workflow.WorkflowExecutionTask]:
18 def get_workflow_execution_task_states(self, workflow_execution_id: WorkflowExecutionId) -> list[WorkflowExecutionTask]: 19 """Get workflow execution task states by ID. 20 21 Returns: 22 enpi_api.l2.types.workflow.WorkflowExecution: The workflow execution. 23 """ 24 workflow_api_instance = openapi_client.WorkflowApi(self._inner_api_client) 25 26 try: 27 response = workflow_api_instance.get_workflow_execution_task_states(workflow_execution_id) 28 except openapi_client.ApiException as e: 29 raise ApiError(e) 30 31 return [WorkflowExecutionTask.from_raw(i) for i in response.workflow_execution_task_states]
Get workflow execution task states by ID.
Returns:
enpi_api.l2.types.workflow.WorkflowExecution: The workflow execution.
def
get_workflow_execution_state( self, workflow_execution_id: enpi_api.l2.types.workflow.WorkflowExecutionId) -> enpi_api.l2.types.task.TaskState:
33 def get_workflow_execution_state(self, workflow_execution_id: WorkflowExecutionId) -> TaskState: 34 """Get workflow execution state by ID. 35 36 Returns: 37 enpi_api.l2.types.task.TaskState 38 """ 39 40 workflow_api_instance = openapi_client.WorkflowApi(self._inner_api_client) 41 42 try: 43 response = workflow_api_instance.get_workflow_execution_state(workflow_execution_id) 44 except openapi_client.ApiException as e: 45 raise ApiError(e) 46 47 return TaskState(response.workflow_execution_state.lower())