enpi_api.l2.client.api.cluster_api
1from pathlib import Path 2 3import pandas as pd 4from loguru import logger 5 6from enpi_api.l1 import openapi_client 7from enpi_api.l2.client.api.file_api import FileApi 8from enpi_api.l2.events.workflow_execution_task_waitable import WorkflowExecutionTaskWaitable 9from enpi_api.l2.types.api_error import ApiErrorContext 10from enpi_api.l2.types.cluster import AdditionalOptions, ClusterRun, ClusterRunId, ExportClustersMode, ExternalClusterRun, SequenceFeatureIdentities 11from enpi_api.l2.types.collection import CollectionId 12from enpi_api.l2.types.execution import Execution 13from enpi_api.l2.types.log import LogLevel 14from enpi_api.l2.types.tag import TagId 15from enpi_api.l2.types.task import TaskState 16from enpi_api.l2.types.workflow import WorkflowExecutionId, WorkflowExecutionTaskId, WorkflowTaskTemplateName 17 18 19class ClusterApi: 20 _inner_api_client: openapi_client.ApiClient 21 _log_level: LogLevel 22 23 def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel): 24 """@private""" 25 self._inner_api_client = inner_api_client 26 self._log_level = log_level 27 28 def get_cluster_runs(self, collection_ids: list[CollectionId] | None = None) -> list[ClusterRun | ExternalClusterRun]: 29 """Get all successful Cluster Runs or a selection of them linked to the passed Collection IDs. 30 31 Args: 32 collection_ids (list[enpi_api.l2.types.collection.CollectionId] | None): IDs of clone collections. 33 If passed, they will be used to filter Cluster Runs down to the ones originitating from 34 the collections matched with passed IDs. 35 36 Returns: 37 list[enpi_api.l2.types.cluster.ClusterRun]: List of Cluster Runs. 38 39 Raises: 40 enpi_api.l2.types.api_error.ApiError: If API request fails. 41 42 Example: 43 Get all Cluster Runs that were run using CollectionId=123 44 ```python 45 with EnpiApiClient() as enpi_client: 46 cluster_runs = enpi_client.cluster_api.get_cluster_runs(collection_ids=[CollectionId(123)]) 47 ``` 48 """ 49 cluster_api_instance = openapi_client.ClusterApi(self._inner_api_client) 50 51 payload = openapi_client.GetClusterRunsRequestPayload( 52 collection_ids=[int(id) for id in collection_ids] if collection_ids is not None else None, 53 ) 54 55 with ApiErrorContext(): 56 data = cluster_api_instance.get_cluster_runs(payload) 57 58 return [ 59 ClusterRun.from_raw(cr.actual_instance) 60 if isinstance(cr.actual_instance, openapi_client.SequenceFeatureClusteringRun) 61 else ExternalClusterRun.from_raw(cr.actual_instance) 62 for cr in data.clusters 63 if cr.actual_instance is not None 64 ] 65 66 def get_cluster_run(self, cluster_run_id: ClusterRunId) -> ClusterRun | ExternalClusterRun: 67 """Get a successful Cluster Run by ID. 68 69 Args: 70 cluster_run_id (enpi_api.l2.types.cluster.ClusterRunId): ID of a Cluster Run to get. 71 72 Returns: 73 enpi_api.l2.types.cluster.ClusterRun: Cluster Run matching the provided ID. 74 75 Raises: 76 enpi_api.l2.types.api_error.ApiError: If API request fails. 77 78 Example: 79 Get a Cluster Run with ClusterRunId="b236c01c-e0e2-464e-8bd7-b1718476a78b" 80 ```python 81 with EnpiApiClient() as enpi_client: 82 cluster_run = enpi_client.cluster_api.get_cluster_run(ClusterRunId("b236c01c-e0e2-464e-8bd7-b1718476a78b")) 83 ``` 84 """ 85 cluster_api_instance = openapi_client.ClusterApi(self._inner_api_client) 86 87 with ApiErrorContext(): 88 data = cluster_api_instance.get_cluster_run(cluster_run_id) 89 assert data.cluster_run.actual_instance is not None 90 91 return ( 92 ClusterRun.from_raw(data.cluster_run.actual_instance) 93 if isinstance(data.cluster_run.actual_instance, openapi_client.SequenceFeatureClusteringRun) 94 else ExternalClusterRun.from_raw(data.cluster_run.actual_instance) 95 ) 96 97 def get_cluster_run_by_workflow_execution_task_id(self, workflow_execution_task_id: WorkflowExecutionTaskId) -> ClusterRun | ExternalClusterRun: 98 """Get a successful Cluster Run by its workflow execution task ID. 99 100 Args: 101 workflow_execution_task_id (enpi_api.l2.types.workflow.WorkflowExecutionTaskId): ID of a workflow execution task linked to a successful Cluster Run. 102 103 Returns: 104 enpi_api.l2.types.cluster.ClusterRun: Successful Cluster Run linked to the provided workflow execution ID. 105 106 Raises: 107 enpi_api.l2.types.api_error.ApiError: If API request fails. 108 109 Example: 110 Get a Cluster Run by Workflow execution task ID: 111 112 ```python 113 with EnpiApiClient() as enpi_client: 114 cluster_run = enpi_client.cluster_api.get_cluster_run_by_workflow_execution_task_id(WorkflowExecutionTaskId(1234)) 115 ``` 116 """ 117 cluster_api_instance = openapi_client.ClusterApi(self._inner_api_client) 118 119 with ApiErrorContext(): 120 data = cluster_api_instance.get_cluster_run_by_workflow_execution_task_id(workflow_execution_task_id) 121 assert data.cluster_run.actual_instance is not None 122 123 return ( 124 ClusterRun.from_raw(data.cluster_run.actual_instance) 125 if isinstance(data.cluster_run.actual_instance, openapi_client.SequenceFeatureClusteringRun) 126 else ExternalClusterRun.from_raw(data.cluster_run.actual_instance) 127 ) 128 129 def start( 130 self, 131 name: str, 132 collection_ids: list[CollectionId], 133 sequence_features: list[TagId], 134 match_tags: list[TagId], 135 identities: SequenceFeatureIdentities, 136 additional_options: AdditionalOptions | None = None, 137 ) -> Execution[ClusterRun | ExternalClusterRun]: 138 """Start a new Cluster Run. 139 140 Args: 141 name (str): Cluster Run name. 142 collection_ids (list[enpi_api.l2.types.collection.CollectionId]): Collections to use in the clustering. All collections must have the 143 same receptor. 144 sequence_features (list[enpi_api.l2.types.tag.TagId]): Sequence Features to be clustered on. 145 identities (enpi_api.l2.types.cluster.SequenceFeatureIdentities): Chain identities used for clustering. 146 optional_restrictions (enpi_api.l2.types.cluster.OptionalRestrictions | None): Optional restrictions applied to sequences before clustering. 147 additional_options (enpi_api.l2.types.cluster.AdditionalOptions | None): Additional Options for clustering configuration. 148 149 Returns: 150 enpi_api.l2.types.execution.Execution[enpi_api.l2.types.cluster.ClusterRun]: An awaitable that returns the completed Cluster Run 151 when awaited. 152 153 Raises: 154 enpi_api.l2.types.api_error.ApiError: If API request fails. 155 156 Example: 157 Start a new Cluster Run for 3 collections, using 80% identity on IG CDR3 Amino Acids, with same CDR3 Length restriction 158 ```python 159 with EnpiApiClient() as enpi_client: 160 cluster_run = enpi_client.cluster_api.start_cluster_run( 161 name=f"Clustering for collections 1,2 and 3", 162 collection_ids=[CollectionId(x) for x in [1,2,3]], 163 sequence_features=[SequenceTags.Cdr3AminoAcids], 164 identities=SequenceFeatureIdentities(Heavy=80, Kappa=80, Lambda=80), 165 optional_restrictions=OptionalRestrictions(should_match_cdr3_length=True), 166 additional_options=None, 167 ).wait() 168 ``` 169 """ 170 cluster_api_instance = openapi_client.ClusterApi(self._inner_api_client) 171 payload = openapi_client.ClusterWork( 172 name=name, 173 collection_ids=[int(c_id) for c_id in collection_ids], 174 sequence_features=[int(tag_id) for tag_id in sequence_features], 175 match_tags=[int(tag_id) for tag_id in match_tags], 176 identities=identities.to_inner(), 177 additional_options=additional_options.to_inner() if additional_options is not None else None, 178 ) 179 180 with ApiErrorContext(): 181 start_cluster_run_response = cluster_api_instance.start_cluster_run(payload) 182 assert start_cluster_run_response.workflow_execution_id is not None 183 184 workflow_execution_id = WorkflowExecutionId(start_cluster_run_response.workflow_execution_id) 185 186 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> ClusterRun | ExternalClusterRun: 187 assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead" 188 189 cluster_run = self.get_cluster_run_by_workflow_execution_task_id(task_id) 190 logger.success(f"Cluster run with task ID: {task_id} in workflow execution with ID: {workflow_execution_id} has successfully finished.") 191 return cluster_run 192 193 waitable = WorkflowExecutionTaskWaitable[ClusterRun | ExternalClusterRun]( 194 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_CLUSTER, on_complete=on_complete 195 ) 196 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state) 197 198 def export_clusters_as_csv( 199 self, 200 cluster_run_id: ClusterRunId, 201 mode: ExportClustersMode, 202 limit: int | None = None, 203 output_directory: str | Path | None = None, 204 ) -> Execution[Path]: 205 """Start a Cluster export and download the results CSV file. 206 207 Args: 208 cluster_run_id (enpi_api.l2.types.cluster.ClusterRunId): 209 ID of the Cluster Run to export. 210 mode (enpi_api.l2.types.cluster.ExportClustersMode): 211 Mode in which export is run. 212 limit (int | None): 213 If specified, the export will contain only the first N clusters, N being the value passed as this param. 214 output_directory (str | Path | None): 215 Directory into which the downloaded archive will be extracted. If left empty, a temporary directory will be created. 216 217 Returns: 218 enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the path to the downloaded CSV file 219 when awaited. 220 221 Raises: 222 enpi_api.l2.types.api_error.ApiError: If API request fails. 223 224 Example: 225 Download successful Cluster Export result as CSV. 226 ```python 227 with EnpiApiClient() as enpi_client: 228 file_path = client.cluster_api.export_clusters_as_csv( 229 cluster_run_id=ClusterRunId("b236c01c-e0e2-464e-8bd7-b1718476a78b"), 230 mode=ExportClusterMode.CLONES, 231 limit=None, 232 output_directory="/home/exported_data/", 233 ) 234 print(file_path) 235 ``` 236 """ 237 cluster_api_instance = openapi_client.ClusterApi(self._inner_api_client) 238 239 payload = openapi_client.GetClusteredClonesRequestPayload( 240 cluster_run_id=cluster_run_id, 241 mode=mode, 242 limit=limit, 243 ) 244 245 with ApiErrorContext(): 246 data = cluster_api_instance.export_clusters(payload) 247 248 assert data.workflow_execution_id is not None 249 250 workflow_execution_id = WorkflowExecutionId(data.workflow_execution_id) 251 252 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> Path: 253 file_api = FileApi(self._inner_api_client, self._log_level) 254 file_path = file_api.download_export_by_workflow_execution_task_id(task_id=task_id, output_directory=output_directory) 255 256 logger.success(f"Exported clusters to {file_path}") 257 258 return file_path 259 260 waitable = WorkflowExecutionTaskWaitable[Path]( 261 workflow_execution_id=workflow_execution_id, 262 on_complete=on_complete, 263 task_template_name=WorkflowTaskTemplateName.ENPI_APP_CLUSTER_EXPORT, 264 ) 265 266 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state) 267 268 def export_clusters_as_df( 269 self, 270 cluster_run_id: ClusterRunId, 271 mode: ExportClustersMode, 272 limit: int | None = None, 273 ) -> Execution[pd.DataFrame]: 274 """Start a Cluster Export, download the results, and return them as Pandas DataFrame. 275 276 Args: 277 cluster_run_id (enpi_api.l2.types.cluster.ClusterRunId): 278 ID of the Cluster Run to export. 279 mode (enpi_api.l2.types.cluster.ExportClustersMode): 280 Mode in which export is run. 281 limit (int | None): 282 If specified, the export will contain only the first N clusters, N being the value passed as this param. 283 284 Returns: 285 enpi_api.l2.types.execution.Execution[pd.DataFrame]: An awaitable that returns the export results as Pandas 286 DataFrame when awaited. 287 288 Raises: 289 enpi_api.l2.types.api_error.ApiError: If API request fails. 290 291 Example: 292 Download successful Cluster Export result as CSV. 293 ```python 294 with EnpiApiClient() as enpi_client: 295 df = enpi_client.cluster_api.export_clusters_as_df( 296 cluster_run_id=ClusterRunId("b236c01c-e0e2-464e-8bd7-b1718476a78b"), 297 mode=ExportClusterMode.CLONES, 298 ).wait() 299 print(df) 300 ``` 301 """ 302 execution = self.export_clusters_as_csv(cluster_run_id, mode, limit) 303 304 def wait() -> pd.DataFrame: 305 file_path = execution.wait() 306 return pd.read_csv(file_path, delimiter="\t") 307 308 return Execution(wait=wait, check_execution_state=execution.check_execution_state)
20class ClusterApi: 21 _inner_api_client: openapi_client.ApiClient 22 _log_level: LogLevel 23 24 def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel): 25 """@private""" 26 self._inner_api_client = inner_api_client 27 self._log_level = log_level 28 29 def get_cluster_runs(self, collection_ids: list[CollectionId] | None = None) -> list[ClusterRun | ExternalClusterRun]: 30 """Get all successful Cluster Runs or a selection of them linked to the passed Collection IDs. 31 32 Args: 33 collection_ids (list[enpi_api.l2.types.collection.CollectionId] | None): IDs of clone collections. 34 If passed, they will be used to filter Cluster Runs down to the ones originitating from 35 the collections matched with passed IDs. 36 37 Returns: 38 list[enpi_api.l2.types.cluster.ClusterRun]: List of Cluster Runs. 39 40 Raises: 41 enpi_api.l2.types.api_error.ApiError: If API request fails. 42 43 Example: 44 Get all Cluster Runs that were run using CollectionId=123 45 ```python 46 with EnpiApiClient() as enpi_client: 47 cluster_runs = enpi_client.cluster_api.get_cluster_runs(collection_ids=[CollectionId(123)]) 48 ``` 49 """ 50 cluster_api_instance = openapi_client.ClusterApi(self._inner_api_client) 51 52 payload = openapi_client.GetClusterRunsRequestPayload( 53 collection_ids=[int(id) for id in collection_ids] if collection_ids is not None else None, 54 ) 55 56 with ApiErrorContext(): 57 data = cluster_api_instance.get_cluster_runs(payload) 58 59 return [ 60 ClusterRun.from_raw(cr.actual_instance) 61 if isinstance(cr.actual_instance, openapi_client.SequenceFeatureClusteringRun) 62 else ExternalClusterRun.from_raw(cr.actual_instance) 63 for cr in data.clusters 64 if cr.actual_instance is not None 65 ] 66 67 def get_cluster_run(self, cluster_run_id: ClusterRunId) -> ClusterRun | ExternalClusterRun: 68 """Get a successful Cluster Run by ID. 69 70 Args: 71 cluster_run_id (enpi_api.l2.types.cluster.ClusterRunId): ID of a Cluster Run to get. 72 73 Returns: 74 enpi_api.l2.types.cluster.ClusterRun: Cluster Run matching the provided ID. 75 76 Raises: 77 enpi_api.l2.types.api_error.ApiError: If API request fails. 78 79 Example: 80 Get a Cluster Run with ClusterRunId="b236c01c-e0e2-464e-8bd7-b1718476a78b" 81 ```python 82 with EnpiApiClient() as enpi_client: 83 cluster_run = enpi_client.cluster_api.get_cluster_run(ClusterRunId("b236c01c-e0e2-464e-8bd7-b1718476a78b")) 84 ``` 85 """ 86 cluster_api_instance = openapi_client.ClusterApi(self._inner_api_client) 87 88 with ApiErrorContext(): 89 data = cluster_api_instance.get_cluster_run(cluster_run_id) 90 assert data.cluster_run.actual_instance is not None 91 92 return ( 93 ClusterRun.from_raw(data.cluster_run.actual_instance) 94 if isinstance(data.cluster_run.actual_instance, openapi_client.SequenceFeatureClusteringRun) 95 else ExternalClusterRun.from_raw(data.cluster_run.actual_instance) 96 ) 97 98 def get_cluster_run_by_workflow_execution_task_id(self, workflow_execution_task_id: WorkflowExecutionTaskId) -> ClusterRun | ExternalClusterRun: 99 """Get a successful Cluster Run by its workflow execution task ID. 100 101 Args: 102 workflow_execution_task_id (enpi_api.l2.types.workflow.WorkflowExecutionTaskId): ID of a workflow execution task linked to a successful Cluster Run. 103 104 Returns: 105 enpi_api.l2.types.cluster.ClusterRun: Successful Cluster Run linked to the provided workflow execution ID. 106 107 Raises: 108 enpi_api.l2.types.api_error.ApiError: If API request fails. 109 110 Example: 111 Get a Cluster Run by Workflow execution task ID: 112 113 ```python 114 with EnpiApiClient() as enpi_client: 115 cluster_run = enpi_client.cluster_api.get_cluster_run_by_workflow_execution_task_id(WorkflowExecutionTaskId(1234)) 116 ``` 117 """ 118 cluster_api_instance = openapi_client.ClusterApi(self._inner_api_client) 119 120 with ApiErrorContext(): 121 data = cluster_api_instance.get_cluster_run_by_workflow_execution_task_id(workflow_execution_task_id) 122 assert data.cluster_run.actual_instance is not None 123 124 return ( 125 ClusterRun.from_raw(data.cluster_run.actual_instance) 126 if isinstance(data.cluster_run.actual_instance, openapi_client.SequenceFeatureClusteringRun) 127 else ExternalClusterRun.from_raw(data.cluster_run.actual_instance) 128 ) 129 130 def start( 131 self, 132 name: str, 133 collection_ids: list[CollectionId], 134 sequence_features: list[TagId], 135 match_tags: list[TagId], 136 identities: SequenceFeatureIdentities, 137 additional_options: AdditionalOptions | None = None, 138 ) -> Execution[ClusterRun | ExternalClusterRun]: 139 """Start a new Cluster Run. 140 141 Args: 142 name (str): Cluster Run name. 143 collection_ids (list[enpi_api.l2.types.collection.CollectionId]): Collections to use in the clustering. All collections must have the 144 same receptor. 145 sequence_features (list[enpi_api.l2.types.tag.TagId]): Sequence Features to be clustered on. 146 identities (enpi_api.l2.types.cluster.SequenceFeatureIdentities): Chain identities used for clustering. 147 optional_restrictions (enpi_api.l2.types.cluster.OptionalRestrictions | None): Optional restrictions applied to sequences before clustering. 148 additional_options (enpi_api.l2.types.cluster.AdditionalOptions | None): Additional Options for clustering configuration. 149 150 Returns: 151 enpi_api.l2.types.execution.Execution[enpi_api.l2.types.cluster.ClusterRun]: An awaitable that returns the completed Cluster Run 152 when awaited. 153 154 Raises: 155 enpi_api.l2.types.api_error.ApiError: If API request fails. 156 157 Example: 158 Start a new Cluster Run for 3 collections, using 80% identity on IG CDR3 Amino Acids, with same CDR3 Length restriction 159 ```python 160 with EnpiApiClient() as enpi_client: 161 cluster_run = enpi_client.cluster_api.start_cluster_run( 162 name=f"Clustering for collections 1,2 and 3", 163 collection_ids=[CollectionId(x) for x in [1,2,3]], 164 sequence_features=[SequenceTags.Cdr3AminoAcids], 165 identities=SequenceFeatureIdentities(Heavy=80, Kappa=80, Lambda=80), 166 optional_restrictions=OptionalRestrictions(should_match_cdr3_length=True), 167 additional_options=None, 168 ).wait() 169 ``` 170 """ 171 cluster_api_instance = openapi_client.ClusterApi(self._inner_api_client) 172 payload = openapi_client.ClusterWork( 173 name=name, 174 collection_ids=[int(c_id) for c_id in collection_ids], 175 sequence_features=[int(tag_id) for tag_id in sequence_features], 176 match_tags=[int(tag_id) for tag_id in match_tags], 177 identities=identities.to_inner(), 178 additional_options=additional_options.to_inner() if additional_options is not None else None, 179 ) 180 181 with ApiErrorContext(): 182 start_cluster_run_response = cluster_api_instance.start_cluster_run(payload) 183 assert start_cluster_run_response.workflow_execution_id is not None 184 185 workflow_execution_id = WorkflowExecutionId(start_cluster_run_response.workflow_execution_id) 186 187 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> ClusterRun | ExternalClusterRun: 188 assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead" 189 190 cluster_run = self.get_cluster_run_by_workflow_execution_task_id(task_id) 191 logger.success(f"Cluster run with task ID: {task_id} in workflow execution with ID: {workflow_execution_id} has successfully finished.") 192 return cluster_run 193 194 waitable = WorkflowExecutionTaskWaitable[ClusterRun | ExternalClusterRun]( 195 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_CLUSTER, on_complete=on_complete 196 ) 197 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state) 198 199 def export_clusters_as_csv( 200 self, 201 cluster_run_id: ClusterRunId, 202 mode: ExportClustersMode, 203 limit: int | None = None, 204 output_directory: str | Path | None = None, 205 ) -> Execution[Path]: 206 """Start a Cluster export and download the results CSV file. 207 208 Args: 209 cluster_run_id (enpi_api.l2.types.cluster.ClusterRunId): 210 ID of the Cluster Run to export. 211 mode (enpi_api.l2.types.cluster.ExportClustersMode): 212 Mode in which export is run. 213 limit (int | None): 214 If specified, the export will contain only the first N clusters, N being the value passed as this param. 215 output_directory (str | Path | None): 216 Directory into which the downloaded archive will be extracted. If left empty, a temporary directory will be created. 217 218 Returns: 219 enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the path to the downloaded CSV file 220 when awaited. 221 222 Raises: 223 enpi_api.l2.types.api_error.ApiError: If API request fails. 224 225 Example: 226 Download successful Cluster Export result as CSV. 227 ```python 228 with EnpiApiClient() as enpi_client: 229 file_path = client.cluster_api.export_clusters_as_csv( 230 cluster_run_id=ClusterRunId("b236c01c-e0e2-464e-8bd7-b1718476a78b"), 231 mode=ExportClusterMode.CLONES, 232 limit=None, 233 output_directory="/home/exported_data/", 234 ) 235 print(file_path) 236 ``` 237 """ 238 cluster_api_instance = openapi_client.ClusterApi(self._inner_api_client) 239 240 payload = openapi_client.GetClusteredClonesRequestPayload( 241 cluster_run_id=cluster_run_id, 242 mode=mode, 243 limit=limit, 244 ) 245 246 with ApiErrorContext(): 247 data = cluster_api_instance.export_clusters(payload) 248 249 assert data.workflow_execution_id is not None 250 251 workflow_execution_id = WorkflowExecutionId(data.workflow_execution_id) 252 253 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> Path: 254 file_api = FileApi(self._inner_api_client, self._log_level) 255 file_path = file_api.download_export_by_workflow_execution_task_id(task_id=task_id, output_directory=output_directory) 256 257 logger.success(f"Exported clusters to {file_path}") 258 259 return file_path 260 261 waitable = WorkflowExecutionTaskWaitable[Path]( 262 workflow_execution_id=workflow_execution_id, 263 on_complete=on_complete, 264 task_template_name=WorkflowTaskTemplateName.ENPI_APP_CLUSTER_EXPORT, 265 ) 266 267 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state) 268 269 def export_clusters_as_df( 270 self, 271 cluster_run_id: ClusterRunId, 272 mode: ExportClustersMode, 273 limit: int | None = None, 274 ) -> Execution[pd.DataFrame]: 275 """Start a Cluster Export, download the results, and return them as Pandas DataFrame. 276 277 Args: 278 cluster_run_id (enpi_api.l2.types.cluster.ClusterRunId): 279 ID of the Cluster Run to export. 280 mode (enpi_api.l2.types.cluster.ExportClustersMode): 281 Mode in which export is run. 282 limit (int | None): 283 If specified, the export will contain only the first N clusters, N being the value passed as this param. 284 285 Returns: 286 enpi_api.l2.types.execution.Execution[pd.DataFrame]: An awaitable that returns the export results as Pandas 287 DataFrame when awaited. 288 289 Raises: 290 enpi_api.l2.types.api_error.ApiError: If API request fails. 291 292 Example: 293 Download successful Cluster Export result as CSV. 294 ```python 295 with EnpiApiClient() as enpi_client: 296 df = enpi_client.cluster_api.export_clusters_as_df( 297 cluster_run_id=ClusterRunId("b236c01c-e0e2-464e-8bd7-b1718476a78b"), 298 mode=ExportClusterMode.CLONES, 299 ).wait() 300 print(df) 301 ``` 302 """ 303 execution = self.export_clusters_as_csv(cluster_run_id, mode, limit) 304 305 def wait() -> pd.DataFrame: 306 file_path = execution.wait() 307 return pd.read_csv(file_path, delimiter="\t") 308 309 return Execution(wait=wait, check_execution_state=execution.check_execution_state)
29 def get_cluster_runs(self, collection_ids: list[CollectionId] | None = None) -> list[ClusterRun | ExternalClusterRun]: 30 """Get all successful Cluster Runs or a selection of them linked to the passed Collection IDs. 31 32 Args: 33 collection_ids (list[enpi_api.l2.types.collection.CollectionId] | None): IDs of clone collections. 34 If passed, they will be used to filter Cluster Runs down to the ones originitating from 35 the collections matched with passed IDs. 36 37 Returns: 38 list[enpi_api.l2.types.cluster.ClusterRun]: List of Cluster Runs. 39 40 Raises: 41 enpi_api.l2.types.api_error.ApiError: If API request fails. 42 43 Example: 44 Get all Cluster Runs that were run using CollectionId=123 45 ```python 46 with EnpiApiClient() as enpi_client: 47 cluster_runs = enpi_client.cluster_api.get_cluster_runs(collection_ids=[CollectionId(123)]) 48 ``` 49 """ 50 cluster_api_instance = openapi_client.ClusterApi(self._inner_api_client) 51 52 payload = openapi_client.GetClusterRunsRequestPayload( 53 collection_ids=[int(id) for id in collection_ids] if collection_ids is not None else None, 54 ) 55 56 with ApiErrorContext(): 57 data = cluster_api_instance.get_cluster_runs(payload) 58 59 return [ 60 ClusterRun.from_raw(cr.actual_instance) 61 if isinstance(cr.actual_instance, openapi_client.SequenceFeatureClusteringRun) 62 else ExternalClusterRun.from_raw(cr.actual_instance) 63 for cr in data.clusters 64 if cr.actual_instance is not None 65 ]
Get all successful Cluster Runs or a selection of them linked to the passed Collection IDs.
Arguments:
- collection_ids (list[enpi_api.l2.types.collection.CollectionId] | None): IDs of clone collections. If passed, they will be used to filter Cluster Runs down to the ones originitating from the collections matched with passed IDs.
Returns:
list[enpi_api.l2.types.cluster.ClusterRun]: List of Cluster Runs.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
Get all Cluster Runs that were run using CollectionId=123
with EnpiApiClient() as enpi_client: cluster_runs = enpi_client.cluster_api.get_cluster_runs(collection_ids=[CollectionId(123)])
67 def get_cluster_run(self, cluster_run_id: ClusterRunId) -> ClusterRun | ExternalClusterRun: 68 """Get a successful Cluster Run by ID. 69 70 Args: 71 cluster_run_id (enpi_api.l2.types.cluster.ClusterRunId): ID of a Cluster Run to get. 72 73 Returns: 74 enpi_api.l2.types.cluster.ClusterRun: Cluster Run matching the provided ID. 75 76 Raises: 77 enpi_api.l2.types.api_error.ApiError: If API request fails. 78 79 Example: 80 Get a Cluster Run with ClusterRunId="b236c01c-e0e2-464e-8bd7-b1718476a78b" 81 ```python 82 with EnpiApiClient() as enpi_client: 83 cluster_run = enpi_client.cluster_api.get_cluster_run(ClusterRunId("b236c01c-e0e2-464e-8bd7-b1718476a78b")) 84 ``` 85 """ 86 cluster_api_instance = openapi_client.ClusterApi(self._inner_api_client) 87 88 with ApiErrorContext(): 89 data = cluster_api_instance.get_cluster_run(cluster_run_id) 90 assert data.cluster_run.actual_instance is not None 91 92 return ( 93 ClusterRun.from_raw(data.cluster_run.actual_instance) 94 if isinstance(data.cluster_run.actual_instance, openapi_client.SequenceFeatureClusteringRun) 95 else ExternalClusterRun.from_raw(data.cluster_run.actual_instance) 96 )
Get a successful Cluster Run by ID.
Arguments:
- cluster_run_id (enpi_api.l2.types.cluster.ClusterRunId): ID of a Cluster Run to get.
Returns:
enpi_api.l2.types.cluster.ClusterRun: Cluster Run matching the provided ID.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
Get a Cluster Run with ClusterRunId="b236c01c-e0e2-464e-8bd7-b1718476a78b"
with EnpiApiClient() as enpi_client: cluster_run = enpi_client.cluster_api.get_cluster_run(ClusterRunId("b236c01c-e0e2-464e-8bd7-b1718476a78b"))
98 def get_cluster_run_by_workflow_execution_task_id(self, workflow_execution_task_id: WorkflowExecutionTaskId) -> ClusterRun | ExternalClusterRun: 99 """Get a successful Cluster Run by its workflow execution task ID. 100 101 Args: 102 workflow_execution_task_id (enpi_api.l2.types.workflow.WorkflowExecutionTaskId): ID of a workflow execution task linked to a successful Cluster Run. 103 104 Returns: 105 enpi_api.l2.types.cluster.ClusterRun: Successful Cluster Run linked to the provided workflow execution ID. 106 107 Raises: 108 enpi_api.l2.types.api_error.ApiError: If API request fails. 109 110 Example: 111 Get a Cluster Run by Workflow execution task ID: 112 113 ```python 114 with EnpiApiClient() as enpi_client: 115 cluster_run = enpi_client.cluster_api.get_cluster_run_by_workflow_execution_task_id(WorkflowExecutionTaskId(1234)) 116 ``` 117 """ 118 cluster_api_instance = openapi_client.ClusterApi(self._inner_api_client) 119 120 with ApiErrorContext(): 121 data = cluster_api_instance.get_cluster_run_by_workflow_execution_task_id(workflow_execution_task_id) 122 assert data.cluster_run.actual_instance is not None 123 124 return ( 125 ClusterRun.from_raw(data.cluster_run.actual_instance) 126 if isinstance(data.cluster_run.actual_instance, openapi_client.SequenceFeatureClusteringRun) 127 else ExternalClusterRun.from_raw(data.cluster_run.actual_instance) 128 )
Get a successful Cluster Run by its workflow execution task ID.
Arguments:
- workflow_execution_task_id (enpi_api.l2.types.workflow.WorkflowExecutionTaskId): ID of a workflow execution task linked to a successful Cluster Run.
Returns:
enpi_api.l2.types.cluster.ClusterRun: Successful Cluster Run linked to the provided workflow execution ID.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
Get a Cluster Run by Workflow execution task ID:
with EnpiApiClient() as enpi_client: cluster_run = enpi_client.cluster_api.get_cluster_run_by_workflow_execution_task_id(WorkflowExecutionTaskId(1234))
130 def start( 131 self, 132 name: str, 133 collection_ids: list[CollectionId], 134 sequence_features: list[TagId], 135 match_tags: list[TagId], 136 identities: SequenceFeatureIdentities, 137 additional_options: AdditionalOptions | None = None, 138 ) -> Execution[ClusterRun | ExternalClusterRun]: 139 """Start a new Cluster Run. 140 141 Args: 142 name (str): Cluster Run name. 143 collection_ids (list[enpi_api.l2.types.collection.CollectionId]): Collections to use in the clustering. All collections must have the 144 same receptor. 145 sequence_features (list[enpi_api.l2.types.tag.TagId]): Sequence Features to be clustered on. 146 identities (enpi_api.l2.types.cluster.SequenceFeatureIdentities): Chain identities used for clustering. 147 optional_restrictions (enpi_api.l2.types.cluster.OptionalRestrictions | None): Optional restrictions applied to sequences before clustering. 148 additional_options (enpi_api.l2.types.cluster.AdditionalOptions | None): Additional Options for clustering configuration. 149 150 Returns: 151 enpi_api.l2.types.execution.Execution[enpi_api.l2.types.cluster.ClusterRun]: An awaitable that returns the completed Cluster Run 152 when awaited. 153 154 Raises: 155 enpi_api.l2.types.api_error.ApiError: If API request fails. 156 157 Example: 158 Start a new Cluster Run for 3 collections, using 80% identity on IG CDR3 Amino Acids, with same CDR3 Length restriction 159 ```python 160 with EnpiApiClient() as enpi_client: 161 cluster_run = enpi_client.cluster_api.start_cluster_run( 162 name=f"Clustering for collections 1,2 and 3", 163 collection_ids=[CollectionId(x) for x in [1,2,3]], 164 sequence_features=[SequenceTags.Cdr3AminoAcids], 165 identities=SequenceFeatureIdentities(Heavy=80, Kappa=80, Lambda=80), 166 optional_restrictions=OptionalRestrictions(should_match_cdr3_length=True), 167 additional_options=None, 168 ).wait() 169 ``` 170 """ 171 cluster_api_instance = openapi_client.ClusterApi(self._inner_api_client) 172 payload = openapi_client.ClusterWork( 173 name=name, 174 collection_ids=[int(c_id) for c_id in collection_ids], 175 sequence_features=[int(tag_id) for tag_id in sequence_features], 176 match_tags=[int(tag_id) for tag_id in match_tags], 177 identities=identities.to_inner(), 178 additional_options=additional_options.to_inner() if additional_options is not None else None, 179 ) 180 181 with ApiErrorContext(): 182 start_cluster_run_response = cluster_api_instance.start_cluster_run(payload) 183 assert start_cluster_run_response.workflow_execution_id is not None 184 185 workflow_execution_id = WorkflowExecutionId(start_cluster_run_response.workflow_execution_id) 186 187 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> ClusterRun | ExternalClusterRun: 188 assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead" 189 190 cluster_run = self.get_cluster_run_by_workflow_execution_task_id(task_id) 191 logger.success(f"Cluster run with task ID: {task_id} in workflow execution with ID: {workflow_execution_id} has successfully finished.") 192 return cluster_run 193 194 waitable = WorkflowExecutionTaskWaitable[ClusterRun | ExternalClusterRun]( 195 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_CLUSTER, on_complete=on_complete 196 ) 197 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)
Start a new Cluster Run.
Arguments:
- name (str): Cluster Run name.
- collection_ids (list[enpi_api.l2.types.collection.CollectionId]): Collections to use in the clustering. All collections must have the same receptor.
- sequence_features (list[enpi_api.l2.types.tag.TagId]): Sequence Features to be clustered on.
- identities (enpi_api.l2.types.cluster.SequenceFeatureIdentities): Chain identities used for clustering.
- optional_restrictions (enpi_api.l2.types.cluster.OptionalRestrictions | None): Optional restrictions applied to sequences before clustering.
- additional_options (enpi_api.l2.types.cluster.AdditionalOptions | None): Additional Options for clustering configuration.
Returns:
enpi_api.l2.types.execution.Execution[enpi_api.l2.types.cluster.ClusterRun]: An awaitable that returns the completed Cluster Run when awaited.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
Start a new Cluster Run for 3 collections, using 80% identity on IG CDR3 Amino Acids, with same CDR3 Length restriction
with EnpiApiClient() as enpi_client: cluster_run = enpi_client.cluster_api.start_cluster_run( name=f"Clustering for collections 1,2 and 3", collection_ids=[CollectionId(x) for x in [1,2,3]], sequence_features=[SequenceTags.Cdr3AminoAcids], identities=SequenceFeatureIdentities(Heavy=80, Kappa=80, Lambda=80), optional_restrictions=OptionalRestrictions(should_match_cdr3_length=True), additional_options=None, ).wait()
199 def export_clusters_as_csv( 200 self, 201 cluster_run_id: ClusterRunId, 202 mode: ExportClustersMode, 203 limit: int | None = None, 204 output_directory: str | Path | None = None, 205 ) -> Execution[Path]: 206 """Start a Cluster export and download the results CSV file. 207 208 Args: 209 cluster_run_id (enpi_api.l2.types.cluster.ClusterRunId): 210 ID of the Cluster Run to export. 211 mode (enpi_api.l2.types.cluster.ExportClustersMode): 212 Mode in which export is run. 213 limit (int | None): 214 If specified, the export will contain only the first N clusters, N being the value passed as this param. 215 output_directory (str | Path | None): 216 Directory into which the downloaded archive will be extracted. If left empty, a temporary directory will be created. 217 218 Returns: 219 enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the path to the downloaded CSV file 220 when awaited. 221 222 Raises: 223 enpi_api.l2.types.api_error.ApiError: If API request fails. 224 225 Example: 226 Download successful Cluster Export result as CSV. 227 ```python 228 with EnpiApiClient() as enpi_client: 229 file_path = client.cluster_api.export_clusters_as_csv( 230 cluster_run_id=ClusterRunId("b236c01c-e0e2-464e-8bd7-b1718476a78b"), 231 mode=ExportClusterMode.CLONES, 232 limit=None, 233 output_directory="/home/exported_data/", 234 ) 235 print(file_path) 236 ``` 237 """ 238 cluster_api_instance = openapi_client.ClusterApi(self._inner_api_client) 239 240 payload = openapi_client.GetClusteredClonesRequestPayload( 241 cluster_run_id=cluster_run_id, 242 mode=mode, 243 limit=limit, 244 ) 245 246 with ApiErrorContext(): 247 data = cluster_api_instance.export_clusters(payload) 248 249 assert data.workflow_execution_id is not None 250 251 workflow_execution_id = WorkflowExecutionId(data.workflow_execution_id) 252 253 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> Path: 254 file_api = FileApi(self._inner_api_client, self._log_level) 255 file_path = file_api.download_export_by_workflow_execution_task_id(task_id=task_id, output_directory=output_directory) 256 257 logger.success(f"Exported clusters to {file_path}") 258 259 return file_path 260 261 waitable = WorkflowExecutionTaskWaitable[Path]( 262 workflow_execution_id=workflow_execution_id, 263 on_complete=on_complete, 264 task_template_name=WorkflowTaskTemplateName.ENPI_APP_CLUSTER_EXPORT, 265 ) 266 267 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)
Start a Cluster export and download the results CSV file.
Arguments:
- cluster_run_id (enpi_api.l2.types.cluster.ClusterRunId): ID of the Cluster Run to export.
- mode (enpi_api.l2.types.cluster.ExportClustersMode): Mode in which export is run.
- limit (int | None): If specified, the export will contain only the first N clusters, N being the value passed as this param.
- output_directory (str | Path | None): Directory into which the downloaded archive will be extracted. If left empty, a temporary directory will be created.
Returns:
enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the path to the downloaded CSV file when awaited.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
Download successful Cluster Export result as CSV.
with EnpiApiClient() as enpi_client: file_path = client.cluster_api.export_clusters_as_csv( cluster_run_id=ClusterRunId("b236c01c-e0e2-464e-8bd7-b1718476a78b"), mode=ExportClusterMode.CLONES, limit=None, output_directory="/home/exported_data/", ) print(file_path)
269 def export_clusters_as_df( 270 self, 271 cluster_run_id: ClusterRunId, 272 mode: ExportClustersMode, 273 limit: int | None = None, 274 ) -> Execution[pd.DataFrame]: 275 """Start a Cluster Export, download the results, and return them as Pandas DataFrame. 276 277 Args: 278 cluster_run_id (enpi_api.l2.types.cluster.ClusterRunId): 279 ID of the Cluster Run to export. 280 mode (enpi_api.l2.types.cluster.ExportClustersMode): 281 Mode in which export is run. 282 limit (int | None): 283 If specified, the export will contain only the first N clusters, N being the value passed as this param. 284 285 Returns: 286 enpi_api.l2.types.execution.Execution[pd.DataFrame]: An awaitable that returns the export results as Pandas 287 DataFrame when awaited. 288 289 Raises: 290 enpi_api.l2.types.api_error.ApiError: If API request fails. 291 292 Example: 293 Download successful Cluster Export result as CSV. 294 ```python 295 with EnpiApiClient() as enpi_client: 296 df = enpi_client.cluster_api.export_clusters_as_df( 297 cluster_run_id=ClusterRunId("b236c01c-e0e2-464e-8bd7-b1718476a78b"), 298 mode=ExportClusterMode.CLONES, 299 ).wait() 300 print(df) 301 ``` 302 """ 303 execution = self.export_clusters_as_csv(cluster_run_id, mode, limit) 304 305 def wait() -> pd.DataFrame: 306 file_path = execution.wait() 307 return pd.read_csv(file_path, delimiter="\t") 308 309 return Execution(wait=wait, check_execution_state=execution.check_execution_state)
Start a Cluster Export, download the results, and return them as Pandas DataFrame.
Arguments:
- cluster_run_id (enpi_api.l2.types.cluster.ClusterRunId): ID of the Cluster Run to export.
- mode (enpi_api.l2.types.cluster.ExportClustersMode): Mode in which export is run.
- limit (int | None): If specified, the export will contain only the first N clusters, N being the value passed as this param.
Returns:
enpi_api.l2.types.execution.Execution[pd.DataFrame]: An awaitable that returns the export results as Pandas DataFrame when awaited.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
Download successful Cluster Export result as CSV.
with EnpiApiClient() as enpi_client: df = enpi_client.cluster_api.export_clusters_as_df( cluster_run_id=ClusterRunId("b236c01c-e0e2-464e-8bd7-b1718476a78b"), mode=ExportClusterMode.CLONES, ).wait() print(df)