ENPICOM Logo API Docs Python SDK Docs Events

enpi_api.l2.client.api.collection_api

  1import os
  2import tempfile
  3from pathlib import Path
  4from typing import Generator, Mapping
  5from uuid import UUID, uuid4
  6from zipfile import ZipFile
  7
  8import pandas as pd
  9from loguru import logger
 10
 11from enpi_api.l1 import openapi_client
 12from enpi_api.l2.client.api.file_api import FileApi
 13from enpi_api.l2.client.api.filter_api import FilterApi
 14from enpi_api.l2.events.workflow_execution_task_waitable import WorkflowExecutionTaskWaitable
 15from enpi_api.l2.tags import CloneTags, CollectionTags, SequenceTags
 16from enpi_api.l2.types import import_metadata, import_metadata_templated
 17from enpi_api.l2.types.api_error import ApiError, ApiErrorContext
 18from enpi_api.l2.types.collection import AdditionalImportMetadata, CollectionId, CollectionMetadata
 19from enpi_api.l2.types.execution import Execution
 20from enpi_api.l2.types.filter import Filter, MatchIds, MatchIdTarget, TemplatedFilter
 21from enpi_api.l2.types.log import LogLevel
 22from enpi_api.l2.types.reference_database import ReferenceDatabaseRevision
 23from enpi_api.l2.types.tag import TagId, TagKey
 24from enpi_api.l2.types.task import TaskState
 25from enpi_api.l2.types.workflow import WorkflowExecutionId, WorkflowExecutionTaskId, WorkflowTaskTemplateName
 26from enpi_api.l2.util.file import verify_headers_uniformity
 27
 28DEFAULT_EXPORT_TAG_IDS = [
 29    # Collection tags
 30    CollectionTags.Name,
 31    CollectionTags.Organism,
 32    CollectionTags.Complexity,
 33    CollectionTags.Receptor,
 34    CollectionTags.NumberOfClones,
 35    CollectionTags.Reference,
 36    # Clone tags
 37    CloneTags.TenXBarcode,
 38    CloneTags.CloneCount,
 39    # Sequence tags
 40    SequenceTags.Chain,
 41    SequenceTags.SequenceCount,
 42    SequenceTags.Cdr3AminoAcids,
 43    SequenceTags.VGene,
 44    SequenceTags.JGene,
 45]
 46"""The default tags that are included when exporting a collection to a DataFrame or a CSV file.
 47
 48These are:
 49
 50- Collection level tags:
 51    - `enpi_api.l2.tags.CollectionTags.Name`
 52    - `enpi_api.l2.tags.CollectionTags.Organism`
 53    - `enpi_api.l2.tags.CollectionTags.Complexity`
 54    - `enpi_api.l2.tags.CollectionTags.Receptor`
 55    - `enpi_api.l2.tags.CollectionTags.NumberOfClones`
 56    - `enpi_api.l2.tags.CollectionTags.Reference`
 57- Clone level tags:
 58    - `enpi_api.l2.tags.CloneTags.TenXBarcode`
 59    - `enpi_api.l2.tags.CloneTags.CloneCount`
 60- Sequence level tags:
 61    - `enpi_api.l2.tags.SequenceTags.Chain`
 62    - `enpi_api.l2.tags.SequenceTags.SequenceCount`
 63    - `enpi_api.l2.tags.SequenceTags.Cdr3AminoAcids`
 64    - `enpi_api.l2.tags.SequenceTags.VGene`
 65    - `enpi_api.l2.tags.SequenceTags.JGene`
 66"""
 67
 68
 69class CollectionApi:
 70    _inner_api_client: openapi_client.ApiClient
 71    _log_level: LogLevel
 72
 73    def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel):
 74        """@private"""
 75        self._inner_api_client = inner_api_client
 76        self._log_level = log_level
 77
 78    def get_collections_metadata(self, name: str | None = None) -> Generator[CollectionMetadata, None, None]:
 79        """Get a generator through all available collections in the platform.
 80
 81        Args:
 82            name (str | None): Optional collection name for search by case-insensitive substring matching
 83
 84        Returns:
 85            Generator[enpi_api.l2.types.collection.CollectionMetadata, None, None]: A generator through all collections in the platform.
 86
 87        Raises:
 88            enpi_api.l2.types.api_error.ApiError: If API request fails.
 89
 90        Example:
 91
 92            ```python
 93            with EnpiApiClient() as enpi_client:
 94                for collection in enpi_client.collection_api.get_collections_metadata():
 95                    print(collection)
 96            ```
 97        """
 98
 99        logger.info("Getting a generator through all collections")
