enpi_api.l2.client.api.collection_api
1import os 2import tempfile 3from pathlib import Path 4from typing import Generator, Mapping 5from uuid import UUID, uuid4 6from zipfile import ZipFile 7 8import pandas as pd 9from loguru import logger 10 11from enpi_api.l1 import openapi_client 12from enpi_api.l2.client.api.file_api import FileApi 13from enpi_api.l2.client.api.filter_api import FilterApi 14from enpi_api.l2.events.workflow_execution_task_waitable import WorkflowExecutionTaskWaitable 15from enpi_api.l2.tags import CloneTags, CollectionTags, SequenceTags 16from enpi_api.l2.types import import_metadata, import_metadata_templated 17from enpi_api.l2.types.api_error import ApiError, ApiErrorContext 18from enpi_api.l2.types.collection import AdditionalImportMetadata, CollectionId, CollectionMetadata 19from enpi_api.l2.types.execution import Execution 20from enpi_api.l2.types.filter import Filter, MatchIds, MatchIdTarget, TemplatedFilter 21from enpi_api.l2.types.log import LogLevel 22from enpi_api.l2.types.reference_database import ReferenceDatabaseRevision 23from enpi_api.l2.types.tag import TagId, TagKey 24from enpi_api.l2.types.task import TaskState 25from enpi_api.l2.types.workflow import WorkflowExecutionId, WorkflowExecutionTaskId, WorkflowTaskTemplateName 26from enpi_api.l2.util.file import verify_headers_uniformity 27 28DEFAULT_EXPORT_TAG_IDS = [ 29 # Collection tags 30 CollectionTags.Name, 31 CollectionTags.Organism, 32 CollectionTags.Complexity, 33 CollectionTags.Receptor, 34 CollectionTags.NumberOfClones, 35 CollectionTags.Reference, 36 # Clone tags 37 CloneTags.TenXBarcode, 38 CloneTags.CloneCount, 39 # Sequence tags 40 SequenceTags.Chain, 41 SequenceTags.SequenceCount, 42 SequenceTags.Cdr3AminoAcids, 43 SequenceTags.VGene, 44 SequenceTags.JGene, 45] 46"""The default tags that are included when exporting a collection to a DataFrame or a CSV file. 47 48These are: 49 50- Collection level tags: 51 - `enpi_api.l2.tags.CollectionTags.Name` 52 - `enpi_api.l2.tags.CollectionTags.Organism` 53 - `enpi_api.l2.tags.CollectionTags.Complexity` 54 - `enpi_api.l2.tags.CollectionTags.Receptor` 55 - `enpi_api.l2.tags.CollectionTags.NumberOfClones` 56 - `enpi_api.l2.tags.CollectionTags.Reference` 57- Clone level tags: 58 - `enpi_api.l2.tags.CloneTags.TenXBarcode` 59 - `enpi_api.l2.tags.CloneTags.CloneCount` 60- Sequence level tags: 61 - `enpi_api.l2.tags.SequenceTags.Chain` 62 - `enpi_api.l2.tags.SequenceTags.SequenceCount` 63 - `enpi_api.l2.tags.SequenceTags.Cdr3AminoAcids` 64 - `enpi_api.l2.tags.SequenceTags.VGene` 65 - `enpi_api.l2.tags.SequenceTags.JGene` 66""" 67 68 69class CollectionApi: 70 _inner_api_client: openapi_client.ApiClient 71 _log_level: LogLevel 72 73 def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel): 74 """@private""" 75 self._inner_api_client = inner_api_client 76 self._log_level = log_level 77 78 def get_collections_metadata(self, name: str | None = None) -> Generator[CollectionMetadata, None, None]: 79 """Get a generator through all available collections in the platform. 80 81 Args: 82 name (str | None): Optional collection name for search by case-insensitive substring matching 83 84 Returns: 85 Generator[enpi_api.l2.types.collection.CollectionMetadata, None, None]: A generator through all collections in the platform. 86 87 Raises: 88 enpi_api.l2.types.api_error.ApiError: If API request fails. 89 90 Example: 91 92 ```python 93 with EnpiApiClient() as enpi_client: 94 for collection in enpi_client.collection_api.get_collections_metadata(): 95 print(collection) 96 ``` 97 """ 98 99 logger.info("Getting a generator through all collections") 100 101 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 102 103 # Fetch the first page, there is always a first page, it may be empty 104 try: 105 get_collections_response = collection_api_instance.get_collections(name=name) 106 except openapi_client.ApiException as e: 107 raise ApiError(e) 108 109 # `collections` and `cursor` get overwritten in the loop below when fetching a new page 110 collections = get_collections_response.collections 111 cursor = get_collections_response.cursor 112 113 while True: 114 for collection in collections: 115 yield CollectionMetadata.from_raw(collection) 116 117 # Check if we need to fetch a next page 118 if cursor is None: 119 logger.trace("No more pages of collections") 120 return # No more pages 121 122 # We have a cursor, so we need to get a next page 123 logger.trace("Fetching next page of collections") 124 try: 125 get_collections_response = collection_api_instance.get_collections( 126 cursor=cursor, 127 name=name if name is not None else None, 128 ) 129 except openapi_client.ApiException as e: 130 raise ApiError(e) 131 collections = get_collections_response.collections 132 cursor = get_collections_response.cursor 133 134 def get_collection_metadata_by_id(self, collection_id: CollectionId) -> CollectionMetadata: 135 """Get a single collection by its ID. 136 137 Args: 138 collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to get. 139 140 Returns: 141 enpi_api.l2.types.collection.CollectionMetadata: The collection, with all its metadata. This object does not contain 142 the collection's clones or sequences, only the metadata. For collection's clone and sequence data refer 143 to enpi_api.l2.client.api.collection_api.CollectionApi.get_as_zip and enpi_api.l2.client.api.collection_api.CollectionApi.get_as_df. 144 145 Raises: 146 enpi_api.l2.types.api_error.ApiError: If API request fails. 147 148 Example: 149 150 ```python 151 with EnpiApiClient() as enpi_client: 152 collection: Collection = enpi_client.collection_api.get_collection_metadata_by_id(collection_id=CollectionId(1234)) 153 ``` 154 """ 155 156 logger.info(f"Getting collection with ID `{collection_id}`") 157 158 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 159 160 try: 161 get_collection_response = collection_api_instance.get_collection(collection_id) 162 except openapi_client.ApiException as e: 163 raise ApiError(e) 164 165 collection = CollectionMetadata.from_raw(get_collection_response.collection) 166 167 return collection 168 169 def delete_collection_by_id(self, collection_id: CollectionId) -> None: 170 """Delete a single collection by its ID. 171 172 This will remove the collection from the ENPICOM Platform. 173 174 Args: 175 collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to delete. 176 177 Raises: 178 enpi_api.l2.types.api_error.ApiError: If API request fails. 179 180 Example: 181 182 ```python 183 with EnpiApiClient() as enpi_client: 184 enpi_client.collection_api.delete_collection_by_id(collection_id=CollectionId(1234)) 185 ``` 186 """ 187 188 logger.info(f"Deleting collection with ID `{collection_id}`") 189 190 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 191 192 try: 193 collection_api_instance.delete_collection(id=collection_id, body={}) 194 except openapi_client.ApiException as e: 195 raise ApiError(e) 196 197 logger.info(f"Collection with ID `{collection_id}` successfully deleted") 198 199 def create_collection_from_csv( 200 self, 201 file_path: str | Path, 202 reference_database_revision: ReferenceDatabaseRevision | None = None, 203 skiprows: int = 0, 204 mapping: Mapping[str, TagKey] | Mapping[str, TagId] | None = None, 205 metadata: AdditionalImportMetadata | None = None, 206 organism: str | None = None, 207 ) -> Execution[CollectionMetadata]: 208 """Import a collection from a CSV file (can be gzipped). 209 210 The file should be a CSV file with a couple of required headers. These headers must 211 either be the tag IDs (for example: 2035, 2040) or tag keys (for example: Name, Organism). 212 The following tags are required: 213 214 - enpi_api.l2.tags.CollectionTags.Name 215 - enpi_api.l2.tags.CollectionTags.Organism 216 - enpi_api.l2.tags.SequenceTags.SequenceCount 217 - enpi_api.l2.tags.SequenceTags.CDR3Nucleotides or enpi_api.l2.tags.SequenceTags.CDR3AminoAcids 218 - enpi_api.l2.tags.SequenceTags.ReceptorNucleotides or enpi_api.l2.tags.SequenceTags.FullSequenceNucleotides 219 - enpi_api.l2.tags.SequenceTags.VCall 220 - enpi_api.l2.tags.SequenceTags.JCall 221 222 Args: 223 file_path (str | Path): The path to the CSV file to import. 224 reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. 225 If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one 226 reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references 227 available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. 228 There is no downsides to always specifying the reference manually, which is a safer and less error-prone option. 229 skiprows (int): Number of rows to skip at the beginning of the file, before reading the headers. Defaults to 0. 230 mapping (Mapping[str, enpi_api.l2.types.tag.TagKey] | Mapping[str, enpi_api.l2.types.tag.TagId] | None): Mapping of the 231 CSV headers to ENPICOM Platform tag keys 232 metadata (enpi_api.l2.types.collection.AdditionalImportMetadata | None): Additional metadata to add to the collection. 233 <u>**If the metadata keys overlap with the keys in the CSV (or with the values of the mapping), the metadata will take 234 precedence when creating tags.**</u> 235 organism: (str | None): If passed, it's compared with the organism value found in the first line of the imported file and 236 throws an error if the values are different. Can serve as a quick utility check. 237 238 Returns: 239 enpi_api.l2.types.collection.CollectionMetadata: Metadata of the collection that was imported. 240 241 Raises: 242 KeyError: If 'Organism' column is not found in the imported df/csv. 243 ValueError: If optional `organism` param value differs from the 'Organism' value from the df/csv. 244 enpi_api.l2.types.api_error.ApiError: If API request fails. 245 246 Example: 247 248 ```python 249 with EnpiApiClient() as enpi_client: 250 reference_name = ... 251 species = ... 252 reference = enpi_client.reference_database_api.get_revision_by_name( 253 name=reference_name, 254 species=reference_species, 255 ) 256 257 collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_csv( 258 file_path=import_file_path, 259 reference_database_revision=reference, 260 skiprows=1, 261 mapping={ 262 "title": CollectionTags.Name, 263 "species": CollectionTags.Organism, 264 }, 265 metadata={ 266 CollectionTags.ProjectId: "Project 001", 267 } 268 ).wait() 269 ``` 270 """ 271 272 logger.info(f"Importing collection from CSV file `{file_path}`") 273 274 # Pandas supports gzipped CSV 275 df = pd.read_csv(file_path, sep=",", skiprows=skiprows) 276 277 # Get the organism from the first line. All lines should hold the same value 278 organism_from_file = str(df.iloc[0].get("Organism", None)) 279 if organism_from_file is None: 280 # If not found by tag key, try to access it via the tag ID 281 organism_from_file = str(df.iloc[0].get(CollectionTags.Organism, None)) 282 283 # If it's still none, raise an error - it's a mandatory column anyways 284 if organism_from_file is None: 285 raise KeyError("A required 'Organism' column was not found in the imported file/df") 286 287 # If `organism` param was passed, compare the values 288 if (organism is not None) and (organism != organism_from_file): 289 raise ValueError( 290 f"Value of 'organism' param: {organism} differs from the organism found in file: {organism_from_file}", 291 ) 292 293 # Map the headers in the CSV file to Tag Keys 294 if mapping is not None: 295 # We drop the columns for which no mapping is specified 296 unmapped_headers = set(df.columns).difference(set(mapping.keys())) 297 logger.warning(f"The following headers are unmapped and are removed:\n{unmapped_headers}") 298 df.drop(columns=list(unmapped_headers), inplace=True) 299 df.rename(columns=mapping, inplace=True) 300 if metadata is not None: 301 for key, value in metadata.items(): 302 df[key] = value 303 304 temporary_csv_file_path = f"/tmp/import_collection_csv.{uuid4()}.csv" 305 df.to_csv(temporary_csv_file_path, index=False) 306 verify_headers_uniformity(list(df.columns)) 307 308 # Upload the file to the platform 309 file_api = FileApi(self._inner_api_client, self._log_level) 310 file = file_api.upload_file(temporary_csv_file_path).wait() 311 312 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 313 314 # Start the collection import, this starts a task, so we'll wait for that to be completed 315 import_collection_request = openapi_client.ImportCollectionRequest( 316 file_id=UUID(file.id), 317 organism=organism_from_file, 318 reference_database_id=str(reference_database_revision.reference_database_id) if reference_database_revision is not None else None, 319 reference_database_version=int(reference_database_revision.reference_database_version) if reference_database_revision is not None else None, 320 ) 321 322 with ApiErrorContext(): 323 import_collection_response = collection_api_instance.import_collection(import_collection_request) 324 assert import_collection_response.workflow_execution_id is not None 325 326 workflow_execution_id = WorkflowExecutionId(import_collection_response.workflow_execution_id) 327 328 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> CollectionMetadata: 329 assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead" 330 331 get_collection_id_response = collection_api_instance.get_collection_id_by_workflow_execution_task_id(task_id) 332 assert get_collection_id_response.collection_id is not None 333 334 collection_id = CollectionId(get_collection_id_response.collection_id) 335 336 logger.success(f"Collection with ID `{collection_id}` was successfully imported") 337 # Remove the file from tmp folder 338 os.remove(temporary_csv_file_path) 339 # Remove the file from the platform 340 file_api.delete_file_by_id(file.id) 341 342 return self.get_collection_metadata_by_id(collection_id) 343 344 waitable = WorkflowExecutionTaskWaitable[CollectionMetadata]( 345 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_IMPORT, on_complete=on_complete 346 ) 347 348 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state) 349 350 def create_collection_from_df( 351 self, 352 data_frame: pd.DataFrame, 353 reference_database_revision: ReferenceDatabaseRevision | None = None, 354 ) -> Execution[CollectionMetadata]: 355 """Import a collection from a DataFrame. 356 357 This is a convenience method to import a collection from a Pandas DataFrame. For more information about the 358 collection import, see enpi_api.l2.client.api.collection_api.CollectionApi.create_collection_from_csv. 359 360 Args: 361 data_frame (pd.DataFrame): The DataFrame containing the collection to import. 362 reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. 363 If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one 364 reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references 365 available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. 366 There is no downsides to always specifying the reference manually, which is a safer and less error-prone option. 367 Raises: 368 enpi_api.l2.types.api_error.ApiError: If API request fails. 369 370 Returns: 371 enpi_api.l2.types.execution.Execution[enpi_api.l2.types.collection.CollectionMetadata]: An awaitable that returns the 372 collection that was imported when awaited. 373 374 Example: 375 376 ```python 377 reference_name = ... 378 species = ... 379 reference = enpi_client.reference_database_api.get_revision_by_name( 380 name=reference_name, 381 species=reference_species, 382 ) 383 384 with EnpiApiClient() as enpi_client: 385 df = pd.read_csv('/home/data.csv') 386 collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_df( 387 data_frame=df, 388 reference_database_revision=reference, 389 ).wait() 390 ``` 391 """ 392 393 # We need to turn the DataFrame into a CSV file 394 with tempfile.NamedTemporaryFile(suffix=".csv") as temp_file: 395 data_frame.to_csv(temp_file.name, index=False) 396 397 create_collection_execution = self.create_collection_from_csv( 398 file_path=temp_file.name, 399 reference_database_revision=reference_database_revision, 400 ) 401 402 def wait() -> CollectionMetadata: 403 return create_collection_execution.wait() 404 405 return Execution(wait=wait, check_execution_state=create_collection_execution.check_execution_state) 406 407 def add_metadata(self, filter: Filter, annotation: import_metadata.Annotation) -> Execution[None]: 408 """Import metadata to annotate collections, clones or sequences in batches using a filter. 409 410 This method allows you to simply annotate collections, clones or sequences using a filter. The annotation values 411 that you provide will be applied to all matching items of the specified level. 412 413 If you would like to add different values based on different matched tags, have a look at the methods that 414 support a templated filter, such as `add_metadata_from_file` or `add_metadata_from_df`. 415 416 Args: 417 filter (enpi_api.l2.types.filter.Filter): The filter to narrow down which collections, clones or sequences you wish to annotate. 418 Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters. 419 annotation (enpi_api.l2.types.import_metadata.Annotation): The annotation to apply to the matched collections, clones or sequences. You 420 specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation, 421 enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation 422 are the preferred way of creating annotation configuration. 423 424 Returns: 425 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 426 427 Raises: 428 enpi_api.l2.types.api_error.ApiError: If API request fails. 429 430 Example: 431 432 Batch tag multiple collections with some tags: 433 434 ```python 435 with EnpiApiClient() as enpi_client: 436 collection_ids = [CollectionId(1), CollectionId(2), CollectionId(3)] 437 438 # Create a filter 439 filter = enpi_client.filter_api.create_filter( 440 name="My filter", 441 condition=dict( 442 type="match_ids", 443 target="collection", 444 ids=collection_ids, 445 ), 446 ) 447 448 # Create an annotation 449 annotation = collection_annotation(tags=[ 450 Tag(id=CollectionTags.CampaignId, value="My campaign"), 451 Tag(id=CollectionTags.ProjectId, value="My project"), 452 ]) 453 454 # Add the metadata 455 enpi_client.collection_api.add_metadata(filter=filter, annotation=annotation).wait() 456 ``` 457 """ 458 459 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 460 461 import_metadata_request = openapi_client.ImportMetadataRequest( 462 openapi_client.SearchAndTag( 463 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 464 annotation=annotation.to_api_payload(), 465 ) 466 ) 467 468 with ApiErrorContext(): 469 import_metadata_response = collection_api_instance.import_metadata(import_metadata_request) 470 assert import_metadata_response.workflow_execution_id is not None 471 472 workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id) 473 474 waitable = WorkflowExecutionTaskWaitable[CollectionMetadata]( 475 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT 476 ) 477 478 return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state) 479 480 def add_metadata_from_file( 481 self, 482 filter: TemplatedFilter, 483 annotation: import_metadata_templated.Annotation, 484 file_path: str | Path, 485 ignore_empty_values: bool = True, 486 ) -> Execution[None]: 487 """Import metadata from a CSV or XLSX file to annotate collections, clones or sequences. 488 489 Args: 490 filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate. 491 Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters. 492 annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You 493 specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation, 494 enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation 495 are the preferred way of creating annotation configuration. 496 file_path (str | Path): The path to the CSV or XLSX file to import. 497 498 Returns: 499 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 500 501 Raises: 502 enpi_api.l2.types.api_error.ApiError: If API request fails. 503 504 Example: 505 506 Assume you have a CSV file which describes 2 sequence tags to match on, and a tag to add to the matched sequences. 507 508 Let's call the match columns *match_chain* and *match_productive*, and the column to add *value_to_add*. 509 We'll add the value to a custom imaginary tag that was created before this example. 510 511 The CSV file would look like this: 512 513 | match_chain | match_productive | value_to_add | 514 |-------------|------------------|--------------| 515 | Heavy | true | Heavy and productive | 516 | Heavy | false | Heavy and unproductive | 517 | Kappa | true | Kappa and productive | 518 | Kappa | false | Kappa and unproductive | 519 | Lambda | true | Lambda and productive | 520 | Lambda | false | Lambda and unproductive | 521 522 We also want to narrow down the collections we want to annotate to a single collection with the imaginary ID *1337*. 523 524 ```python 525 my_collection_id: CollectionId = CollectionId(1337) 526 527 tag_id_chain: TagId = TagId(SequenceTags.Chain) 528 tag_id_productive: TagId = TagId(SequenceTags.Cdr3Productive) 529 tag_id_value_to_add: TagId = TagId(52001) # This is a custom tag 530 531 with EnpiApiClient() as enpi_client: 532 filter = enpi_client.filter_api.get_templated_filter_by_id(FilterId('92be003d-6f5c-447a-baac-c9d420783fc8')) 533 enpi_client.collection_api.add_metadata_from_file( 534 filter=filter, 535 annotation=sequence_annotation([ 536 template_tag(tag_id=tag_id_value_to_add, key="value_to_add"), 537 ]), 538 file_path="path/to/metadata.csv", 539 ).wait() 540 ``` 541 """ 542 543 # We need to upload the file to the platform 544 file_api = FileApi(self._inner_api_client, self._log_level) 545 file_execution = file_api.upload_file(file_path) 546 547 file = file_execution.wait() 548 549 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 550 551 # Start the metadata import, this starts a task, so we'll wait for that to be completed 552 import_metadata_request = openapi_client.ImportMetadataRequest( 553 openapi_client.TemplatedSearchAndTag( 554 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 555 annotation=annotation.to_api_payload(), 556 template_file_id=file.id, 557 ignore_empty_values=ignore_empty_values, 558 ) 559 ) 560 561 with ApiErrorContext(): 562 # The metadata import has not started yet because we first need to wait for the file upload 563 import_metadata_response = collection_api_instance.import_metadata(import_metadata_request) 564 assert import_metadata_response.workflow_execution_id is not None 565 566 workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id) 567 568 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> None: 569 assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead" 570 571 nonlocal file 572 file_api.delete_file_by_id(file.id) 573 574 waitable = WorkflowExecutionTaskWaitable[None]( 575 on_complete=on_complete, workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT_TEMPLATED 576 ) 577 578 return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state) 579 580 def add_metadata_from_df( 581 self, 582 filter: TemplatedFilter, 583 annotation: import_metadata_templated.Annotation, 584 data_frame: pd.DataFrame, 585 ) -> Execution[None]: 586 """Import metadata from a DataFrame to annotate collections, clones or sequences. 587 588 This is a convenience method to import metadata from a Pandas DataFrame. For more information about the 589 metadata import, see the documentation for `import_metadata_from_csv`. 590 591 Args: 592 filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate. 593 annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You 594 specify a specific annotation target and the values to apply. 595 data_frame (pd.DataFrame): The DataFrame containing the templated metadata to import. 596 597 Returns: 598 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 599 600 Raises: 601 enpi_api.l2.types.api_error.ApiError: If API request fails. 602 603 Example: 604 605 Part of the `add_calculated_metadata.py` example script. 606 607 ```python 608 # Specify the filter query to match the sequences we want to add metadata to 609 metadata_filter = client.filter_api.create_templated_filter( 610 name="Metadata import filter", 611 shared=False, 612 condition=TemplatedAndOperator( 613 conditions=[ 614 TemplatedMatchTag(tag_id=CollectionTags.Name), 615 TemplatedMatchId(target=MatchIdTarget.SEQUENCE), 616 ] 617 ), 618 ) 619 620 # Specify the sequence-level annotation to add to the collection 621 metadata_annotation: Annotation = sequence_annotation([template_tag(new_tag_archetype.id)]) 622 623 # Create metadata dataframe 624 metadata_frame = pd.DataFrame( 625 [ 626 [ 627 collection_name, # Match 628 df_row[1]["Unique Sequence ID"], # Match 629 grouped_df.loc[df_row[1]["Unique Clone ID"]]["Sequence Count"], # Add 630 ] 631 for df_row in exported_df.iterrows() 632 ], 633 columns=["Name", "Unique Sequence ID", new_tag_archetype.key], 634 ) 635 636 # Apply metadata to the collection 637 client.collection_api.add_metadata_from_df( 638 filter=metadata_filter, 639 annotation=metadata_annotation, 640 data_frame=metadata_frame, 641 ).wait() 642 ``` 643 """ 644 645 # We need to turn the DataFrame into a CSV file 646 temporary_csv_file_path = f"/tmp/import_metadata.{uuid4()}.csv" 647 data_frame.to_csv(temporary_csv_file_path, index=False) 648 649 return self.add_metadata_from_file(filter, annotation, temporary_csv_file_path) 650 651 def get_as_zip( 652 self, 653 collection_ids: list[CollectionId], 654 filter: Filter | None = None, 655 tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS, 656 output_directory: str | Path | None = None, 657 ) -> Execution[Path]: 658 """Export collection(s) into a zip file. Inside of the archive, each collection is exported to a separate TSV file. 659 660 Args: 661 collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export. 662 filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export. 663 If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used. 664 tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export. 665 output_directory (str | Path | None): The directory path under which file will get exported. If 666 not provided, a temporary directory will be used. 667 668 Returns: 669 enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the full path to the exported file when 670 awaited. 671 672 Raises: 673 enpi_api.l2.types.api_error.ApiError: If API request fails. 674 675 Example: 676 677 ```python 678 with EnpiApiClient() as enpi_client: 679 680 collection_id = CollectionId(1234) 681 682 # Example assumes you have a filter 683 collection_filter: Filter = ... 684 685 path: str = enpi_client.collection_api.get_as_tsv( 686 collection_ids=[collection_id], 687 filter=collection_filter, 688 tag_ids=[ 689 CollectionTags.Name, 690 CollectionTags.Organism, 691 CollectionTags.Complexity, 692 CollectionTags.Receptor, 693 SequenceTags.Chain, 694 SequenceTags.Cdr3Productive, 695 ], 696 output_directory="example/export_result/" 697 ) 698 ``` 699 """ 700 701 # Create the collectiom filter if it wasn't provided, it will match and 702 # get all the clones from target collections 703 if filter is None: 704 filter_api = FilterApi(self._inner_api_client, self._log_level) 705 filter = filter_api.create_filter( 706 name=f"all-collection-clones-filter-{uuid4()}", # Unique name to avoid collision 707 condition=MatchIds( 708 target=MatchIdTarget.COLLECTION, 709 ids=collection_ids, # Match all collection IDs passed to this function 710 ), 711 ) 712 713 # Start the collection export, this starts a task, so we'll wait for that to be completed 714 export_collection_request = openapi_client.ExportRequest( 715 payload=openapi_client.ExportPayload( 716 collection_ids=[int(collection_id) for collection_id in collection_ids], 717 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 718 tag_ids=[int(tag_id) for tag_id in tag_ids], 719 ) 720 ) 721 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 722 723 with ApiErrorContext(): 724 export_collection_response = collection_api_instance.export(export_collection_request) 725 assert export_collection_response.workflow_execution_id is not None 726 727 workflow_execution_id = WorkflowExecutionId(export_collection_response.workflow_execution_id) 728 729 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> Path: 730 file_api = FileApi(self._inner_api_client, self._log_level) 731 file_path = file_api.download_export_by_workflow_execution_task_id(task_id=task_id, output_directory=output_directory) 732 733 logger.success("Collection(s) export has succeeded.") 734 return file_path 735 736 waitable = WorkflowExecutionTaskWaitable[Path]( 737 workflow_execution_id=workflow_execution_id, 738 on_complete=on_complete, 739 task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_EXPORT, 740 ) 741 742 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state) 743 744 def get_as_df( 745 self, 746 collection_ids: list[CollectionId], 747 filter: Filter | None = None, 748 tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS, 749 ) -> Execution[pd.DataFrame]: 750 """Export collection(s) to a Pandas DataFrame. 751 752 Args: 753 collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export. 754 filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export. 755 If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used. 756 tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export. 757 758 Returns: 759 Execution[pd.DataFrame]: An awaitable that will return a DataFrame with the exported collection. 760 761 Raises: 762 enpi_api.l2.types.api_error.ApiError: If API request fails. 763 764 Example: 765 766 ```python 767 with EnpiApiClient() as enpi_client: 768 # Example assumes you have a filter 769 filter: Filter = ... 770 771 df: pd.DataFrame = enpi_client.collection_api.get_as_df( 772 collection_ids=[CollectionId(1)], 773 filter=filter, 774 tag_ids=[CollectionTags.Name, CloneTags.TenXBarcode, SequenceTags.CDR3AminoAcids], 775 ) 776 ``` 777 """ 778 tmp_dir = tempfile.TemporaryDirectory() 779 get_as_zip_execution = self.get_as_zip(collection_ids=collection_ids, filter=filter, tag_ids=tag_ids, output_directory=tmp_dir.name) 780 781 def wait() -> pd.DataFrame: 782 zip_path = get_as_zip_execution.wait() 783 784 # Extract all TSV files from the ZIP archive 785 with ZipFile(zip_path, "r") as zip_ref: 786 zip_ref.extractall(tmp_dir.name) 787 788 # Read all TSV files into a single DataFrame 789 all_dfs = [] 790 for root, _, files in os.walk(tmp_dir.name): 791 for file in files: 792 if file.endswith(".tsv"): 793 file_path = os.path.join(root, file) 794 df = pd.read_csv(file_path, delimiter="\t") 795 all_dfs.append(df) 796 797 return pd.concat(all_dfs) 798 799 return Execution(wait=wait, check_execution_state=get_as_zip_execution.check_execution_state)
The default tags that are included when exporting a collection to a DataFrame or a CSV file.
These are:
- Collection level tags:
- Clone level tags:
- Sequence level tags:
70class CollectionApi: 71 _inner_api_client: openapi_client.ApiClient 72 _log_level: LogLevel 73 74 def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel): 75 """@private""" 76 self._inner_api_client = inner_api_client 77 self._log_level = log_level 78 79 def get_collections_metadata(self, name: str | None = None) -> Generator[CollectionMetadata, None, None]: 80 """Get a generator through all available collections in the platform. 81 82 Args: 83 name (str | None): Optional collection name for search by case-insensitive substring matching 84 85 Returns: 86 Generator[enpi_api.l2.types.collection.CollectionMetadata, None, None]: A generator through all collections in the platform. 87 88 Raises: 89 enpi_api.l2.types.api_error.ApiError: If API request fails. 90 91 Example: 92 93 ```python 94 with EnpiApiClient() as enpi_client: 95 for collection in enpi_client.collection_api.get_collections_metadata(): 96 print(collection) 97 ``` 98 """ 99 100 logger.info("Getting a generator through all collections") 101 102 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 103 104 # Fetch the first page, there is always a first page, it may be empty 105 try: 106 get_collections_response = collection_api_instance.get_collections(name=name) 107 except openapi_client.ApiException as e: 108 raise ApiError(e) 109 110 # `collections` and `cursor` get overwritten in the loop below when fetching a new page 111 collections = get_collections_response.collections 112 cursor = get_collections_response.cursor 113 114 while True: 115 for collection in collections: 116 yield CollectionMetadata.from_raw(collection) 117 118 # Check if we need to fetch a next page 119 if cursor is None: 120 logger.trace("No more pages of collections") 121 return # No more pages 122 123 # We have a cursor, so we need to get a next page 124 logger.trace("Fetching next page of collections") 125 try: 126 get_collections_response = collection_api_instance.get_collections( 127 cursor=cursor, 128 name=name if name is not None else None, 129 ) 130 except openapi_client.ApiException as e: 131 raise ApiError(e) 132 collections = get_collections_response.collections 133 cursor = get_collections_response.cursor 134 135 def get_collection_metadata_by_id(self, collection_id: CollectionId) -> CollectionMetadata: 136 """Get a single collection by its ID. 137 138 Args: 139 collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to get. 140 141 Returns: 142 enpi_api.l2.types.collection.CollectionMetadata: The collection, with all its metadata. This object does not contain 143 the collection's clones or sequences, only the metadata. For collection's clone and sequence data refer 144 to enpi_api.l2.client.api.collection_api.CollectionApi.get_as_zip and enpi_api.l2.client.api.collection_api.CollectionApi.get_as_df. 145 146 Raises: 147 enpi_api.l2.types.api_error.ApiError: If API request fails. 148 149 Example: 150 151 ```python 152 with EnpiApiClient() as enpi_client: 153 collection: Collection = enpi_client.collection_api.get_collection_metadata_by_id(collection_id=CollectionId(1234)) 154 ``` 155 """ 156 157 logger.info(f"Getting collection with ID `{collection_id}`") 158 159 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 160 161 try: 162 get_collection_response = collection_api_instance.get_collection(collection_id) 163 except openapi_client.ApiException as e: 164 raise ApiError(e) 165 166 collection = CollectionMetadata.from_raw(get_collection_response.collection) 167 168 return collection 169 170 def delete_collection_by_id(self, collection_id: CollectionId) -> None: 171 """Delete a single collection by its ID. 172 173 This will remove the collection from the ENPICOM Platform. 174 175 Args: 176 collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to delete. 177 178 Raises: 179 enpi_api.l2.types.api_error.ApiError: If API request fails. 180 181 Example: 182 183 ```python 184 with EnpiApiClient() as enpi_client: 185 enpi_client.collection_api.delete_collection_by_id(collection_id=CollectionId(1234)) 186 ``` 187 """ 188 189 logger.info(f"Deleting collection with ID `{collection_id}`") 190 191 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 192 193 try: 194 collection_api_instance.delete_collection(id=collection_id, body={}) 195 except openapi_client.ApiException as e: 196 raise ApiError(e) 197 198 logger.info(f"Collection with ID `{collection_id}` successfully deleted") 199 200 def create_collection_from_csv( 201 self, 202 file_path: str | Path, 203 reference_database_revision: ReferenceDatabaseRevision | None = None, 204 skiprows: int = 0, 205 mapping: Mapping[str, TagKey] | Mapping[str, TagId] | None = None, 206 metadata: AdditionalImportMetadata | None = None, 207 organism: str | None = None, 208 ) -> Execution[CollectionMetadata]: 209 """Import a collection from a CSV file (can be gzipped). 210 211 The file should be a CSV file with a couple of required headers. These headers must 212 either be the tag IDs (for example: 2035, 2040) or tag keys (for example: Name, Organism). 213 The following tags are required: 214 215 - enpi_api.l2.tags.CollectionTags.Name 216 - enpi_api.l2.tags.CollectionTags.Organism 217 - enpi_api.l2.tags.SequenceTags.SequenceCount 218 - enpi_api.l2.tags.SequenceTags.CDR3Nucleotides or enpi_api.l2.tags.SequenceTags.CDR3AminoAcids 219 - enpi_api.l2.tags.SequenceTags.ReceptorNucleotides or enpi_api.l2.tags.SequenceTags.FullSequenceNucleotides 220 - enpi_api.l2.tags.SequenceTags.VCall 221 - enpi_api.l2.tags.SequenceTags.JCall 222 223 Args: 224 file_path (str | Path): The path to the CSV file to import. 225 reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. 226 If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one 227 reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references 228 available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. 229 There is no downsides to always specifying the reference manually, which is a safer and less error-prone option. 230 skiprows (int): Number of rows to skip at the beginning of the file, before reading the headers. Defaults to 0. 231 mapping (Mapping[str, enpi_api.l2.types.tag.TagKey] | Mapping[str, enpi_api.l2.types.tag.TagId] | None): Mapping of the 232 CSV headers to ENPICOM Platform tag keys 233 metadata (enpi_api.l2.types.collection.AdditionalImportMetadata | None): Additional metadata to add to the collection. 234 <u>**If the metadata keys overlap with the keys in the CSV (or with the values of the mapping), the metadata will take 235 precedence when creating tags.**</u> 236 organism: (str | None): If passed, it's compared with the organism value found in the first line of the imported file and 237 throws an error if the values are different. Can serve as a quick utility check. 238 239 Returns: 240 enpi_api.l2.types.collection.CollectionMetadata: Metadata of the collection that was imported. 241 242 Raises: 243 KeyError: If 'Organism' column is not found in the imported df/csv. 244 ValueError: If optional `organism` param value differs from the 'Organism' value from the df/csv. 245 enpi_api.l2.types.api_error.ApiError: If API request fails. 246 247 Example: 248 249 ```python 250 with EnpiApiClient() as enpi_client: 251 reference_name = ... 252 species = ... 253 reference = enpi_client.reference_database_api.get_revision_by_name( 254 name=reference_name, 255 species=reference_species, 256 ) 257 258 collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_csv( 259 file_path=import_file_path, 260 reference_database_revision=reference, 261 skiprows=1, 262 mapping={ 263 "title": CollectionTags.Name, 264 "species": CollectionTags.Organism, 265 }, 266 metadata={ 267 CollectionTags.ProjectId: "Project 001", 268 } 269 ).wait() 270 ``` 271 """ 272 273 logger.info(f"Importing collection from CSV file `{file_path}`") 274 275 # Pandas supports gzipped CSV 276 df = pd.read_csv(file_path, sep=",", skiprows=skiprows) 277 278 # Get the organism from the first line. All lines should hold the same value 279 organism_from_file = str(df.iloc[0].get("Organism", None)) 280 if organism_from_file is None: 281 # If not found by tag key, try to access it via the tag ID 282 organism_from_file = str(df.iloc[0].get(CollectionTags.Organism, None)) 283 284 # If it's still none, raise an error - it's a mandatory column anyways 285 if organism_from_file is None: 286 raise KeyError("A required 'Organism' column was not found in the imported file/df") 287 288 # If `organism` param was passed, compare the values 289 if (organism is not None) and (organism != organism_from_file): 290 raise ValueError( 291 f"Value of 'organism' param: {organism} differs from the organism found in file: {organism_from_file}", 292 ) 293 294 # Map the headers in the CSV file to Tag Keys 295 if mapping is not None: 296 # We drop the columns for which no mapping is specified 297 unmapped_headers = set(df.columns).difference(set(mapping.keys())) 298 logger.warning(f"The following headers are unmapped and are removed:\n{unmapped_headers}") 299 df.drop(columns=list(unmapped_headers), inplace=True) 300 df.rename(columns=mapping, inplace=True) 301 if metadata is not None: 302 for key, value in metadata.items(): 303 df[key] = value 304 305 temporary_csv_file_path = f"/tmp/import_collection_csv.{uuid4()}.csv" 306 df.to_csv(temporary_csv_file_path, index=False) 307 verify_headers_uniformity(list(df.columns)) 308 309 # Upload the file to the platform 310 file_api = FileApi(self._inner_api_client, self._log_level) 311 file = file_api.upload_file(temporary_csv_file_path).wait() 312 313 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 314 315 # Start the collection import, this starts a task, so we'll wait for that to be completed 316 import_collection_request = openapi_client.ImportCollectionRequest( 317 file_id=UUID(file.id), 318 organism=organism_from_file, 319 reference_database_id=str(reference_database_revision.reference_database_id) if reference_database_revision is not None else None, 320 reference_database_version=int(reference_database_revision.reference_database_version) if reference_database_revision is not None else None, 321 ) 322 323 with ApiErrorContext(): 324 import_collection_response = collection_api_instance.import_collection(import_collection_request) 325 assert import_collection_response.workflow_execution_id is not None 326 327 workflow_execution_id = WorkflowExecutionId(import_collection_response.workflow_execution_id) 328 329 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> CollectionMetadata: 330 assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead" 331 332 get_collection_id_response = collection_api_instance.get_collection_id_by_workflow_execution_task_id(task_id) 333 assert get_collection_id_response.collection_id is not None 334 335 collection_id = CollectionId(get_collection_id_response.collection_id) 336 337 logger.success(f"Collection with ID `{collection_id}` was successfully imported") 338 # Remove the file from tmp folder 339 os.remove(temporary_csv_file_path) 340 # Remove the file from the platform 341 file_api.delete_file_by_id(file.id) 342 343 return self.get_collection_metadata_by_id(collection_id) 344 345 waitable = WorkflowExecutionTaskWaitable[CollectionMetadata]( 346 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_IMPORT, on_complete=on_complete 347 ) 348 349 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state) 350 351 def create_collection_from_df( 352 self, 353 data_frame: pd.DataFrame, 354 reference_database_revision: ReferenceDatabaseRevision | None = None, 355 ) -> Execution[CollectionMetadata]: 356 """Import a collection from a DataFrame. 357 358 This is a convenience method to import a collection from a Pandas DataFrame. For more information about the 359 collection import, see enpi_api.l2.client.api.collection_api.CollectionApi.create_collection_from_csv. 360 361 Args: 362 data_frame (pd.DataFrame): The DataFrame containing the collection to import. 363 reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. 364 If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one 365 reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references 366 available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. 367 There is no downsides to always specifying the reference manually, which is a safer and less error-prone option. 368 Raises: 369 enpi_api.l2.types.api_error.ApiError: If API request fails. 370 371 Returns: 372 enpi_api.l2.types.execution.Execution[enpi_api.l2.types.collection.CollectionMetadata]: An awaitable that returns the 373 collection that was imported when awaited. 374 375 Example: 376 377 ```python 378 reference_name = ... 379 species = ... 380 reference = enpi_client.reference_database_api.get_revision_by_name( 381 name=reference_name, 382 species=reference_species, 383 ) 384 385 with EnpiApiClient() as enpi_client: 386 df = pd.read_csv('/home/data.csv') 387 collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_df( 388 data_frame=df, 389 reference_database_revision=reference, 390 ).wait() 391 ``` 392 """ 393 394 # We need to turn the DataFrame into a CSV file 395 with tempfile.NamedTemporaryFile(suffix=".csv") as temp_file: 396 data_frame.to_csv(temp_file.name, index=False) 397 398 create_collection_execution = self.create_collection_from_csv( 399 file_path=temp_file.name, 400 reference_database_revision=reference_database_revision, 401 ) 402 403 def wait() -> CollectionMetadata: 404 return create_collection_execution.wait() 405 406 return Execution(wait=wait, check_execution_state=create_collection_execution.check_execution_state) 407 408 def add_metadata(self, filter: Filter, annotation: import_metadata.Annotation) -> Execution[None]: 409 """Import metadata to annotate collections, clones or sequences in batches using a filter. 410 411 This method allows you to simply annotate collections, clones or sequences using a filter. The annotation values 412 that you provide will be applied to all matching items of the specified level. 413 414 If you would like to add different values based on different matched tags, have a look at the methods that 415 support a templated filter, such as `add_metadata_from_file` or `add_metadata_from_df`. 416 417 Args: 418 filter (enpi_api.l2.types.filter.Filter): The filter to narrow down which collections, clones or sequences you wish to annotate. 419 Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters. 420 annotation (enpi_api.l2.types.import_metadata.Annotation): The annotation to apply to the matched collections, clones or sequences. You 421 specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation, 422 enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation 423 are the preferred way of creating annotation configuration. 424 425 Returns: 426 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 427 428 Raises: 429 enpi_api.l2.types.api_error.ApiError: If API request fails. 430 431 Example: 432 433 Batch tag multiple collections with some tags: 434 435 ```python 436 with EnpiApiClient() as enpi_client: 437 collection_ids = [CollectionId(1), CollectionId(2), CollectionId(3)] 438 439 # Create a filter 440 filter = enpi_client.filter_api.create_filter( 441 name="My filter", 442 condition=dict( 443 type="match_ids", 444 target="collection", 445 ids=collection_ids, 446 ), 447 ) 448 449 # Create an annotation 450 annotation = collection_annotation(tags=[ 451 Tag(id=CollectionTags.CampaignId, value="My campaign"), 452 Tag(id=CollectionTags.ProjectId, value="My project"), 453 ]) 454 455 # Add the metadata 456 enpi_client.collection_api.add_metadata(filter=filter, annotation=annotation).wait() 457 ``` 458 """ 459 460 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 461 462 import_metadata_request = openapi_client.ImportMetadataRequest( 463 openapi_client.SearchAndTag( 464 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 465 annotation=annotation.to_api_payload(), 466 ) 467 ) 468 469 with ApiErrorContext(): 470 import_metadata_response = collection_api_instance.import_metadata(import_metadata_request) 471 assert import_metadata_response.workflow_execution_id is not None 472 473 workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id) 474 475 waitable = WorkflowExecutionTaskWaitable[CollectionMetadata]( 476 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT 477 ) 478 479 return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state) 480 481 def add_metadata_from_file( 482 self, 483 filter: TemplatedFilter, 484 annotation: import_metadata_templated.Annotation, 485 file_path: str | Path, 486 ignore_empty_values: bool = True, 487 ) -> Execution[None]: 488 """Import metadata from a CSV or XLSX file to annotate collections, clones or sequences. 489 490 Args: 491 filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate. 492 Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters. 493 annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You 494 specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation, 495 enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation 496 are the preferred way of creating annotation configuration. 497 file_path (str | Path): The path to the CSV or XLSX file to import. 498 499 Returns: 500 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 501 502 Raises: 503 enpi_api.l2.types.api_error.ApiError: If API request fails. 504 505 Example: 506 507 Assume you have a CSV file which describes 2 sequence tags to match on, and a tag to add to the matched sequences. 508 509 Let's call the match columns *match_chain* and *match_productive*, and the column to add *value_to_add*. 510 We'll add the value to a custom imaginary tag that was created before this example. 511 512 The CSV file would look like this: 513 514 | match_chain | match_productive | value_to_add | 515 |-------------|------------------|--------------| 516 | Heavy | true | Heavy and productive | 517 | Heavy | false | Heavy and unproductive | 518 | Kappa | true | Kappa and productive | 519 | Kappa | false | Kappa and unproductive | 520 | Lambda | true | Lambda and productive | 521 | Lambda | false | Lambda and unproductive | 522 523 We also want to narrow down the collections we want to annotate to a single collection with the imaginary ID *1337*. 524 525 ```python 526 my_collection_id: CollectionId = CollectionId(1337) 527 528 tag_id_chain: TagId = TagId(SequenceTags.Chain) 529 tag_id_productive: TagId = TagId(SequenceTags.Cdr3Productive) 530 tag_id_value_to_add: TagId = TagId(52001) # This is a custom tag 531 532 with EnpiApiClient() as enpi_client: 533 filter = enpi_client.filter_api.get_templated_filter_by_id(FilterId('92be003d-6f5c-447a-baac-c9d420783fc8')) 534 enpi_client.collection_api.add_metadata_from_file( 535 filter=filter, 536 annotation=sequence_annotation([ 537 template_tag(tag_id=tag_id_value_to_add, key="value_to_add"), 538 ]), 539 file_path="path/to/metadata.csv", 540 ).wait() 541 ``` 542 """ 543 544 # We need to upload the file to the platform 545 file_api = FileApi(self._inner_api_client, self._log_level) 546 file_execution = file_api.upload_file(file_path) 547 548 file = file_execution.wait() 549 550 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 551 552 # Start the metadata import, this starts a task, so we'll wait for that to be completed 553 import_metadata_request = openapi_client.ImportMetadataRequest( 554 openapi_client.TemplatedSearchAndTag( 555 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 556 annotation=annotation.to_api_payload(), 557 template_file_id=file.id, 558 ignore_empty_values=ignore_empty_values, 559 ) 560 ) 561 562 with ApiErrorContext(): 563 # The metadata import has not started yet because we first need to wait for the file upload 564 import_metadata_response = collection_api_instance.import_metadata(import_metadata_request) 565 assert import_metadata_response.workflow_execution_id is not None 566 567 workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id) 568 569 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> None: 570 assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead" 571 572 nonlocal file 573 file_api.delete_file_by_id(file.id) 574 575 waitable = WorkflowExecutionTaskWaitable[None]( 576 on_complete=on_complete, workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT_TEMPLATED 577 ) 578 579 return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state) 580 581 def add_metadata_from_df( 582 self, 583 filter: TemplatedFilter, 584 annotation: import_metadata_templated.Annotation, 585 data_frame: pd.DataFrame, 586 ) -> Execution[None]: 587 """Import metadata from a DataFrame to annotate collections, clones or sequences. 588 589 This is a convenience method to import metadata from a Pandas DataFrame. For more information about the 590 metadata import, see the documentation for `import_metadata_from_csv`. 591 592 Args: 593 filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate. 594 annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You 595 specify a specific annotation target and the values to apply. 596 data_frame (pd.DataFrame): The DataFrame containing the templated metadata to import. 597 598 Returns: 599 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 600 601 Raises: 602 enpi_api.l2.types.api_error.ApiError: If API request fails. 603 604 Example: 605 606 Part of the `add_calculated_metadata.py` example script. 607 608 ```python 609 # Specify the filter query to match the sequences we want to add metadata to 610 metadata_filter = client.filter_api.create_templated_filter( 611 name="Metadata import filter", 612 shared=False, 613 condition=TemplatedAndOperator( 614 conditions=[ 615 TemplatedMatchTag(tag_id=CollectionTags.Name), 616 TemplatedMatchId(target=MatchIdTarget.SEQUENCE), 617 ] 618 ), 619 ) 620 621 # Specify the sequence-level annotation to add to the collection 622 metadata_annotation: Annotation = sequence_annotation([template_tag(new_tag_archetype.id)]) 623 624 # Create metadata dataframe 625 metadata_frame = pd.DataFrame( 626 [ 627 [ 628 collection_name, # Match 629 df_row[1]["Unique Sequence ID"], # Match 630 grouped_df.loc[df_row[1]["Unique Clone ID"]]["Sequence Count"], # Add 631 ] 632 for df_row in exported_df.iterrows() 633 ], 634 columns=["Name", "Unique Sequence ID", new_tag_archetype.key], 635 ) 636 637 # Apply metadata to the collection 638 client.collection_api.add_metadata_from_df( 639 filter=metadata_filter, 640 annotation=metadata_annotation, 641 data_frame=metadata_frame, 642 ).wait() 643 ``` 644 """ 645 646 # We need to turn the DataFrame into a CSV file 647 temporary_csv_file_path = f"/tmp/import_metadata.{uuid4()}.csv" 648 data_frame.to_csv(temporary_csv_file_path, index=False) 649 650 return self.add_metadata_from_file(filter, annotation, temporary_csv_file_path) 651 652 def get_as_zip( 653 self, 654 collection_ids: list[CollectionId], 655 filter: Filter | None = None, 656 tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS, 657 output_directory: str | Path | None = None, 658 ) -> Execution[Path]: 659 """Export collection(s) into a zip file. Inside of the archive, each collection is exported to a separate TSV file. 660 661 Args: 662 collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export. 663 filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export. 664 If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used. 665 tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export. 666 output_directory (str | Path | None): The directory path under which file will get exported. If 667 not provided, a temporary directory will be used. 668 669 Returns: 670 enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the full path to the exported file when 671 awaited. 672 673 Raises: 674 enpi_api.l2.types.api_error.ApiError: If API request fails. 675 676 Example: 677 678 ```python 679 with EnpiApiClient() as enpi_client: 680 681 collection_id = CollectionId(1234) 682 683 # Example assumes you have a filter 684 collection_filter: Filter = ... 685 686 path: str = enpi_client.collection_api.get_as_tsv( 687 collection_ids=[collection_id], 688 filter=collection_filter, 689 tag_ids=[ 690 CollectionTags.Name, 691 CollectionTags.Organism, 692 CollectionTags.Complexity, 693 CollectionTags.Receptor, 694 SequenceTags.Chain, 695 SequenceTags.Cdr3Productive, 696 ], 697 output_directory="example/export_result/" 698 ) 699 ``` 700 """ 701 702 # Create the collectiom filter if it wasn't provided, it will match and 703 # get all the clones from target collections 704 if filter is None: 705 filter_api = FilterApi(self._inner_api_client, self._log_level) 706 filter = filter_api.create_filter( 707 name=f"all-collection-clones-filter-{uuid4()}", # Unique name to avoid collision 708 condition=MatchIds( 709 target=MatchIdTarget.COLLECTION, 710 ids=collection_ids, # Match all collection IDs passed to this function 711 ), 712 ) 713 714 # Start the collection export, this starts a task, so we'll wait for that to be completed 715 export_collection_request = openapi_client.ExportRequest( 716 payload=openapi_client.ExportPayload( 717 collection_ids=[int(collection_id) for collection_id in collection_ids], 718 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 719 tag_ids=[int(tag_id) for tag_id in tag_ids], 720 ) 721 ) 722 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 723 724 with ApiErrorContext(): 725 export_collection_response = collection_api_instance.export(export_collection_request) 726 assert export_collection_response.workflow_execution_id is not None 727 728 workflow_execution_id = WorkflowExecutionId(export_collection_response.workflow_execution_id) 729 730 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> Path: 731 file_api = FileApi(self._inner_api_client, self._log_level) 732 file_path = file_api.download_export_by_workflow_execution_task_id(task_id=task_id, output_directory=output_directory) 733 734 logger.success("Collection(s) export has succeeded.") 735 return file_path 736 737 waitable = WorkflowExecutionTaskWaitable[Path]( 738 workflow_execution_id=workflow_execution_id, 739 on_complete=on_complete, 740 task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_EXPORT, 741 ) 742 743 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state) 744 745 def get_as_df( 746 self, 747 collection_ids: list[CollectionId], 748 filter: Filter | None = None, 749 tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS, 750 ) -> Execution[pd.DataFrame]: 751 """Export collection(s) to a Pandas DataFrame. 752 753 Args: 754 collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export. 755 filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export. 756 If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used. 757 tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export. 758 759 Returns: 760 Execution[pd.DataFrame]: An awaitable that will return a DataFrame with the exported collection. 761 762 Raises: 763 enpi_api.l2.types.api_error.ApiError: If API request fails. 764 765 Example: 766 767 ```python 768 with EnpiApiClient() as enpi_client: 769 # Example assumes you have a filter 770 filter: Filter = ... 771 772 df: pd.DataFrame = enpi_client.collection_api.get_as_df( 773 collection_ids=[CollectionId(1)], 774 filter=filter, 775 tag_ids=[CollectionTags.Name, CloneTags.TenXBarcode, SequenceTags.CDR3AminoAcids], 776 ) 777 ``` 778 """ 779 tmp_dir = tempfile.TemporaryDirectory() 780 get_as_zip_execution = self.get_as_zip(collection_ids=collection_ids, filter=filter, tag_ids=tag_ids, output_directory=tmp_dir.name) 781 782 def wait() -> pd.DataFrame: 783 zip_path = get_as_zip_execution.wait() 784 785 # Extract all TSV files from the ZIP archive 786 with ZipFile(zip_path, "r") as zip_ref: 787 zip_ref.extractall(tmp_dir.name) 788 789 # Read all TSV files into a single DataFrame 790 all_dfs = [] 791 for root, _, files in os.walk(tmp_dir.name): 792 for file in files: 793 if file.endswith(".tsv"): 794 file_path = os.path.join(root, file) 795 df = pd.read_csv(file_path, delimiter="\t") 796 all_dfs.append(df) 797 798 return pd.concat(all_dfs) 799 800 return Execution(wait=wait, check_execution_state=get_as_zip_execution.check_execution_state)
79 def get_collections_metadata(self, name: str | None = None) -> Generator[CollectionMetadata, None, None]: 80 """Get a generator through all available collections in the platform. 81 82 Args: 83 name (str | None): Optional collection name for search by case-insensitive substring matching 84 85 Returns: 86 Generator[enpi_api.l2.types.collection.CollectionMetadata, None, None]: A generator through all collections in the platform. 87 88 Raises: 89 enpi_api.l2.types.api_error.ApiError: If API request fails. 90 91 Example: 92 93 ```python 94 with EnpiApiClient() as enpi_client: 95 for collection in enpi_client.collection_api.get_collections_metadata(): 96 print(collection) 97 ``` 98 """ 99 100 logger.info("Getting a generator through all collections") 101 102 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 103 104 # Fetch the first page, there is always a first page, it may be empty 105 try: 106 get_collections_response = collection_api_instance.get_collections(name=name) 107 except openapi_client.ApiException as e: 108 raise ApiError(e) 109 110 # `collections` and `cursor` get overwritten in the loop below when fetching a new page 111 collections = get_collections_response.collections 112 cursor = get_collections_response.cursor 113 114 while True: 115 for collection in collections: 116 yield CollectionMetadata.from_raw(collection) 117 118 # Check if we need to fetch a next page 119 if cursor is None: 120 logger.trace("No more pages of collections") 121 return # No more pages 122 123 # We have a cursor, so we need to get a next page 124 logger.trace("Fetching next page of collections") 125 try: 126 get_collections_response = collection_api_instance.get_collections( 127 cursor=cursor, 128 name=name if name is not None else None, 129 ) 130 except openapi_client.ApiException as e: 131 raise ApiError(e) 132 collections = get_collections_response.collections 133 cursor = get_collections_response.cursor
Get a generator through all available collections in the platform.
Arguments:
- name (str | None): Optional collection name for search by case-insensitive substring matching
Returns:
Generator[enpi_api.l2.types.collection.CollectionMetadata, None, None]: A generator through all collections in the platform.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: for collection in enpi_client.collection_api.get_collections_metadata(): print(collection)
135 def get_collection_metadata_by_id(self, collection_id: CollectionId) -> CollectionMetadata: 136 """Get a single collection by its ID. 137 138 Args: 139 collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to get. 140 141 Returns: 142 enpi_api.l2.types.collection.CollectionMetadata: The collection, with all its metadata. This object does not contain 143 the collection's clones or sequences, only the metadata. For collection's clone and sequence data refer 144 to enpi_api.l2.client.api.collection_api.CollectionApi.get_as_zip and enpi_api.l2.client.api.collection_api.CollectionApi.get_as_df. 145 146 Raises: 147 enpi_api.l2.types.api_error.ApiError: If API request fails. 148 149 Example: 150 151 ```python 152 with EnpiApiClient() as enpi_client: 153 collection: Collection = enpi_client.collection_api.get_collection_metadata_by_id(collection_id=CollectionId(1234)) 154 ``` 155 """ 156 157 logger.info(f"Getting collection with ID `{collection_id}`") 158 159 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 160 161 try: 162 get_collection_response = collection_api_instance.get_collection(collection_id) 163 except openapi_client.ApiException as e: 164 raise ApiError(e) 165 166 collection = CollectionMetadata.from_raw(get_collection_response.collection) 167 168 return collection
Get a single collection by its ID.
Arguments:
- collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to get.
Returns:
enpi_api.l2.types.collection.CollectionMetadata: The collection, with all its metadata. This object does not contain the collection's clones or sequences, only the metadata. For collection's clone and sequence data refer to enpi_api.l2.client.api.collection_api.CollectionApi.get_as_zip and enpi_api.l2.client.api.collection_api.CollectionApi.get_as_df.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: collection: Collection = enpi_client.collection_api.get_collection_metadata_by_id(collection_id=CollectionId(1234))
170 def delete_collection_by_id(self, collection_id: CollectionId) -> None: 171 """Delete a single collection by its ID. 172 173 This will remove the collection from the ENPICOM Platform. 174 175 Args: 176 collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to delete. 177 178 Raises: 179 enpi_api.l2.types.api_error.ApiError: If API request fails. 180 181 Example: 182 183 ```python 184 with EnpiApiClient() as enpi_client: 185 enpi_client.collection_api.delete_collection_by_id(collection_id=CollectionId(1234)) 186 ``` 187 """ 188 189 logger.info(f"Deleting collection with ID `{collection_id}`") 190 191 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 192 193 try: 194 collection_api_instance.delete_collection(id=collection_id, body={}) 195 except openapi_client.ApiException as e: 196 raise ApiError(e) 197 198 logger.info(f"Collection with ID `{collection_id}` successfully deleted")
Delete a single collection by its ID.
This will remove the collection from the ENPICOM Platform.
Arguments:
- collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to delete.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: enpi_client.collection_api.delete_collection_by_id(collection_id=CollectionId(1234))
200 def create_collection_from_csv( 201 self, 202 file_path: str | Path, 203 reference_database_revision: ReferenceDatabaseRevision | None = None, 204 skiprows: int = 0, 205 mapping: Mapping[str, TagKey] | Mapping[str, TagId] | None = None, 206 metadata: AdditionalImportMetadata | None = None, 207 organism: str | None = None, 208 ) -> Execution[CollectionMetadata]: 209 """Import a collection from a CSV file (can be gzipped). 210 211 The file should be a CSV file with a couple of required headers. These headers must 212 either be the tag IDs (for example: 2035, 2040) or tag keys (for example: Name, Organism). 213 The following tags are required: 214 215 - enpi_api.l2.tags.CollectionTags.Name 216 - enpi_api.l2.tags.CollectionTags.Organism 217 - enpi_api.l2.tags.SequenceTags.SequenceCount 218 - enpi_api.l2.tags.SequenceTags.CDR3Nucleotides or enpi_api.l2.tags.SequenceTags.CDR3AminoAcids 219 - enpi_api.l2.tags.SequenceTags.ReceptorNucleotides or enpi_api.l2.tags.SequenceTags.FullSequenceNucleotides 220 - enpi_api.l2.tags.SequenceTags.VCall 221 - enpi_api.l2.tags.SequenceTags.JCall 222 223 Args: 224 file_path (str | Path): The path to the CSV file to import. 225 reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. 226 If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one 227 reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references 228 available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. 229 There is no downsides to always specifying the reference manually, which is a safer and less error-prone option. 230 skiprows (int): Number of rows to skip at the beginning of the file, before reading the headers. Defaults to 0. 231 mapping (Mapping[str, enpi_api.l2.types.tag.TagKey] | Mapping[str, enpi_api.l2.types.tag.TagId] | None): Mapping of the 232 CSV headers to ENPICOM Platform tag keys 233 metadata (enpi_api.l2.types.collection.AdditionalImportMetadata | None): Additional metadata to add to the collection. 234 <u>**If the metadata keys overlap with the keys in the CSV (or with the values of the mapping), the metadata will take 235 precedence when creating tags.**</u> 236 organism: (str | None): If passed, it's compared with the organism value found in the first line of the imported file and 237 throws an error if the values are different. Can serve as a quick utility check. 238 239 Returns: 240 enpi_api.l2.types.collection.CollectionMetadata: Metadata of the collection that was imported. 241 242 Raises: 243 KeyError: If 'Organism' column is not found in the imported df/csv. 244 ValueError: If optional `organism` param value differs from the 'Organism' value from the df/csv. 245 enpi_api.l2.types.api_error.ApiError: If API request fails. 246 247 Example: 248 249 ```python 250 with EnpiApiClient() as enpi_client: 251 reference_name = ... 252 species = ... 253 reference = enpi_client.reference_database_api.get_revision_by_name( 254 name=reference_name, 255 species=reference_species, 256 ) 257 258 collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_csv( 259 file_path=import_file_path, 260 reference_database_revision=reference, 261 skiprows=1, 262 mapping={ 263 "title": CollectionTags.Name, 264 "species": CollectionTags.Organism, 265 }, 266 metadata={ 267 CollectionTags.ProjectId: "Project 001", 268 } 269 ).wait() 270 ``` 271 """ 272 273 logger.info(f"Importing collection from CSV file `{file_path}`") 274 275 # Pandas supports gzipped CSV 276 df = pd.read_csv(file_path, sep=",", skiprows=skiprows) 277 278 # Get the organism from the first line. All lines should hold the same value 279 organism_from_file = str(df.iloc[0].get("Organism", None)) 280 if organism_from_file is None: 281 # If not found by tag key, try to access it via the tag ID 282 organism_from_file = str(df.iloc[0].get(CollectionTags.Organism, None)) 283 284 # If it's still none, raise an error - it's a mandatory column anyways 285 if organism_from_file is None: 286 raise KeyError("A required 'Organism' column was not found in the imported file/df") 287 288 # If `organism` param was passed, compare the values 289 if (organism is not None) and (organism != organism_from_file): 290 raise ValueError( 291 f"Value of 'organism' param: {organism} differs from the organism found in file: {organism_from_file}", 292 ) 293 294 # Map the headers in the CSV file to Tag Keys 295 if mapping is not None: 296 # We drop the columns for which no mapping is specified 297 unmapped_headers = set(df.columns).difference(set(mapping.keys())) 298 logger.warning(f"The following headers are unmapped and are removed:\n{unmapped_headers}") 299 df.drop(columns=list(unmapped_headers), inplace=True) 300 df.rename(columns=mapping, inplace=True) 301 if metadata is not None: 302 for key, value in metadata.items(): 303 df[key] = value 304 305 temporary_csv_file_path = f"/tmp/import_collection_csv.{uuid4()}.csv" 306 df.to_csv(temporary_csv_file_path, index=False) 307 verify_headers_uniformity(list(df.columns)) 308 309 # Upload the file to the platform 310 file_api = FileApi(self._inner_api_client, self._log_level) 311 file = file_api.upload_file(temporary_csv_file_path).wait() 312 313 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 314 315 # Start the collection import, this starts a task, so we'll wait for that to be completed 316 import_collection_request = openapi_client.ImportCollectionRequest( 317 file_id=UUID(file.id), 318 organism=organism_from_file, 319 reference_database_id=str(reference_database_revision.reference_database_id) if reference_database_revision is not None else None, 320 reference_database_version=int(reference_database_revision.reference_database_version) if reference_database_revision is not None else None, 321 ) 322 323 with ApiErrorContext(): 324 import_collection_response = collection_api_instance.import_collection(import_collection_request) 325 assert import_collection_response.workflow_execution_id is not None 326 327 workflow_execution_id = WorkflowExecutionId(import_collection_response.workflow_execution_id) 328 329 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> CollectionMetadata: 330 assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead" 331 332 get_collection_id_response = collection_api_instance.get_collection_id_by_workflow_execution_task_id(task_id) 333 assert get_collection_id_response.collection_id is not None 334 335 collection_id = CollectionId(get_collection_id_response.collection_id) 336 337 logger.success(f"Collection with ID `{collection_id}` was successfully imported") 338 # Remove the file from tmp folder 339 os.remove(temporary_csv_file_path) 340 # Remove the file from the platform 341 file_api.delete_file_by_id(file.id) 342 343 return self.get_collection_metadata_by_id(collection_id) 344 345 waitable = WorkflowExecutionTaskWaitable[CollectionMetadata]( 346 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_IMPORT, on_complete=on_complete 347 ) 348 349 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)
Import a collection from a CSV file (can be gzipped).
The file should be a CSV file with a couple of required headers. These headers must either be the tag IDs (for example: 2035, 2040) or tag keys (for example: Name, Organism).
The following tags are required:
- enpi_api.l2.tags.CollectionTags.Name
- enpi_api.l2.tags.CollectionTags.Organism
- enpi_api.l2.tags.SequenceTags.SequenceCount
- enpi_api.l2.tags.SequenceTags.CDR3Nucleotides or enpi_api.l2.tags.SequenceTags.CDR3AminoAcids
- enpi_api.l2.tags.SequenceTags.ReceptorNucleotides or enpi_api.l2.tags.SequenceTags.FullSequenceNucleotides
- enpi_api.l2.tags.SequenceTags.VCall
- enpi_api.l2.tags.SequenceTags.JCall
Arguments:
- file_path (str | Path): The path to the CSV file to import.
- reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
- skiprows (int): Number of rows to skip at the beginning of the file, before reading the headers. Defaults to 0.
- mapping (Mapping[str, enpi_api.l2.types.tag.TagKey] | Mapping[str, enpi_api.l2.types.tag.TagId] | None): Mapping of the CSV headers to ENPICOM Platform tag keys
- metadata (enpi_api.l2.types.collection.AdditionalImportMetadata | None): Additional metadata to add to the collection. If the metadata keys overlap with the keys in the CSV (or with the values of the mapping), the metadata will take precedence when creating tags.
- organism: (str | None): If passed, it's compared with the organism value found in the first line of the imported file and throws an error if the values are different. Can serve as a quick utility check.
Returns:
enpi_api.l2.types.collection.CollectionMetadata: Metadata of the collection that was imported.
Raises:
- KeyError: If 'Organism' column is not found in the imported df/csv.
- ValueError: If optional
organismparam value differs from the 'Organism' value from the df/csv. - enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: reference_name = ... species = ... reference = enpi_client.reference_database_api.get_revision_by_name( name=reference_name, species=reference_species, ) collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_csv( file_path=import_file_path, reference_database_revision=reference, skiprows=1, mapping={ "title": CollectionTags.Name, "species": CollectionTags.Organism, }, metadata={ CollectionTags.ProjectId: "Project 001", } ).wait()
351 def create_collection_from_df( 352 self, 353 data_frame: pd.DataFrame, 354 reference_database_revision: ReferenceDatabaseRevision | None = None, 355 ) -> Execution[CollectionMetadata]: 356 """Import a collection from a DataFrame. 357 358 This is a convenience method to import a collection from a Pandas DataFrame. For more information about the 359 collection import, see enpi_api.l2.client.api.collection_api.CollectionApi.create_collection_from_csv. 360 361 Args: 362 data_frame (pd.DataFrame): The DataFrame containing the collection to import. 363 reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. 364 If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one 365 reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references 366 available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. 367 There is no downsides to always specifying the reference manually, which is a safer and less error-prone option. 368 Raises: 369 enpi_api.l2.types.api_error.ApiError: If API request fails. 370 371 Returns: 372 enpi_api.l2.types.execution.Execution[enpi_api.l2.types.collection.CollectionMetadata]: An awaitable that returns the 373 collection that was imported when awaited. 374 375 Example: 376 377 ```python 378 reference_name = ... 379 species = ... 380 reference = enpi_client.reference_database_api.get_revision_by_name( 381 name=reference_name, 382 species=reference_species, 383 ) 384 385 with EnpiApiClient() as enpi_client: 386 df = pd.read_csv('/home/data.csv') 387 collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_df( 388 data_frame=df, 389 reference_database_revision=reference, 390 ).wait() 391 ``` 392 """ 393 394 # We need to turn the DataFrame into a CSV file 395 with tempfile.NamedTemporaryFile(suffix=".csv") as temp_file: 396 data_frame.to_csv(temp_file.name, index=False) 397 398 create_collection_execution = self.create_collection_from_csv( 399 file_path=temp_file.name, 400 reference_database_revision=reference_database_revision, 401 ) 402 403 def wait() -> CollectionMetadata: 404 return create_collection_execution.wait() 405 406 return Execution(wait=wait, check_execution_state=create_collection_execution.check_execution_state)
Import a collection from a DataFrame.
This is a convenience method to import a collection from a Pandas DataFrame. For more information about the collection import, see enpi_api.l2.client.api.collection_api.CollectionApi.create_collection_from_csv.
Arguments:
- data_frame (pd.DataFrame): The DataFrame containing the collection to import.
- reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Returns:
enpi_api.l2.types.execution.Execution[enpi_api.l2.types.collection.CollectionMetadata]: An awaitable that returns the collection that was imported when awaited.
Example:
reference_name = ... species = ... reference = enpi_client.reference_database_api.get_revision_by_name( name=reference_name, species=reference_species, ) with EnpiApiClient() as enpi_client: df = pd.read_csv('/home/data.csv') collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_df( data_frame=df, reference_database_revision=reference, ).wait()
408 def add_metadata(self, filter: Filter, annotation: import_metadata.Annotation) -> Execution[None]: 409 """Import metadata to annotate collections, clones or sequences in batches using a filter. 410 411 This method allows you to simply annotate collections, clones or sequences using a filter. The annotation values 412 that you provide will be applied to all matching items of the specified level. 413 414 If you would like to add different values based on different matched tags, have a look at the methods that 415 support a templated filter, such as `add_metadata_from_file` or `add_metadata_from_df`. 416 417 Args: 418 filter (enpi_api.l2.types.filter.Filter): The filter to narrow down which collections, clones or sequences you wish to annotate. 419 Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters. 420 annotation (enpi_api.l2.types.import_metadata.Annotation): The annotation to apply to the matched collections, clones or sequences. You 421 specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation, 422 enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation 423 are the preferred way of creating annotation configuration. 424 425 Returns: 426 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 427 428 Raises: 429 enpi_api.l2.types.api_error.ApiError: If API request fails. 430 431 Example: 432 433 Batch tag multiple collections with some tags: 434 435 ```python 436 with EnpiApiClient() as enpi_client: 437 collection_ids = [CollectionId(1), CollectionId(2), CollectionId(3)] 438 439 # Create a filter 440 filter = enpi_client.filter_api.create_filter( 441 name="My filter", 442 condition=dict( 443 type="match_ids", 444 target="collection", 445 ids=collection_ids, 446 ), 447 ) 448 449 # Create an annotation 450 annotation = collection_annotation(tags=[ 451 Tag(id=CollectionTags.CampaignId, value="My campaign"), 452 Tag(id=CollectionTags.ProjectId, value="My project"), 453 ]) 454 455 # Add the metadata 456 enpi_client.collection_api.add_metadata(filter=filter, annotation=annotation).wait() 457 ``` 458 """ 459 460 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 461 462 import_metadata_request = openapi_client.ImportMetadataRequest( 463 openapi_client.SearchAndTag( 464 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 465 annotation=annotation.to_api_payload(), 466 ) 467 ) 468 469 with ApiErrorContext(): 470 import_metadata_response = collection_api_instance.import_metadata(import_metadata_request) 471 assert import_metadata_response.workflow_execution_id is not None 472 473 workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id) 474 475 waitable = WorkflowExecutionTaskWaitable[CollectionMetadata]( 476 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT 477 ) 478 479 return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state)
Import metadata to annotate collections, clones or sequences in batches using a filter.
This method allows you to simply annotate collections, clones or sequences using a filter. The annotation values that you provide will be applied to all matching items of the specified level.
If you would like to add different values based on different matched tags, have a look at the methods that
support a templated filter, such as add_metadata_from_file or add_metadata_from_df.
Arguments:
- filter (enpi_api.l2.types.filter.Filter): The filter to narrow down which collections, clones or sequences you wish to annotate. Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters.
- annotation (enpi_api.l2.types.import_metadata.Annotation): The annotation to apply to the matched collections, clones or sequences. You specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation, enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation are the preferred way of creating annotation configuration.
Returns:
enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
Batch tag multiple collections with some tags:
with EnpiApiClient() as enpi_client: collection_ids = [CollectionId(1), CollectionId(2), CollectionId(3)] # Create a filter filter = enpi_client.filter_api.create_filter( name="My filter", condition=dict( type="match_ids", target="collection", ids=collection_ids, ), ) # Create an annotation annotation = collection_annotation(tags=[ Tag(id=CollectionTags.CampaignId, value="My campaign"), Tag(id=CollectionTags.ProjectId, value="My project"), ]) # Add the metadata enpi_client.collection_api.add_metadata(filter=filter, annotation=annotation).wait()
481 def add_metadata_from_file( 482 self, 483 filter: TemplatedFilter, 484 annotation: import_metadata_templated.Annotation, 485 file_path: str | Path, 486 ignore_empty_values: bool = True, 487 ) -> Execution[None]: 488 """Import metadata from a CSV or XLSX file to annotate collections, clones or sequences. 489 490 Args: 491 filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate. 492 Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters. 493 annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You 494 specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation, 495 enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation 496 are the preferred way of creating annotation configuration. 497 file_path (str | Path): The path to the CSV or XLSX file to import. 498 499 Returns: 500 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 501 502 Raises: 503 enpi_api.l2.types.api_error.ApiError: If API request fails. 504 505 Example: 506 507 Assume you have a CSV file which describes 2 sequence tags to match on, and a tag to add to the matched sequences. 508 509 Let's call the match columns *match_chain* and *match_productive*, and the column to add *value_to_add*. 510 We'll add the value to a custom imaginary tag that was created before this example. 511 512 The CSV file would look like this: 513 514 | match_chain | match_productive | value_to_add | 515 |-------------|------------------|--------------| 516 | Heavy | true | Heavy and productive | 517 | Heavy | false | Heavy and unproductive | 518 | Kappa | true | Kappa and productive | 519 | Kappa | false | Kappa and unproductive | 520 | Lambda | true | Lambda and productive | 521 | Lambda | false | Lambda and unproductive | 522 523 We also want to narrow down the collections we want to annotate to a single collection with the imaginary ID *1337*. 524 525 ```python 526 my_collection_id: CollectionId = CollectionId(1337) 527 528 tag_id_chain: TagId = TagId(SequenceTags.Chain) 529 tag_id_productive: TagId = TagId(SequenceTags.Cdr3Productive) 530 tag_id_value_to_add: TagId = TagId(52001) # This is a custom tag 531 532 with EnpiApiClient() as enpi_client: 533 filter = enpi_client.filter_api.get_templated_filter_by_id(FilterId('92be003d-6f5c-447a-baac-c9d420783fc8')) 534 enpi_client.collection_api.add_metadata_from_file( 535 filter=filter, 536 annotation=sequence_annotation([ 537 template_tag(tag_id=tag_id_value_to_add, key="value_to_add"), 538 ]), 539 file_path="path/to/metadata.csv", 540 ).wait() 541 ``` 542 """ 543 544 # We need to upload the file to the platform 545 file_api = FileApi(self._inner_api_client, self._log_level) 546 file_execution = file_api.upload_file(file_path) 547 548 file = file_execution.wait() 549 550 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 551 552 # Start the metadata import, this starts a task, so we'll wait for that to be completed 553 import_metadata_request = openapi_client.ImportMetadataRequest( 554 openapi_client.TemplatedSearchAndTag( 555 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 556 annotation=annotation.to_api_payload(), 557 template_file_id=file.id, 558 ignore_empty_values=ignore_empty_values, 559 ) 560 ) 561 562 with ApiErrorContext(): 563 # The metadata import has not started yet because we first need to wait for the file upload 564 import_metadata_response = collection_api_instance.import_metadata(import_metadata_request) 565 assert import_metadata_response.workflow_execution_id is not None 566 567 workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id) 568 569 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> None: 570 assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead" 571 572 nonlocal file 573 file_api.delete_file_by_id(file.id) 574 575 waitable = WorkflowExecutionTaskWaitable[None]( 576 on_complete=on_complete, workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT_TEMPLATED 577 ) 578 579 return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state)
Import metadata from a CSV or XLSX file to annotate collections, clones or sequences.
Arguments:
- filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate. Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters.
- annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation, enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation are the preferred way of creating annotation configuration.
- file_path (str | Path): The path to the CSV or XLSX file to import.
Returns:
enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
Assume you have a CSV file which describes 2 sequence tags to match on, and a tag to add to the matched sequences.
Let's call the match columns match_chain and match_productive, and the column to add value_to_add. We'll add the value to a custom imaginary tag that was created before this example.
The CSV file would look like this:
match_chain match_productive value_to_add Heavy true Heavy and productive Heavy false Heavy and unproductive Kappa true Kappa and productive Kappa false Kappa and unproductive Lambda true Lambda and productive Lambda false Lambda and unproductive We also want to narrow down the collections we want to annotate to a single collection with the imaginary ID 1337.
my_collection_id: CollectionId = CollectionId(1337) tag_id_chain: TagId = TagId(SequenceTags.Chain) tag_id_productive: TagId = TagId(SequenceTags.Cdr3Productive) tag_id_value_to_add: TagId = TagId(52001) # This is a custom tag with EnpiApiClient() as enpi_client: filter = enpi_client.filter_api.get_templated_filter_by_id(FilterId('92be003d-6f5c-447a-baac-c9d420783fc8')) enpi_client.collection_api.add_metadata_from_file( filter=filter, annotation=sequence_annotation([ template_tag(tag_id=tag_id_value_to_add, key="value_to_add"), ]), file_path="path/to/metadata.csv", ).wait()
581 def add_metadata_from_df( 582 self, 583 filter: TemplatedFilter, 584 annotation: import_metadata_templated.Annotation, 585 data_frame: pd.DataFrame, 586 ) -> Execution[None]: 587 """Import metadata from a DataFrame to annotate collections, clones or sequences. 588 589 This is a convenience method to import metadata from a Pandas DataFrame. For more information about the 590 metadata import, see the documentation for `import_metadata_from_csv`. 591 592 Args: 593 filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate. 594 annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You 595 specify a specific annotation target and the values to apply. 596 data_frame (pd.DataFrame): The DataFrame containing the templated metadata to import. 597 598 Returns: 599 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 600 601 Raises: 602 enpi_api.l2.types.api_error.ApiError: If API request fails. 603 604 Example: 605 606 Part of the `add_calculated_metadata.py` example script. 607 608 ```python 609 # Specify the filter query to match the sequences we want to add metadata to 610 metadata_filter = client.filter_api.create_templated_filter( 611 name="Metadata import filter", 612 shared=False, 613 condition=TemplatedAndOperator( 614 conditions=[ 615 TemplatedMatchTag(tag_id=CollectionTags.Name), 616 TemplatedMatchId(target=MatchIdTarget.SEQUENCE), 617 ] 618 ), 619 ) 620 621 # Specify the sequence-level annotation to add to the collection 622 metadata_annotation: Annotation = sequence_annotation([template_tag(new_tag_archetype.id)]) 623 624 # Create metadata dataframe 625 metadata_frame = pd.DataFrame( 626 [ 627 [ 628 collection_name, # Match 629 df_row[1]["Unique Sequence ID"], # Match 630 grouped_df.loc[df_row[1]["Unique Clone ID"]]["Sequence Count"], # Add 631 ] 632 for df_row in exported_df.iterrows() 633 ], 634 columns=["Name", "Unique Sequence ID", new_tag_archetype.key], 635 ) 636 637 # Apply metadata to the collection 638 client.collection_api.add_metadata_from_df( 639 filter=metadata_filter, 640 annotation=metadata_annotation, 641 data_frame=metadata_frame, 642 ).wait() 643 ``` 644 """ 645 646 # We need to turn the DataFrame into a CSV file 647 temporary_csv_file_path = f"/tmp/import_metadata.{uuid4()}.csv" 648 data_frame.to_csv(temporary_csv_file_path, index=False) 649 650 return self.add_metadata_from_file(filter, annotation, temporary_csv_file_path)
Import metadata from a DataFrame to annotate collections, clones or sequences.
This is a convenience method to import metadata from a Pandas DataFrame. For more information about the
metadata import, see the documentation for import_metadata_from_csv.
Arguments:
- filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate.
- annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You specify a specific annotation target and the values to apply.
- data_frame (pd.DataFrame): The DataFrame containing the templated metadata to import.
Returns:
enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
Part of the
add_calculated_metadata.pyexample script.# Specify the filter query to match the sequences we want to add metadata to metadata_filter = client.filter_api.create_templated_filter( name="Metadata import filter", shared=False, condition=TemplatedAndOperator( conditions=[ TemplatedMatchTag(tag_id=CollectionTags.Name), TemplatedMatchId(target=MatchIdTarget.SEQUENCE), ] ), ) # Specify the sequence-level annotation to add to the collection metadata_annotation: Annotation = sequence_annotation([template_tag(new_tag_archetype.id)]) # Create metadata dataframe metadata_frame = pd.DataFrame( [ [ collection_name, # Match df_row[1]["Unique Sequence ID"], # Match grouped_df.loc[df_row[1]["Unique Clone ID"]]["Sequence Count"], # Add ] for df_row in exported_df.iterrows() ], columns=["Name", "Unique Sequence ID", new_tag_archetype.key], ) # Apply metadata to the collection client.collection_api.add_metadata_from_df( filter=metadata_filter, annotation=metadata_annotation, data_frame=metadata_frame, ).wait()
652 def get_as_zip( 653 self, 654 collection_ids: list[CollectionId], 655 filter: Filter | None = None, 656 tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS, 657 output_directory: str | Path | None = None, 658 ) -> Execution[Path]: 659 """Export collection(s) into a zip file. Inside of the archive, each collection is exported to a separate TSV file. 660 661 Args: 662 collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export. 663 filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export. 664 If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used. 665 tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export. 666 output_directory (str | Path | None): The directory path under which file will get exported. If 667 not provided, a temporary directory will be used. 668 669 Returns: 670 enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the full path to the exported file when 671 awaited. 672 673 Raises: 674 enpi_api.l2.types.api_error.ApiError: If API request fails. 675 676 Example: 677 678 ```python 679 with EnpiApiClient() as enpi_client: 680 681 collection_id = CollectionId(1234) 682 683 # Example assumes you have a filter 684 collection_filter: Filter = ... 685 686 path: str = enpi_client.collection_api.get_as_tsv( 687 collection_ids=[collection_id], 688 filter=collection_filter, 689 tag_ids=[ 690 CollectionTags.Name, 691 CollectionTags.Organism, 692 CollectionTags.Complexity, 693 CollectionTags.Receptor, 694 SequenceTags.Chain, 695 SequenceTags.Cdr3Productive, 696 ], 697 output_directory="example/export_result/" 698 ) 699 ``` 700 """ 701 702 # Create the collectiom filter if it wasn't provided, it will match and 703 # get all the clones from target collections 704 if filter is None: 705 filter_api = FilterApi(self._inner_api_client, self._log_level) 706 filter = filter_api.create_filter( 707 name=f"all-collection-clones-filter-{uuid4()}", # Unique name to avoid collision 708 condition=MatchIds( 709 target=MatchIdTarget.COLLECTION, 710 ids=collection_ids, # Match all collection IDs passed to this function 711 ), 712 ) 713 714 # Start the collection export, this starts a task, so we'll wait for that to be completed 715 export_collection_request = openapi_client.ExportRequest( 716 payload=openapi_client.ExportPayload( 717 collection_ids=[int(collection_id) for collection_id in collection_ids], 718 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 719 tag_ids=[int(tag_id) for tag_id in tag_ids], 720 ) 721 ) 722 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 723 724 with ApiErrorContext(): 725 export_collection_response = collection_api_instance.export(export_collection_request) 726 assert export_collection_response.workflow_execution_id is not None 727 728 workflow_execution_id = WorkflowExecutionId(export_collection_response.workflow_execution_id) 729 730 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> Path: 731 file_api = FileApi(self._inner_api_client, self._log_level) 732 file_path = file_api.download_export_by_workflow_execution_task_id(task_id=task_id, output_directory=output_directory) 733 734 logger.success("Collection(s) export has succeeded.") 735 return file_path 736 737 waitable = WorkflowExecutionTaskWaitable[Path]( 738 workflow_execution_id=workflow_execution_id, 739 on_complete=on_complete, 740 task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_EXPORT, 741 ) 742 743 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)
Export collection(s) into a zip file. Inside of the archive, each collection is exported to a separate TSV file.
Arguments:
- collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
- filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export.
If it's
None, a new filter that matches all thecollection_idsprovided above will be created and used. - tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
- output_directory (str | Path | None): The directory path under which file will get exported. If not provided, a temporary directory will be used.
Returns:
enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the full path to the exported file when awaited.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: collection_id = CollectionId(1234) # Example assumes you have a filter collection_filter: Filter = ... path: str = enpi_client.collection_api.get_as_tsv( collection_ids=[collection_id], filter=collection_filter, tag_ids=[ CollectionTags.Name, CollectionTags.Organism, CollectionTags.Complexity, CollectionTags.Receptor, SequenceTags.Chain, SequenceTags.Cdr3Productive, ], output_directory="example/export_result/" )
745 def get_as_df( 746 self, 747 collection_ids: list[CollectionId], 748 filter: Filter | None = None, 749 tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS, 750 ) -> Execution[pd.DataFrame]: 751 """Export collection(s) to a Pandas DataFrame. 752 753 Args: 754 collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export. 755 filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export. 756 If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used. 757 tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export. 758 759 Returns: 760 Execution[pd.DataFrame]: An awaitable that will return a DataFrame with the exported collection. 761 762 Raises: 763 enpi_api.l2.types.api_error.ApiError: If API request fails. 764 765 Example: 766 767 ```python 768 with EnpiApiClient() as enpi_client: 769 # Example assumes you have a filter 770 filter: Filter = ... 771 772 df: pd.DataFrame = enpi_client.collection_api.get_as_df( 773 collection_ids=[CollectionId(1)], 774 filter=filter, 775 tag_ids=[CollectionTags.Name, CloneTags.TenXBarcode, SequenceTags.CDR3AminoAcids], 776 ) 777 ``` 778 """ 779 tmp_dir = tempfile.TemporaryDirectory() 780 get_as_zip_execution = self.get_as_zip(collection_ids=collection_ids, filter=filter, tag_ids=tag_ids, output_directory=tmp_dir.name) 781 782 def wait() -> pd.DataFrame: 783 zip_path = get_as_zip_execution.wait() 784 785 # Extract all TSV files from the ZIP archive 786 with ZipFile(zip_path, "r") as zip_ref: 787 zip_ref.extractall(tmp_dir.name) 788 789 # Read all TSV files into a single DataFrame 790 all_dfs = [] 791 for root, _, files in os.walk(tmp_dir.name): 792 for file in files: 793 if file.endswith(".tsv"): 794 file_path = os.path.join(root, file) 795 df = pd.read_csv(file_path, delimiter="\t") 796 all_dfs.append(df) 797 798 return pd.concat(all_dfs) 799 800 return Execution(wait=wait, check_execution_state=get_as_zip_execution.check_execution_state)
Export collection(s) to a Pandas DataFrame.
Arguments:
- collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
- filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export.
If it's
None, a new filter that matches all thecollection_idsprovided above will be created and used. - tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
Returns:
Execution[pd.DataFrame]: An awaitable that will return a DataFrame with the exported collection.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: # Example assumes you have a filter filter: Filter = ... df: pd.DataFrame = enpi_client.collection_api.get_as_df( collection_ids=[CollectionId(1)], filter=filter, tag_ids=[CollectionTags.Name, CloneTags.TenXBarcode, SequenceTags.CDR3AminoAcids], )