enpi_api.l2.client.api.collection_api
1import os 2import tempfile 3from pathlib import Path 4from typing import Generator, Mapping 5from uuid import uuid4 6from zipfile import ZipFile 7 8import pandas as pd 9from loguru import logger 10 11from enpi_api.l1 import openapi_client 12from enpi_api.l2.client.api.file_api import FileApi 13from enpi_api.l2.client.api.filter_api import FilterApi 14from enpi_api.l2.events.workflow_execution_task_waitable import WorkflowExecutionTaskWaitable 15from enpi_api.l2.tags import CloneTags, CollectionTags, SequenceTags 16from enpi_api.l2.types import import_metadata, import_metadata_templated 17from enpi_api.l2.types.api_error import ApiError, ApiErrorContext 18from enpi_api.l2.types.collection import AdditionalImportMetadata, CollectionId, CollectionMetadata 19from enpi_api.l2.types.execution import Execution 20from enpi_api.l2.types.filter import Filter, MatchIds, MatchIdTarget, TemplatedFilter 21from enpi_api.l2.types.log import LogLevel 22from enpi_api.l2.types.reference_database import ReferenceDatabaseRevision 23from enpi_api.l2.types.tag import TagId, TagKey 24from enpi_api.l2.types.task import TaskState 25from enpi_api.l2.types.workflow import WorkflowExecutionId, WorkflowExecutionTaskId, WorkflowTaskTemplateName 26from enpi_api.l2.util.file import verify_headers_uniformity 27 28DEFAULT_EXPORT_TAG_IDS = [ 29 # Collection tags 30 CollectionTags.Name, 31 CollectionTags.Organism, 32 CollectionTags.Complexity, 33 CollectionTags.Receptor, 34 CollectionTags.NumberOfClones, 35 CollectionTags.Reference, 36 # Clone tags 37 CloneTags.TenXBarcode, 38 CloneTags.CloneCount, 39 # Sequence tags 40 SequenceTags.Chain, 41 SequenceTags.SequenceCount, 42 SequenceTags.Cdr3AminoAcids, 43 SequenceTags.VGene, 44 SequenceTags.JGene, 45] 46"""The default tags that are included when exporting a collection to a DataFrame or a CSV file. 47 48These are: 49 50- Collection level tags: 51 - `enpi_api.l2.tags.CollectionTags.Name` 52 - `enpi_api.l2.tags.CollectionTags.Organism` 53 - `enpi_api.l2.tags.CollectionTags.Complexity` 54 - `enpi_api.l2.tags.CollectionTags.Receptor` 55 - `enpi_api.l2.tags.CollectionTags.NumberOfClones` 56 - `enpi_api.l2.tags.CollectionTags.Reference` 57- Clone level tags: 58 - `enpi_api.l2.tags.CloneTags.TenXBarcode` 59 - `enpi_api.l2.tags.CloneTags.CloneCount` 60- Sequence level tags: 61 - `enpi_api.l2.tags.SequenceTags.Chain` 62 - `enpi_api.l2.tags.SequenceTags.SequenceCount` 63 - `enpi_api.l2.tags.SequenceTags.Cdr3AminoAcids` 64 - `enpi_api.l2.tags.SequenceTags.VGene` 65 - `enpi_api.l2.tags.SequenceTags.JGene` 66""" 67 68 69class CollectionApi: 70 _inner_api_client: openapi_client.ApiClient 71 _log_level: LogLevel 72 73 def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel): 74 """@private""" 75 self._inner_api_client = inner_api_client 76 self._log_level = log_level 77 78 def get_collections_metadata(self, name: str | None = None) -> Generator[CollectionMetadata, None, None]: 79 """Get a generator through all available collections in the platform. 80 81 Args: 82 name (str | None): Optional collection name for search by case-insensitive substring matching 83 84 Returns: 85 Generator[enpi_api.l2.types.collection.CollectionMetadata, None, None]: A generator through all collections in the platform. 86 87 Raises: 88 enpi_api.l2.types.api_error.ApiError: If API request fails. 89 90 Example: 91 92 ```python 93 with EnpiApiClient() as enpi_client: 94 for collection in enpi_client.collection_api.get_collections_metadata(): 95 print(collection) 96 ``` 97 """ 98 99 logger.info("Getting a generator through all collections") 100 101 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 102 103 # Fetch the first page, there is always a first page, it may be empty 104 try: 105 get_collections_response = collection_api_instance.get_collections(name=name) 106 except openapi_client.ApiException as e: 107 raise ApiError(e) 108 109 # `collections` and `cursor` get overwritten in the loop below when fetching a new page 110 collections = get_collections_response.collections 111 cursor = get_collections_response.cursor 112 113 while True: 114 for collection in collections: 115 yield CollectionMetadata.from_raw(collection) 116 117 # Check if we need to fetch a next page 118 if cursor is None: 119 logger.trace("No more pages of collections") 120 return # No more pages 121 122 # We have a cursor, so we need to get a next page 123 logger.trace("Fetching next page of collections") 124 try: 125 get_collections_response = collection_api_instance.get_collections( 126 cursor=cursor, 127 name=name if name is not None else None, 128 ) 129 except openapi_client.ApiException as e: 130 raise ApiError(e) 131 collections = get_collections_response.collections 132 cursor = get_collections_response.cursor 133 134 def get_collection_metadata_by_id(self, collection_id: CollectionId) -> CollectionMetadata: 135 """Get a single collection by its ID. 136 137 Args: 138 collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to get. 139 140 Returns: 141 enpi_api.l2.types.collection.CollectionMetadata: The collection, with all its metadata. This object does not contain 142 the collection's clones or sequences, only the metadata. For collection's clone and sequence data refer 143 to enpi_api.l2.client.api.collection_api.CollectionApi.get_as_zip and enpi_api.l2.client.api.collection_api.CollectionApi.get_as_df. 144 145 Raises: 146 enpi_api.l2.types.api_error.ApiError: If API request fails. 147 148 Example: 149 150 ```python 151 with EnpiApiClient() as enpi_client: 152 collection: Collection = enpi_client.collection_api.get_collection_metadata_by_id(collection_id=CollectionId(1234)) 153 ``` 154 """ 155 156 logger.info(f"Getting collection with ID `{collection_id}`") 157 158 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 159 160 try: 161 get_collection_response = collection_api_instance.get_collection(collection_id) 162 except openapi_client.ApiException as e: 163 raise ApiError(e) 164 165 collection = CollectionMetadata.from_raw(get_collection_response.collection) 166 167 return collection 168 169 def delete_collection_by_id(self, collection_id: CollectionId) -> None: 170 """Delete a single collection by its ID. 171 172 This will remove the collection from the ENPICOM Platform. 173 174 Args: 175 collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to delete. 176 177 Raises: 178 enpi_api.l2.types.api_error.ApiError: If API request fails. 179 180 Example: 181 182 ```python 183 with EnpiApiClient() as enpi_client: 184 enpi_client.collection_api.delete_collection_by_id(collection_id=CollectionId(1234)) 185 ``` 186 """ 187 188 logger.info(f"Deleting collection with ID `{collection_id}`") 189 190 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 191 192 try: 193 collection_api_instance.delete_collection(id=collection_id) 194 except openapi_client.ApiException as e: 195 raise ApiError(e) 196 197 logger.info(f"Collection with ID `{collection_id}` successfully deleted") 198 199 def create_collection_from_csv( 200 self, 201 file_path: str | Path, 202 reference_database_revision: ReferenceDatabaseRevision | None = None, 203 skiprows: int = 0, 204 mapping: Mapping[str, TagKey] | Mapping[str, TagId] | None = None, 205 metadata: AdditionalImportMetadata | None = None, 206 organism: str | None = None, 207 ) -> Execution[CollectionMetadata]: 208 """Import a collection from a CSV file (can be gzipped). 209 210 The file should be a CSV file with a couple of required headers. These headers must 211 either be the tag IDs (for example: 2035, 2040) or tag keys (for example: Name, Organism). 212 The following tags are required: 213 214 - enpi_api.l2.tags.CollectionTags.Name 215 - enpi_api.l2.tags.CollectionTags.Organism 216 - enpi_api.l2.tags.SequenceTags.SequenceCount 217 - enpi_api.l2.tags.SequenceTags.CDR3Nucleotides or enpi_api.l2.tags.SequenceTags.CDR3AminoAcids 218 - enpi_api.l2.tags.SequenceTags.ReceptorNucleotides or enpi_api.l2.tags.SequenceTags.FullSequenceNucleotides 219 - enpi_api.l2.tags.SequenceTags.VCall 220 - enpi_api.l2.tags.SequenceTags.JCall 221 222 Args: 223 file_path (str | Path): The path to the CSV file to import. 224 reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. 225 If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one 226 reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references 227 available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. 228 There is no downsides to always specifying the reference manually, which is a safer and less error-prone option. 229 skiprows (int): Number of rows to skip at the beginning of the file, before reading the headers. Defaults to 0. 230 mapping (Mapping[str, enpi_api.l2.types.tag.TagKey] | Mapping[str, enpi_api.l2.types.tag.TagId] | None): Mapping of the 231 CSV headers to ENPICOM Platform tag keys 232 metadata (enpi_api.l2.types.collection.AdditionalImportMetadata | None): Additional metadata to add to the collection. 233 <u>**If the metadata keys overlap with the keys in the CSV (or with the values of the mapping), the metadata will take 234 precedence when creating tags.**</u> 235 organism: (str | None): If passed, it's compared with the organism value found in the first line of the imported file and 236 throws an error if the values are different. Can serve as a quick utility check. 237 238 Returns: 239 enpi_api.l2.types.collection.CollectionMetadata: Metadata of the collection that was imported. 240 241 Raises: 242 KeyError: If 'Organism' column is not found in the imported df/csv. 243 ValueError: If optional `organism` param value differs from the 'Organism' value from the df/csv. 244 enpi_api.l2.types.api_error.ApiError: If API request fails. 245 246 Example: 247 248 ```python 249 with EnpiApiClient() as enpi_client: 250 reference_name = ... 251 species = ... 252 reference = enpi_client.reference_database_api.get_revision_by_name( 253 name=reference_name, 254 species=reference_species, 255 ) 256 257 collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_csv( 258 file_path=import_file_path, 259 reference_database_revision=reference, 260 skiprows=1, 261 mapping={ 262 "title": CollectionTags.Name, 263 "species": CollectionTags.Organism, 264 }, 265 metadata={ 266 CollectionTags.ProjectId: "Project 001", 267 } 268 ).wait() 269 ``` 270 """ 271 272 logger.info(f"Importing collection from CSV file `{file_path}`") 273 274 # Pandas supports gzipped CSV 275 df = pd.read_csv(file_path, sep=",", skiprows=skiprows) 276 277 # Get the organism from the first line. All lines should hold the same value 278 organism_from_file = str(df.iloc[0].get("Organism", None)) 279 if organism_from_file is None: 280 # If not found by tag key, try to access it via the tag ID 281 organism_from_file = str(df.iloc[0].get(CollectionTags.Organism, None)) 282 283 # If it's still none, raise an error - it's a mandatory column anyways 284 if organism_from_file is None: 285 raise KeyError("A required 'Organism' column was not found in the imported file/df") 286 287 # If `organism` param was passed, compare the values 288 if (organism is not None) and (organism != organism_from_file): 289 raise ValueError( 290 f"Value of 'organism' param: {organism} differs from the organism found in file: {organism_from_file}", 291 ) 292 293 # Map the headers in the CSV file to Tag Keys 294 if mapping is not None: 295 # We drop the columns for which no mapping is specified 296 unmapped_headers = set(df.columns).difference(set(mapping.keys())) 297 logger.warning(f"The following headers are unmapped and are removed:\n{unmapped_headers}") 298 df.drop(columns=list(unmapped_headers), inplace=True) 299 df.rename(columns=mapping, inplace=True) 300 if metadata is not None: 301 for key, value in metadata.items(): 302 df[key] = value 303 304 temporary_csv_file_path = f"/tmp/import_collection_csv.{uuid4()}.csv" 305 df.to_csv(temporary_csv_file_path, index=False) 306 verify_headers_uniformity(list(df.columns)) 307 308 # Upload the file to the platform 309 file_api = FileApi(self._inner_api_client, self._log_level) 310 file = file_api.upload_file(temporary_csv_file_path).wait() 311 312 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 313 314 # Start the collection import, this starts a task, so we'll wait for that to be completed 315 import_collection_request = openapi_client.ImportCollectionRequest( 316 file_id=file.id, 317 organism=organism_from_file, 318 reference_database_id=str(reference_database_revision.reference_database_id) if reference_database_revision is not None else None, 319 reference_database_version=int(reference_database_revision.reference_database_version) if reference_database_revision is not None else None, 320 ) 321 322 with ApiErrorContext(): 323 import_collection_response = collection_api_instance.import_collection(import_collection_request) 324 assert import_collection_response.workflow_execution_id is not None 325 326 workflow_execution_id = WorkflowExecutionId(import_collection_response.workflow_execution_id) 327 328 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> CollectionMetadata: 329 assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead" 330 331 get_collection_id_response = collection_api_instance.get_collection_id_by_workflow_execution_task_id(task_id) 332 assert get_collection_id_response.collection_id is not None 333 334 collection_id = CollectionId(get_collection_id_response.collection_id) 335 336 logger.success(f"Collection with ID `{collection_id}` was successfully imported") 337 # Remove the file from tmp folder 338 os.remove(temporary_csv_file_path) 339 # Remove the file from the platform 340 file_api.delete_file_by_id(file.id) 341 342 return self.get_collection_metadata_by_id(collection_id) 343 344 waitable = WorkflowExecutionTaskWaitable[CollectionMetadata]( 345 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_IMPORT, on_complete=on_complete 346 ) 347 348 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state) 349 350 def create_collection_from_df( 351 self, 352 data_frame: pd.DataFrame, 353 reference_database_revision: ReferenceDatabaseRevision | None = None, 354 ) -> Execution[CollectionMetadata]: 355 """Import a collection from a DataFrame. 356 357 This is a convenience method to import a collection from a Pandas DataFrame. For more information about the 358 collection import, see enpi_api.l2.client.api.collection_api.CollectionApi.create_collection_from_csv. 359 360 Args: 361 data_frame (pd.DataFrame): The DataFrame containing the collection to import. 362 reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. 363 If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one 364 reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references 365 available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. 366 There is no downsides to always specifying the reference manually, which is a safer and less error-prone option. 367 Raises: 368 enpi_api.l2.types.api_error.ApiError: If API request fails. 369 370 Returns: 371 enpi_api.l2.types.execution.Execution[enpi_api.l2.types.collection.CollectionMetadata]: An awaitable that returns the 372 collection that was imported when awaited. 373 374 Example: 375 376 ```python 377 reference_name = ... 378 species = ... 379 reference = enpi_client.reference_database_api.get_revision_by_name( 380 name=reference_name, 381 species=reference_species, 382 ) 383 384 with EnpiApiClient() as enpi_client: 385 df = pd.read_csv('/home/data.csv') 386 collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_df( 387 data_frame=df, 388 reference_database_revision=reference, 389 ).wait() 390 ``` 391 """ 392 393 # We need to turn the DataFrame into a CSV file 394 with tempfile.NamedTemporaryFile(suffix=".csv") as temp_file: 395 data_frame.to_csv(temp_file.name, index=False) 396 397 create_collection_execution = self.create_collection_from_csv( 398 file_path=temp_file.name, 399 reference_database_revision=reference_database_revision, 400 ) 401 402 def wait() -> CollectionMetadata: 403 return create_collection_execution.wait() 404 405 return Execution(wait=wait, check_execution_state=create_collection_execution.check_execution_state) 406 407 def add_metadata(self, filter: Filter, annotation: import_metadata.Annotation) -> Execution[None]: 408 """Import metadata to annotate collections, clones or sequences in batches using a filter. 409 410 This method allows you to simply annotate collections, clones or sequences using a filter. The annotation values 411 that you provide will be applied to all matching items of the specified level. 412 413 If you would like to add different values based on different matched tags, have a look at the methods that 414 support a templated filter, such as `add_metadata_from_file` or `add_metadata_from_df`. 415 416 Args: 417 filter (enpi_api.l2.types.filter.Filter): The filter to narrow down which collections, clones or sequences you wish to annotate. 418 Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters. 419 annotation (enpi_api.l2.types.import_metadata.Annotation): The annotation to apply to the matched collections, clones or sequences. You 420 specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation, 421 enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation 422 are the preferred way of creating annotation configuration. 423 424 Returns: 425 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 426 427 Raises: 428 enpi_api.l2.types.api_error.ApiError: If API request fails. 429 430 Example: 431 432 Batch tag multiple collections with some tags: 433 434 ```python 435 with EnpiApiClient() as enpi_client: 436 collection_ids = [CollectionId(1), CollectionId(2), CollectionId(3)] 437 438 # Create a filter 439 filter = enpi_client.filter_api.create_filter( 440 name="My filter", 441 condition=dict( 442 type="match_ids", 443 target="collection", 444 ids=collection_ids, 445 ), 446 ) 447 448 # Create an annotation 449 annotation = collection_annotation(tags=[ 450 Tag(id=CollectionTags.CampaignId, value="My campaign"), 451 Tag(id=CollectionTags.ProjectId, value="My project"), 452 ]) 453 454 # Add the metadata 455 enpi_client.collection_api.add_metadata(filter=filter, annotation=annotation).wait() 456 ``` 457 """ 458 459 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 460 461 import_metadata_request = openapi_client.ImportMetadataRequest( 462 openapi_client.SearchAndTag( 463 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 464 annotation=annotation.to_api_payload(), 465 ) 466 ) 467 468 with ApiErrorContext(): 469 import_metadata_response = collection_api_instance.import_metadata(import_metadata_request) 470 assert import_metadata_response.workflow_execution_id is not None 471 472 workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id) 473 474 waitable = WorkflowExecutionTaskWaitable[CollectionMetadata]( 475 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT 476 ) 477 478 return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state) 479 480 def add_metadata_from_file( 481 self, 482 filter: TemplatedFilter, 483 annotation: import_metadata_templated.Annotation, 484 file_path: str | Path, 485 ) -> Execution[None]: 486 """Import metadata from a CSV or XLSX file to annotate collections, clones or sequences. 487 488 Args: 489 filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate. 490 Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters. 491 annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You 492 specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation, 493 enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation 494 are the preferred way of creating annotation configuration. 495 file_path (str | Path): The path to the CSV or XLSX file to import. 496 497 Returns: 498 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 499 500 Raises: 501 enpi_api.l2.types.api_error.ApiError: If API request fails. 502 503 Example: 504 505 Assume you have a CSV file which describes 2 sequence tags to match on, and a tag to add to the matched sequences. 506 507 Let's call the match columns *match_chain* and *match_productive*, and the column to add *value_to_add*. 508 We'll add the value to a custom imaginary tag that was created before this example. 509 510 The CSV file would look like this: 511 512 | match_chain | match_productive | value_to_add | 513 |-------------|------------------|--------------| 514 | Heavy | true | Heavy and productive | 515 | Heavy | false | Heavy and unproductive | 516 | Kappa | true | Kappa and productive | 517 | Kappa | false | Kappa and unproductive | 518 | Lambda | true | Lambda and productive | 519 | Lambda | false | Lambda and unproductive | 520 521 We also want to narrow down the collections we want to annotate to a single collection with the imaginary ID *1337*. 522 523 ```python 524 my_collection_id: CollectionId = CollectionId(1337) 525 526 tag_id_chain: TagId = TagId(SequenceTags.Chain) 527 tag_id_productive: TagId = TagId(SequenceTags.Productive) 528 tag_id_value_to_add: TagId = TagId(52001) # This is a custom tag 529 530 with EnpiApiClient() as enpi_client: 531 filter = enpi_client.filter_api.get_templated_filter_by_id(FilterId('92be003d-6f5c-447a-baac-c9d420783fc8')) 532 enpi_client.collection_api.add_metadata_from_file( 533 filter=filter, 534 annotation=sequence_annotation([ 535 template_tag(tag_id=tag_id_value_to_add, key="value_to_add"), 536 ]), 537 file_path="path/to/metadata.csv", 538 ).wait() 539 ``` 540 """ 541 542 # We need to upload the file to the platform 543 file_api = FileApi(self._inner_api_client, self._log_level) 544 file_execution = file_api.upload_file(file_path) 545 546 file = file_execution.wait() 547 548 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 549 550 # Start the metadata import, this starts a task, so we'll wait for that to be completed 551 import_metadata_request = openapi_client.ImportMetadataRequest( 552 openapi_client.TemplatedSearchAndTag( 553 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 554 annotation=annotation.to_api_payload(), 555 template_file_id=file.id, 556 ) 557 ) 558 559 with ApiErrorContext(): 560 # The metadata import has not started yet because we first need to wait for the file upload 561 import_metadata_response = collection_api_instance.import_metadata(import_metadata_request) 562 assert import_metadata_response.workflow_execution_id is not None 563 564 workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id) 565 566 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> None: 567 assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead" 568 569 nonlocal file 570 file_api.delete_file_by_id(file.id) 571 572 waitable = WorkflowExecutionTaskWaitable[None]( 573 on_complete=on_complete, workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT_TEMPLATED 574 ) 575 576 return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state) 577 578 def add_metadata_from_df( 579 self, 580 filter: TemplatedFilter, 581 annotation: import_metadata_templated.Annotation, 582 data_frame: pd.DataFrame, 583 ) -> Execution[None]: 584 """Import metadata from a DataFrame to annotate collections, clones or sequences. 585 586 This is a convenience method to import metadata from a Pandas DataFrame. For more information about the 587 metadata import, see the documentation for `import_metadata_from_csv`. 588 589 Args: 590 filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate. 591 annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You 592 specify a specific annotation target and the values to apply. 593 data_frame (pd.DataFrame): The DataFrame containing the templated metadata to import. 594 595 Returns: 596 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 597 598 Raises: 599 enpi_api.l2.types.api_error.ApiError: If API request fails. 600 601 Example: 602 603 Part of the `add_calculated_metadata.py` example script. 604 605 ```python 606 # Specify the filter query to match the sequences we want to add metadata to 607 metadata_filter = client.filter_api.create_templated_filter( 608 name="Metadata import filter", 609 shared=False, 610 condition=TemplatedAndOperator( 611 conditions=[ 612 TemplatedMatchTag(tag_id=CollectionTags.Name), 613 TemplatedMatchId(target=MatchIdTarget.SEQUENCE), 614 ] 615 ), 616 ) 617 618 # Specify the sequence-level annotation to add to the collection 619 metadata_annotation: Annotation = sequence_annotation([template_tag(new_tag_archetype.id)]) 620 621 # Create metadata dataframe 622 metadata_frame = pd.DataFrame( 623 [ 624 [ 625 collection_name, # Match 626 df_row[1]["Unique Sequence ID"], # Match 627 grouped_df.loc[df_row[1]["Unique Clone ID"]]["Sequence Count"], # Add 628 ] 629 for df_row in exported_df.iterrows() 630 ], 631 columns=["Name", "Unique Sequence ID", new_tag_archetype.key], 632 ) 633 634 # Apply metadata to the collection 635 client.collection_api.add_metadata_from_df( 636 filter=metadata_filter, 637 annotation=metadata_annotation, 638 data_frame=metadata_frame, 639 ).wait() 640 ``` 641 """ 642 643 # We need to turn the DataFrame into a CSV file 644 temporary_csv_file_path = f"/tmp/import_metadata.{uuid4()}.csv" 645 data_frame.to_csv(temporary_csv_file_path, index=False) 646 647 return self.add_metadata_from_file(filter, annotation, temporary_csv_file_path) 648 649 def get_as_zip( 650 self, 651 collection_ids: list[CollectionId], 652 filter: Filter | None = None, 653 tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS, 654 output_directory: str | Path | None = None, 655 ) -> Execution[Path]: 656 """Export collection(s) into a zip file. Inside of the archive, each collection is exported to a separate TSV file. 657 658 Args: 659 collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export. 660 filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export. 661 If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used. 662 tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export. 663 output_directory (str | Path | None): The directory path under which file will get exported. If 664 not provided, a temporary directory will be used. 665 666 Returns: 667 enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the full path to the exported file when 668 awaited. 669 670 Raises: 671 enpi_api.l2.types.api_error.ApiError: If API request fails. 672 673 Example: 674 675 ```python 676 with EnpiApiClient() as enpi_client: 677 678 collection_id = CollectionId(1234) 679 680 # Example assumes you have a filter 681 collection_filter: Filter = ... 682 683 path: str = enpi_client.collection_api.get_as_tsv( 684 collection_ids=[collection_id], 685 filter=collection_filter, 686 tag_ids=[ 687 CollectionTags.Name, 688 CollectionTags.Organism, 689 CollectionTags.Complexity, 690 CollectionTags.Receptor, 691 SequenceTags.Chain, 692 SequenceTags.Productive, 693 ], 694 output_directory="example/export_result/" 695 ) 696 ``` 697 """ 698 699 # Create the collectiom filter if it wasn't provided, it will match and 700 # get all the clones from target collections 701 if filter is None: 702 filter_api = FilterApi(self._inner_api_client, self._log_level) 703 filter = filter_api.create_filter( 704 name=f"all-collection-clones-filter-{uuid4()}", # Unique name to avoid collision 705 condition=MatchIds( 706 target=MatchIdTarget.COLLECTION, 707 ids=collection_ids, # Match all collection IDs passed to this function 708 ), 709 ) 710 711 # Start the collection export, this starts a task, so we'll wait for that to be completed 712 export_collection_request = openapi_client.ExportRequest( 713 payload=openapi_client.ExportPayload( 714 collection_ids=[int(collection_id) for collection_id in collection_ids], 715 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 716 tag_ids=[int(tag_id) for tag_id in tag_ids], 717 ) 718 ) 719 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 720 721 with ApiErrorContext(): 722 export_collection_response = collection_api_instance.export(export_collection_request) 723 assert export_collection_response.workflow_execution_id is not None 724 725 workflow_execution_id = WorkflowExecutionId(export_collection_response.workflow_execution_id) 726 727 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> Path: 728 file_api = FileApi(self._inner_api_client, self._log_level) 729 file_path = file_api.download_export_by_workflow_execution_task_id(task_id=task_id, output_directory=output_directory) 730 731 logger.success("Collection(s) export has succeeded.") 732 return file_path 733 734 waitable = WorkflowExecutionTaskWaitable[Path]( 735 workflow_execution_id=workflow_execution_id, 736 on_complete=on_complete, 737 task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_EXPORT, 738 ) 739 740 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state) 741 742 def get_as_df( 743 self, 744 collection_ids: list[CollectionId], 745 filter: Filter | None = None, 746 tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS, 747 ) -> Execution[pd.DataFrame]: 748 """Export collection(s) to a Pandas DataFrame. 749 750 Args: 751 collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export. 752 filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export. 753 If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used. 754 tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export. 755 756 Returns: 757 Execution[pd.DataFrame]: An awaitable that will return a DataFrame with the exported collection. 758 759 Raises: 760 enpi_api.l2.types.api_error.ApiError: If API request fails. 761 762 Example: 763 764 ```python 765 with EnpiApiClient() as enpi_client: 766 # Example assumes you have a filter 767 filter: Filter = ... 768 769 df: pd.DataFrame = enpi_client.collection_api.get_as_df( 770 collection_ids=[CollectionId(1)], 771 filter=filter, 772 tag_ids=[CollectionTags.Name, CloneTags.TenXBarcode, SequenceTags.CDR3AminoAcids], 773 ) 774 ``` 775 """ 776 tmp_dir = tempfile.TemporaryDirectory() 777 get_as_zip_execution = self.get_as_zip(collection_ids=collection_ids, filter=filter, tag_ids=tag_ids, output_directory=tmp_dir.name) 778 779 def wait() -> pd.DataFrame: 780 zip_path = get_as_zip_execution.wait() 781 782 # Extract all TSV files from the ZIP archive 783 with ZipFile(zip_path, "r") as zip_ref: 784 zip_ref.extractall(tmp_dir.name) 785 786 # Read all TSV files into a single DataFrame 787 all_dfs = [] 788 for root, _, files in os.walk(tmp_dir.name): 789 for file in files: 790 if file.endswith(".tsv"): 791 file_path = os.path.join(root, file) 792 df = pd.read_csv(file_path, delimiter="\t") 793 all_dfs.append(df) 794 795 return pd.concat(all_dfs) 796 797 return Execution(wait=wait, check_execution_state=get_as_zip_execution.check_execution_state)
The default tags that are included when exporting a collection to a DataFrame or a CSV file.
These are:
- Collection level tags:
- Clone level tags:
- Sequence level tags:
70class CollectionApi: 71 _inner_api_client: openapi_client.ApiClient 72 _log_level: LogLevel 73 74 def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel): 75 """@private""" 76 self._inner_api_client = inner_api_client 77 self._log_level = log_level 78 79 def get_collections_metadata(self, name: str | None = None) -> Generator[CollectionMetadata, None, None]: 80 """Get a generator through all available collections in the platform. 81 82 Args: 83 name (str | None): Optional collection name for search by case-insensitive substring matching 84 85 Returns: 86 Generator[enpi_api.l2.types.collection.CollectionMetadata, None, None]: A generator through all collections in the platform. 87 88 Raises: 89 enpi_api.l2.types.api_error.ApiError: If API request fails. 90 91 Example: 92 93 ```python 94 with EnpiApiClient() as enpi_client: 95 for collection in enpi_client.collection_api.get_collections_metadata(): 96 print(collection) 97 ``` 98 """ 99 100 logger.info("Getting a generator through all collections") 101 102 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 103 104 # Fetch the first page, there is always a first page, it may be empty 105 try: 106 get_collections_response = collection_api_instance.get_collections(name=name) 107 except openapi_client.ApiException as e: 108 raise ApiError(e) 109 110 # `collections` and `cursor` get overwritten in the loop below when fetching a new page 111 collections = get_collections_response.collections 112 cursor = get_collections_response.cursor 113 114 while True: 115 for collection in collections: 116 yield CollectionMetadata.from_raw(collection) 117 118 # Check if we need to fetch a next page 119 if cursor is None: 120 logger.trace("No more pages of collections") 121 return # No more pages 122 123 # We have a cursor, so we need to get a next page 124 logger.trace("Fetching next page of collections") 125 try: 126 get_collections_response = collection_api_instance.get_collections( 127 cursor=cursor, 128 name=name if name is not None else None, 129 ) 130 except openapi_client.ApiException as e: 131 raise ApiError(e) 132 collections = get_collections_response.collections 133 cursor = get_collections_response.cursor 134 135 def get_collection_metadata_by_id(self, collection_id: CollectionId) -> CollectionMetadata: 136 """Get a single collection by its ID. 137 138 Args: 139 collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to get. 140 141 Returns: 142 enpi_api.l2.types.collection.CollectionMetadata: The collection, with all its metadata. This object does not contain 143 the collection's clones or sequences, only the metadata. For collection's clone and sequence data refer 144 to enpi_api.l2.client.api.collection_api.CollectionApi.get_as_zip and enpi_api.l2.client.api.collection_api.CollectionApi.get_as_df. 145 146 Raises: 147 enpi_api.l2.types.api_error.ApiError: If API request fails. 148 149 Example: 150 151 ```python 152 with EnpiApiClient() as enpi_client: 153 collection: Collection = enpi_client.collection_api.get_collection_metadata_by_id(collection_id=CollectionId(1234)) 154 ``` 155 """ 156 157 logger.info(f"Getting collection with ID `{collection_id}`") 158 159 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 160 161 try: 162 get_collection_response = collection_api_instance.get_collection(collection_id) 163 except openapi_client.ApiException as e: 164 raise ApiError(e) 165 166 collection = CollectionMetadata.from_raw(get_collection_response.collection) 167 168 return collection 169 170 def delete_collection_by_id(self, collection_id: CollectionId) -> None: 171 """Delete a single collection by its ID. 172 173 This will remove the collection from the ENPICOM Platform. 174 175 Args: 176 collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to delete. 177 178 Raises: 179 enpi_api.l2.types.api_error.ApiError: If API request fails. 180 181 Example: 182 183 ```python 184 with EnpiApiClient() as enpi_client: 185 enpi_client.collection_api.delete_collection_by_id(collection_id=CollectionId(1234)) 186 ``` 187 """ 188 189 logger.info(f"Deleting collection with ID `{collection_id}`") 190 191 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 192 193 try: 194 collection_api_instance.delete_collection(id=collection_id) 195 except openapi_client.ApiException as e: 196 raise ApiError(e) 197 198 logger.info(f"Collection with ID `{collection_id}` successfully deleted") 199 200 def create_collection_from_csv( 201 self, 202 file_path: str | Path, 203 reference_database_revision: ReferenceDatabaseRevision | None = None, 204 skiprows: int = 0, 205 mapping: Mapping[str, TagKey] | Mapping[str, TagId] | None = None, 206 metadata: AdditionalImportMetadata | None = None, 207 organism: str | None = None, 208 ) -> Execution[CollectionMetadata]: 209 """Import a collection from a CSV file (can be gzipped). 210 211 The file should be a CSV file with a couple of required headers. These headers must 212 either be the tag IDs (for example: 2035, 2040) or tag keys (for example: Name, Organism). 213 The following tags are required: 214 215 - enpi_api.l2.tags.CollectionTags.Name 216 - enpi_api.l2.tags.CollectionTags.Organism 217 - enpi_api.l2.tags.SequenceTags.SequenceCount 218 - enpi_api.l2.tags.SequenceTags.CDR3Nucleotides or enpi_api.l2.tags.SequenceTags.CDR3AminoAcids 219 - enpi_api.l2.tags.SequenceTags.ReceptorNucleotides or enpi_api.l2.tags.SequenceTags.FullSequenceNucleotides 220 - enpi_api.l2.tags.SequenceTags.VCall 221 - enpi_api.l2.tags.SequenceTags.JCall 222 223 Args: 224 file_path (str | Path): The path to the CSV file to import. 225 reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. 226 If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one 227 reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references 228 available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. 229 There is no downsides to always specifying the reference manually, which is a safer and less error-prone option. 230 skiprows (int): Number of rows to skip at the beginning of the file, before reading the headers. Defaults to 0. 231 mapping (Mapping[str, enpi_api.l2.types.tag.TagKey] | Mapping[str, enpi_api.l2.types.tag.TagId] | None): Mapping of the 232 CSV headers to ENPICOM Platform tag keys 233 metadata (enpi_api.l2.types.collection.AdditionalImportMetadata | None): Additional metadata to add to the collection. 234 <u>**If the metadata keys overlap with the keys in the CSV (or with the values of the mapping), the metadata will take 235 precedence when creating tags.**</u> 236 organism: (str | None): If passed, it's compared with the organism value found in the first line of the imported file and 237 throws an error if the values are different. Can serve as a quick utility check. 238 239 Returns: 240 enpi_api.l2.types.collection.CollectionMetadata: Metadata of the collection that was imported. 241 242 Raises: 243 KeyError: If 'Organism' column is not found in the imported df/csv. 244 ValueError: If optional `organism` param value differs from the 'Organism' value from the df/csv. 245 enpi_api.l2.types.api_error.ApiError: If API request fails. 246 247 Example: 248 249 ```python 250 with EnpiApiClient() as enpi_client: 251 reference_name = ... 252 species = ... 253 reference = enpi_client.reference_database_api.get_revision_by_name( 254 name=reference_name, 255 species=reference_species, 256 ) 257 258 collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_csv( 259 file_path=import_file_path, 260 reference_database_revision=reference, 261 skiprows=1, 262 mapping={ 263 "title": CollectionTags.Name, 264 "species": CollectionTags.Organism, 265 }, 266 metadata={ 267 CollectionTags.ProjectId: "Project 001", 268 } 269 ).wait() 270 ``` 271 """ 272 273 logger.info(f"Importing collection from CSV file `{file_path}`") 274 275 # Pandas supports gzipped CSV 276 df = pd.read_csv(file_path, sep=",", skiprows=skiprows) 277 278 # Get the organism from the first line. All lines should hold the same value 279 organism_from_file = str(df.iloc[0].get("Organism", None)) 280 if organism_from_file is None: 281 # If not found by tag key, try to access it via the tag ID 282 organism_from_file = str(df.iloc[0].get(CollectionTags.Organism, None)) 283 284 # If it's still none, raise an error - it's a mandatory column anyways 285 if organism_from_file is None: 286 raise KeyError("A required 'Organism' column was not found in the imported file/df") 287 288 # If `organism` param was passed, compare the values 289 if (organism is not None) and (organism != organism_from_file): 290 raise ValueError( 291 f"Value of 'organism' param: {organism} differs from the organism found in file: {organism_from_file}", 292 ) 293 294 # Map the headers in the CSV file to Tag Keys 295 if mapping is not None: 296 # We drop the columns for which no mapping is specified 297 unmapped_headers = set(df.columns).difference(set(mapping.keys())) 298 logger.warning(f"The following headers are unmapped and are removed:\n{unmapped_headers}") 299 df.drop(columns=list(unmapped_headers), inplace=True) 300 df.rename(columns=mapping, inplace=True) 301 if metadata is not None: 302 for key, value in metadata.items(): 303 df[key] = value 304 305 temporary_csv_file_path = f"/tmp/import_collection_csv.{uuid4()}.csv" 306 df.to_csv(temporary_csv_file_path, index=False) 307 verify_headers_uniformity(list(df.columns)) 308 309 # Upload the file to the platform 310 file_api = FileApi(self._inner_api_client, self._log_level) 311 file = file_api.upload_file(temporary_csv_file_path).wait() 312 313 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 314 315 # Start the collection import, this starts a task, so we'll wait for that to be completed 316 import_collection_request = openapi_client.ImportCollectionRequest( 317 file_id=file.id, 318 organism=organism_from_file, 319 reference_database_id=str(reference_database_revision.reference_database_id) if reference_database_revision is not None else None, 320 reference_database_version=int(reference_database_revision.reference_database_version) if reference_database_revision is not None else None, 321 ) 322 323 with ApiErrorContext(): 324 import_collection_response = collection_api_instance.import_collection(import_collection_request) 325 assert import_collection_response.workflow_execution_id is not None 326 327 workflow_execution_id = WorkflowExecutionId(import_collection_response.workflow_execution_id) 328 329 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> CollectionMetadata: 330 assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead" 331 332 get_collection_id_response = collection_api_instance.get_collection_id_by_workflow_execution_task_id(task_id) 333 assert get_collection_id_response.collection_id is not None 334 335 collection_id = CollectionId(get_collection_id_response.collection_id) 336 337 logger.success(f"Collection with ID `{collection_id}` was successfully imported") 338 # Remove the file from tmp folder 339 os.remove(temporary_csv_file_path) 340 # Remove the file from the platform 341 file_api.delete_file_by_id(file.id) 342 343 return self.get_collection_metadata_by_id(collection_id) 344 345 waitable = WorkflowExecutionTaskWaitable[CollectionMetadata]( 346 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_IMPORT, on_complete=on_complete 347 ) 348 349 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state) 350 351 def create_collection_from_df( 352 self, 353 data_frame: pd.DataFrame, 354 reference_database_revision: ReferenceDatabaseRevision | None = None, 355 ) -> Execution[CollectionMetadata]: 356 """Import a collection from a DataFrame. 357 358 This is a convenience method to import a collection from a Pandas DataFrame. For more information about the 359 collection import, see enpi_api.l2.client.api.collection_api.CollectionApi.create_collection_from_csv. 360 361 Args: 362 data_frame (pd.DataFrame): The DataFrame containing the collection to import. 363 reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. 364 If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one 365 reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references 366 available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. 367 There is no downsides to always specifying the reference manually, which is a safer and less error-prone option. 368 Raises: 369 enpi_api.l2.types.api_error.ApiError: If API request fails. 370 371 Returns: 372 enpi_api.l2.types.execution.Execution[enpi_api.l2.types.collection.CollectionMetadata]: An awaitable that returns the 373 collection that was imported when awaited. 374 375 Example: 376 377 ```python 378 reference_name = ... 379 species = ... 380 reference = enpi_client.reference_database_api.get_revision_by_name( 381 name=reference_name, 382 species=reference_species, 383 ) 384 385 with EnpiApiClient() as enpi_client: 386 df = pd.read_csv('/home/data.csv') 387 collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_df( 388 data_frame=df, 389 reference_database_revision=reference, 390 ).wait() 391 ``` 392 """ 393 394 # We need to turn the DataFrame into a CSV file 395 with tempfile.NamedTemporaryFile(suffix=".csv") as temp_file: 396 data_frame.to_csv(temp_file.name, index=False) 397 398 create_collection_execution = self.create_collection_from_csv( 399 file_path=temp_file.name, 400 reference_database_revision=reference_database_revision, 401 ) 402 403 def wait() -> CollectionMetadata: 404 return create_collection_execution.wait() 405 406 return Execution(wait=wait, check_execution_state=create_collection_execution.check_execution_state) 407 408 def add_metadata(self, filter: Filter, annotation: import_metadata.Annotation) -> Execution[None]: 409 """Import metadata to annotate collections, clones or sequences in batches using a filter. 410 411 This method allows you to simply annotate collections, clones or sequences using a filter. The annotation values 412 that you provide will be applied to all matching items of the specified level. 413 414 If you would like to add different values based on different matched tags, have a look at the methods that 415 support a templated filter, such as `add_metadata_from_file` or `add_metadata_from_df`. 416 417 Args: 418 filter (enpi_api.l2.types.filter.Filter): The filter to narrow down which collections, clones or sequences you wish to annotate. 419 Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters. 420 annotation (enpi_api.l2.types.import_metadata.Annotation): The annotation to apply to the matched collections, clones or sequences. You 421 specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation, 422 enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation 423 are the preferred way of creating annotation configuration. 424 425 Returns: 426 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 427 428 Raises: 429 enpi_api.l2.types.api_error.ApiError: If API request fails. 430 431 Example: 432 433 Batch tag multiple collections with some tags: 434 435 ```python 436 with EnpiApiClient() as enpi_client: 437 collection_ids = [CollectionId(1), CollectionId(2), CollectionId(3)] 438 439 # Create a filter 440 filter = enpi_client.filter_api.create_filter( 441 name="My filter", 442 condition=dict( 443 type="match_ids", 444 target="collection", 445 ids=collection_ids, 446 ), 447 ) 448 449 # Create an annotation 450 annotation = collection_annotation(tags=[ 451 Tag(id=CollectionTags.CampaignId, value="My campaign"), 452 Tag(id=CollectionTags.ProjectId, value="My project"), 453 ]) 454 455 # Add the metadata 456 enpi_client.collection_api.add_metadata(filter=filter, annotation=annotation).wait() 457 ``` 458 """ 459 460 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 461 462 import_metadata_request = openapi_client.ImportMetadataRequest( 463 openapi_client.SearchAndTag( 464 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 465 annotation=annotation.to_api_payload(), 466 ) 467 ) 468 469 with ApiErrorContext(): 470 import_metadata_response = collection_api_instance.import_metadata(import_metadata_request) 471 assert import_metadata_response.workflow_execution_id is not None 472 473 workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id) 474 475 waitable = WorkflowExecutionTaskWaitable[CollectionMetadata]( 476 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT 477 ) 478 479 return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state) 480 481 def add_metadata_from_file( 482 self, 483 filter: TemplatedFilter, 484 annotation: import_metadata_templated.Annotation, 485 file_path: str | Path, 486 ) -> Execution[None]: 487 """Import metadata from a CSV or XLSX file to annotate collections, clones or sequences. 488 489 Args: 490 filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate. 491 Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters. 492 annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You 493 specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation, 494 enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation 495 are the preferred way of creating annotation configuration. 496 file_path (str | Path): The path to the CSV or XLSX file to import. 497 498 Returns: 499 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 500 501 Raises: 502 enpi_api.l2.types.api_error.ApiError: If API request fails. 503 504 Example: 505 506 Assume you have a CSV file which describes 2 sequence tags to match on, and a tag to add to the matched sequences. 507 508 Let's call the match columns *match_chain* and *match_productive*, and the column to add *value_to_add*. 509 We'll add the value to a custom imaginary tag that was created before this example. 510 511 The CSV file would look like this: 512 513 | match_chain | match_productive | value_to_add | 514 |-------------|------------------|--------------| 515 | Heavy | true | Heavy and productive | 516 | Heavy | false | Heavy and unproductive | 517 | Kappa | true | Kappa and productive | 518 | Kappa | false | Kappa and unproductive | 519 | Lambda | true | Lambda and productive | 520 | Lambda | false | Lambda and unproductive | 521 522 We also want to narrow down the collections we want to annotate to a single collection with the imaginary ID *1337*. 523 524 ```python 525 my_collection_id: CollectionId = CollectionId(1337) 526 527 tag_id_chain: TagId = TagId(SequenceTags.Chain) 528 tag_id_productive: TagId = TagId(SequenceTags.Productive) 529 tag_id_value_to_add: TagId = TagId(52001) # This is a custom tag 530 531 with EnpiApiClient() as enpi_client: 532 filter = enpi_client.filter_api.get_templated_filter_by_id(FilterId('92be003d-6f5c-447a-baac-c9d420783fc8')) 533 enpi_client.collection_api.add_metadata_from_file( 534 filter=filter, 535 annotation=sequence_annotation([ 536 template_tag(tag_id=tag_id_value_to_add, key="value_to_add"), 537 ]), 538 file_path="path/to/metadata.csv", 539 ).wait() 540 ``` 541 """ 542 543 # We need to upload the file to the platform 544 file_api = FileApi(self._inner_api_client, self._log_level) 545 file_execution = file_api.upload_file(file_path) 546 547 file = file_execution.wait() 548 549 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 550 551 # Start the metadata import, this starts a task, so we'll wait for that to be completed 552 import_metadata_request = openapi_client.ImportMetadataRequest( 553 openapi_client.TemplatedSearchAndTag( 554 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 555 annotation=annotation.to_api_payload(), 556 template_file_id=file.id, 557 ) 558 ) 559 560 with ApiErrorContext(): 561 # The metadata import has not started yet because we first need to wait for the file upload 562 import_metadata_response = collection_api_instance.import_metadata(import_metadata_request) 563 assert import_metadata_response.workflow_execution_id is not None 564 565 workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id) 566 567 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> None: 568 assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead" 569 570 nonlocal file 571 file_api.delete_file_by_id(file.id) 572 573 waitable = WorkflowExecutionTaskWaitable[None]( 574 on_complete=on_complete, workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT_TEMPLATED 575 ) 576 577 return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state) 578 579 def add_metadata_from_df( 580 self, 581 filter: TemplatedFilter, 582 annotation: import_metadata_templated.Annotation, 583 data_frame: pd.DataFrame, 584 ) -> Execution[None]: 585 """Import metadata from a DataFrame to annotate collections, clones or sequences. 586 587 This is a convenience method to import metadata from a Pandas DataFrame. For more information about the 588 metadata import, see the documentation for `import_metadata_from_csv`. 589 590 Args: 591 filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate. 592 annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You 593 specify a specific annotation target and the values to apply. 594 data_frame (pd.DataFrame): The DataFrame containing the templated metadata to import. 595 596 Returns: 597 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 598 599 Raises: 600 enpi_api.l2.types.api_error.ApiError: If API request fails. 601 602 Example: 603 604 Part of the `add_calculated_metadata.py` example script. 605 606 ```python 607 # Specify the filter query to match the sequences we want to add metadata to 608 metadata_filter = client.filter_api.create_templated_filter( 609 name="Metadata import filter", 610 shared=False, 611 condition=TemplatedAndOperator( 612 conditions=[ 613 TemplatedMatchTag(tag_id=CollectionTags.Name), 614 TemplatedMatchId(target=MatchIdTarget.SEQUENCE), 615 ] 616 ), 617 ) 618 619 # Specify the sequence-level annotation to add to the collection 620 metadata_annotation: Annotation = sequence_annotation([template_tag(new_tag_archetype.id)]) 621 622 # Create metadata dataframe 623 metadata_frame = pd.DataFrame( 624 [ 625 [ 626 collection_name, # Match 627 df_row[1]["Unique Sequence ID"], # Match 628 grouped_df.loc[df_row[1]["Unique Clone ID"]]["Sequence Count"], # Add 629 ] 630 for df_row in exported_df.iterrows() 631 ], 632 columns=["Name", "Unique Sequence ID", new_tag_archetype.key], 633 ) 634 635 # Apply metadata to the collection 636 client.collection_api.add_metadata_from_df( 637 filter=metadata_filter, 638 annotation=metadata_annotation, 639 data_frame=metadata_frame, 640 ).wait() 641 ``` 642 """ 643 644 # We need to turn the DataFrame into a CSV file 645 temporary_csv_file_path = f"/tmp/import_metadata.{uuid4()}.csv" 646 data_frame.to_csv(temporary_csv_file_path, index=False) 647 648 return self.add_metadata_from_file(filter, annotation, temporary_csv_file_path) 649 650 def get_as_zip( 651 self, 652 collection_ids: list[CollectionId], 653 filter: Filter | None = None, 654 tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS, 655 output_directory: str | Path | None = None, 656 ) -> Execution[Path]: 657 """Export collection(s) into a zip file. Inside of the archive, each collection is exported to a separate TSV file. 658 659 Args: 660 collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export. 661 filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export. 662 If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used. 663 tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export. 664 output_directory (str | Path | None): The directory path under which file will get exported. If 665 not provided, a temporary directory will be used. 666 667 Returns: 668 enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the full path to the exported file when 669 awaited. 670 671 Raises: 672 enpi_api.l2.types.api_error.ApiError: If API request fails. 673 674 Example: 675 676 ```python 677 with EnpiApiClient() as enpi_client: 678 679 collection_id = CollectionId(1234) 680 681 # Example assumes you have a filter 682 collection_filter: Filter = ... 683 684 path: str = enpi_client.collection_api.get_as_tsv( 685 collection_ids=[collection_id], 686 filter=collection_filter, 687 tag_ids=[ 688 CollectionTags.Name, 689 CollectionTags.Organism, 690 CollectionTags.Complexity, 691 CollectionTags.Receptor, 692 SequenceTags.Chain, 693 SequenceTags.Productive, 694 ], 695 output_directory="example/export_result/" 696 ) 697 ``` 698 """ 699 700 # Create the collectiom filter if it wasn't provided, it will match and 701 # get all the clones from target collections 702 if filter is None: 703 filter_api = FilterApi(self._inner_api_client, self._log_level) 704 filter = filter_api.create_filter( 705 name=f"all-collection-clones-filter-{uuid4()}", # Unique name to avoid collision 706 condition=MatchIds( 707 target=MatchIdTarget.COLLECTION, 708 ids=collection_ids, # Match all collection IDs passed to this function 709 ), 710 ) 711 712 # Start the collection export, this starts a task, so we'll wait for that to be completed 713 export_collection_request = openapi_client.ExportRequest( 714 payload=openapi_client.ExportPayload( 715 collection_ids=[int(collection_id) for collection_id in collection_ids], 716 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 717 tag_ids=[int(tag_id) for tag_id in tag_ids], 718 ) 719 ) 720 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 721 722 with ApiErrorContext(): 723 export_collection_response = collection_api_instance.export(export_collection_request) 724 assert export_collection_response.workflow_execution_id is not None 725 726 workflow_execution_id = WorkflowExecutionId(export_collection_response.workflow_execution_id) 727 728 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> Path: 729 file_api = FileApi(self._inner_api_client, self._log_level) 730 file_path = file_api.download_export_by_workflow_execution_task_id(task_id=task_id, output_directory=output_directory) 731 732 logger.success("Collection(s) export has succeeded.") 733 return file_path 734 735 waitable = WorkflowExecutionTaskWaitable[Path]( 736 workflow_execution_id=workflow_execution_id, 737 on_complete=on_complete, 738 task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_EXPORT, 739 ) 740 741 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state) 742 743 def get_as_df( 744 self, 745 collection_ids: list[CollectionId], 746 filter: Filter | None = None, 747 tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS, 748 ) -> Execution[pd.DataFrame]: 749 """Export collection(s) to a Pandas DataFrame. 750 751 Args: 752 collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export. 753 filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export. 754 If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used. 755 tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export. 756 757 Returns: 758 Execution[pd.DataFrame]: An awaitable that will return a DataFrame with the exported collection. 759 760 Raises: 761 enpi_api.l2.types.api_error.ApiError: If API request fails. 762 763 Example: 764 765 ```python 766 with EnpiApiClient() as enpi_client: 767 # Example assumes you have a filter 768 filter: Filter = ... 769 770 df: pd.DataFrame = enpi_client.collection_api.get_as_df( 771 collection_ids=[CollectionId(1)], 772 filter=filter, 773 tag_ids=[CollectionTags.Name, CloneTags.TenXBarcode, SequenceTags.CDR3AminoAcids], 774 ) 775 ``` 776 """ 777 tmp_dir = tempfile.TemporaryDirectory() 778 get_as_zip_execution = self.get_as_zip(collection_ids=collection_ids, filter=filter, tag_ids=tag_ids, output_directory=tmp_dir.name) 779 780 def wait() -> pd.DataFrame: 781 zip_path = get_as_zip_execution.wait() 782 783 # Extract all TSV files from the ZIP archive 784 with ZipFile(zip_path, "r") as zip_ref: 785 zip_ref.extractall(tmp_dir.name) 786 787 # Read all TSV files into a single DataFrame 788 all_dfs = [] 789 for root, _, files in os.walk(tmp_dir.name): 790 for file in files: 791 if file.endswith(".tsv"): 792 file_path = os.path.join(root, file) 793 df = pd.read_csv(file_path, delimiter="\t") 794 all_dfs.append(df) 795 796 return pd.concat(all_dfs) 797 798 return Execution(wait=wait, check_execution_state=get_as_zip_execution.check_execution_state)
79 def get_collections_metadata(self, name: str | None = None) -> Generator[CollectionMetadata, None, None]: 80 """Get a generator through all available collections in the platform. 81 82 Args: 83 name (str | None): Optional collection name for search by case-insensitive substring matching 84 85 Returns: 86 Generator[enpi_api.l2.types.collection.CollectionMetadata, None, None]: A generator through all collections in the platform. 87 88 Raises: 89 enpi_api.l2.types.api_error.ApiError: If API request fails. 90 91 Example: 92 93 ```python 94 with EnpiApiClient() as enpi_client: 95 for collection in enpi_client.collection_api.get_collections_metadata(): 96 print(collection) 97 ``` 98 """ 99 100 logger.info("Getting a generator through all collections") 101 102 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 103 104 # Fetch the first page, there is always a first page, it may be empty 105 try: 106 get_collections_response = collection_api_instance.get_collections(name=name) 107 except openapi_client.ApiException as e: 108 raise ApiError(e) 109 110 # `collections` and `cursor` get overwritten in the loop below when fetching a new page 111 collections = get_collections_response.collections 112 cursor = get_collections_response.cursor 113 114 while True: 115 for collection in collections: 116 yield CollectionMetadata.from_raw(collection) 117 118 # Check if we need to fetch a next page 119 if cursor is None: 120 logger.trace("No more pages of collections") 121 return # No more pages 122 123 # We have a cursor, so we need to get a next page 124 logger.trace("Fetching next page of collections") 125 try: 126 get_collections_response = collection_api_instance.get_collections( 127 cursor=cursor, 128 name=name if name is not None else None, 129 ) 130 except openapi_client.ApiException as e: 131 raise ApiError(e) 132 collections = get_collections_response.collections 133 cursor = get_collections_response.cursor
Get a generator through all available collections in the platform.
Arguments:
- name (str | None): Optional collection name for search by case-insensitive substring matching
Returns:
Generator[enpi_api.l2.types.collection.CollectionMetadata, None, None]: A generator through all collections in the platform.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: for collection in enpi_client.collection_api.get_collections_metadata(): print(collection)
135 def get_collection_metadata_by_id(self, collection_id: CollectionId) -> CollectionMetadata: 136 """Get a single collection by its ID. 137 138 Args: 139 collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to get. 140 141 Returns: 142 enpi_api.l2.types.collection.CollectionMetadata: The collection, with all its metadata. This object does not contain 143 the collection's clones or sequences, only the metadata. For collection's clone and sequence data refer 144 to enpi_api.l2.client.api.collection_api.CollectionApi.get_as_zip and enpi_api.l2.client.api.collection_api.CollectionApi.get_as_df. 145 146 Raises: 147 enpi_api.l2.types.api_error.ApiError: If API request fails. 148 149 Example: 150 151 ```python 152 with EnpiApiClient() as enpi_client: 153 collection: Collection = enpi_client.collection_api.get_collection_metadata_by_id(collection_id=CollectionId(1234)) 154 ``` 155 """ 156 157 logger.info(f"Getting collection with ID `{collection_id}`") 158 159 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 160 161 try: 162 get_collection_response = collection_api_instance.get_collection(collection_id) 163 except openapi_client.ApiException as e: 164 raise ApiError(e) 165 166 collection = CollectionMetadata.from_raw(get_collection_response.collection) 167 168 return collection
Get a single collection by its ID.
Arguments:
- collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to get.
Returns:
enpi_api.l2.types.collection.CollectionMetadata: The collection, with all its metadata. This object does not contain the collection's clones or sequences, only the metadata. For collection's clone and sequence data refer to enpi_api.l2.client.api.collection_api.CollectionApi.get_as_zip and enpi_api.l2.client.api.collection_api.CollectionApi.get_as_df.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: collection: Collection = enpi_client.collection_api.get_collection_metadata_by_id(collection_id=CollectionId(1234))
170 def delete_collection_by_id(self, collection_id: CollectionId) -> None: 171 """Delete a single collection by its ID. 172 173 This will remove the collection from the ENPICOM Platform. 174 175 Args: 176 collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to delete. 177 178 Raises: 179 enpi_api.l2.types.api_error.ApiError: If API request fails. 180 181 Example: 182 183 ```python 184 with EnpiApiClient() as enpi_client: 185 enpi_client.collection_api.delete_collection_by_id(collection_id=CollectionId(1234)) 186 ``` 187 """ 188 189 logger.info(f"Deleting collection with ID `{collection_id}`") 190 191 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 192 193 try: 194 collection_api_instance.delete_collection(id=collection_id) 195 except openapi_client.ApiException as e: 196 raise ApiError(e) 197 198 logger.info(f"Collection with ID `{collection_id}` successfully deleted")
Delete a single collection by its ID.
This will remove the collection from the ENPICOM Platform.
Arguments:
- collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to delete.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: enpi_client.collection_api.delete_collection_by_id(collection_id=CollectionId(1234))
200 def create_collection_from_csv( 201 self, 202 file_path: str | Path, 203 reference_database_revision: ReferenceDatabaseRevision | None = None, 204 skiprows: int = 0, 205 mapping: Mapping[str, TagKey] | Mapping[str, TagId] | None = None, 206 metadata: AdditionalImportMetadata | None = None, 207 organism: str | None = None, 208 ) -> Execution[CollectionMetadata]: 209 """Import a collection from a CSV file (can be gzipped). 210 211 The file should be a CSV file with a couple of required headers. These headers must 212 either be the tag IDs (for example: 2035, 2040) or tag keys (for example: Name, Organism). 213 The following tags are required: 214 215 - enpi_api.l2.tags.CollectionTags.Name 216 - enpi_api.l2.tags.CollectionTags.Organism 217 - enpi_api.l2.tags.SequenceTags.SequenceCount 218 - enpi_api.l2.tags.SequenceTags.CDR3Nucleotides or enpi_api.l2.tags.SequenceTags.CDR3AminoAcids 219 - enpi_api.l2.tags.SequenceTags.ReceptorNucleotides or enpi_api.l2.tags.SequenceTags.FullSequenceNucleotides 220 - enpi_api.l2.tags.SequenceTags.VCall 221 - enpi_api.l2.tags.SequenceTags.JCall 222 223 Args: 224 file_path (str | Path): The path to the CSV file to import. 225 reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. 226 If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one 227 reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references 228 available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. 229 There is no downsides to always specifying the reference manually, which is a safer and less error-prone option. 230 skiprows (int): Number of rows to skip at the beginning of the file, before reading the headers. Defaults to 0. 231 mapping (Mapping[str, enpi_api.l2.types.tag.TagKey] | Mapping[str, enpi_api.l2.types.tag.TagId] | None): Mapping of the 232 CSV headers to ENPICOM Platform tag keys 233 metadata (enpi_api.l2.types.collection.AdditionalImportMetadata | None): Additional metadata to add to the collection. 234 <u>**If the metadata keys overlap with the keys in the CSV (or with the values of the mapping), the metadata will take 235 precedence when creating tags.**</u> 236 organism: (str | None): If passed, it's compared with the organism value found in the first line of the imported file and 237 throws an error if the values are different. Can serve as a quick utility check. 238 239 Returns: 240 enpi_api.l2.types.collection.CollectionMetadata: Metadata of the collection that was imported. 241 242 Raises: 243 KeyError: If 'Organism' column is not found in the imported df/csv. 244 ValueError: If optional `organism` param value differs from the 'Organism' value from the df/csv. 245 enpi_api.l2.types.api_error.ApiError: If API request fails. 246 247 Example: 248 249 ```python 250 with EnpiApiClient() as enpi_client: 251 reference_name = ... 252 species = ... 253 reference = enpi_client.reference_database_api.get_revision_by_name( 254 name=reference_name, 255 species=reference_species, 256 ) 257 258 collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_csv( 259 file_path=import_file_path, 260 reference_database_revision=reference, 261 skiprows=1, 262 mapping={ 263 "title": CollectionTags.Name, 264 "species": CollectionTags.Organism, 265 }, 266 metadata={ 267 CollectionTags.ProjectId: "Project 001", 268 } 269 ).wait() 270 ``` 271 """ 272 273 logger.info(f"Importing collection from CSV file `{file_path}`") 274 275 # Pandas supports gzipped CSV 276 df = pd.read_csv(file_path, sep=",", skiprows=skiprows) 277 278 # Get the organism from the first line. All lines should hold the same value 279 organism_from_file = str(df.iloc[0].get("Organism", None)) 280 if organism_from_file is None: 281 # If not found by tag key, try to access it via the tag ID 282 organism_from_file = str(df.iloc[0].get(CollectionTags.Organism, None)) 283 284 # If it's still none, raise an error - it's a mandatory column anyways 285 if organism_from_file is None: 286 raise KeyError("A required 'Organism' column was not found in the imported file/df") 287 288 # If `organism` param was passed, compare the values 289 if (organism is not None) and (organism != organism_from_file): 290 raise ValueError( 291 f"Value of 'organism' param: {organism} differs from the organism found in file: {organism_from_file}", 292 ) 293 294 # Map the headers in the CSV file to Tag Keys 295 if mapping is not None: 296 # We drop the columns for which no mapping is specified 297 unmapped_headers = set(df.columns).difference(set(mapping.keys())) 298 logger.warning(f"The following headers are unmapped and are removed:\n{unmapped_headers}") 299 df.drop(columns=list(unmapped_headers), inplace=True) 300 df.rename(columns=mapping, inplace=True) 301 if metadata is not None: 302 for key, value in metadata.items(): 303 df[key] = value 304 305 temporary_csv_file_path = f"/tmp/import_collection_csv.{uuid4()}.csv" 306 df.to_csv(temporary_csv_file_path, index=False) 307 verify_headers_uniformity(list(df.columns)) 308 309 # Upload the file to the platform 310 file_api = FileApi(self._inner_api_client, self._log_level) 311 file = file_api.upload_file(temporary_csv_file_path).wait() 312 313 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 314 315 # Start the collection import, this starts a task, so we'll wait for that to be completed 316 import_collection_request = openapi_client.ImportCollectionRequest( 317 file_id=file.id, 318 organism=organism_from_file, 319 reference_database_id=str(reference_database_revision.reference_database_id) if reference_database_revision is not None else None, 320 reference_database_version=int(reference_database_revision.reference_database_version) if reference_database_revision is not None else None, 321 ) 322 323 with ApiErrorContext(): 324 import_collection_response = collection_api_instance.import_collection(import_collection_request) 325 assert import_collection_response.workflow_execution_id is not None 326 327 workflow_execution_id = WorkflowExecutionId(import_collection_response.workflow_execution_id) 328 329 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> CollectionMetadata: 330 assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead" 331 332 get_collection_id_response = collection_api_instance.get_collection_id_by_workflow_execution_task_id(task_id) 333 assert get_collection_id_response.collection_id is not None 334 335 collection_id = CollectionId(get_collection_id_response.collection_id) 336 337 logger.success(f"Collection with ID `{collection_id}` was successfully imported") 338 # Remove the file from tmp folder 339 os.remove(temporary_csv_file_path) 340 # Remove the file from the platform 341 file_api.delete_file_by_id(file.id) 342 343 return self.get_collection_metadata_by_id(collection_id) 344 345 waitable = WorkflowExecutionTaskWaitable[CollectionMetadata]( 346 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_IMPORT, on_complete=on_complete 347 ) 348 349 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)
Import a collection from a CSV file (can be gzipped).
The file should be a CSV file with a couple of required headers. These headers must either be the tag IDs (for example: 2035, 2040) or tag keys (for example: Name, Organism).
The following tags are required:
- enpi_api.l2.tags.CollectionTags.Name
- enpi_api.l2.tags.CollectionTags.Organism
- enpi_api.l2.tags.SequenceTags.SequenceCount
- enpi_api.l2.tags.SequenceTags.CDR3Nucleotides or enpi_api.l2.tags.SequenceTags.CDR3AminoAcids
- enpi_api.l2.tags.SequenceTags.ReceptorNucleotides or enpi_api.l2.tags.SequenceTags.FullSequenceNucleotides
- enpi_api.l2.tags.SequenceTags.VCall
- enpi_api.l2.tags.SequenceTags.JCall
Arguments:
- file_path (str | Path): The path to the CSV file to import.
- reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
- skiprows (int): Number of rows to skip at the beginning of the file, before reading the headers. Defaults to 0.
- mapping (Mapping[str, enpi_api.l2.types.tag.TagKey] | Mapping[str, enpi_api.l2.types.tag.TagId] | None): Mapping of the CSV headers to ENPICOM Platform tag keys
- metadata (enpi_api.l2.types.collection.AdditionalImportMetadata | None): Additional metadata to add to the collection. If the metadata keys overlap with the keys in the CSV (or with the values of the mapping), the metadata will take precedence when creating tags.
- organism: (str | None): If passed, it's compared with the organism value found in the first line of the imported file and throws an error if the values are different. Can serve as a quick utility check.
Returns:
enpi_api.l2.types.collection.CollectionMetadata: Metadata of the collection that was imported.
Raises:
- KeyError: If 'Organism' column is not found in the imported df/csv.
- ValueError: If optional
organismparam value differs from the 'Organism' value from the df/csv. - enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: reference_name = ... species = ... reference = enpi_client.reference_database_api.get_revision_by_name( name=reference_name, species=reference_species, ) collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_csv( file_path=import_file_path, reference_database_revision=reference, skiprows=1, mapping={ "title": CollectionTags.Name, "species": CollectionTags.Organism, }, metadata={ CollectionTags.ProjectId: "Project 001", } ).wait()
351 def create_collection_from_df( 352 self, 353 data_frame: pd.DataFrame, 354 reference_database_revision: ReferenceDatabaseRevision | None = None, 355 ) -> Execution[CollectionMetadata]: 356 """Import a collection from a DataFrame. 357 358 This is a convenience method to import a collection from a Pandas DataFrame. For more information about the 359 collection import, see enpi_api.l2.client.api.collection_api.CollectionApi.create_collection_from_csv. 360 361 Args: 362 data_frame (pd.DataFrame): The DataFrame containing the collection to import. 363 reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. 364 If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one 365 reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references 366 available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. 367 There is no downsides to always specifying the reference manually, which is a safer and less error-prone option. 368 Raises: 369 enpi_api.l2.types.api_error.ApiError: If API request fails. 370 371 Returns: 372 enpi_api.l2.types.execution.Execution[enpi_api.l2.types.collection.CollectionMetadata]: An awaitable that returns the 373 collection that was imported when awaited. 374 375 Example: 376 377 ```python 378 reference_name = ... 379 species = ... 380 reference = enpi_client.reference_database_api.get_revision_by_name( 381 name=reference_name, 382 species=reference_species, 383 ) 384 385 with EnpiApiClient() as enpi_client: 386 df = pd.read_csv('/home/data.csv') 387 collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_df( 388 data_frame=df, 389 reference_database_revision=reference, 390 ).wait() 391 ``` 392 """ 393 394 # We need to turn the DataFrame into a CSV file 395 with tempfile.NamedTemporaryFile(suffix=".csv") as temp_file: 396 data_frame.to_csv(temp_file.name, index=False) 397 398 create_collection_execution = self.create_collection_from_csv( 399 file_path=temp_file.name, 400 reference_database_revision=reference_database_revision, 401 ) 402 403 def wait() -> CollectionMetadata: 404 return create_collection_execution.wait() 405 406 return Execution(wait=wait, check_execution_state=create_collection_execution.check_execution_state)
Import a collection from a DataFrame.
This is a convenience method to import a collection from a Pandas DataFrame. For more information about the collection import, see enpi_api.l2.client.api.collection_api.CollectionApi.create_collection_from_csv.
Arguments:
- data_frame (pd.DataFrame): The DataFrame containing the collection to import.
- reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Returns:
enpi_api.l2.types.execution.Execution[enpi_api.l2.types.collection.CollectionMetadata]: An awaitable that returns the collection that was imported when awaited.
Example:
reference_name = ... species = ... reference = enpi_client.reference_database_api.get_revision_by_name( name=reference_name, species=reference_species, ) with EnpiApiClient() as enpi_client: df = pd.read_csv('/home/data.csv') collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_df( data_frame=df, reference_database_revision=reference, ).wait()
408 def add_metadata(self, filter: Filter, annotation: import_metadata.Annotation) -> Execution[None]: 409 """Import metadata to annotate collections, clones or sequences in batches using a filter. 410 411 This method allows you to simply annotate collections, clones or sequences using a filter. The annotation values 412 that you provide will be applied to all matching items of the specified level. 413 414 If you would like to add different values based on different matched tags, have a look at the methods that 415 support a templated filter, such as `add_metadata_from_file` or `add_metadata_from_df`. 416 417 Args: 418 filter (enpi_api.l2.types.filter.Filter): The filter to narrow down which collections, clones or sequences you wish to annotate. 419 Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters. 420 annotation (enpi_api.l2.types.import_metadata.Annotation): The annotation to apply to the matched collections, clones or sequences. You 421 specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation, 422 enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation 423 are the preferred way of creating annotation configuration. 424 425 Returns: 426 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 427 428 Raises: 429 enpi_api.l2.types.api_error.ApiError: If API request fails. 430 431 Example: 432 433 Batch tag multiple collections with some tags: 434 435 ```python 436 with EnpiApiClient() as enpi_client: 437 collection_ids = [CollectionId(1), CollectionId(2), CollectionId(3)] 438 439 # Create a filter 440 filter = enpi_client.filter_api.create_filter( 441 name="My filter", 442 condition=dict( 443 type="match_ids", 444 target="collection", 445 ids=collection_ids, 446 ), 447 ) 448 449 # Create an annotation 450 annotation = collection_annotation(tags=[ 451 Tag(id=CollectionTags.CampaignId, value="My campaign"), 452 Tag(id=CollectionTags.ProjectId, value="My project"), 453 ]) 454 455 # Add the metadata 456 enpi_client.collection_api.add_metadata(filter=filter, annotation=annotation).wait() 457 ``` 458 """ 459 460 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 461 462 import_metadata_request = openapi_client.ImportMetadataRequest( 463 openapi_client.SearchAndTag( 464 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 465 annotation=annotation.to_api_payload(), 466 ) 467 ) 468 469 with ApiErrorContext(): 470 import_metadata_response = collection_api_instance.import_metadata(import_metadata_request) 471 assert import_metadata_response.workflow_execution_id is not None 472 473 workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id) 474 475 waitable = WorkflowExecutionTaskWaitable[CollectionMetadata]( 476 workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT 477 ) 478 479 return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state)
Import metadata to annotate collections, clones or sequences in batches using a filter.
This method allows you to simply annotate collections, clones or sequences using a filter. The annotation values that you provide will be applied to all matching items of the specified level.
If you would like to add different values based on different matched tags, have a look at the methods that
support a templated filter, such as add_metadata_from_file or add_metadata_from_df.
Arguments:
- filter (enpi_api.l2.types.filter.Filter): The filter to narrow down which collections, clones or sequences you wish to annotate. Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters.
- annotation (enpi_api.l2.types.import_metadata.Annotation): The annotation to apply to the matched collections, clones or sequences. You specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation, enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation are the preferred way of creating annotation configuration.
Returns:
enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
Batch tag multiple collections with some tags:
with EnpiApiClient() as enpi_client: collection_ids = [CollectionId(1), CollectionId(2), CollectionId(3)] # Create a filter filter = enpi_client.filter_api.create_filter( name="My filter", condition=dict( type="match_ids", target="collection", ids=collection_ids, ), ) # Create an annotation annotation = collection_annotation(tags=[ Tag(id=CollectionTags.CampaignId, value="My campaign"), Tag(id=CollectionTags.ProjectId, value="My project"), ]) # Add the metadata enpi_client.collection_api.add_metadata(filter=filter, annotation=annotation).wait()
481 def add_metadata_from_file( 482 self, 483 filter: TemplatedFilter, 484 annotation: import_metadata_templated.Annotation, 485 file_path: str | Path, 486 ) -> Execution[None]: 487 """Import metadata from a CSV or XLSX file to annotate collections, clones or sequences. 488 489 Args: 490 filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate. 491 Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters. 492 annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You 493 specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation, 494 enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation 495 are the preferred way of creating annotation configuration. 496 file_path (str | Path): The path to the CSV or XLSX file to import. 497 498 Returns: 499 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 500 501 Raises: 502 enpi_api.l2.types.api_error.ApiError: If API request fails. 503 504 Example: 505 506 Assume you have a CSV file which describes 2 sequence tags to match on, and a tag to add to the matched sequences. 507 508 Let's call the match columns *match_chain* and *match_productive*, and the column to add *value_to_add*. 509 We'll add the value to a custom imaginary tag that was created before this example. 510 511 The CSV file would look like this: 512 513 | match_chain | match_productive | value_to_add | 514 |-------------|------------------|--------------| 515 | Heavy | true | Heavy and productive | 516 | Heavy | false | Heavy and unproductive | 517 | Kappa | true | Kappa and productive | 518 | Kappa | false | Kappa and unproductive | 519 | Lambda | true | Lambda and productive | 520 | Lambda | false | Lambda and unproductive | 521 522 We also want to narrow down the collections we want to annotate to a single collection with the imaginary ID *1337*. 523 524 ```python 525 my_collection_id: CollectionId = CollectionId(1337) 526 527 tag_id_chain: TagId = TagId(SequenceTags.Chain) 528 tag_id_productive: TagId = TagId(SequenceTags.Productive) 529 tag_id_value_to_add: TagId = TagId(52001) # This is a custom tag 530 531 with EnpiApiClient() as enpi_client: 532 filter = enpi_client.filter_api.get_templated_filter_by_id(FilterId('92be003d-6f5c-447a-baac-c9d420783fc8')) 533 enpi_client.collection_api.add_metadata_from_file( 534 filter=filter, 535 annotation=sequence_annotation([ 536 template_tag(tag_id=tag_id_value_to_add, key="value_to_add"), 537 ]), 538 file_path="path/to/metadata.csv", 539 ).wait() 540 ``` 541 """ 542 543 # We need to upload the file to the platform 544 file_api = FileApi(self._inner_api_client, self._log_level) 545 file_execution = file_api.upload_file(file_path) 546 547 file = file_execution.wait() 548 549 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 550 551 # Start the metadata import, this starts a task, so we'll wait for that to be completed 552 import_metadata_request = openapi_client.ImportMetadataRequest( 553 openapi_client.TemplatedSearchAndTag( 554 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 555 annotation=annotation.to_api_payload(), 556 template_file_id=file.id, 557 ) 558 ) 559 560 with ApiErrorContext(): 561 # The metadata import has not started yet because we first need to wait for the file upload 562 import_metadata_response = collection_api_instance.import_metadata(import_metadata_request) 563 assert import_metadata_response.workflow_execution_id is not None 564 565 workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id) 566 567 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> None: 568 assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead" 569 570 nonlocal file 571 file_api.delete_file_by_id(file.id) 572 573 waitable = WorkflowExecutionTaskWaitable[None]( 574 on_complete=on_complete, workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT_TEMPLATED 575 ) 576 577 return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state)
Import metadata from a CSV or XLSX file to annotate collections, clones or sequences.
Arguments:
- filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate. Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters.
- annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation, enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation are the preferred way of creating annotation configuration.
- file_path (str | Path): The path to the CSV or XLSX file to import.
Returns:
enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
Assume you have a CSV file which describes 2 sequence tags to match on, and a tag to add to the matched sequences.
Let's call the match columns match_chain and match_productive, and the column to add value_to_add. We'll add the value to a custom imaginary tag that was created before this example.
The CSV file would look like this:
match_chain match_productive value_to_add Heavy true Heavy and productive Heavy false Heavy and unproductive Kappa true Kappa and productive Kappa false Kappa and unproductive Lambda true Lambda and productive Lambda false Lambda and unproductive We also want to narrow down the collections we want to annotate to a single collection with the imaginary ID 1337.
my_collection_id: CollectionId = CollectionId(1337) tag_id_chain: TagId = TagId(SequenceTags.Chain) tag_id_productive: TagId = TagId(SequenceTags.Productive) tag_id_value_to_add: TagId = TagId(52001) # This is a custom tag with EnpiApiClient() as enpi_client: filter = enpi_client.filter_api.get_templated_filter_by_id(FilterId('92be003d-6f5c-447a-baac-c9d420783fc8')) enpi_client.collection_api.add_metadata_from_file( filter=filter, annotation=sequence_annotation([ template_tag(tag_id=tag_id_value_to_add, key="value_to_add"), ]), file_path="path/to/metadata.csv", ).wait()
579 def add_metadata_from_df( 580 self, 581 filter: TemplatedFilter, 582 annotation: import_metadata_templated.Annotation, 583 data_frame: pd.DataFrame, 584 ) -> Execution[None]: 585 """Import metadata from a DataFrame to annotate collections, clones or sequences. 586 587 This is a convenience method to import metadata from a Pandas DataFrame. For more information about the 588 metadata import, see the documentation for `import_metadata_from_csv`. 589 590 Args: 591 filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate. 592 annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You 593 specify a specific annotation target and the values to apply. 594 data_frame (pd.DataFrame): The DataFrame containing the templated metadata to import. 595 596 Returns: 597 enpi_api.l2.types.execution.Execution[None]: An awaitable execution. 598 599 Raises: 600 enpi_api.l2.types.api_error.ApiError: If API request fails. 601 602 Example: 603 604 Part of the `add_calculated_metadata.py` example script. 605 606 ```python 607 # Specify the filter query to match the sequences we want to add metadata to 608 metadata_filter = client.filter_api.create_templated_filter( 609 name="Metadata import filter", 610 shared=False, 611 condition=TemplatedAndOperator( 612 conditions=[ 613 TemplatedMatchTag(tag_id=CollectionTags.Name), 614 TemplatedMatchId(target=MatchIdTarget.SEQUENCE), 615 ] 616 ), 617 ) 618 619 # Specify the sequence-level annotation to add to the collection 620 metadata_annotation: Annotation = sequence_annotation([template_tag(new_tag_archetype.id)]) 621 622 # Create metadata dataframe 623 metadata_frame = pd.DataFrame( 624 [ 625 [ 626 collection_name, # Match 627 df_row[1]["Unique Sequence ID"], # Match 628 grouped_df.loc[df_row[1]["Unique Clone ID"]]["Sequence Count"], # Add 629 ] 630 for df_row in exported_df.iterrows() 631 ], 632 columns=["Name", "Unique Sequence ID", new_tag_archetype.key], 633 ) 634 635 # Apply metadata to the collection 636 client.collection_api.add_metadata_from_df( 637 filter=metadata_filter, 638 annotation=metadata_annotation, 639 data_frame=metadata_frame, 640 ).wait() 641 ``` 642 """ 643 644 # We need to turn the DataFrame into a CSV file 645 temporary_csv_file_path = f"/tmp/import_metadata.{uuid4()}.csv" 646 data_frame.to_csv(temporary_csv_file_path, index=False) 647 648 return self.add_metadata_from_file(filter, annotation, temporary_csv_file_path)
Import metadata from a DataFrame to annotate collections, clones or sequences.
This is a convenience method to import metadata from a Pandas DataFrame. For more information about the
metadata import, see the documentation for import_metadata_from_csv.
Arguments:
- filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate.
- annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You specify a specific annotation target and the values to apply.
- data_frame (pd.DataFrame): The DataFrame containing the templated metadata to import.
Returns:
enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
Part of the
add_calculated_metadata.pyexample script.# Specify the filter query to match the sequences we want to add metadata to metadata_filter = client.filter_api.create_templated_filter( name="Metadata import filter", shared=False, condition=TemplatedAndOperator( conditions=[ TemplatedMatchTag(tag_id=CollectionTags.Name), TemplatedMatchId(target=MatchIdTarget.SEQUENCE), ] ), ) # Specify the sequence-level annotation to add to the collection metadata_annotation: Annotation = sequence_annotation([template_tag(new_tag_archetype.id)]) # Create metadata dataframe metadata_frame = pd.DataFrame( [ [ collection_name, # Match df_row[1]["Unique Sequence ID"], # Match grouped_df.loc[df_row[1]["Unique Clone ID"]]["Sequence Count"], # Add ] for df_row in exported_df.iterrows() ], columns=["Name", "Unique Sequence ID", new_tag_archetype.key], ) # Apply metadata to the collection client.collection_api.add_metadata_from_df( filter=metadata_filter, annotation=metadata_annotation, data_frame=metadata_frame, ).wait()
650 def get_as_zip( 651 self, 652 collection_ids: list[CollectionId], 653 filter: Filter | None = None, 654 tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS, 655 output_directory: str | Path | None = None, 656 ) -> Execution[Path]: 657 """Export collection(s) into a zip file. Inside of the archive, each collection is exported to a separate TSV file. 658 659 Args: 660 collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export. 661 filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export. 662 If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used. 663 tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export. 664 output_directory (str | Path | None): The directory path under which file will get exported. If 665 not provided, a temporary directory will be used. 666 667 Returns: 668 enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the full path to the exported file when 669 awaited. 670 671 Raises: 672 enpi_api.l2.types.api_error.ApiError: If API request fails. 673 674 Example: 675 676 ```python 677 with EnpiApiClient() as enpi_client: 678 679 collection_id = CollectionId(1234) 680 681 # Example assumes you have a filter 682 collection_filter: Filter = ... 683 684 path: str = enpi_client.collection_api.get_as_tsv( 685 collection_ids=[collection_id], 686 filter=collection_filter, 687 tag_ids=[ 688 CollectionTags.Name, 689 CollectionTags.Organism, 690 CollectionTags.Complexity, 691 CollectionTags.Receptor, 692 SequenceTags.Chain, 693 SequenceTags.Productive, 694 ], 695 output_directory="example/export_result/" 696 ) 697 ``` 698 """ 699 700 # Create the collectiom filter if it wasn't provided, it will match and 701 # get all the clones from target collections 702 if filter is None: 703 filter_api = FilterApi(self._inner_api_client, self._log_level) 704 filter = filter_api.create_filter( 705 name=f"all-collection-clones-filter-{uuid4()}", # Unique name to avoid collision 706 condition=MatchIds( 707 target=MatchIdTarget.COLLECTION, 708 ids=collection_ids, # Match all collection IDs passed to this function 709 ), 710 ) 711 712 # Start the collection export, this starts a task, so we'll wait for that to be completed 713 export_collection_request = openapi_client.ExportRequest( 714 payload=openapi_client.ExportPayload( 715 collection_ids=[int(collection_id) for collection_id in collection_ids], 716 filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version), 717 tag_ids=[int(tag_id) for tag_id in tag_ids], 718 ) 719 ) 720 collection_api_instance = openapi_client.CollectionApi(self._inner_api_client) 721 722 with ApiErrorContext(): 723 export_collection_response = collection_api_instance.export(export_collection_request) 724 assert export_collection_response.workflow_execution_id is not None 725 726 workflow_execution_id = WorkflowExecutionId(export_collection_response.workflow_execution_id) 727 728 def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> Path: 729 file_api = FileApi(self._inner_api_client, self._log_level) 730 file_path = file_api.download_export_by_workflow_execution_task_id(task_id=task_id, output_directory=output_directory) 731 732 logger.success("Collection(s) export has succeeded.") 733 return file_path 734 735 waitable = WorkflowExecutionTaskWaitable[Path]( 736 workflow_execution_id=workflow_execution_id, 737 on_complete=on_complete, 738 task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_EXPORT, 739 ) 740 741 return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)
Export collection(s) into a zip file. Inside of the archive, each collection is exported to a separate TSV file.
Arguments:
- collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
- filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export.
If it's
None, a new filter that matches all thecollection_idsprovided above will be created and used. - tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
- output_directory (str | Path | None): The directory path under which file will get exported. If not provided, a temporary directory will be used.
Returns:
enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the full path to the exported file when awaited.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: collection_id = CollectionId(1234) # Example assumes you have a filter collection_filter: Filter = ... path: str = enpi_client.collection_api.get_as_tsv( collection_ids=[collection_id], filter=collection_filter, tag_ids=[ CollectionTags.Name, CollectionTags.Organism, CollectionTags.Complexity, CollectionTags.Receptor, SequenceTags.Chain, SequenceTags.Productive, ], output_directory="example/export_result/" )
743 def get_as_df( 744 self, 745 collection_ids: list[CollectionId], 746 filter: Filter | None = None, 747 tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS, 748 ) -> Execution[pd.DataFrame]: 749 """Export collection(s) to a Pandas DataFrame. 750 751 Args: 752 collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export. 753 filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export. 754 If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used. 755 tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export. 756 757 Returns: 758 Execution[pd.DataFrame]: An awaitable that will return a DataFrame with the exported collection. 759 760 Raises: 761 enpi_api.l2.types.api_error.ApiError: If API request fails. 762 763 Example: 764 765 ```python 766 with EnpiApiClient() as enpi_client: 767 # Example assumes you have a filter 768 filter: Filter = ... 769 770 df: pd.DataFrame = enpi_client.collection_api.get_as_df( 771 collection_ids=[CollectionId(1)], 772 filter=filter, 773 tag_ids=[CollectionTags.Name, CloneTags.TenXBarcode, SequenceTags.CDR3AminoAcids], 774 ) 775 ``` 776 """ 777 tmp_dir = tempfile.TemporaryDirectory() 778 get_as_zip_execution = self.get_as_zip(collection_ids=collection_ids, filter=filter, tag_ids=tag_ids, output_directory=tmp_dir.name) 779 780 def wait() -> pd.DataFrame: 781 zip_path = get_as_zip_execution.wait() 782 783 # Extract all TSV files from the ZIP archive 784 with ZipFile(zip_path, "r") as zip_ref: 785 zip_ref.extractall(tmp_dir.name) 786 787 # Read all TSV files into a single DataFrame 788 all_dfs = [] 789 for root, _, files in os.walk(tmp_dir.name): 790 for file in files: 791 if file.endswith(".tsv"): 792 file_path = os.path.join(root, file) 793 df = pd.read_csv(file_path, delimiter="\t") 794 all_dfs.append(df) 795 796 return pd.concat(all_dfs) 797 798 return Execution(wait=wait, check_execution_state=get_as_zip_execution.check_execution_state)
Export collection(s) to a Pandas DataFrame.
Arguments:
- collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
- filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export.
If it's
None, a new filter that matches all thecollection_idsprovided above will be created and used. - tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
Returns:
Execution[pd.DataFrame]: An awaitable that will return a DataFrame with the exported collection.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: # Example assumes you have a filter filter: Filter = ... df: pd.DataFrame = enpi_client.collection_api.get_as_df( collection_ids=[CollectionId(1)], filter=filter, tag_ids=[CollectionTags.Name, CloneTags.TenXBarcode, SequenceTags.CDR3AminoAcids], )