100
101        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
102
103        # Fetch the first page, there is always a first page, it may be empty
104        try:
105            get_collections_response = collection_api_instance.get_collections(name=name)
106        except openapi_client.ApiException as e:
107            raise ApiError(e)
108
109        # `collections` and `cursor` get overwritten in the loop below when fetching a new page
110        collections = get_collections_response.collections
111        cursor = get_collections_response.cursor
112
113        while True:
114            for collection in collections:
115                yield CollectionMetadata.from_raw(collection)
116
117            # Check if we need to fetch a next page
118            if cursor is None:
119                logger.trace("No more pages of collections")
120                return  # No more pages
121
122            # We have a cursor, so we need to get a next page
123            logger.trace("Fetching next page of collections")
124            try:
125                get_collections_response = collection_api_instance.get_collections(
126                    cursor=cursor,
127                    name=name if name is not None else None,
128                )
129            except openapi_client.ApiException as e:
130                raise ApiError(e)
131            collections = get_collections_response.collections
132            cursor = get_collections_response.cursor
133
134    def get_collection_metadata_by_id(self, collection_id: CollectionId) -> CollectionMetadata:
135        """Get a single collection by its ID.
136
137        Args:
138            collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to get.
139
140        Returns:
141            enpi_api.l2.types.collection.CollectionMetadata: The collection, with all its metadata. This object does not contain
142              the collection's clones or sequences, only the metadata. For collection's clone and sequence data refer
143              to enpi_api.l2.client.api.collection_api.CollectionApi.get_as_zip and enpi_api.l2.client.api.collection_api.CollectionApi.get_as_df.
144
145        Raises:
146            enpi_api.l2.types.api_error.ApiError: If API request fails.
147
148        Example:
149
150            ```python
151            with EnpiApiClient() as enpi_client:
152                collection: Collection = enpi_client.collection_api.get_collection_metadata_by_id(collection_id=CollectionId(1234))
153            ```
154        """
155
156        logger.info(f"Getting collection with ID `{collection_id}`")
157
158        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
159
160        try:
161            get_collection_response = collection_api_instance.get_collection(collection_id)
162        except openapi_client.ApiException as e:
163            raise ApiError(e)
164
165        collection = CollectionMetadata.from_raw(get_collection_response.collection)
166
167        return collection
168
169    def delete_collection_by_id(self, collection_id: CollectionId) -> None:
170        """Delete a single collection by its ID.
171
172        This will remove the collection from the ENPICOM Platform.
173
174        Args:
175            collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to delete.
176
177        Raises:
178            enpi_api.l2.types.api_error.ApiError: If API request fails.
179
180        Example:
181
182            ```python
183            with EnpiApiClient() as enpi_client:
184                enpi_client.collection_api.delete_collection_by_id(collection_id=CollectionId(1234))
185            ```
186        """
187
188        logger.info(f"Deleting collection with ID `{collection_id}`")
189
190        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
191
192        try:
193            collection_api_instance.delete_collection(id=collection_id, body={})
194        except openapi_client.ApiException as e:
195            raise ApiError(e)
196
197        logger.info(f"Collection with ID `{collection_id}` successfully deleted")
198
199    def create_collection_from_csv(
200        self,
201        file_path: str | Path,
202        reference_database_revision: ReferenceDatabaseRevision | None = None,
203        skiprows: int = 0,
204        mapping: Mapping[str, TagKey] | Mapping[str, TagId] | None = None,
205        metadata: AdditionalImportMetadata | None = None,
206        organism: str | None = None,
207    ) -> Execution[CollectionMetadata]:
208        """Import a collection from a CSV file (can be gzipped).
209
210        The file should be a CSV file with a couple of required headers. These headers must
211        either be the tag IDs (for example: 2035, 2040) or tag keys (for example: Name, Organism).
212        The following tags are required:
213
214            - enpi_api.l2.tags.CollectionTags.Name
215            - enpi_api.l2.tags.CollectionTags.Organism
216            - enpi_api.l2.tags.SequenceTags.SequenceCount
217            - enpi_api.l2.tags.SequenceTags.CDR3Nucleotides or enpi_api.l2.tags.SequenceTags.CDR3AminoAcids
218            - enpi_api.l2.tags.SequenceTags.ReceptorNucleotides or enpi_api.l2.tags.SequenceTags.FullSequenceNucleotides
219            - enpi_api.l2.tags.SequenceTags.VCall
220            - enpi_api.l2.tags.SequenceTags.JCall
221
222        Args:
223            file_path (str | Path): The path to the CSV file to import.
224            reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use.
225                If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one
226                reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references
227                available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter.
228                There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
229            skiprows (int): Number of rows to skip at the beginning of the file, before reading the headers. Defaults to 0.
230            mapping (Mapping[str, enpi_api.l2.types.tag.TagKey] | Mapping[str, enpi_api.l2.types.tag.TagId] | None): Mapping of the
231              CSV headers to ENPICOM Platform tag keys
232            metadata (enpi_api.l2.types.collection.AdditionalImportMetadata | None): Additional metadata to add to the collection.
233                <u>**If the metadata keys overlap with the keys in the CSV (or with the values of the mapping), the metadata will take
234                precedence when creating tags.**</u>
235            organism: (str | None): If passed, it's compared with the organism value found in the first line of the imported file and
236                throws an error if the values are different. Can serve as a quick utility check.
237
238        Returns:
239            enpi_api.l2.types.collection.CollectionMetadata: Metadata of the collection that was imported.
240
241        Raises:
242            KeyError: If 'Organism' column is not found in the imported df/csv.
243            ValueError: If optional `organism` param value differs from the 'Organism' value from the df/csv.
244            enpi_api.l2.types.api_error.ApiError: If API request fails.
245
246        Example:
247
248            ```python
249            with EnpiApiClient() as enpi_client:
250                reference_name = ...
251                species = ...
252                reference = enpi_client.reference_database_api.get_revision_by_name(
253                    name=reference_name,
254                    species=reference_species,
255                )
256
257                collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_csv(
258                    file_path=import_file_path,
259                    reference_database_revision=reference,
260                    skiprows=1,
261                    mapping={
262                        "title": CollectionTags.Name,
263                        "species": CollectionTags.Organism,
264                    },
265                    metadata={
266                        CollectionTags.ProjectId: "Project 001",
267                    }
268                ).wait()
269                ```
270        """
271
272        logger.info(f"Importing collection from CSV file `{file_path}`")
273
274        # Pandas supports gzipped CSV
275        df = pd.read_csv(file_path, sep=",", skiprows=skiprows)
276
277        # Get the organism from the first line. All lines should hold the same value
278        organism_from_file = str(df.iloc[0].get("Organism", None))
279        if organism_from_file is None:
280            # If not found by tag key, try to access it via the tag ID
281            organism_from_file = str(df.iloc[0].get(CollectionTags.Organism, None))
282
283        # If it's still none, raise an error - it's a mandatory column anyways
284        if organism_from_file is None:
285            raise KeyError("A required 'Organism' column was not found in the imported file/df")
286
287        # If `organism` param was passed, compare the values
288        if (organism is not None) and (organism != organism_from_file):
289            raise ValueError(
290                f"Value of 'organism' param: {organism} differs from the organism found in file: {organism_from_file}",
291            )
292
293        # Map the headers in the CSV file to Tag Keys
294        if mapping is not None:
295            # We drop the columns for which no mapping is specified
296            unmapped_headers = set(df.columns).difference(set(mapping.keys()))
297            logger.warning(f"The following headers are unmapped and are removed:\n{unmapped_headers}")
298            df.drop(columns=list(unmapped_headers), inplace=True)
299            df.rename(columns=mapping, inplace=True)
300        if metadata is not None:
301            for key, value in metadata.items():
302                df[key] = value
303
304        temporary_csv_file_path = f"/tmp/import_collection_csv.{uuid4()}.csv"
305        df.to_csv(temporary_csv_file_path, index=False)
306        verify_headers_uniformity(list(df.columns))
307
308        # Upload the file to the platform
309        file_api = FileApi(self._inner_api_client, self._log_level)
310        file = file_api.upload_file(temporary_csv_file_path).wait()
311
312        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
313
314        # Start the collection import, this starts a task, so we'll wait for that to be completed
315        import_collection_request = openapi_client.ImportCollectionRequest(
316            file_id=UUID(file.id),
317            organism=organism_from_file,
318            reference_database_id=str(reference_database_revision.reference_database_id) if reference_database_revision is not None else None,
319            reference_database_version=int(reference_database_revision.reference_database_version) if reference_database_revision is not None else None,
320        )
321
322        with ApiErrorContext():
323            import_collection_response = collection_api_instance.import_collection(import_collection_request)
324            assert import_collection_response.workflow_execution_id is not None
325
326            workflow_execution_id = WorkflowExecutionId(import_collection_response.workflow_execution_id)
327
328            def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> CollectionMetadata:
329                assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead"
330
331                get_collection_id_response = collection_api_instance.get_collection_id_by_workflow_execution_task_id(task_id)
332                assert get_collection_id_response.collection_id is not None
333
334                collection_id = CollectionId(get_collection_id_response.collection_id)
335
336                logger.success(f"Collection with ID `{collection_id}` was successfully imported")
337                # Remove the file from tmp folder
338                os.remove(temporary_csv_file_path)
339                # Remove the file from the platform
340                file_api.delete_file_by_id(file.id)
341
342                return self.get_collection_metadata_by_id(collection_id)
343
344            waitable = WorkflowExecutionTaskWaitable[CollectionMetadata](
345                workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_IMPORT, on_complete=on_complete
346            )
347
348            return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)
349
350    def create_collection_from_df(
351        self,
352        data_frame: pd.DataFrame,
353        reference_database_revision: ReferenceDatabaseRevision | None = None,
354    ) -> Execution[CollectionMetadata]:
355        """Import a collection from a DataFrame.
356
357        This is a convenience method to import a collection from a Pandas DataFrame. For more information about the
358        collection import, see enpi_api.l2.client.api.collection_api.CollectionApi.create_collection_from_csv.
359
360        Args:
361            data_frame (pd.DataFrame): The DataFrame containing the collection to import.
362            reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use.
363                If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one
364                reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references
365                available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter.
366                There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
367        Raises:
368            enpi_api.l2.types.api_error.ApiError: If API request fails.
369
370        Returns:
371            enpi_api.l2.types.execution.Execution[enpi_api.l2.types.collection.CollectionMetadata]: An awaitable that returns the
372              collection that was imported when awaited.
373
374        Example:
375
376            ```python
377            reference_name = ...
378            species = ...
379            reference = enpi_client.reference_database_api.get_revision_by_name(
380                name=reference_name,
381                species=reference_species,
382            )
383
384            with EnpiApiClient() as enpi_client:
385                df = pd.read_csv('/home/data.csv')
386                collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_df(
387                    data_frame=df,
388                    reference_database_revision=reference,
389                ).wait()
390            ```
391        """
392
393        # We need to turn the DataFrame into a CSV file
394        with tempfile.NamedTemporaryFile(suffix=".csv") as temp_file:
395            data_frame.to_csv(temp_file.name, index=False)
396
397            create_collection_execution = self.create_collection_from_csv(
398                file_path=temp_file.name,
399                reference_database_revision=reference_database_revision,
400            )
401
402        def wait() -> CollectionMetadata:
403            return create_collection_execution.wait()
404
405        return Execution(wait=wait, check_execution_state=create_collection_execution.check_execution_state)
406
407    def add_metadata(self, filter: Filter, annotation: import_metadata.Annotation) -> Execution[None]:
408        """Import metadata to annotate collections, clones or sequences in batches using a filter.
409
410        This method allows you to simply annotate collections, clones or sequences using a filter. The annotation values
411        that you provide will be applied to all matching items of the specified level.
412
413        If you would like to add different values based on different matched tags, have a look at the methods that
414        support a templated filter, such as `add_metadata_from_file` or `add_metadata_from_df`.
415
416        Args:
417            filter (enpi_api.l2.types.filter.Filter): The filter to narrow down which collections, clones or sequences you wish to annotate.
418              Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters.
419            annotation (enpi_api.l2.types.import_metadata.Annotation): The annotation to apply to the matched collections, clones or sequences. You
420              specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation,
421              enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation
422              are the preferred way of creating annotation configuration.
423
424        Returns:
425            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
426
427        Raises:
428            enpi_api.l2.types.api_error.ApiError: If API request fails.
429
430        Example:
431
432            Batch tag multiple collections with some tags:
433
434            ```python
435            with EnpiApiClient() as enpi_client:
436                collection_ids = [CollectionId(1), CollectionId(2), CollectionId(3)]
437
438                # Create a filter
439                filter = enpi_client.filter_api.create_filter(
440                    name="My filter",
441                    condition=dict(
442                        type="match_ids",
443                        target="collection",
444                        ids=collection_ids,
445                    ),
446                )
447
448                # Create an annotation
449                annotation = collection_annotation(tags=[
450                    Tag(id=CollectionTags.CampaignId, value="My campaign"),
451                    Tag(id=CollectionTags.ProjectId, value="My project"),
452                ])
453
454                # Add the metadata
455                enpi_client.collection_api.add_metadata(filter=filter, annotation=annotation).wait()
456            ```
457        """
458
459        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
460
461        import_metadata_request = openapi_client.ImportMetadataRequest(
462            openapi_client.SearchAndTag(
463                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
464                annotation=annotation.to_api_payload(),
465            )
466        )
467
468        with ApiErrorContext():
469            import_metadata_response = collection_api_instance.import_metadata(import_metadata_request)
470            assert import_metadata_response.workflow_execution_id is not None
471
472            workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id)
473
474            waitable = WorkflowExecutionTaskWaitable[CollectionMetadata](
475                workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT
476            )
477
478            return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state)
479
480    def add_metadata_from_file(
481        self,
482        filter: TemplatedFilter,
483        annotation: import_metadata_templated.Annotation,
484        file_path: str | Path,
485        ignore_empty_values: bool = True,
486    ) -> Execution[None]:
487        """Import metadata from a CSV or XLSX file to annotate collections, clones or sequences.
488
489        Args:
490            filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate.
491              Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters.
492            annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You
493              specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation,
494              enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation
495              are the preferred way of creating annotation configuration.
496            file_path (str | Path): The path to the CSV or XLSX file to import.
497
498        Returns:
499            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
500
501        Raises:
502            enpi_api.l2.types.api_error.ApiError: If API request fails.
503
504        Example:
505
506            Assume you have a CSV file which describes 2 sequence tags to match on, and a tag to add to the matched sequences.
507
508            Let's call the match columns *match_chain* and *match_productive*, and the column to add *value_to_add*.
509            We'll add the value to a custom imaginary tag that was created before this example.
510
511            The CSV file would look like this:
512
513            | match_chain | match_productive | value_to_add |
514            |-------------|------------------|--------------|
515            | Heavy       | true             | Heavy and productive |
516            | Heavy       | false            | Heavy and unproductive |
517            | Kappa       | true             | Kappa and productive |
518            | Kappa       | false            | Kappa and unproductive |
519            | Lambda      | true             | Lambda and productive |
520            | Lambda      | false            | Lambda and unproductive |
521
522            We also want to narrow down the collections we want to annotate to a single collection with the imaginary ID *1337*.
523
524            ```python
525            my_collection_id: CollectionId = CollectionId(1337)
526
527            tag_id_chain: TagId = TagId(SequenceTags.Chain)
528            tag_id_productive: TagId = TagId(SequenceTags.Cdr3Productive)
529            tag_id_value_to_add: TagId = TagId(52001)  # This is a custom tag
530
531            with EnpiApiClient() as enpi_client:
532                filter = enpi_client.filter_api.get_templated_filter_by_id(FilterId('92be003d-6f5c-447a-baac-c9d420783fc8'))
533                enpi_client.collection_api.add_metadata_from_file(
534                    filter=filter,
535                    annotation=sequence_annotation([
536                        template_tag(tag_id=tag_id_value_to_add, key="value_to_add"),
537                    ]),
538                    file_path="path/to/metadata.csv",
539                ).wait()
540            ```
541        """
542
543        # We need to upload the file to the platform
544        file_api = FileApi(self._inner_api_client, self._log_level)
545        file_execution = file_api.upload_file(file_path)
546
547        file = file_execution.wait()
548
549        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
550
551        # Start the metadata import, this starts a task, so we'll wait for that to be completed
552        import_metadata_request = openapi_client.ImportMetadataRequest(
553            openapi_client.TemplatedSearchAndTag(
554                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
555                annotation=annotation.to_api_payload(),
556                template_file_id=file.id,
557                ignore_empty_values=ignore_empty_values,
558            )
559        )
560
561        with ApiErrorContext():
562            # The metadata import has not started yet because we first need to wait for the file upload
563            import_metadata_response = collection_api_instance.import_metadata(import_metadata_request)
564            assert import_metadata_response.workflow_execution_id is not None
565
566        workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id)
567
568        def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> None:
569            assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead"
570
571            nonlocal file
572            file_api.delete_file_by_id(file.id)
573
574        waitable = WorkflowExecutionTaskWaitable[None](
575            on_complete=on_complete, workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT_TEMPLATED
576        )
577
578        return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state)
579
580    def add_metadata_from_df(
581        self,
582        filter: TemplatedFilter,
583        annotation: import_metadata_templated.Annotation,
584        data_frame: pd.DataFrame,
585    ) -> Execution[None]:
586        """Import metadata from a DataFrame to annotate collections, clones or sequences.
587
588        This is a convenience method to import metadata from a Pandas DataFrame. For more information about the
589        metadata import, see the documentation for `import_metadata_from_csv`.
590
591        Args:
592            filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate.
593            annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You
594              specify a specific annotation target and the values to apply.
595            data_frame (pd.DataFrame): The DataFrame containing the templated metadata to import.
596
597        Returns:
598            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
599
600        Raises:
601            enpi_api.l2.types.api_error.ApiError: If API request fails.
602
603        Example:
604
605            Part of the `add_calculated_metadata.py` example script.
606
607            ```python
608            # Specify the filter query to match the sequences we want to add metadata to
609            metadata_filter = client.filter_api.create_templated_filter(
610                name="Metadata import filter",
611                shared=False,
612                condition=TemplatedAndOperator(
613                    conditions=[
614                        TemplatedMatchTag(tag_id=CollectionTags.Name),
615                        TemplatedMatchId(target=MatchIdTarget.SEQUENCE),
616                    ]
617                ),
618            )
619
620            # Specify the sequence-level annotation to add to the collection
621            metadata_annotation: Annotation = sequence_annotation([template_tag(new_tag_archetype.id)])
622
623            # Create metadata dataframe
624            metadata_frame = pd.DataFrame(
625                [
626                    [
627                        collection_name,  # Match
628                        df_row[1]["Unique Sequence ID"],  # Match
629                        grouped_df.loc[df_row[1]["Unique Clone ID"]]["Sequence Count"],  # Add
630                    ]
631                    for df_row in exported_df.iterrows()
632                ],
633                columns=["Name", "Unique Sequence ID", new_tag_archetype.key],
634            )
635
636            # Apply metadata to the collection
637            client.collection_api.add_metadata_from_df(
638                filter=metadata_filter,
639                annotation=metadata_annotation,
640                data_frame=metadata_frame,
641            ).wait()
642            ```
643        """
644
645        # We need to turn the DataFrame into a CSV file
646        temporary_csv_file_path = f"/tmp/import_metadata.{uuid4()}.csv"
647        data_frame.to_csv(temporary_csv_file_path, index=False)
648
649        return self.add_metadata_from_file(filter, annotation, temporary_csv_file_path)
650
651    def get_as_zip(
652        self,
653        collection_ids: list[CollectionId],
654        filter: Filter | None = None,
655        tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS,
656        output_directory: str | Path | None = None,
657    ) -> Execution[Path]:
658        """Export collection(s) into a zip file. Inside of the archive, each collection is exported to a separate TSV file.
659
660        Args:
661            collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
662            filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export.
663                If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used.
664            tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
665            output_directory (str | Path | None): The directory path under which file will get exported. If
666              not provided, a temporary directory will be used.
667
668        Returns:
669            enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the full path to the exported file when
670              awaited.
671
672        Raises:
673            enpi_api.l2.types.api_error.ApiError: If API request fails.
674
675        Example:
676
677            ```python
678            with EnpiApiClient() as enpi_client:
679
680                collection_id = CollectionId(1234)
681
682                # Example assumes you have a filter
683                collection_filter: Filter = ...
684
685                path: str = enpi_client.collection_api.get_as_tsv(
686                    collection_ids=[collection_id],
687                    filter=collection_filter,
688                    tag_ids=[
689                        CollectionTags.Name,
690                        CollectionTags.Organism,
691                        CollectionTags.Complexity,
692                        CollectionTags.Receptor,
693                        SequenceTags.Chain,
694                        SequenceTags.Cdr3Productive,
695                    ],
696                    output_directory="example/export_result/"
697                )
698            ```
699        """
700
701        # Create the collectiom filter if it wasn't provided, it will match and
702        # get all the clones from target collections
703        if filter is None:
704            filter_api = FilterApi(self._inner_api_client, self._log_level)
705            filter = filter_api.create_filter(
706                name=f"all-collection-clones-filter-{uuid4()}",  # Unique name to avoid collision
707                condition=MatchIds(
708                    target=MatchIdTarget.COLLECTION,
709                    ids=collection_ids,  # Match all collection IDs passed to this function
710                ),
711            )
712
713        # Start the collection export, this starts a task, so we'll wait for that to be completed
714        export_collection_request = openapi_client.ExportRequest(
715            payload=openapi_client.ExportPayload(
716                collection_ids=[int(collection_id) for collection_id in collection_ids],
717                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
718                tag_ids=[int(tag_id) for tag_id in tag_ids],
719            )
720        )
721        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
722
723        with ApiErrorContext():
724            export_collection_response = collection_api_instance.export(export_collection_request)
725            assert export_collection_response.workflow_execution_id is not None
726
727            workflow_execution_id = WorkflowExecutionId(export_collection_response.workflow_execution_id)
728
729            def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> Path:
730                file_api = FileApi(self._inner_api_client, self._log_level)
731                file_path = file_api.download_export_by_workflow_execution_task_id(task_id=task_id, output_directory=output_directory)
732
733                logger.success("Collection(s) export has succeeded.")
734                return file_path
735
736            waitable = WorkflowExecutionTaskWaitable[Path](
737                workflow_execution_id=workflow_execution_id,
738                on_complete=on_complete,
739                task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_EXPORT,
740            )
741
742            return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)
743
744    def get_as_df(
745        self,
746        collection_ids: list[CollectionId],
747        filter: Filter | None = None,
748        tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS,
749    ) -> Execution[pd.DataFrame]:
750        """Export collection(s) to a Pandas DataFrame.
751
752        Args:
753            collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
754            filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export.
755                If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used.
756            tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
757
758        Returns:
759            Execution[pd.DataFrame]: An awaitable that will return a DataFrame with the exported collection.
760
761        Raises:
762            enpi_api.l2.types.api_error.ApiError: If API request fails.
763
764        Example:
765
766            ```python
767            with EnpiApiClient() as enpi_client:
768                # Example assumes you have a filter
769                filter: Filter = ...
770
771                df: pd.DataFrame = enpi_client.collection_api.get_as_df(
772                    collection_ids=[CollectionId(1)],
773                    filter=filter,
774                    tag_ids=[CollectionTags.Name, CloneTags.TenXBarcode, SequenceTags.CDR3AminoAcids],
775                )
776            ```
777        """
778        tmp_dir = tempfile.TemporaryDirectory()
779        get_as_zip_execution = self.get_as_zip(collection_ids=collection_ids, filter=filter, tag_ids=tag_ids, output_directory=tmp_dir.name)
780
781        def wait() -> pd.DataFrame:
782            zip_path = get_as_zip_execution.wait()
783
784            # Extract all TSV files from the ZIP archive
785            with ZipFile(zip_path, "r") as zip_ref:
786                zip_ref.extractall(tmp_dir.name)
787
788            # Read all TSV files into a single DataFrame
789            all_dfs = []
790            for root, _, files in os.walk(tmp_dir.name):
791                for file in files:
792                    if file.endswith(".tsv"):
793                        file_path = os.path.join(root, file)
794                        df = pd.read_csv(file_path, delimiter="\t")
795                        all_dfs.append(df)
796
797            return pd.concat(all_dfs)
798
799        return Execution(wait=wait, check_execution_state=get_as_zip_execution.check_execution_state)
DEFAULT_EXPORT_TAG_IDS = [2035, 2040, 2083, 2084, 2036, 2060, 1001, 1077, 38, 144, 24, 154, 110]
class CollectionApi:
 70class CollectionApi:
 71    _inner_api_client: openapi_client.ApiClient
 72    _log_level: LogLevel
 73
 74    def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel):
 75        """@private"""
 76        self._inner_api_client = inner_api_client
 77        self._log_level = log_level
 78
 79    def get_collections_metadata(self, name: str | None = None) -> Generator[CollectionMetadata, None, None]:
 80        """Get a generator through all available collections in the platform.
 81
 82        Args:
 83            name (str | None): Optional collection name for search by case-insensitive substring matching
 84
 85        Returns:
 86            Generator[enpi_api.l2.types.collection.CollectionMetadata, None, None]: A generator through all collections in the platform.
 87
 88        Raises:
 89            enpi_api.l2.types.api_error.ApiError: If API request fails.
 90
 91        Example:
 92
 93            ```python
 94            with EnpiApiClient() as enpi_client:
 95                for collection in enpi_client.collection_api.get_collections_metadata():
 96                    print(collection)
 97            ```
 98        """
 99
