enpi_api.l2.client.api.liabilities_api
1from loguru import logger 2 3from enpi_api.l1 import openapi_client 4from enpi_api.l2.events.workflow_execution_task_waitable import WorkflowExecutionTaskWaitable 5from enpi_api.l2.types.api_error import ApiError 6from enpi_api.l2.types.clone import CloneId 7from enpi_api.l2.types.execution import Execution 8from enpi_api.l2.types.log import LogLevel 9from enpi_api.l2.types.sequence import SequenceId 10from enpi_api.l2.types.task import TaskState 11from enpi_api.l2.types.workflow import WorkflowExecutionId, WorkflowExecutionTaskId, WorkflowTaskTemplateName 12 13 14class LiabilitiesApi: 15 _inner_api_client: openapi_client.ApiClient 16 _log_level: LogLevel 17 18 def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel): 19 """@private""" 20 self._inner_api_client = inner_api_client 21 self._log_level = log_level 22 23 def start( 24 self, 25 clone_ids: list[CloneId] | None = None, 26 sequence_ids: list[SequenceId] | None = None, 27 ) -> Execution[WorkflowExecutionTaskId]: 28 """Run liabilities computation on a set of target clones. 29 30 This will compute liabilities for clones matched with clone and sequence IDs passed 31 to the function and will add new tags to those clones if computations are successful. 32 33 > This functionality uses clone resolving.\n 34 > Clone resolving uses passed clone and sequence IDs in order to resolve clones. 35 > For each clone, a maximum of one *big* chain and one *small* chain sequence will be picked, resulting in a 36 maximum of two sequences per clone. 37 > Sequences matched with passed sequence IDs have priority over internally resolved sequences, meaning that if 38 possible, they will be picked as sequences for the resolved clones. 39 40 Args: 41 clone_ids (list[enpi_api.l2.types.clone.CloneId]): IDs of clones based on which clones will be 42 resolved and passed for liabilities computation. 43 sequence_ids (list[enpi_api.l2.types.sequence.SequenceId]): IDs of sequences based on which clones will be 44 resolved and passed for liabilities computation. If clone resolving based on clone IDs and sequence IDs results in the same, 45 "overlapping" clones (with the same clone IDs) but potentially different sequences within, clones resolved with use of sequence IDs 46 will be picked over the ones resolved with clone IDs. 47 48 Returns: 49 enpi_api.l2.types.execution.Execution[None]: An awaitable. 50 51 Raises: 52 ValueError: If clone and/or sequence IDs passed to this function are empty or invalid. 53 enpi_api.l2.types.api_error.ApiError: If API request fails. 54 55 Example: 56 57 ``` 58 collection_id = CollectionId(1234) 59 60 # Get all clones belonging to a collection 61 collection_filter = client.filter_api.create_filter(name="test_collection", condition=MatchId(target=MatchIdTarget.COLLECTION, id=collection_id)) 62 clones_df = client.collection_api.get_as_df( 63 collection_ids=[collection_id], 64 filter=collection_filter, 65 tag_ids=[], 66 ).wait() 67 68 # Extract clone ids from the dataframe 69 clone_ids = [CloneId(id) for id in clones_df.index.tolist()] 70 71 # Run liabilities computation 72 client.liabilities_api.start(clone_ids=clone_ids).wait() 73 74 # Get clones updated with new tags, result of liabilities computation 75 updated_clones_df = client.collection_api.get_as_df( 76 collection_ids=[collection_id], 77 filter=collection_filter, 78 tag_ids=[ 79 CloneTags.TapScore # Tag added during liabilities run 80 ], 81 ).wait() 82 ``` 83 """ 84 liabilities_api_instance = openapi_client.LiabilitiesApi(self._inner_api_client) 85 86 # Check if we got any ids to work with 87 if (clone_ids is None or len(clone_ids) == 0) and (sequence_ids is None or len(sequence_ids) == 0): 88 raise ValueError("Both clone and sequence IDs arrays are null, at least one of them needs to contain proper values.") 89 90 # Validate if ID types are right 91 if clone_ids is not None and not all([isinstance(id, str) for id in clone_ids]): 92 raise ValueError("Some of the passed clone IDs are not strings.") 93 elif sequence_ids is not None and not all([isinstance(id, int) for id in sequence_ids]): 94 raise ValueError("Some of the passed sequence IDs are not integers.") 95 96 try: 97 liabilities_work = openapi_client.LiabilitiesWork( 98 clone_ids=None if clone_ids is None else [str(id) for id in clone_ids], 99 sequence_ids=None if sequence_ids is None else [int(id) for id in sequence_ids], 100 ) 101 logger.info("Making a request for liabilities computation run start...") 102 data = liabilities_api_instance.start_liabilities(liabilities_work=liabilities_work) 103 assert data.workflow_execution_id is not None 104 105 workflow_execution_id = WorkflowExecutionId(int(data.workflow_execution_id)) 106 107 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> WorkflowExecutionTaskId: 108 logger.success(f"Liabilities task ID: {task_id} in workflow execution with ID: {workflow_execution_id} has successfully finished.") 109 110 return task_id 111 112 waitable = WorkflowExecutionTaskWaitable[WorkflowExecutionTaskId]( 113 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_LIABILITIES, on_complete=on_complete 114 ) 115 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state) 116 except openapi_client.ApiException as e: 117 raise ApiError(e)
class
LiabilitiesApi:
15class LiabilitiesApi: 16 _inner_api_client: openapi_client.ApiClient 17 _log_level: LogLevel 18 19 def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel): 20 """@private""" 21 self._inner_api_client = inner_api_client 22 self._log_level = log_level 23 24 def start( 25 self, 26 clone_ids: list[CloneId] | None = None, 27 sequence_ids: list[SequenceId] | None = None, 28 ) -> Execution[WorkflowExecutionTaskId]: 29 """Run liabilities computation on a set of target clones. 30 31 This will compute liabilities for clones matched with clone and sequence IDs passed 32 to the function and will add new tags to those clones if computations are successful. 33 34 > This functionality uses clone resolving.\n 35 > Clone resolving uses passed clone and sequence IDs in order to resolve clones. 36 > For each clone, a maximum of one *big* chain and one *small* chain sequence will be picked, resulting in a 37 maximum of two sequences per clone. 38 > Sequences matched with passed sequence IDs have priority over internally resolved sequences, meaning that if 39 possible, they will be picked as sequences for the resolved clones. 40 41 Args: 42 clone_ids (list[enpi_api.l2.types.clone.CloneId]): IDs of clones based on which clones will be 43 resolved and passed for liabilities computation. 44 sequence_ids (list[enpi_api.l2.types.sequence.SequenceId]): IDs of sequences based on which clones will be 45 resolved and passed for liabilities computation. If clone resolving based on clone IDs and sequence IDs results in the same, 46 "overlapping" clones (with the same clone IDs) but potentially different sequences within, clones resolved with use of sequence IDs 47 will be picked over the ones resolved with clone IDs. 48 49 Returns: 50 enpi_api.l2.types.execution.Execution[None]: An awaitable. 51 52 Raises: 53 ValueError: If clone and/or sequence IDs passed to this function are empty or invalid. 54 enpi_api.l2.types.api_error.ApiError: If API request fails. 55 56 Example: 57 58 ``` 59 collection_id = CollectionId(1234) 60 61 # Get all clones belonging to a collection 62 collection_filter = client.filter_api.create_filter(name="test_collection", condition=MatchId(target=MatchIdTarget.COLLECTION, id=collection_id)) 63 clones_df = client.collection_api.get_as_df( 64 collection_ids=[collection_id], 65 filter=collection_filter, 66 tag_ids=[], 67 ).wait() 68 69 # Extract clone ids from the dataframe 70 clone_ids = [CloneId(id) for id in clones_df.index.tolist()] 71 72 # Run liabilities computation 73 client.liabilities_api.start(clone_ids=clone_ids).wait() 74 75 # Get clones updated with new tags, result of liabilities computation 76 updated_clones_df = client.collection_api.get_as_df( 77 collection_ids=[collection_id], 78 filter=collection_filter, 79 tag_ids=[ 80 CloneTags.TapScore # Tag added during liabilities run 81 ], 82 ).wait() 83 ``` 84 """ 85 liabilities_api_instance = openapi_client.LiabilitiesApi(self._inner_api_client) 86 87 # Check if we got any ids to work with 88 if (clone_ids is None or len(clone_ids) == 0) and (sequence_ids is None or len(sequence_ids) == 0): 89 raise ValueError("Both clone and sequence IDs arrays are null, at least one of them needs to contain proper values.") 90 91 # Validate if ID types are right 92 if clone_ids is not None and not all([isinstance(id, str) for id in clone_ids]): 93 raise ValueError("Some of the passed clone IDs are not strings.") 94 elif sequence_ids is not None and not all([isinstance(id, int) for id in sequence_ids]): 95 raise ValueError("Some of the passed sequence IDs are not integers.") 96 97 try: 98 liabilities_work = openapi_client.LiabilitiesWork( 99 clone_ids=None if clone_ids is None else [str(id) for id in clone_ids], 100 sequence_ids=None if sequence_ids is None else [int(id) for id in sequence_ids], 101 ) 102 logger.info("Making a request for liabilities computation run start...") 103 data = liabilities_api_instance.start_liabilities(liabilities_work=liabilities_work) 104 assert data.workflow_execution_id is not None 105 106 workflow_execution_id = WorkflowExecutionId(int(data.workflow_execution_id)) 107 108 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> WorkflowExecutionTaskId: 109 logger.success(f"Liabilities task ID: {task_id} in workflow execution with ID: {workflow_execution_id} has successfully finished.") 110 111 return task_id 112 113 waitable = WorkflowExecutionTaskWaitable[WorkflowExecutionTaskId]( 114 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_LIABILITIES, on_complete=on_complete 115 ) 116 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state) 117 except openapi_client.ApiException as e: 118 raise ApiError(e)
def
start( self, clone_ids: list[enpi_api.l2.types.clone.CloneId] | None = None, sequence_ids: list[enpi_api.l2.types.sequence.SequenceId] | None = None) -> enpi_api.l2.types.execution.Execution[NewType]:
24 def start( 25 self, 26 clone_ids: list[CloneId] | None = None, 27 sequence_ids: list[SequenceId] | None = None, 28 ) -> Execution[WorkflowExecutionTaskId]: 29 """Run liabilities computation on a set of target clones. 30 31 This will compute liabilities for clones matched with clone and sequence IDs passed 32 to the function and will add new tags to those clones if computations are successful. 33 34 > This functionality uses clone resolving.\n 35 > Clone resolving uses passed clone and sequence IDs in order to resolve clones. 36 > For each clone, a maximum of one *big* chain and one *small* chain sequence will be picked, resulting in a 37 maximum of two sequences per clone. 38 > Sequences matched with passed sequence IDs have priority over internally resolved sequences, meaning that if 39 possible, they will be picked as sequences for the resolved clones. 40 41 Args: 42 clone_ids (list[enpi_api.l2.types.clone.CloneId]): IDs of clones based on which clones will be 43 resolved and passed for liabilities computation. 44 sequence_ids (list[enpi_api.l2.types.sequence.SequenceId]): IDs of sequences based on which clones will be 45 resolved and passed for liabilities computation. If clone resolving based on clone IDs and sequence IDs results in the same, 46 "overlapping" clones (with the same clone IDs) but potentially different sequences within, clones resolved with use of sequence IDs 47 will be picked over the ones resolved with clone IDs. 48 49 Returns: 50 enpi_api.l2.types.execution.Execution[None]: An awaitable. 51 52 Raises: 53 ValueError: If clone and/or sequence IDs passed to this function are empty or invalid. 54 enpi_api.l2.types.api_error.ApiError: If API request fails. 55 56 Example: 57 58 ``` 59 collection_id = CollectionId(1234) 60 61 # Get all clones belonging to a collection 62 collection_filter = client.filter_api.create_filter(name="test_collection", condition=MatchId(target=MatchIdTarget.COLLECTION, id=collection_id)) 63 clones_df = client.collection_api.get_as_df( 64 collection_ids=[collection_id], 65 filter=collection_filter, 66 tag_ids=[], 67 ).wait() 68 69 # Extract clone ids from the dataframe 70 clone_ids = [CloneId(id) for id in clones_df.index.tolist()] 71 72 # Run liabilities computation 73 client.liabilities_api.start(clone_ids=clone_ids).wait() 74 75 # Get clones updated with new tags, result of liabilities computation 76 updated_clones_df = client.collection_api.get_as_df( 77 collection_ids=[collection_id], 78 filter=collection_filter, 79 tag_ids=[ 80 CloneTags.TapScore # Tag added during liabilities run 81 ], 82 ).wait() 83 ``` 84 """ 85 liabilities_api_instance = openapi_client.LiabilitiesApi(self._inner_api_client) 86 87 # Check if we got any ids to work with 88 if (clone_ids is None or len(clone_ids) == 0) and (sequence_ids is None or len(sequence_ids) == 0): 89 raise ValueError("Both clone and sequence IDs arrays are null, at least one of them needs to contain proper values.") 90 91 # Validate if ID types are right 92 if clone_ids is not None and not all([isinstance(id, str) for id in clone_ids]): 93 raise ValueError("Some of the passed clone IDs are not strings.") 94 elif sequence_ids is not None and not all([isinstance(id, int) for id in sequence_ids]): 95 raise ValueError("Some of the passed sequence IDs are not integers.") 96 97 try: 98 liabilities_work = openapi_client.LiabilitiesWork( 99 clone_ids=None if clone_ids is None else [str(id) for id in clone_ids], 100 sequence_ids=None if sequence_ids is None else [int(id) for id in sequence_ids], 101 ) 102 logger.info("Making a request for liabilities computation run start...") 103 data = liabilities_api_instance.start_liabilities(liabilities_work=liabilities_work) 104 assert data.workflow_execution_id is not None 105 106 workflow_execution_id = WorkflowExecutionId(int(data.workflow_execution_id)) 107 108 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> WorkflowExecutionTaskId: 109 logger.success(f"Liabilities task ID: {task_id} in workflow execution with ID: {workflow_execution_id} has successfully finished.") 110 111 return task_id 112 113 waitable = WorkflowExecutionTaskWaitable[WorkflowExecutionTaskId]( 114 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_LIABILITIES, on_complete=on_complete 115 ) 116 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state) 117 except openapi_client.ApiException as e: 118 raise ApiError(e)
Run liabilities computation on a set of target clones.
This will compute liabilities for clones matched with clone and sequence IDs passed to the function and will add new tags to those clones if computations are successful.
This functionality uses clone resolving.
Clone resolving uses passed clone and sequence IDs in order to resolve clones. For each clone, a maximum of one big chain and one small chain sequence will be picked, resulting in a maximum of two sequences per clone. Sequences matched with passed sequence IDs have priority over internally resolved sequences, meaning that if possible, they will be picked as sequences for the resolved clones.
Arguments:
- clone_ids (list[enpi_api.l2.types.clone.CloneId]): IDs of clones based on which clones will be resolved and passed for liabilities computation.
- sequence_ids (list[enpi_api.l2.types.sequence.SequenceId]): IDs of sequences based on which clones will be resolved and passed for liabilities computation. If clone resolving based on clone IDs and sequence IDs results in the same, "overlapping" clones (with the same clone IDs) but potentially different sequences within, clones resolved with use of sequence IDs will be picked over the ones resolved with clone IDs.
Returns:
enpi_api.l2.types.execution.Execution[None]: An awaitable.
Raises:
- ValueError: If clone and/or sequence IDs passed to this function are empty or invalid.
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
collection_id = CollectionId(1234) # Get all clones belonging to a collection collection_filter = client.filter_api.create_filter(name="test_collection", condition=MatchId(target=MatchIdTarget.COLLECTION, id=collection_id)) clones_df = client.collection_api.get_as_df( collection_ids=[collection_id], filter=collection_filter, tag_ids=[], ).wait() # Extract clone ids from the dataframe clone_ids = [CloneId(id) for id in clones_df.index.tolist()] # Run liabilities computation client.liabilities_api.start(clone_ids=clone_ids).wait() # Get clones updated with new tags, result of liabilities computation updated_clones_df = client.collection_api.get_as_df( collection_ids=[collection_id], filter=collection_filter, tag_ids=[ CloneTags.TapScore # Tag added during liabilities run ], ).wait()