100        logger.info("Getting a generator through all collections")
101
102        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
103
104        # Fetch the first page, there is always a first page, it may be empty
105        try:
106            get_collections_response = collection_api_instance.get_collections(name=name)
107        except openapi_client.ApiException as e:
108            raise ApiError(e)
109
110        # `collections` and `cursor` get overwritten in the loop below when fetching a new page
111        collections = get_collections_response.collections
112        cursor = get_collections_response.cursor
113
114        while True:
115            for collection in collections:
116                yield CollectionMetadata.from_raw(collection)
117
118            # Check if we need to fetch a next page
119            if cursor is None:
120                logger.trace("No more pages of collections")
121                return  # No more pages
122
123            # We have a cursor, so we need to get a next page
124            logger.trace("Fetching next page of collections")
125            try:
126                get_collections_response = collection_api_instance.get_collections(
127                    cursor=cursor,
128                    name=name if name is not None else None,
129                )
130            except openapi_client.ApiException as e:
131                raise ApiError(e)
132            collections = get_collections_response.collections
133            cursor = get_collections_response.cursor
134
135    def get_collection_metadata_by_id(self, collection_id: CollectionId) -> CollectionMetadata:
136        """Get a single collection by its ID.
137
138        Args:
139            collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to get.
140
141        Returns:
142            enpi_api.l2.types.collection.CollectionMetadata: The collection, with all its metadata. This object does not contain
143              the collection's clones or sequences, only the metadata. For collection's clone and sequence data refer
144              to enpi_api.l2.client.api.collection_api.CollectionApi.get_as_zip and enpi_api.l2.client.api.collection_api.CollectionApi.get_as_df.
145
146        Raises:
147            enpi_api.l2.types.api_error.ApiError: If API request fails.
148
149        Example:
150
151            ```python
152            with EnpiApiClient() as enpi_client:
153                collection: Collection = enpi_client.collection_api.get_collection_metadata_by_id(collection_id=CollectionId(1234))
154            ```
155        """
156
157        logger.info(f"Getting collection with ID `{collection_id}`")
158
159        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
160
161        try:
162            get_collection_response = collection_api_instance.get_collection(collection_id)
163        except openapi_client.ApiException as e:
164            raise ApiError(e)
165
166        collection = CollectionMetadata.from_raw(get_collection_response.collection)
167
168        return collection
169
170    def delete_collection_by_id(self, collection_id: CollectionId) -> None:
171        """Delete a single collection by its ID.
172
173        This will remove the collection from the ENPICOM Platform.
174
175        Args:
176            collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to delete.
177
178        Raises:
179            enpi_api.l2.types.api_error.ApiError: If API request fails.
180
181        Example:
182
183            ```python
184            with EnpiApiClient() as enpi_client:
185                enpi_client.collection_api.delete_collection_by_id(collection_id=CollectionId(1234))
186            ```
187        """
188
189        logger.info(f"Deleting collection with ID `{collection_id}`")
190
191        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
192
193        try:
194            collection_api_instance.delete_collection(id=collection_id, body={})
195        except openapi_client.ApiException as e:
196            raise ApiError(e)
197
198        logger.info(f"Collection with ID `{collection_id}` successfully deleted")
199
200    def create_collection_from_csv(
201        self,
202        file_path: str | Path,
203        reference_database_revision: ReferenceDatabaseRevision | None = None,
204        skiprows: int = 0,
205        mapping: Mapping[str, TagKey] | Mapping[str, TagId] | None = None,
206        metadata: AdditionalImportMetadata | None = None,
207        organism: str | None = None,
208    ) -> Execution[CollectionMetadata]:
209        """Import a collection from a CSV file (can be gzipped).
210
211        The file should be a CSV file with a couple of required headers. These headers must
212        either be the tag IDs (for example: 2035, 2040) or tag keys (for example: Name, Organism).
213        The following tags are required:
214
215            - enpi_api.l2.tags.CollectionTags.Name
216            - enpi_api.l2.tags.CollectionTags.Organism
217            - enpi_api.l2.tags.SequenceTags.SequenceCount
218            - enpi_api.l2.tags.SequenceTags.CDR3Nucleotides or enpi_api.l2.tags.SequenceTags.CDR3AminoAcids
219            - enpi_api.l2.tags.SequenceTags.ReceptorNucleotides or enpi_api.l2.tags.SequenceTags.FullSequenceNucleotides
220            - enpi_api.l2.tags.SequenceTags.VCall
221            - enpi_api.l2.tags.SequenceTags.JCall
222
223        Args:
224            file_path (str | Path): The path to the CSV file to import.
225            reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use.
226                If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one
227                reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references
228                available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter.
229                There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
230            skiprows (int): Number of rows to skip at the beginning of the file, before reading the headers. Defaults to 0.
231            mapping (Mapping[str, enpi_api.l2.types.tag.TagKey] | Mapping[str, enpi_api.l2.types.tag.TagId] | None): Mapping of the
232              CSV headers to ENPICOM Platform tag keys
233            metadata (enpi_api.l2.types.collection.AdditionalImportMetadata | None): Additional metadata to add to the collection.
234                <u>**If the metadata keys overlap with the keys in the CSV (or with the values of the mapping), the metadata will take
235                precedence when creating tags.**</u>
236            organism: (str | None): If passed, it's compared with the organism value found in the first line of the imported file and
237                throws an error if the values are different. Can serve as a quick utility check.
238
239        Returns:
240            enpi_api.l2.types.collection.CollectionMetadata: Metadata of the collection that was imported.
241
242        Raises:
243            KeyError: If 'Organism' column is not found in the imported df/csv.
244            ValueError: If optional `organism` param value differs from the 'Organism' value from the df/csv.
245            enpi_api.l2.types.api_error.ApiError: If API request fails.
246
247        Example:
248
249            ```python
250            with EnpiApiClient() as enpi_client:
251                reference_name = ...
252                species = ...
253                reference = enpi_client.reference_database_api.get_revision_by_name(
254                    name=reference_name,
255                    species=reference_species,
256                )
257
258                collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_csv(
259                    file_path=import_file_path,
260                    reference_database_revision=reference,
261                    skiprows=1,
262                    mapping={
263                        "title": CollectionTags.Name,
264                        "species": CollectionTags.Organism,
265                    },
266                    metadata={
267                        CollectionTags.ProjectId: "Project 001",
268                    }
269                ).wait()
270                ```
271        """
272
273        logger.info(f"Importing collection from CSV file `{file_path}`")
274
275        # Pandas supports gzipped CSV
276        df = pd.read_csv(file_path, sep=",", skiprows=skiprows)
277
278        # Get the organism from the first line. All lines should hold the same value
279        organism_from_file = str(df.iloc[0].get("Organism", None))
280        if organism_from_file is None:
281            # If not found by tag key, try to access it via the tag ID
282            organism_from_file = str(df.iloc[0].get(CollectionTags.Organism, None))
283
284        # If it's still none, raise an error - it's a mandatory column anyways
285        if organism_from_file is None:
286            raise KeyError("A required 'Organism' column was not found in the imported file/df")
287
288        # If `organism` param was passed, compare the values
289        if (organism is not None) and (organism != organism_from_file):
290            raise ValueError(
291                f"Value of 'organism' param: {organism} differs from the organism found in file: {organism_from_file}",
292            )
293
294        # Map the headers in the CSV file to Tag Keys
295        if mapping is not None:
296            # We drop the columns for which no mapping is specified
297            unmapped_headers = set(df.columns).difference(set(mapping.keys()))
298            logger.warning(f"The following headers are unmapped and are removed:\n{unmapped_headers}")
299            df.drop(columns=list(unmapped_headers), inplace=True)
300            df.rename(columns=mapping, inplace=True)
301        if metadata is not None:
302            for key, value in metadata.items():
303                df[key] = value
304
305        temporary_csv_file_path = f"/tmp/import_collection_csv.{uuid4()}.csv"
306        df.to_csv(temporary_csv_file_path, index=False)
307        verify_headers_uniformity(list(df.columns))
308
309        # Upload the file to the platform
310        file_api = FileApi(self._inner_api_client, self._log_level)
311        file = file_api.upload_file(temporary_csv_file_path).wait()
312
313        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
314
315        # Start the collection import, this starts a task, so we'll wait for that to be completed
316        import_collection_request = openapi_client.ImportCollectionRequest(
317            file_id=UUID(file.id),
318            organism=organism_from_file,
319            reference_database_id=str(reference_database_revision.reference_database_id) if reference_database_revision is not None else None,
320            reference_database_version=int(reference_database_revision.reference_database_version) if reference_database_revision is not None else None,
321        )
322
323        with ApiErrorContext():
324            import_collection_response = collection_api_instance.import_collection(import_collection_request)
325            assert import_collection_response.workflow_execution_id is not None
326
327            workflow_execution_id = WorkflowExecutionId(import_collection_response.workflow_execution_id)
328
329            def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> CollectionMetadata:
330                assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead"
331
332                get_collection_id_response = collection_api_instance.get_collection_id_by_workflow_execution_task_id(task_id)
333                assert get_collection_id_response.collection_id is not None
334
335                collection_id = CollectionId(get_collection_id_response.collection_id)
336
337                logger.success(f"Collection with ID `{collection_id}` was successfully imported")
338                # Remove the file from tmp folder
339                os.remove(temporary_csv_file_path)
340                # Remove the file from the platform
341                file_api.delete_file_by_id(file.id)
342
343                return self.get_collection_metadata_by_id(collection_id)
344
345            waitable = WorkflowExecutionTaskWaitable[CollectionMetadata](
346                workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_IMPORT, on_complete=on_complete
347            )
348
349            return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)
350
351    def create_collection_from_df(
352        self,
353        data_frame: pd.DataFrame,
354        reference_database_revision: ReferenceDatabaseRevision | None = None,
355    ) -> Execution[CollectionMetadata]:
356        """Import a collection from a DataFrame.
357
358        This is a convenience method to import a collection from a Pandas DataFrame. For more information about the
359        collection import, see enpi_api.l2.client.api.collection_api.CollectionApi.create_collection_from_csv.
360
361        Args:
362            data_frame (pd.DataFrame): The DataFrame containing the collection to import.
363            reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use.
364                If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one
365                reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references
366                available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter.
367                There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
368        Raises:
369            enpi_api.l2.types.api_error.ApiError: If API request fails.
370
371        Returns:
372            enpi_api.l2.types.execution.Execution[enpi_api.l2.types.collection.CollectionMetadata]: An awaitable that returns the
373              collection that was imported when awaited.
374
375        Example:
376
377            ```python
378            reference_name = ...
379            species = ...
380            reference = enpi_client.reference_database_api.get_revision_by_name(
381                name=reference_name,
382                species=reference_species,
383            )
384
385            with EnpiApiClient() as enpi_client:
386                df = pd.read_csv('/home/data.csv')
387                collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_df(
388                    data_frame=df,
389                    reference_database_revision=reference,
390                ).wait()
391            ```
392        """
393
394        # We need to turn the DataFrame into a CSV file
395        with tempfile.NamedTemporaryFile(suffix=".csv") as temp_file:
396            data_frame.to_csv(temp_file.name, index=False)
397
398            create_collection_execution = self.create_collection_from_csv(
399                file_path=temp_file.name,
400                reference_database_revision=reference_database_revision,
401            )
402
403        def wait() -> CollectionMetadata:
404            return create_collection_execution.wait()
405
406        return Execution(wait=wait, check_execution_state=create_collection_execution.check_execution_state)
407
408    def add_metadata(self, filter: Filter, annotation: import_metadata.Annotation) -> Execution[None]:
409        """Import metadata to annotate collections, clones or sequences in batches using a filter.
410
411        This method allows you to simply annotate collections, clones or sequences using a filter. The annotation values
412        that you provide will be applied to all matching items of the specified level.
413
414        If you would like to add different values based on different matched tags, have a look at the methods that
415        support a templated filter, such as `add_metadata_from_file` or `add_metadata_from_df`.
416
417        Args:
418            filter (enpi_api.l2.types.filter.Filter): The filter to narrow down which collections, clones or sequences you wish to annotate.
419              Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters.
420            annotation (enpi_api.l2.types.import_metadata.Annotation): The annotation to apply to the matched collections, clones or sequences. You
421              specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation,
422              enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation
423              are the preferred way of creating annotation configuration.
424
425        Returns:
426            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
427
428        Raises:
429            enpi_api.l2.types.api_error.ApiError: If API request fails.
430
431        Example:
432
433            Batch tag multiple collections with some tags:
434
435            ```python
436            with EnpiApiClient() as enpi_client:
437                collection_ids = [CollectionId(1), CollectionId(2), CollectionId(3)]
438
439                # Create a filter
440                filter = enpi_client.filter_api.create_filter(
441                    name="My filter",
442                    condition=dict(
443                        type="match_ids",
444                        target="collection",
445                        ids=collection_ids,
446                    ),
447                )
448
449                # Create an annotation
450                annotation = collection_annotation(tags=[
451                    Tag(id=CollectionTags.CampaignId, value="My campaign"),
452                    Tag(id=CollectionTags.ProjectId, value="My project"),
453                ])
454
455                # Add the metadata
456                enpi_client.collection_api.add_metadata(filter=filter, annotation=annotation).wait()
457            ```
458        """
459
460        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
461
462        import_metadata_request = openapi_client.ImportMetadataRequest(
463            openapi_client.SearchAndTag(
464                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
465                annotation=annotation.to_api_payload(),
466            )
467        )
468
469        with ApiErrorContext():
470            import_metadata_response = collection_api_instance.import_metadata(import_metadata_request)
471            assert import_metadata_response.workflow_execution_id is not None
472
473            workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id)
474
475            waitable = WorkflowExecutionTaskWaitable[CollectionMetadata](
476                workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT
477            )
478
479            return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state)
480
481    def add_metadata_from_file(
482        self,
483        filter: TemplatedFilter,
484        annotation: import_metadata_templated.Annotation,
485        file_path: str | Path,
486        ignore_empty_values: bool = True,
487    ) -> Execution[None]:
488        """Import metadata from a CSV or XLSX file to annotate collections, clones or sequences.
489
490        Args:
491            filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate.
492              Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters.
493            annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You
494              specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation,
495              enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation
496              are the preferred way of creating annotation configuration.
497            file_path (str | Path): The path to the CSV or XLSX file to import.
498
499        Returns:
500            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
501
502        Raises:
503            enpi_api.l2.types.api_error.ApiError: If API request fails.
504
505        Example:
506
507            Assume you have a CSV file which describes 2 sequence tags to match on, and a tag to add to the matched sequences.
508
509            Let's call the match columns *match_chain* and *match_productive*, and the column to add *value_to_add*.
510            We'll add the value to a custom imaginary tag that was created before this example.
511
512            The CSV file would look like this:
513
514            | match_chain | match_productive | value_to_add |
515            |-------------|------------------|--------------|
516            | Heavy       | true             | Heavy and productive |
517            | Heavy       | false            | Heavy and unproductive |
518            | Kappa       | true             | Kappa and productive |
519            | Kappa       | false            | Kappa and unproductive |
520            | Lambda      | true             | Lambda and productive |
521            | Lambda      | false            | Lambda and unproductive |
522
523            We also want to narrow down the collections we want to annotate to a single collection with the imaginary ID *1337*.
524
525            ```python
526            my_collection_id: CollectionId = CollectionId(1337)
527
528            tag_id_chain: TagId = TagId(SequenceTags.Chain)
529            tag_id_productive: TagId = TagId(SequenceTags.Cdr3Productive)
530            tag_id_value_to_add: TagId = TagId(52001)  # This is a custom tag
531
532            with EnpiApiClient() as enpi_client:
533                filter = enpi_client.filter_api.get_templated_filter_by_id(FilterId('92be003d-6f5c-447a-baac-c9d420783fc8'))
534                enpi_client.collection_api.add_metadata_from_file(
535                    filter=filter,
536                    annotation=sequence_annotation([
537                        template_tag(tag_id=tag_id_value_to_add, key="value_to_add"),
538                    ]),
539                    file_path="path/to/metadata.csv",
540                ).wait()
541            ```
542        """
543
544        # We need to upload the file to the platform
545        file_api = FileApi(self._inner_api_client, self._log_level)
546        file_execution = file_api.upload_file(file_path)
547
548        file = file_execution.wait()
549
550        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
551
552        # Start the metadata import, this starts a task, so we'll wait for that to be completed
553        import_metadata_request = openapi_client.ImportMetadataRequest(
554            openapi_client.TemplatedSearchAndTag(
555                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
556                annotation=annotation.to_api_payload(),
557                template_file_id=file.id,
558                ignore_empty_values=ignore_empty_values,
559            )
560        )
561
562        with ApiErrorContext():
563            # The metadata import has not started yet because we first need to wait for the file upload
564            import_metadata_response = collection_api_instance.import_metadata(import_metadata_request)
565            assert import_metadata_response.workflow_execution_id is not None
566
567        workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id)
568
569        def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> None:
570            assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead"
571
572            nonlocal file
573            file_api.delete_file_by_id(file.id)
574
575        waitable = WorkflowExecutionTaskWaitable[None](
576            on_complete=on_complete, workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT_TEMPLATED
577        )
578
579        return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state)
580
581    def add_metadata_from_df(
582        self,
583        filter: TemplatedFilter,
584        annotation: import_metadata_templated.Annotation,
585        data_frame: pd.DataFrame,
586    ) -> Execution[None]:
587        """Import metadata from a DataFrame to annotate collections, clones or sequences.
588
589        This is a convenience method to import metadata from a Pandas DataFrame. For more information about the
590        metadata import, see the documentation for `import_metadata_from_csv`.
591
592        Args:
593            filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate.
594            annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You
595              specify a specific annotation target and the values to apply.
596            data_frame (pd.DataFrame): The DataFrame containing the templated metadata to import.
597
598        Returns:
599            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
600
601        Raises:
602            enpi_api.l2.types.api_error.ApiError: If API request fails.
603
604        Example:
605
606            Part of the `add_calculated_metadata.py` example script.
607
608            ```python
609            # Specify the filter query to match the sequences we want to add metadata to
610            metadata_filter = client.filter_api.create_templated_filter(
611                name="Metadata import filter",
612                shared=False,
613                condition=TemplatedAndOperator(
614                    conditions=[
615                        TemplatedMatchTag(tag_id=CollectionTags.Name),
616                        TemplatedMatchId(target=MatchIdTarget.SEQUENCE),
617                    ]
618                ),
619            )
620
621            # Specify the sequence-level annotation to add to the collection
622            metadata_annotation: Annotation = sequence_annotation([template_tag(new_tag_archetype.id)])
623
624            # Create metadata dataframe
625            metadata_frame = pd.DataFrame(
626                [
627                    [
628                        collection_name,  # Match
629                        df_row[1]["Unique Sequence ID"],  # Match
630                        grouped_df.loc[df_row[1]["Unique Clone ID"]]["Sequence Count"],  # Add
631                    ]
632                    for df_row in exported_df.iterrows()
633                ],
634                columns=["Name", "Unique Sequence ID", new_tag_archetype.key],
635            )
636
637            # Apply metadata to the collection
638            client.collection_api.add_metadata_from_df(
639                filter=metadata_filter,
640                annotation=metadata_annotation,
641                data_frame=metadata_frame,
642            ).wait()
643            ```
644        """
645
646        # We need to turn the DataFrame into a CSV file
647        temporary_csv_file_path = f"/tmp/import_metadata.{uuid4()}.csv"
648        data_frame.to_csv(temporary_csv_file_path, index=False)
649
650        return self.add_metadata_from_file(filter, annotation, temporary_csv_file_path)
651
652    def get_as_zip(
653        self,
654        collection_ids: list[CollectionId],
655        filter: Filter | None = None,
656        tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS,
657        output_directory: str | Path | None = None,
658    ) -> Execution[Path]:
659        """Export collection(s) into a zip file. Inside of the archive, each collection is exported to a separate TSV file.
660
661        Args:
662            collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
663            filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export.
664                If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used.
665            tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
666            output_directory (str | Path | None): The directory path under which file will get exported. If
667              not provided, a temporary directory will be used.
668
669        Returns:
670            enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the full path to the exported file when
671              awaited.
672
673        Raises:
674            enpi_api.l2.types.api_error.ApiError: If API request fails.
675
676        Example:
677
678            ```python
679            with EnpiApiClient() as enpi_client:
680
681                collection_id = CollectionId(1234)
682
683                # Example assumes you have a filter
684                collection_filter: Filter = ...
685
686                path: str = enpi_client.collection_api.get_as_tsv(
687                    collection_ids=[collection_id],
688                    filter=collection_filter,
689                    tag_ids=[
690                        CollectionTags.Name,
691                        CollectionTags.Organism,
692                        CollectionTags.Complexity,
693                        CollectionTags.Receptor,
694                        SequenceTags.Chain,
695                        SequenceTags.Cdr3Productive,
696                    ],
697                    output_directory="example/export_result/"
698                )
699            ```
700        """
701
702        # Create the collectiom filter if it wasn't provided, it will match and
703        # get all the clones from target collections
704        if filter is None:
705            filter_api = FilterApi(self._inner_api_client, self._log_level)
706            filter = filter_api.create_filter(
707                name=f"all-collection-clones-filter-{uuid4()}",  # Unique name to avoid collision
708                condition=MatchIds(
709                    target=MatchIdTarget.COLLECTION,
710                    ids=collection_ids,  # Match all collection IDs passed to this function
711                ),
712            )
713
714        # Start the collection export, this starts a task, so we'll wait for that to be completed
715        export_collection_request = openapi_client.ExportRequest(
716            payload=openapi_client.ExportPayload(
717                collection_ids=[int(collection_id) for collection_id in collection_ids],
718                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
719                tag_ids=[int(tag_id) for tag_id in tag_ids],
720            )
721        )
722        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
723
724        with ApiErrorContext():
725            export_collection_response = collection_api_instance.export(export_collection_request)
726            assert export_collection_response.workflow_execution_id is not None
727
728            workflow_execution_id = WorkflowExecutionId(export_collection_response.workflow_execution_id)
729
730            def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> Path:
731                file_api = FileApi(self._inner_api_client, self._log_level)
732                file_path = file_api.download_export_by_workflow_execution_task_id(task_id=task_id, output_directory=output_directory)
733
734                logger.success("Collection(s) export has succeeded.")
735                return file_path
736
737            waitable = WorkflowExecutionTaskWaitable[Path](
738                workflow_execution_id=workflow_execution_id,
739                on_complete=on_complete,
740                task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_EXPORT,
741            )
742
743            return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)
744
745    def get_as_df(
746        self,
747        collection_ids: list[CollectionId],
748        filter: Filter | None = None,
749        tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS,
750    ) -> Execution[pd.DataFrame]:
751        """Export collection(s) to a Pandas DataFrame.
752
753        Args:
754            collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
755            filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export.
756                If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used.
757            tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
758
759        Returns:
760            Execution[pd.DataFrame]: An awaitable that will return a DataFrame with the exported collection.
761
762        Raises:
763            enpi_api.l2.types.api_error.ApiError: If API request fails.
764
765        Example:
766
767            ```python
768            with EnpiApiClient() as enpi_client:
769                # Example assumes you have a filter
770                filter: Filter = ...
771
772                df: pd.DataFrame = enpi_client.collection_api.get_as_df(
773                    collection_ids=[CollectionId(1)],
774                    filter=filter,
775                    tag_ids=[CollectionTags.Name, CloneTags.TenXBarcode, SequenceTags.CDR3AminoAcids],
776                )
777            ```
778        """
779        tmp_dir = tempfile.TemporaryDirectory()
780        get_as_zip_execution = self.get_as_zip(collection_ids=collection_ids, filter=filter, tag_ids=tag_ids, output_directory=tmp_dir.name)
781
782        def wait() -> pd.DataFrame:
783            zip_path = get_as_zip_execution.wait()
784
785            # Extract all TSV files from the ZIP archive
786            with ZipFile(zip_path, "r") as zip_ref:
787                zip_ref.extractall(tmp_dir.name)
788
789            # Read all TSV files into a single DataFrame
790            all_dfs = []
791            for root, _, files in os.walk(tmp_dir.name):
792                for file in files:
793                    if file.endswith(".tsv"):
794                        file_path = os.path.join(root, file)
795                        df = pd.read_csv(file_path, delimiter="\t")
796                        all_dfs.append(df)
797
798            return pd.concat(all_dfs)
799
800        return Execution(wait=wait, check_execution_state=get_as_zip_execution.check_execution_state)
def get_collections_metadata( self, name: str | None = None) -> Generator[enpi_api.l2.types.collection.CollectionMetadata, NoneType, NoneType]:
 79    def get_collections_metadata(self, name: str | None = None) -> Generator[CollectionMetadata, None, None]:
 80        """Get a generator through all available collections in the platform.
 81
 82        Args:
 83            name (str | None): Optional collection name for search by case-insensitive substring matching
 84
 85        Returns:
 86            Generator[enpi_api.l2.types.collection.CollectionMetadata, None, None]: A generator through all collections in the platform.
 87
 88        Raises:
 89            enpi_api.l2.types.api_error.ApiError: If API request fails.
 90
 91        Example:
 92
 93            ```python
 94            with EnpiApiClient() as enpi_client:
 95                for collection in enpi_client.collection_api.get_collections_metadata():
 96                    print(collection)
 97            ```
 98        """
 99
100        logger.info("Getting a generator through all collections")
101
102        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
103
104        # Fetch the first page, there is always a first page, it may be empty
105        try:
106            get_collections_response = collection_api_instance.get_collections(name=name)
107        except openapi_client.ApiException as e:
108            raise ApiError(e)
109
110        # `collections` and `cursor` get overwritten in the loop below when fetching a new page
111        collections = get_collections_response.collections
112        cursor = get_collections_response.cursor
113
114        while True:
115            for collection in collections:
116                yield CollectionMetadata.from_raw(collection)
117
118            # Check if we need to fetch a next page
119            if cursor is None:
120                logger.trace("No more pages of collections")
121                return  # No more pages
122
123            # We have a cursor, so we need to get a next page
124            logger.trace("Fetching next page of collections")
125            try:
126                get_collections_response = collection_api_instance.get_collections(
127                    cursor=cursor,
128                    name=name if name is not None else None,
129                )
130            except openapi_client.ApiException as e:
131                raise ApiError(e)
132            collections = get_collections_response.collections
133            cursor = get_collections_response.cursor

Get a generator through all available collections in the platform.

Arguments:
  • name (str | None): Optional collection name for search by case-insensitive substring matching
Returns:

Generator[enpi_api.l2.types.collection.CollectionMetadata, None, None]: A generator through all collections in the platform.

Raises:
Example:
with EnpiApiClient() as enpi_client:
    for collection in enpi_client.collection_api.get_collections_metadata():
        print(collection)
def get_collection_metadata_by_id( self, collection_id: enpi_api.l2.types.collection.CollectionId) -> enpi_api.l2.types.collection.CollectionMetadata:
135    def get_collection_metadata_by_id(self, collection_id: CollectionId) -> CollectionMetadata:
136        """Get a single collection by its ID.
137
138        Args:
139            collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to get.
140
141        Returns:
142            enpi_api.l2.types.collection.CollectionMetadata: The collection, with all its metadata. This object does not contain
143              the collection's clones or sequences, only the metadata. For collection's clone and sequence data refer
144              to enpi_api.l2.client.api.collection_api.CollectionApi.get_as_zip and enpi_api.l2.client.api.collection_api.CollectionApi.get_as_df.
145
146        Raises:
147            enpi_api.l2.types.api_error.ApiError: If API request fails.
148
149        Example:
150
151            ```python
152            with EnpiApiClient() as enpi_client:
153                collection: Collection = enpi_client.collection_api.get_collection_metadata_by_id(collection_id=CollectionId(1234))
154            ```
155        """
156
157        logger.info(f"Getting collection with ID `{collection_id}`")
158
159        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
160
161        try:
162            get_collection_response = collection_api_instance.get_collection(collection_id)
163        except openapi_client.ApiException as e:
164            raise ApiError(e)
165
166        collection = CollectionMetadata.from_raw(get_collection_response.collection)
167
168        return collection

Get a single collection by its ID.

Arguments:
Returns:

enpi_api.l2.types.collection.CollectionMetadata: The collection, with all its metadata. This object does not contain the collection's clones or sequences, only the metadata. For collection's clone and sequence data refer to enpi_api.l2.client.api.collection_api.CollectionApi.get_as_zip and enpi_api.l2.client.api.collection_api.CollectionApi.get_as_df.

Raises:
Example:
with EnpiApiClient() as enpi_client:
    collection: Collection = enpi_client.collection_api.get_collection_metadata_by_id(collection_id=CollectionId(1234))
def delete_collection_by_id(self, collection_id: enpi_api.l2.types.collection.CollectionId) -> None:
170    def delete_collection_by_id(self, collection_id: CollectionId) -> None:
171        """Delete a single collection by its ID.
172
173        This will remove the collection from the ENPICOM Platform.
174
175        Args:
176            collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to delete.
177
178        Raises:
179            enpi_api.l2.types.api_error.ApiError: If API request fails.
180
181        Example:
182
183            ```python
184            with EnpiApiClient() as enpi_client:
185                enpi_client.collection_api.delete_collection_by_id(collection_id=CollectionId(1234))
186            ```
187        """
188
189        logger.info(f"Deleting collection with ID `{collection_id}`")
190
191        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
192
193        try:
194            collection_api_instance.delete_collection(id=collection_id, body={})
195        except openapi_client.ApiException as e:
196            raise ApiError(e)
197
198        logger.info(f"Collection with ID `{collection_id}` successfully deleted")

Delete a single collection by its ID.

This will remove the collection from the ENPICOM Platform.

Arguments:
Raises:
Example:
with EnpiApiClient() as enpi_client:
    enpi_client.collection_api.delete_collection_by_id(collection_id=CollectionId(1234))
def create_collection_from_csv( self, file_path: str | pathlib.Path, reference_database_revision: enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None = None, skiprows: int = 0, mapping: Union[Mapping[str, str], Mapping[str, enpi_api.l2.types.tag.TagId], NoneType] = None, metadata: Union[Mapping[str, bool | int | float | str], Mapping[enpi_api.l2.types.tag.TagId, bool | int | float | str], NoneType] = None, organism: str | None = None) -> enpi_api.l2.types.execution.Execution[CollectionMetadata]:
200    def create_collection_from_csv(
201        self,
202        file_path: str | Path,
203        reference_database_revision: ReferenceDatabaseRevision | None = None,
204        skiprows: int = 0,
205        mapping: Mapping[str, TagKey] | Mapping[str, TagId] | None = None,
206        metadata: AdditionalImportMetadata | None = None,
207        organism: str | None = None,
208    ) -> Execution[CollectionMetadata]:
209        """Import a collection from a CSV file (can be gzipped).
210
211        The file should be a CSV file with a couple of required headers. These headers must
212        either be the tag IDs (for example: 2035, 2040) or tag keys (for example: Name, Organism).
213        The following tags are required:
214
215            - enpi_api.l2.tags.CollectionTags.Name
216            - enpi_api.l2.tags.CollectionTags.Organism
217            - enpi_api.l2.tags.SequenceTags.SequenceCount
218            - enpi_api.l2.tags.SequenceTags.CDR3Nucleotides or enpi_api.l2.tags.SequenceTags.CDR3AminoAcids
219            - enpi_api.l2.tags.SequenceTags.ReceptorNucleotides or enpi_api.l2.tags.SequenceTags.FullSequenceNucleotides
220            - enpi_api.l2.tags.SequenceTags.VCall
221            - enpi_api.l2.tags.SequenceTags.JCall
222
223        Args:
224            file_path (str | Path): The path to the CSV file to import.
225            reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use.
226                If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one
227                reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references
228                available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter.
229                There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
230            skiprows (int): Number of rows to skip at the beginning of the file, before reading the headers. Defaults to 0.
231            mapping (Mapping[str, enpi_api.l2.types.tag.TagKey] | Mapping[str, enpi_api.l2.types.tag.TagId] | None): Mapping of the
232              CSV headers to ENPICOM Platform tag keys
233            metadata (enpi_api.l2.types.collection.AdditionalImportMetadata | None): Additional metadata to add to the collection.
234                <u>**If the metadata keys overlap with the keys in the CSV (or with the values of the mapping), the metadata will take
235                precedence when creating tags.**</u>
236            organism: (str | None): If passed, it's compared with the organism value found in the first line of the imported file and
237                throws an error if the values are different. Can serve as a quick utility check.
238
239        Returns:
240            enpi_api.l2.types.collection.CollectionMetadata: Metadata of the collection that was imported.
241
242        Raises:
243            KeyError: If 'Organism' column is not found in the imported df/csv.
244            ValueError: If optional `organism` param value differs from the 'Organism' value from the df/csv.
245            enpi_api.l2.types.api_error.ApiError: If API request fails.
246
247        Example:
248
249            ```python
250            with EnpiApiClient() as enpi_client:
251                reference_name = ...
252                species = ...
253                reference = enpi_client.reference_database_api.get_revision_by_name(
254                    name=reference_name,
255                    species=reference_species,
256                )
257
258                collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_csv(
259                    file_path=import_file_path,
260                    reference_database_revision=reference,
261                    skiprows=1,
262                    mapping={
263                        "title": CollectionTags.Name,
264                        "species": CollectionTags.Organism,
265                    },
266                    metadata={
267                        CollectionTags.ProjectId: "Project 001",
268                    }
269                ).wait()
270                ```
271        """
272
273        logger.info(f"Importing collection from CSV file `{file_path}`")
274
275        # Pandas supports gzipped CSV
276        df = pd.read_csv(file_path, sep=",", skiprows=skiprows)
277
278        # Get the organism from the first line. All lines should hold the same value
279        organism_from_file = str(df.iloc[0].get("Organism", None))
280        if organism_from_file is None:
281            # If not found by tag key, try to access it via the tag ID
282            organism_from_file = str(df.iloc[0].get(CollectionTags.Organism, None))
283
284        # If it's still none, raise an error - it's a mandatory column anyways
285        if organism_from_file is None:
286            raise KeyError("A required 'Organism' column was not found in the imported file/df")
287
288        # If `organism` param was passed, compare the values
289        if (organism is not None) and (organism != organism_from_file):
290            raise ValueError(
291                f"Value of 'organism' param: {organism} differs from the organism found in file: {organism_from_file}",
292            )
293
294        # Map the headers in the CSV file to Tag Keys
295        if mapping is not None:
296            # We drop the columns for which no mapping is specified
297            unmapped_headers = set(df.columns).difference(set(mapping.keys()))
298            logger.warning(f"The following headers are unmapped and are removed:\n{unmapped_headers}")
299            df.drop(columns=list(unmapped_headers), inplace=True)
300            df.rename(columns=mapping, inplace=True)
301        if metadata is not None:
302            for key, value in metadata.items():
303                df[key] = value
304
305        temporary_csv_file_path = f"/tmp/import_collection_csv.{uuid4()}.csv"
306        df.to_csv(temporary_csv_file_path, index=False)
307        verify_headers_uniformity(list(df.columns))
308
309        # Upload the file to the platform
310        file_api = FileApi(self._inner_api_client, self._log_level)
311        file = file_api.upload_file(temporary_csv_file_path).wait()
312
313        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
314
315        # Start the collection import, this starts a task, so we'll wait for that to be completed
316        import_collection_request = openapi_client.ImportCollectionRequest(
317            file_id=UUID(file.id),
318            organism=organism_from_file,
319            reference_database_id=str(reference_database_revision.reference_database_id) if reference_database_revision is not None else None,
320            reference_database_version=int(reference_database_revision.reference_database_version) if reference_database_revision is not None else None,
321        )
322
323        with ApiErrorContext():
324            import_collection_response = collection_api_instance.import_collection(import_collection_request)
325            assert import_collection_response.workflow_execution_id is not None
326
327            workflow_execution_id = WorkflowExecutionId(import_collection_response.workflow_execution_id)
328
329            def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> CollectionMetadata:
330                assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead"
331
332                get_collection_id_response = collection_api_instance.get_collection_id_by_workflow_execution_task_id(task_id)
333                assert get_collection_id_response.collection_id is not None
334
335                collection_id = CollectionId(get_collection_id_response.collection_id)
336
337                logger.success(f"Collection with ID `{collection_id}` was successfully imported")
338                # Remove the file from tmp folder
339                os.remove(temporary_csv_file_path)
340                # Remove the file from the platform
341                file_api.delete_file_by_id(file.id)
342
343                return self.get_collection_metadata_by_id(collection_id)
344
345            waitable = WorkflowExecutionTaskWaitable[CollectionMetadata](
346                workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_IMPORT, on_complete=on_complete
347            )
348
349            return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)

Import a collection from a CSV file (can be gzipped).

The file should be a CSV file with a couple of required headers. These headers must either be the tag IDs (for example: 2035, 2040) or tag keys (for example: Name, Organism).

The following tags are required:
Arguments:
  • file_path (str | Path): The path to the CSV file to import.
  • reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
  • skiprows (int): Number of rows to skip at the beginning of the file, before reading the headers. Defaults to 0.
  • mapping (Mapping[str, enpi_api.l2.types.tag.TagKey] | Mapping[str, enpi_api.l2.types.tag.TagId] | None): Mapping of the CSV headers to ENPICOM Platform tag keys
  • metadata (enpi_api.l2.types.collection.AdditionalImportMetadata | None): Additional metadata to add to the collection. If the metadata keys overlap with the keys in the CSV (or with the values of the mapping), the metadata will take precedence when creating tags.
  • organism: (str | None): If passed, it's compared with the organism value found in the first line of the imported file and throws an error if the values are different. Can serve as a quick utility check.
Returns:

enpi_api.l2.types.collection.CollectionMetadata: Metadata of the collection that was imported.

Raises:
  • KeyError: If 'Organism' column is not found in the imported df/csv.
  • ValueError: If optional organism param value differs from the 'Organism' value from the df/csv.
  • enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client:
    reference_name = ...
    species = ...
    reference = enpi_client.reference_database_api.get_revision_by_name(
        name=reference_name,
        species=reference_species,
    )

    collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_csv(
        file_path=import_file_path,
        reference_database_revision=reference,
        skiprows=1,
        mapping={
            "title": CollectionTags.Name,
            "species": CollectionTags.Organism,
        },
        metadata={
            CollectionTags.ProjectId: "Project 001",
        }
    ).wait()
   
def create_collection_from_df( self, data_frame: pandas.core.frame.DataFrame, reference_database_revision: enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None = None) -> enpi_api.l2.types.execution.Execution[CollectionMetadata]:
351    def create_collection_from_df(
352        self,
353        data_frame: pd.DataFrame,
354        reference_database_revision: ReferenceDatabaseRevision | None = None,
355    ) -> Execution[CollectionMetadata]:
356        """Import a collection from a DataFrame.
357
358        This is a convenience method to import a collection from a Pandas DataFrame. For more information about the
359        collection import, see enpi_api.l2.client.api.collection_api.CollectionApi.create_collection_from_csv.
360
361        Args:
362            data_frame (pd.DataFrame): The DataFrame containing the collection to import.
363            reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use.
364                If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one
365                reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references
366                available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter.
367                There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
368        Raises:
369            enpi_api.l2.types.api_error.ApiError: If API request fails.
370
371        Returns:
372            enpi_api.l2.types.execution.Execution[enpi_api.l2.types.collection.CollectionMetadata]: An awaitable that returns the
373              collection that was imported when awaited.
374
375        Example:
376
377            ```python
378            reference_name = ...
379            species = ...
380            reference = enpi_client.reference_database_api.get_revision_by_name(
381                name=reference_name,
382                species=reference_species,
383            )
384
385            with EnpiApiClient() as enpi_client:
386                df = pd.read_csv('/home/data.csv')
387                collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_df(
388                    data_frame=df,
389                    reference_database_revision=reference,
390                ).wait()
391            ```
392        """
393
394        # We need to turn the DataFrame into a CSV file
395        with tempfile.NamedTemporaryFile(suffix=".csv") as temp_file:
396            data_frame.to_csv(temp_file.name, index=False)
397
398            create_collection_execution = self.create_collection_from_csv(
399                file_path=temp_file.name,
400                reference_database_revision=reference_database_revision,
401            )
402
403        def wait() -> CollectionMetadata:
404            return create_collection_execution.wait()
405
406        return Execution(wait=wait, check_execution_state=create_collection_execution.check_execution_state)

Import a collection from a DataFrame.

This is a convenience method to import a collection from a Pandas DataFrame. For more information about the collection import, see enpi_api.l2.client.api.collection_api.CollectionApi.create_collection_from_csv.

Arguments:
  • data_frame (pd.DataFrame): The DataFrame containing the collection to import.
  • reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
Raises:
Returns:

enpi_api.l2.types.execution.Execution[enpi_api.l2.types.collection.CollectionMetadata]: An awaitable that returns the collection that was imported when awaited.

Example:
reference_name = ...
species = ...
reference = enpi_client.reference_database_api.get_revision_by_name(
    name=reference_name,
    species=reference_species,
)

with EnpiApiClient() as enpi_client:
    df = pd.read_csv('/home/data.csv')
    collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_df(
        data_frame=df,
        reference_database_revision=reference,
    ).wait()
408    def add_metadata(self, filter: Filter, annotation: import_metadata.Annotation) -> Execution[None]:
409        """Import metadata to annotate collections, clones or sequences in batches using a filter.
410
411        This method allows you to simply annotate collections, clones or sequences using a filter. The annotation values
412        that you provide will be applied to all matching items of the specified level.
413
414        If you would like to add different values based on different matched tags, have a look at the methods that
415        support a templated filter, such as `add_metadata_from_file` or `add_metadata_from_df`.
416
417        Args:
418            filter (enpi_api.l2.types.filter.Filter): The filter to narrow down which collections, clones or sequences you wish to annotate.
419              Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters.
420            annotation (enpi_api.l2.types.import_metadata.Annotation): The annotation to apply to the matched collections, clones or sequences. You
421              specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation,
422              enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation
423              are the preferred way of creating annotation configuration.
424
425        Returns:
426            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
427
428        Raises:
429            enpi_api.l2.types.api_error.ApiError: If API request fails.
430
431        Example:
432
433            Batch tag multiple collections with some tags:
434
435            ```python
436            with EnpiApiClient() as enpi_client:
437                collection_ids = [CollectionId(1), CollectionId(2), CollectionId(3)]
438
439                # Create a filter
440                filter = enpi_client.filter_api.create_filter(
441                    name="My filter",
442                    condition=dict(
443                        type="match_ids",
444                        target="collection",
445                        ids=collection_ids,
446                    ),
447                )
448
449                # Create an annotation
450                annotation = collection_annotation(tags=[
451                    Tag(id=CollectionTags.CampaignId, value="My campaign"),
452                    Tag(id=CollectionTags.ProjectId, value="My project"),
453                ])
454
455                # Add the metadata
456                enpi_client.collection_api.add_metadata(filter=filter, annotation=annotation).wait()
457            ```
458        """
459
460        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
461
462        import_metadata_request = openapi_client.ImportMetadataRequest(
463            openapi_client.SearchAndTag(
464                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
465                annotation=annotation.to_api_payload(),
466            )
467        )
468
469        with ApiErrorContext():
470            import_metadata_response = collection_api_instance.import_metadata(import_metadata_request)
471            assert import_metadata_response.workflow_execution_id is not None
472
473            workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id)
474
475            waitable = WorkflowExecutionTaskWaitable[CollectionMetadata](
476                workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT
477            )
478
479            return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state)

Import metadata to annotate collections, clones or sequences in batches using a filter.

This method allows you to simply annotate collections, clones or sequences using a filter. The annotation values that you provide will be applied to all matching items of the specified level.

If you would like to add different values based on different matched tags, have a look at the methods that support a templated filter, such as add_metadata_from_file or add_metadata_from_df.

Arguments:
Returns:

enpi_api.l2.types.execution.Execution[None]: An awaitable execution.

Raises:
Example:

Batch tag multiple collections with some tags:

with EnpiApiClient() as enpi_client:
    collection_ids = [CollectionId(1), CollectionId(2), CollectionId(3)]

    # Create a filter
    filter = enpi_client.filter_api.create_filter(
        name="My filter",
        condition=dict(
            type="match_ids",
            target="collection",
            ids=collection_ids,
        ),
    )

    # Create an annotation
    annotation = collection_annotation(tags=[
        Tag(id=CollectionTags.CampaignId, value="My campaign"),
        Tag(id=CollectionTags.ProjectId, value="My project"),
    ])

    # Add the metadata
    enpi_client.collection_api.add_metadata(filter=filter, annotation=annotation).wait()
481    def add_metadata_from_file(
482        self,
483        filter: TemplatedFilter,
484        annotation: import_metadata_templated.Annotation,
485        file_path: str | Path,
486        ignore_empty_values: bool = True,
487    ) -> Execution[None]:
488        """Import metadata from a CSV or XLSX file to annotate collections, clones or sequences.
489
490        Args:
491            filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate.
492              Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters.
493            annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You
494              specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation,
495              enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation
496              are the preferred way of creating annotation configuration.
497            file_path (str | Path): The path to the CSV or XLSX file to import.
498
499        Returns:
500            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
501
502        Raises:
503            enpi_api.l2.types.api_error.ApiError: If API request fails.
504
505        Example:
506
507            Assume you have a CSV file which describes 2 sequence tags to match on, and a tag to add to the matched sequences.
508
509            Let's call the match columns *match_chain* and *match_productive*, and the column to add *value_to_add*.
510            We'll add the value to a custom imaginary tag that was created before this example.
511
512            The CSV file would look like this:
513
514            | match_chain | match_productive | value_to_add |
515            |-------------|------------------|--------------|
516            | Heavy       | true             | Heavy and productive |
517            | Heavy       | false            | Heavy and unproductive |
518            | Kappa       | true             | Kappa and productive |
519            | Kappa       | false            | Kappa and unproductive |
520            | Lambda      | true             | Lambda and productive |
521            | Lambda      | false            | Lambda and unproductive |
522
523            We also want to narrow down the collections we want to annotate to a single collection with the imaginary ID *1337*.
524
525            ```python
526            my_collection_id: CollectionId = CollectionId(1337)
527
528            tag_id_chain: TagId = TagId(SequenceTags.Chain)
529            tag_id_productive: TagId = TagId(SequenceTags.Cdr3Productive)
530            tag_id_value_to_add: TagId = TagId(52001)  # This is a custom tag
531
532            with EnpiApiClient() as enpi_client:
533                filter = enpi_client.filter_api.get_templated_filter_by_id(FilterId('92be003d-6f5c-447a-baac-c9d420783fc8'))
534                enpi_client.collection_api.add_metadata_from_file(
535                    filter=filter,
536                    annotation=sequence_annotation([
537                        template_tag(tag_id=tag_id_value_to_add, key="value_to_add"),
538                    ]),
539                    file_path="path/to/metadata.csv",
540                ).wait()
541            ```
542        """
543
544        # We need to upload the file to the platform
545        file_api = FileApi(self._inner_api_client, self._log_level)
546        file_execution = file_api.upload_file(file_path)
547
548        file = file_execution.wait()
549
550        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
551
552        # Start the metadata import, this starts a task, so we'll wait for that to be completed
553        import_metadata_request = openapi_client.ImportMetadataRequest(
554            openapi_client.TemplatedSearchAndTag(
555                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
556                annotation=annotation.to_api_payload(),
557                template_file_id=file.id,
558                ignore_empty_values=ignore_empty_values,
559            )
560        )
561
562        with ApiErrorContext():
563            # The metadata import has not started yet because we first need to wait for the file upload
564            import_metadata_response = collection_api_instance.import_metadata(import_metadata_request)
565            assert import_metadata_response.workflow_execution_id is not None
566
567        workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id)
568
569        def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> None:
570            assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead"
571
572            nonlocal file
573            file_api.delete_file_by_id(file.id)
574
575        waitable = WorkflowExecutionTaskWaitable[None](
576            on_complete=on_complete, workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT_TEMPLATED
577        )
578
579        return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state)

Import metadata from a CSV or XLSX file to annotate collections, clones or sequences.

Arguments:
Returns:

enpi_api.l2.types.execution.Execution[None]: An awaitable execution.

Raises:
Example:

Assume you have a CSV file which describes 2 sequence tags to match on, and a tag to add to the matched sequences.

Let's call the match columns match_chain and match_productive, and the column to add value_to_add. We'll add the value to a custom imaginary tag that was created before this example.

The CSV file would look like this:

match_chain match_productive value_to_add
Heavy true Heavy and productive
Heavy false Heavy and unproductive
Kappa true Kappa and productive
Kappa false Kappa and unproductive
Lambda true Lambda and productive
Lambda false Lambda and unproductive

We also want to narrow down the collections we want to annotate to a single collection with the imaginary ID 1337.

my_collection_id: CollectionId = CollectionId(1337)

tag_id_chain: TagId = TagId(SequenceTags.Chain)
tag_id_productive: TagId = TagId(SequenceTags.Cdr3Productive)
tag_id_value_to_add: TagId = TagId(52001)  # This is a custom tag

with EnpiApiClient() as enpi_client:
    filter = enpi_client.filter_api.get_templated_filter_by_id(FilterId('92be003d-6f5c-447a-baac-c9d420783fc8'))
    enpi_client.collection_api.add_metadata_from_file(
        filter=filter,
        annotation=sequence_annotation([
            template_tag(tag_id=tag_id_value_to_add, key="value_to_add"),
        ]),
        file_path="path/to/metadata.csv",
    ).wait()
581    def add_metadata_from_df(
582        self,
583        filter: TemplatedFilter,
584        annotation: import_metadata_templated.Annotation,
585        data_frame: pd.DataFrame,
586    ) -> Execution[None]:
587        """Import metadata from a DataFrame to annotate collections, clones or sequences.
588
589        This is a convenience method to import metadata from a Pandas DataFrame. For more information about the
590        metadata import, see the documentation for `import_metadata_from_csv`.
591
592        Args:
593            filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate.
594            annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You
595              specify a specific annotation target and the values to apply.
596            data_frame (pd.DataFrame): The DataFrame containing the templated metadata to import.
597
598        Returns:
599            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
600
601        Raises:
602            enpi_api.l2.types.api_error.ApiError: If API request fails.
603
604        Example:
605
606            Part of the `add_calculated_metadata.py` example script.
607
608            ```python
609            # Specify the filter query to match the sequences we want to add metadata to
610            metadata_filter = client.filter_api.create_templated_filter(
611                name="Metadata import filter",
612                shared=False,
613                condition=TemplatedAndOperator(
614                    conditions=[
615                        TemplatedMatchTag(tag_id=CollectionTags.Name),
616                        TemplatedMatchId(target=MatchIdTarget.SEQUENCE),
617                    ]
618                ),
619            )
620
621            # Specify the sequence-level annotation to add to the collection
622            metadata_annotation: Annotation = sequence_annotation([template_tag(new_tag_archetype.id)])
623
624            # Create metadata dataframe
625            metadata_frame = pd.DataFrame(
626                [
627                    [
628                        collection_name,  # Match
629                        df_row[1]["Unique Sequence ID"],  # Match
630                        grouped_df.loc[df_row[1]["Unique Clone ID"]]["Sequence Count"],  # Add
631                    ]
632                    for df_row in exported_df.iterrows()
633                ],
634                columns=["Name", "Unique Sequence ID", new_tag_archetype.key],
635            )
636
637            # Apply metadata to the collection
638            client.collection_api.add_metadata_from_df(
639                filter=metadata_filter,
640                annotation=metadata_annotation,
641                data_frame=metadata_frame,
642            ).wait()
643            ```
644        """
645
646        # We need to turn the DataFrame into a CSV file
647        temporary_csv_file_path = f"/tmp/import_metadata.{uuid4()}.csv"
648        data_frame.to_csv(temporary_csv_file_path, index=False)
649
650        return self.add_metadata_from_file(filter, annotation, temporary_csv_file_path)

Import metadata from a DataFrame to annotate collections, clones or sequences.

This is a convenience method to import metadata from a Pandas DataFrame. For more information about the metadata import, see the documentation for import_metadata_from_csv.

Arguments:
Returns:

enpi_api.l2.types.execution.Execution[None]: An awaitable execution.

Raises:
Example:

Part of the add_calculated_metadata.py example script.

# Specify the filter query to match the sequences we want to add metadata to
metadata_filter = client.filter_api.create_templated_filter(
    name="Metadata import filter",
    shared=False,
    condition=TemplatedAndOperator(
        conditions=[
            TemplatedMatchTag(tag_id=CollectionTags.Name),
            TemplatedMatchId(target=MatchIdTarget.SEQUENCE),
        ]
    ),
)

# Specify the sequence-level annotation to add to the collection
metadata_annotation: Annotation = sequence_annotation([template_tag(new_tag_archetype.id)])

# Create metadata dataframe
metadata_frame = pd.DataFrame(
    [
        [
            collection_name,  # Match
            df_row[1]["Unique Sequence ID"],  # Match
            grouped_df.loc[df_row[1]["Unique Clone ID"]]["Sequence Count"],  # Add
        ]
        for df_row in exported_df.iterrows()
    ],
    columns=["Name", "Unique Sequence ID", new_tag_archetype.key],
)

# Apply metadata to the collection
client.collection_api.add_metadata_from_df(
    filter=metadata_filter,
    annotation=metadata_annotation,
    data_frame=metadata_frame,
).wait()
def get_as_zip( self, collection_ids: list[enpi_api.l2.types.collection.CollectionId], filter: enpi_api.l2.types.filter.Filter | None = None, tag_ids: list[enpi_api.l2.types.tag.TagId] = [2035, 2040, 2083, 2084, 2036, 2060, 1001, 1077, 38, 144, 24, 154, 110], output_directory: str | pathlib.Path | None = None) -> enpi_api.l2.types.execution.Execution[Path]:
652    def get_as_zip(
653        self,
654        collection_ids: list[CollectionId],
655        filter: Filter | None = None,
656        tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS,
657        output_directory: str | Path | None = None,
658    ) -> Execution[Path]:
659        """Export collection(s) into a zip file. Inside of the archive, each collection is exported to a separate TSV file.
660
661        Args:
662            collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
663            filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export.
664                If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used.
665            tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
666            output_directory (str | Path | None): The directory path under which file will get exported. If
667              not provided, a temporary directory will be used.
668
669        Returns:
670            enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the full path to the exported file when
671              awaited.
672
673        Raises:
674            enpi_api.l2.types.api_error.ApiError: If API request fails.
675
676        Example:
677
678            ```python
679            with EnpiApiClient() as enpi_client:
680
681                collection_id = CollectionId(1234)
682
683                # Example assumes you have a filter
684                collection_filter: Filter = ...
685
686                path: str = enpi_client.collection_api.get_as_tsv(
687                    collection_ids=[collection_id],
688                    filter=collection_filter,
689                    tag_ids=[
690                        CollectionTags.Name,
691                        CollectionTags.Organism,
692                        CollectionTags.Complexity,
693                        CollectionTags.Receptor,
694                        SequenceTags.Chain,
695                        SequenceTags.Cdr3Productive,
696                    ],
697                    output_directory="example/export_result/"
698                )
699            ```
700        """
701
702        # Create the collectiom filter if it wasn't provided, it will match and
703        # get all the clones from target collections
704        if filter is None:
705            filter_api = FilterApi(self._inner_api_client, self._log_level)
706            filter = filter_api.create_filter(
707                name=f"all-collection-clones-filter-{uuid4()}",  # Unique name to avoid collision
708                condition=MatchIds(
709                    target=MatchIdTarget.COLLECTION,
710                    ids=collection_ids,  # Match all collection IDs passed to this function
711                ),
712            )
713
714        # Start the collection export, this starts a task, so we'll wait for that to be completed
715        export_collection_request = openapi_client.ExportRequest(
716            payload=openapi_client.ExportPayload(
717                collection_ids=[int(collection_id) for collection_id in collection_ids],
718                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
719                tag_ids=[int(tag_id) for tag_id in tag_ids],
720            )
721        )
722        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
723
724        with ApiErrorContext():
725            export_collection_response = collection_api_instance.export(export_collection_request)
726            assert export_collection_response.workflow_execution_id is not None
727
728            workflow_execution_id = WorkflowExecutionId(export_collection_response.workflow_execution_id)
729
730            def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> Path:
731                file_api = FileApi(self._inner_api_client, self._log_level)
732                file_path = file_api.download_export_by_workflow_execution_task_id(task_id=task_id, output_directory=output_directory)
733
734                logger.success("Collection(s) export has succeeded.")
735                return file_path
736
737            waitable = WorkflowExecutionTaskWaitable[Path](
738                workflow_execution_id=workflow_execution_id,
739                on_complete=on_complete,
740                task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_EXPORT,
741            )
742
743            return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)

Export collection(s) into a zip file. Inside of the archive, each collection is exported to a separate TSV file.

Arguments:
  • collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
  • filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export. If it's None, a new filter that matches all the collection_ids provided above will be created and used.
  • tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
  • output_directory (str | Path | None): The directory path under which file will get exported. If not provided, a temporary directory will be used.
Returns:

enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the full path to the exported file when awaited.

Raises:
Example:
with EnpiApiClient() as enpi_client:

    collection_id = CollectionId(1234)

    # Example assumes you have a filter
    collection_filter: Filter = ...

    path: str = enpi_client.collection_api.get_as_tsv(
        collection_ids=[collection_id],
        filter=collection_filter,
        tag_ids=[
            CollectionTags.Name,
            CollectionTags.Organism,
            CollectionTags.Complexity,
            CollectionTags.Receptor,
            SequenceTags.Chain,
            SequenceTags.Cdr3Productive,
        ],
        output_directory="example/export_result/"
    )
def get_as_df( self, collection_ids: list[enpi_api.l2.types.collection.CollectionId], filter: enpi_api.l2.types.filter.Filter | None = None, tag_ids: list[enpi_api.l2.types.tag.TagId] = [2035, 2040, 2083, 2084, 2036, 2060, 1001, 1077, 38, 144, 24, 154, 110]) -> enpi_api.l2.types.execution.Execution[DataFrame]:
745    def get_as_df(
746        self,
747        collection_ids: list[CollectionId],
748        filter: Filter | None = None,
749        tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS,
750    ) -> Execution[pd.DataFrame]:
751        """Export collection(s) to a Pandas DataFrame.
752
753        Args:
754            collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
755            filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export.
756                If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used.
757            tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
758
759        Returns:
760            Execution[pd.DataFrame]: An awaitable that will return a DataFrame with the exported collection.
761
762        Raises:
763            enpi_api.l2.types.api_error.ApiError: If API request fails.
764
765        Example:
766
767            ```python
768            with EnpiApiClient() as enpi_client:
769                # Example assumes you have a filter
770                filter: Filter = ...
771
772                df: pd.DataFrame = enpi_client.collection_api.get_as_df(
773                    collection_ids=[CollectionId(1)],
774                    filter=filter,
775                    tag_ids=[CollectionTags.Name, CloneTags.TenXBarcode, SequenceTags.CDR3AminoAcids],
776                )
777            ```
778        """
779        tmp_dir = tempfile.TemporaryDirectory()
780        get_as_zip_execution = self.get_as_zip(collection_ids=collection_ids, filter=filter, tag_ids=tag_ids, output_directory=tmp_dir.name)
781
782        def wait() -> pd.DataFrame:
783            zip_path = get_as_zip_execution.wait()
784
785            # Extract all TSV files from the ZIP archive
786            with ZipFile(zip_path, "r") as zip_ref:
787                zip_ref.extractall(tmp_dir.name)
788
789            # Read all TSV files into a single DataFrame
790            all_dfs = []
791            for root, _, files in os.walk(tmp_dir.name):
792                for file in files:
793                    if file.endswith(".tsv"):
794                        file_path = os.path.join(root, file)
795                        df = pd.read_csv(file_path, delimiter="\t")
796                        all_dfs.append(df)
797
798            return pd.concat(all_dfs)
799
800        return Execution(wait=wait, check_execution_state=get_as_zip_execution.check_execution_state)

Export collection(s) to a Pandas DataFrame.

Arguments:
Returns:

Execution[pd.DataFrame]: An awaitable that will return a DataFrame with the exported collection.

Raises:
Example:
with EnpiApiClient() as enpi_client:
    # Example assumes you have a filter
    filter: Filter = ...

    df: pd.DataFrame = enpi_client.collection_api.get_as_df(
        collection_ids=[CollectionId(1)],
        filter=filter,
        tag_ids=[CollectionTags.Name, CloneTags.TenXBarcode, SequenceTags.CDR3AminoAcids],
    )