ENPICOM Logo API Docs Python SDK Docs Events

enpi_api.l2.client.api.collection_api

  1import os
  2import tempfile
  3from pathlib import Path
  4from typing import Generator, Mapping
  5from uuid import uuid4
  6from zipfile import ZipFile
  7
  8import pandas as pd
  9from loguru import logger
 10
 11from enpi_api.l1 import openapi_client
 12from enpi_api.l2.client.api.file_api import FileApi
 13from enpi_api.l2.client.api.filter_api import FilterApi
 14from enpi_api.l2.events.workflow_execution_task_waitable import WorkflowExecutionTaskWaitable
 15from enpi_api.l2.tags import CloneTags, CollectionTags, SequenceTags
 16from enpi_api.l2.types import import_metadata, import_metadata_templated
 17from enpi_api.l2.types.api_error import ApiError, ApiErrorContext
 18from enpi_api.l2.types.collection import AdditionalImportMetadata, CollectionId, CollectionMetadata
 19from enpi_api.l2.types.execution import Execution
 20from enpi_api.l2.types.filter import Filter, MatchIds, MatchIdTarget, TemplatedFilter
 21from enpi_api.l2.types.log import LogLevel
 22from enpi_api.l2.types.reference_database import ReferenceDatabaseRevision
 23from enpi_api.l2.types.tag import TagId, TagKey
 24from enpi_api.l2.types.task import TaskState
 25from enpi_api.l2.types.workflow import WorkflowExecutionId, WorkflowExecutionTaskId, WorkflowTaskTemplateName
 26from enpi_api.l2.util.file import verify_headers_uniformity
 27
 28DEFAULT_EXPORT_TAG_IDS = [
 29    # Collection tags
 30    CollectionTags.Name,
 31    CollectionTags.Organism,
 32    CollectionTags.Complexity,
 33    CollectionTags.Receptor,
 34    CollectionTags.NumberOfClones,
 35    CollectionTags.Reference,
 36    # Clone tags
 37    CloneTags.TenXBarcode,
 38    CloneTags.CloneCount,
 39    # Sequence tags
 40    SequenceTags.Chain,
 41    SequenceTags.SequenceCount,
 42    SequenceTags.Cdr3AminoAcids,
 43    SequenceTags.VGene,
 44    SequenceTags.JGene,
 45]
 46"""The default tags that are included when exporting a collection to a DataFrame or a CSV file.
 47
 48These are:
 49
 50- Collection level tags:
 51    - `enpi_api.l2.tags.CollectionTags.Name`
 52    - `enpi_api.l2.tags.CollectionTags.Organism`
 53    - `enpi_api.l2.tags.CollectionTags.Complexity`
 54    - `enpi_api.l2.tags.CollectionTags.Receptor`
 55    - `enpi_api.l2.tags.CollectionTags.NumberOfClones`
 56    - `enpi_api.l2.tags.CollectionTags.Reference`
 57- Clone level tags:
 58    - `enpi_api.l2.tags.CloneTags.TenXBarcode`
 59    - `enpi_api.l2.tags.CloneTags.CloneCount`
 60- Sequence level tags:
 61    - `enpi_api.l2.tags.SequenceTags.Chain`
 62    - `enpi_api.l2.tags.SequenceTags.SequenceCount`
 63    - `enpi_api.l2.tags.SequenceTags.Cdr3AminoAcids`
 64    - `enpi_api.l2.tags.SequenceTags.VGene`
 65    - `enpi_api.l2.tags.SequenceTags.JGene`
 66"""
 67
 68
 69class CollectionApi:
 70    _inner_api_client: openapi_client.ApiClient
 71    _log_level: LogLevel
 72
 73    def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel):
 74        """@private"""
 75        self._inner_api_client = inner_api_client
 76        self._log_level = log_level
 77
 78    def get_collections_metadata(self, name: str | None = None) -> Generator[CollectionMetadata, None, None]:
 79        """Get a generator through all available collections in the platform.
 80
 81        Args:
 82            name (str | None): Optional collection name for search by case-insensitive substring matching
 83
 84        Returns:
 85            Generator[enpi_api.l2.types.collection.CollectionMetadata, None, None]: A generator through all collections in the platform.
 86
 87        Raises:
 88            enpi_api.l2.types.api_error.ApiError: If API request fails.
 89
 90        Example:
 91
 92            ```python
 93            with EnpiApiClient() as enpi_client:
 94                for collection in enpi_client.collection_api.get_collections_metadata():
 95                    print(collection)
 96            ```
 97        """
 98
 99        logger.info("Getting a generator through all collections")
100
101        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
102
103        # Fetch the first page, there is always a first page, it may be empty
104        try:
105            get_collections_response = collection_api_instance.get_collections(name=name)
106        except openapi_client.ApiException as e:
107            raise ApiError(e)
108
109        # `collections` and `cursor` get overwritten in the loop below when fetching a new page
110        collections = get_collections_response.collections
111        cursor = get_collections_response.cursor
112
113        while True:
114            for collection in collections:
115                yield CollectionMetadata.from_raw(collection)
116
117            # Check if we need to fetch a next page
118            if cursor is None:
119                logger.trace("No more pages of collections")
120                return  # No more pages
121
122            # We have a cursor, so we need to get a next page
123            logger.trace("Fetching next page of collections")
124            try:
125                get_collections_response = collection_api_instance.get_collections(
126                    cursor=cursor,
127                    name=name if name is not None else None,
128                )
129            except openapi_client.ApiException as e:
130                raise ApiError(e)
131            collections = get_collections_response.collections
132            cursor = get_collections_response.cursor
133
134    def get_collection_metadata_by_id(self, collection_id: CollectionId) -> CollectionMetadata:
135        """Get a single collection by its ID.
136
137        Args:
138            collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to get.
139
140        Returns:
141            enpi_api.l2.types.collection.CollectionMetadata: The collection, with all its metadata. This object does not contain
142              the collection's clones or sequences, only the metadata. For collection's clone and sequence data refer
143              to enpi_api.l2.client.api.collection_api.CollectionApi.get_as_zip and enpi_api.l2.client.api.collection_api.CollectionApi.get_as_df.
144
145        Raises:
146            enpi_api.l2.types.api_error.ApiError: If API request fails.
147
148        Example:
149
150            ```python
151            with EnpiApiClient() as enpi_client:
152                collection: Collection = enpi_client.collection_api.get_collection_metadata_by_id(collection_id=CollectionId(1234))
153            ```
154        """
155
156        logger.info(f"Getting collection with ID `{collection_id}`")
157
158        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
159
160        try:
161            get_collection_response = collection_api_instance.get_collection(collection_id)
162        except openapi_client.ApiException as e:
163            raise ApiError(e)
164
165        collection = CollectionMetadata.from_raw(get_collection_response.collection)
166
167        return collection
168
169    def delete_collection_by_id(self, collection_id: CollectionId) -> None:
170        """Delete a single collection by its ID.
171
172        This will remove the collection from the ENPICOM Platform.
173
174        Args:
175            collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to delete.
176
177        Raises:
178            enpi_api.l2.types.api_error.ApiError: If API request fails.
179
180        Example:
181
182            ```python
183            with EnpiApiClient() as enpi_client:
184                enpi_client.collection_api.delete_collection_by_id(collection_id=CollectionId(1234))
185            ```
186        """
187
188        logger.info(f"Deleting collection with ID `{collection_id}`")
189
190        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
191
192        try:
193            collection_api_instance.delete_collection(id=collection_id)
194        except openapi_client.ApiException as e:
195            raise ApiError(e)
196
197        logger.info(f"Collection with ID `{collection_id}` successfully deleted")
198
199    def create_collection_from_csv(
200        self,
201        file_path: str | Path,
202        reference_database_revision: ReferenceDatabaseRevision | None = None,
203        skiprows: int = 0,
204        mapping: Mapping[str, TagKey] | Mapping[str, TagId] | None = None,
205        metadata: AdditionalImportMetadata | None = None,
206        organism: str | None = None,
207    ) -> Execution[CollectionMetadata]:
208        """Import a collection from a CSV file (can be gzipped).
209
210        The file should be a CSV file with a couple of required headers. These headers must
211        either be the tag IDs (for example: 2035, 2040) or tag keys (for example: Name, Organism).
212        The following tags are required:
213
214            - enpi_api.l2.tags.CollectionTags.Name
215            - enpi_api.l2.tags.CollectionTags.Organism
216            - enpi_api.l2.tags.SequenceTags.SequenceCount
217            - enpi_api.l2.tags.SequenceTags.CDR3Nucleotides or enpi_api.l2.tags.SequenceTags.CDR3AminoAcids
218            - enpi_api.l2.tags.SequenceTags.ReceptorNucleotides or enpi_api.l2.tags.SequenceTags.FullSequenceNucleotides
219            - enpi_api.l2.tags.SequenceTags.VCall
220            - enpi_api.l2.tags.SequenceTags.JCall
221
222        Args:
223            file_path (str | Path): The path to the CSV file to import.
224            reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use.
225                If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one
226                reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references
227                available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter.
228                There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
229            skiprows (int): Number of rows to skip at the beginning of the file, before reading the headers. Defaults to 0.
230            mapping (Mapping[str, enpi_api.l2.types.tag.TagKey] | Mapping[str, enpi_api.l2.types.tag.TagId] | None): Mapping of the
231              CSV headers to ENPICOM Platform tag keys
232            metadata (enpi_api.l2.types.collection.AdditionalImportMetadata | None): Additional metadata to add to the collection.
233                <u>**If the metadata keys overlap with the keys in the CSV (or with the values of the mapping), the metadata will take
234                precedence when creating tags.**</u>
235            organism: (str | None): If passed, it's compared with the organism value found in the first line of the imported file and
236                throws an error if the values are different. Can serve as a quick utility check.
237
238        Returns:
239            enpi_api.l2.types.collection.CollectionMetadata: Metadata of the collection that was imported.
240
241        Raises:
242            KeyError: If 'Organism' column is not found in the imported df/csv.
243            ValueError: If optional `organism` param value differs from the 'Organism' value from the df/csv.
244            enpi_api.l2.types.api_error.ApiError: If API request fails.
245
246        Example:
247
248            ```python
249            with EnpiApiClient() as enpi_client:
250                reference_name = ...
251                species = ...
252                reference = enpi_client.reference_database_api.get_revision_by_name(
253                    name=reference_name,
254                    species=reference_species,
255                )
256
257                collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_csv(
258                    file_path=import_file_path,
259                    reference_database_revision=reference,
260                    skiprows=1,
261                    mapping={
262                        "title": CollectionTags.Name,
263                        "species": CollectionTags.Organism,
264                    },
265                    metadata={
266                        CollectionTags.ProjectId: "Project 001",
267                    }
268                ).wait()
269                ```
270        """
271
272        logger.info(f"Importing collection from CSV file `{file_path}`")
273
274        # Pandas supports gzipped CSV
275        df = pd.read_csv(file_path, sep=",", skiprows=skiprows)
276
277        # Get the organism from the first line. All lines should hold the same value
278        organism_from_file = str(df.iloc[0].get("Organism", None))
279        if organism_from_file is None:
280            # If not found by tag key, try to access it via the tag ID
281            organism_from_file = str(df.iloc[0].get(CollectionTags.Organism, None))
282
283        # If it's still none, raise an error - it's a mandatory column anyways
284        if organism_from_file is None:
285            raise KeyError("A required 'Organism' column was not found in the imported file/df")
286
287        # If `organism` param was passed, compare the values
288        if (organism is not None) and (organism != organism_from_file):
289            raise ValueError(
290                f"Value of 'organism' param: {organism} differs from the organism found in file: {organism_from_file}",
291            )
292
293        # Map the headers in the CSV file to Tag Keys
294        if mapping is not None:
295            # We drop the columns for which no mapping is specified
296            unmapped_headers = set(df.columns).difference(set(mapping.keys()))
297            logger.warning(f"The following headers are unmapped and are removed:\n{unmapped_headers}")
298            df.drop(columns=list(unmapped_headers), inplace=True)
299            df.rename(columns=mapping, inplace=True)
300        if metadata is not None:
301            for key, value in metadata.items():
302                df[key] = value
303
304        temporary_csv_file_path = f"/tmp/import_collection_csv.{uuid4()}.csv"
305        df.to_csv(temporary_csv_file_path, index=False)
306        verify_headers_uniformity(list(df.columns))
307
308        # Upload the file to the platform
309        file_api = FileApi(self._inner_api_client, self._log_level)
310        file = file_api.upload_file(temporary_csv_file_path).wait()
311
312        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
313
314        # Start the collection import, this starts a task, so we'll wait for that to be completed
315        import_collection_request = openapi_client.ImportCollectionRequest(
316            file_id=file.id,
317            organism=organism_from_file,
318            reference_database_id=str(reference_database_revision.reference_database_id) if reference_database_revision is not None else None,
319            reference_database_version=int(reference_database_revision.reference_database_version) if reference_database_revision is not None else None,
320        )
321
322        with ApiErrorContext():
323            import_collection_response = collection_api_instance.import_collection(import_collection_request)
324            assert import_collection_response.workflow_execution_id is not None
325
326            workflow_execution_id = WorkflowExecutionId(import_collection_response.workflow_execution_id)
327
328            def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> CollectionMetadata:
329                assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead"
330
331                get_collection_id_response = collection_api_instance.get_collection_id_by_workflow_execution_task_id(task_id)
332                assert get_collection_id_response.collection_id is not None
333
334                collection_id = CollectionId(get_collection_id_response.collection_id)
335
336                logger.success(f"Collection with ID `{collection_id}` was successfully imported")
337                # Remove the file from tmp folder
338                os.remove(temporary_csv_file_path)
339                # Remove the file from the platform
340                file_api.delete_file_by_id(file.id)
341
342                return self.get_collection_metadata_by_id(collection_id)
343
344            waitable = WorkflowExecutionTaskWaitable[CollectionMetadata](
345                workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_IMPORT, on_complete=on_complete
346            )
347
348            return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)
349
350    def create_collection_from_df(
351        self,
352        data_frame: pd.DataFrame,
353        reference_database_revision: ReferenceDatabaseRevision | None = None,
354    ) -> Execution[CollectionMetadata]:
355        """Import a collection from a DataFrame.
356
357        This is a convenience method to import a collection from a Pandas DataFrame. For more information about the
358        collection import, see enpi_api.l2.client.api.collection_api.CollectionApi.create_collection_from_csv.
359
360        Args:
361            data_frame (pd.DataFrame): The DataFrame containing the collection to import.
362            reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use.
363                If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one
364                reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references
365                available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter.
366                There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
367        Raises:
368            enpi_api.l2.types.api_error.ApiError: If API request fails.
369
370        Returns:
371            enpi_api.l2.types.execution.Execution[enpi_api.l2.types.collection.CollectionMetadata]: An awaitable that returns the
372              collection that was imported when awaited.
373
374        Example:
375
376            ```python
377            reference_name = ...
378            species = ...
379            reference = enpi_client.reference_database_api.get_revision_by_name(
380                name=reference_name,
381                species=reference_species,
382            )
383
384            with EnpiApiClient() as enpi_client:
385                df = pd.read_csv('/home/data.csv')
386                collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_df(
387                    data_frame=df,
388                    reference_database_revision=reference,
389                ).wait()
390            ```
391        """
392
393        # We need to turn the DataFrame into a CSV file
394        with tempfile.NamedTemporaryFile(suffix=".csv") as temp_file:
395            data_frame.to_csv(temp_file.name, index=False)
396
397            create_collection_execution = self.create_collection_from_csv(
398                file_path=temp_file.name,
399                reference_database_revision=reference_database_revision,
400            )
401
402        def wait() -> CollectionMetadata:
403            return create_collection_execution.wait()
404
405        return Execution(wait=wait, check_execution_state=create_collection_execution.check_execution_state)
406
407    def add_metadata(self, filter: Filter, annotation: import_metadata.Annotation) -> Execution[None]:
408        """Import metadata to annotate collections, clones or sequences in batches using a filter.
409
410        This method allows you to simply annotate collections, clones or sequences using a filter. The annotation values
411        that you provide will be applied to all matching items of the specified level.
412
413        If you would like to add different values based on different matched tags, have a look at the methods that
414        support a templated filter, such as `add_metadata_from_file` or `add_metadata_from_df`.
415
416        Args:
417            filter (enpi_api.l2.types.filter.Filter): The filter to narrow down which collections, clones or sequences you wish to annotate.
418              Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters.
419            annotation (enpi_api.l2.types.import_metadata.Annotation): The annotation to apply to the matched collections, clones or sequences. You
420              specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation,
421              enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation
422              are the preferred way of creating annotation configuration.
423
424        Returns:
425            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
426
427        Raises:
428            enpi_api.l2.types.api_error.ApiError: If API request fails.
429
430        Example:
431
432            Batch tag multiple collections with some tags:
433
434            ```python
435            with EnpiApiClient() as enpi_client:
436                collection_ids = [CollectionId(1), CollectionId(2), CollectionId(3)]
437
438                # Create a filter
439                filter = enpi_client.filter_api.create_filter(
440                    name="My filter",
441                    condition=dict(
442                        type="match_ids",
443                        target="collection",
444                        ids=collection_ids,
445                    ),
446                )
447
448                # Create an annotation
449                annotation = collection_annotation(tags=[
450                    Tag(id=CollectionTags.CampaignId, value="My campaign"),
451                    Tag(id=CollectionTags.ProjectId, value="My project"),
452                ])
453
454                # Add the metadata
455                enpi_client.collection_api.add_metadata(filter=filter, annotation=annotation).wait()
456            ```
457        """
458
459        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
460
461        import_metadata_request = openapi_client.ImportMetadataRequest(
462            openapi_client.SearchAndTag(
463                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
464                annotation=annotation.to_api_payload(),
465            )
466        )
467
468        with ApiErrorContext():
469            import_metadata_response = collection_api_instance.import_metadata(import_metadata_request)
470            assert import_metadata_response.workflow_execution_id is not None
471
472            workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id)
473
474            waitable = WorkflowExecutionTaskWaitable[CollectionMetadata](
475                workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT
476            )
477
478            return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state)
479
480    def add_metadata_from_file(
481        self,
482        filter: TemplatedFilter,
483        annotation: import_metadata_templated.Annotation,
484        file_path: str | Path,
485    ) -> Execution[None]:
486        """Import metadata from a CSV or XLSX file to annotate collections, clones or sequences.
487
488        Args:
489            filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate.
490              Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters.
491            annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You
492              specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation,
493              enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation
494              are the preferred way of creating annotation configuration.
495            file_path (str | Path): The path to the CSV or XLSX file to import.
496
497        Returns:
498            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
499
500        Raises:
501            enpi_api.l2.types.api_error.ApiError: If API request fails.
502
503        Example:
504
505            Assume you have a CSV file which describes 2 sequence tags to match on, and a tag to add to the matched sequences.
506
507            Let's call the match columns *match_chain* and *match_productive*, and the column to add *value_to_add*.
508            We'll add the value to a custom imaginary tag that was created before this example.
509
510            The CSV file would look like this:
511
512            | match_chain | match_productive | value_to_add |
513            |-------------|------------------|--------------|
514            | Heavy       | true             | Heavy and productive |
515            | Heavy       | false            | Heavy and unproductive |
516            | Kappa       | true             | Kappa and productive |
517            | Kappa       | false            | Kappa and unproductive |
518            | Lambda      | true             | Lambda and productive |
519            | Lambda      | false            | Lambda and unproductive |
520
521            We also want to narrow down the collections we want to annotate to a single collection with the imaginary ID *1337*.
522
523            ```python
524            my_collection_id: CollectionId = CollectionId(1337)
525
526            tag_id_chain: TagId = TagId(SequenceTags.Chain)
527            tag_id_productive: TagId = TagId(SequenceTags.Productive)
528            tag_id_value_to_add: TagId = TagId(52001)  # This is a custom tag
529
530            with EnpiApiClient() as enpi_client:
531                filter = enpi_client.filter_api.get_templated_filter_by_id(FilterId('92be003d-6f5c-447a-baac-c9d420783fc8'))
532                enpi_client.collection_api.add_metadata_from_file(
533                    filter=filter,
534                    annotation=sequence_annotation([
535                        template_tag(tag_id=tag_id_value_to_add, key="value_to_add"),
536                    ]),
537                    file_path="path/to/metadata.csv",
538                ).wait()
539            ```
540        """
541
542        # We need to upload the file to the platform
543        file_api = FileApi(self._inner_api_client, self._log_level)
544        file_execution = file_api.upload_file(file_path)
545
546        file = file_execution.wait()
547
548        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
549
550        # Start the metadata import, this starts a task, so we'll wait for that to be completed
551        import_metadata_request = openapi_client.ImportMetadataRequest(
552            openapi_client.TemplatedSearchAndTag(
553                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
554                annotation=annotation.to_api_payload(),
555                template_file_id=file.id,
556            )
557        )
558
559        with ApiErrorContext():
560            # The metadata import has not started yet because we first need to wait for the file upload
561            import_metadata_response = collection_api_instance.import_metadata(import_metadata_request)
562            assert import_metadata_response.workflow_execution_id is not None
563
564        workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id)
565
566        def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> None:
567            assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead"
568
569            nonlocal file
570            file_api.delete_file_by_id(file.id)
571
572        waitable = WorkflowExecutionTaskWaitable[None](
573            on_complete=on_complete, workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT_TEMPLATED
574        )
575
576        return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state)
577
578    def add_metadata_from_df(
579        self,
580        filter: TemplatedFilter,
581        annotation: import_metadata_templated.Annotation,
582        data_frame: pd.DataFrame,
583    ) -> Execution[None]:
584        """Import metadata from a DataFrame to annotate collections, clones or sequences.
585
586        This is a convenience method to import metadata from a Pandas DataFrame. For more information about the
587        metadata import, see the documentation for `import_metadata_from_csv`.
588
589        Args:
590            filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate.
591            annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You
592              specify a specific annotation target and the values to apply.
593            data_frame (pd.DataFrame): The DataFrame containing the templated metadata to import.
594
595        Returns:
596            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
597
598        Raises:
599            enpi_api.l2.types.api_error.ApiError: If API request fails.
600
601        Example:
602
603            Part of the `add_calculated_metadata.py` example script.
604
605            ```python
606            # Specify the filter query to match the sequences we want to add metadata to
607            metadata_filter = client.filter_api.create_templated_filter(
608                name="Metadata import filter",
609                shared=False,
610                condition=TemplatedAndOperator(
611                    conditions=[
612                        TemplatedMatchTag(tag_id=CollectionTags.Name),
613                        TemplatedMatchId(target=MatchIdTarget.SEQUENCE),
614                    ]
615                ),
616            )
617
618            # Specify the sequence-level annotation to add to the collection
619            metadata_annotation: Annotation = sequence_annotation([template_tag(new_tag_archetype.id)])
620
621            # Create metadata dataframe
622            metadata_frame = pd.DataFrame(
623                [
624                    [
625                        collection_name,  # Match
626                        df_row[1]["Unique Sequence ID"],  # Match
627                        grouped_df.loc[df_row[1]["Unique Clone ID"]]["Sequence Count"],  # Add
628                    ]
629                    for df_row in exported_df.iterrows()
630                ],
631                columns=["Name", "Unique Sequence ID", new_tag_archetype.key],
632            )
633
634            # Apply metadata to the collection
635            client.collection_api.add_metadata_from_df(
636                filter=metadata_filter,
637                annotation=metadata_annotation,
638                data_frame=metadata_frame,
639            ).wait()
640            ```
641        """
642
643        # We need to turn the DataFrame into a CSV file
644        temporary_csv_file_path = f"/tmp/import_metadata.{uuid4()}.csv"
645        data_frame.to_csv(temporary_csv_file_path, index=False)
646
647        return self.add_metadata_from_file(filter, annotation, temporary_csv_file_path)
648
649    def get_as_zip(
650        self,
651        collection_ids: list[CollectionId],
652        filter: Filter | None = None,
653        tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS,
654        output_directory: str | Path | None = None,
655    ) -> Execution[Path]:
656        """Export collection(s) into a zip file. Inside of the archive, each collection is exported to a separate TSV file.
657
658        Args:
659            collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
660            filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export.
661                If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used.
662            tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
663            output_directory (str | Path | None): The directory path under which file will get exported. If
664              not provided, a temporary directory will be used.
665
666        Returns:
667            enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the full path to the exported file when
668              awaited.
669
670        Raises:
671            enpi_api.l2.types.api_error.ApiError: If API request fails.
672
673        Example:
674
675            ```python
676            with EnpiApiClient() as enpi_client:
677
678                collection_id = CollectionId(1234)
679
680                # Example assumes you have a filter
681                collection_filter: Filter = ...
682
683                path: str = enpi_client.collection_api.get_as_tsv(
684                    collection_ids=[collection_id],
685                    filter=collection_filter,
686                    tag_ids=[
687                        CollectionTags.Name,
688                        CollectionTags.Organism,
689                        CollectionTags.Complexity,
690                        CollectionTags.Receptor,
691                        SequenceTags.Chain,
692                        SequenceTags.Productive,
693                    ],
694                    output_directory="example/export_result/"
695                )
696            ```
697        """
698
699        # Create the collectiom filter if it wasn't provided, it will match and
700        # get all the clones from target collections
701        if filter is None:
702            filter_api = FilterApi(self._inner_api_client, self._log_level)
703            filter = filter_api.create_filter(
704                name=f"all-collection-clones-filter-{uuid4()}",  # Unique name to avoid collision
705                condition=MatchIds(
706                    target=MatchIdTarget.COLLECTION,
707                    ids=collection_ids,  # Match all collection IDs passed to this function
708                ),
709            )
710
711        # Start the collection export, this starts a task, so we'll wait for that to be completed
712        export_collection_request = openapi_client.ExportRequest(
713            payload=openapi_client.ExportPayload(
714                collection_ids=[int(collection_id) for collection_id in collection_ids],
715                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
716                tag_ids=[int(tag_id) for tag_id in tag_ids],
717            )
718        )
719        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
720
721        with ApiErrorContext():
722            export_collection_response = collection_api_instance.export(export_collection_request)
723            assert export_collection_response.workflow_execution_id is not None
724
725            workflow_execution_id = WorkflowExecutionId(export_collection_response.workflow_execution_id)
726
727            def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> Path:
728                file_api = FileApi(self._inner_api_client, self._log_level)
729                file_path = file_api.download_export_by_workflow_execution_task_id(task_id=task_id, output_directory=output_directory)
730
731                logger.success("Collection(s) export has succeeded.")
732                return file_path
733
734            waitable = WorkflowExecutionTaskWaitable[Path](
735                workflow_execution_id=workflow_execution_id,
736                on_complete=on_complete,
737                task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_EXPORT,
738            )
739
740            return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)
741
742    def get_as_df(
743        self,
744        collection_ids: list[CollectionId],
745        filter: Filter | None = None,
746        tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS,
747    ) -> Execution[pd.DataFrame]:
748        """Export collection(s) to a Pandas DataFrame.
749
750        Args:
751            collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
752            filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export.
753                If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used.
754            tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
755
756        Returns:
757            Execution[pd.DataFrame]: An awaitable that will return a DataFrame with the exported collection.
758
759        Raises:
760            enpi_api.l2.types.api_error.ApiError: If API request fails.
761
762        Example:
763
764            ```python
765            with EnpiApiClient() as enpi_client:
766                # Example assumes you have a filter
767                filter: Filter = ...
768
769                df: pd.DataFrame = enpi_client.collection_api.get_as_df(
770                    collection_ids=[CollectionId(1)],
771                    filter=filter,
772                    tag_ids=[CollectionTags.Name, CloneTags.TenXBarcode, SequenceTags.CDR3AminoAcids],
773                )
774            ```
775        """
776        tmp_dir = tempfile.TemporaryDirectory()
777        get_as_zip_execution = self.get_as_zip(collection_ids=collection_ids, filter=filter, tag_ids=tag_ids, output_directory=tmp_dir.name)
778
779        def wait() -> pd.DataFrame:
780            zip_path = get_as_zip_execution.wait()
781
782            # Extract all TSV files from the ZIP archive
783            with ZipFile(zip_path, "r") as zip_ref:
784                zip_ref.extractall(tmp_dir.name)
785
786            # Read all TSV files into a single DataFrame
787            all_dfs = []
788            for root, _, files in os.walk(tmp_dir.name):
789                for file in files:
790                    if file.endswith(".tsv"):
791                        file_path = os.path.join(root, file)
792                        df = pd.read_csv(file_path, delimiter="\t")
793                        all_dfs.append(df)
794
795            return pd.concat(all_dfs)
796
797        return Execution(wait=wait, check_execution_state=get_as_zip_execution.check_execution_state)
DEFAULT_EXPORT_TAG_IDS = [2035, 2040, 2083, 2084, 2036, 2060, 1001, 1077, 38, 144, 24, 154, 110]
class CollectionApi:
 70class CollectionApi:
 71    _inner_api_client: openapi_client.ApiClient
 72    _log_level: LogLevel
 73
 74    def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel):
 75        """@private"""
 76        self._inner_api_client = inner_api_client
 77        self._log_level = log_level
 78
 79    def get_collections_metadata(self, name: str | None = None) -> Generator[CollectionMetadata, None, None]:
 80        """Get a generator through all available collections in the platform.
 81
 82        Args:
 83            name (str | None): Optional collection name for search by case-insensitive substring matching
 84
 85        Returns:
 86            Generator[enpi_api.l2.types.collection.CollectionMetadata, None, None]: A generator through all collections in the platform.
 87
 88        Raises:
 89            enpi_api.l2.types.api_error.ApiError: If API request fails.
 90
 91        Example:
 92
 93            ```python
 94            with EnpiApiClient() as enpi_client:
 95                for collection in enpi_client.collection_api.get_collections_metadata():
 96                    print(collection)
 97            ```
 98        """
 99
100        logger.info("Getting a generator through all collections")
101
102        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
103
104        # Fetch the first page, there is always a first page, it may be empty
105        try:
106            get_collections_response = collection_api_instance.get_collections(name=name)
107        except openapi_client.ApiException as e:
108            raise ApiError(e)
109
110        # `collections` and `cursor` get overwritten in the loop below when fetching a new page
111        collections = get_collections_response.collections
112        cursor = get_collections_response.cursor
113
114        while True:
115            for collection in collections:
116                yield CollectionMetadata.from_raw(collection)
117
118            # Check if we need to fetch a next page
119            if cursor is None:
120                logger.trace("No more pages of collections")
121                return  # No more pages
122
123            # We have a cursor, so we need to get a next page
124            logger.trace("Fetching next page of collections")
125            try:
126                get_collections_response = collection_api_instance.get_collections(
127                    cursor=cursor,
128                    name=name if name is not None else None,
129                )
130            except openapi_client.ApiException as e:
131                raise ApiError(e)
132            collections = get_collections_response.collections
133            cursor = get_collections_response.cursor
134
135    def get_collection_metadata_by_id(self, collection_id: CollectionId) -> CollectionMetadata:
136        """Get a single collection by its ID.
137
138        Args:
139            collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to get.
140
141        Returns:
142            enpi_api.l2.types.collection.CollectionMetadata: The collection, with all its metadata. This object does not contain
143              the collection's clones or sequences, only the metadata. For collection's clone and sequence data refer
144              to enpi_api.l2.client.api.collection_api.CollectionApi.get_as_zip and enpi_api.l2.client.api.collection_api.CollectionApi.get_as_df.
145
146        Raises:
147            enpi_api.l2.types.api_error.ApiError: If API request fails.
148
149        Example:
150
151            ```python
152            with EnpiApiClient() as enpi_client:
153                collection: Collection = enpi_client.collection_api.get_collection_metadata_by_id(collection_id=CollectionId(1234))
154            ```
155        """
156
157        logger.info(f"Getting collection with ID `{collection_id}`")
158
159        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
160
161        try:
162            get_collection_response = collection_api_instance.get_collection(collection_id)
163        except openapi_client.ApiException as e:
164            raise ApiError(e)
165
166        collection = CollectionMetadata.from_raw(get_collection_response.collection)
167
168        return collection
169
170    def delete_collection_by_id(self, collection_id: CollectionId) -> None:
171        """Delete a single collection by its ID.
172
173        This will remove the collection from the ENPICOM Platform.
174
175        Args:
176            collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to delete.
177
178        Raises:
179            enpi_api.l2.types.api_error.ApiError: If API request fails.
180
181        Example:
182
183            ```python
184            with EnpiApiClient() as enpi_client:
185                enpi_client.collection_api.delete_collection_by_id(collection_id=CollectionId(1234))
186            ```
187        """
188
189        logger.info(f"Deleting collection with ID `{collection_id}`")
190
191        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
192
193        try:
194            collection_api_instance.delete_collection(id=collection_id)
195        except openapi_client.ApiException as e:
196            raise ApiError(e)
197
198        logger.info(f"Collection with ID `{collection_id}` successfully deleted")
199
200    def create_collection_from_csv(
201        self,
202        file_path: str | Path,
203        reference_database_revision: ReferenceDatabaseRevision | None = None,
204        skiprows: int = 0,
205        mapping: Mapping[str, TagKey] | Mapping[str, TagId] | None = None,
206        metadata: AdditionalImportMetadata | None = None,
207        organism: str | None = None,
208    ) -> Execution[CollectionMetadata]:
209        """Import a collection from a CSV file (can be gzipped).
210
211        The file should be a CSV file with a couple of required headers. These headers must
212        either be the tag IDs (for example: 2035, 2040) or tag keys (for example: Name, Organism).
213        The following tags are required:
214
215            - enpi_api.l2.tags.CollectionTags.Name
216            - enpi_api.l2.tags.CollectionTags.Organism
217            - enpi_api.l2.tags.SequenceTags.SequenceCount
218            - enpi_api.l2.tags.SequenceTags.CDR3Nucleotides or enpi_api.l2.tags.SequenceTags.CDR3AminoAcids
219            - enpi_api.l2.tags.SequenceTags.ReceptorNucleotides or enpi_api.l2.tags.SequenceTags.FullSequenceNucleotides
220            - enpi_api.l2.tags.SequenceTags.VCall
221            - enpi_api.l2.tags.SequenceTags.JCall
222
223        Args:
224            file_path (str | Path): The path to the CSV file to import.
225            reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use.
226                If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one
227                reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references
228                available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter.
229                There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
230            skiprows (int): Number of rows to skip at the beginning of the file, before reading the headers. Defaults to 0.
231            mapping (Mapping[str, enpi_api.l2.types.tag.TagKey] | Mapping[str, enpi_api.l2.types.tag.TagId] | None): Mapping of the
232              CSV headers to ENPICOM Platform tag keys
233            metadata (enpi_api.l2.types.collection.AdditionalImportMetadata | None): Additional metadata to add to the collection.
234                <u>**If the metadata keys overlap with the keys in the CSV (or with the values of the mapping), the metadata will take
235                precedence when creating tags.**</u>
236            organism: (str | None): If passed, it's compared with the organism value found in the first line of the imported file and
237                throws an error if the values are different. Can serve as a quick utility check.
238
239        Returns:
240            enpi_api.l2.types.collection.CollectionMetadata: Metadata of the collection that was imported.
241
242        Raises:
243            KeyError: If 'Organism' column is not found in the imported df/csv.
244            ValueError: If optional `organism` param value differs from the 'Organism' value from the df/csv.
245            enpi_api.l2.types.api_error.ApiError: If API request fails.
246
247        Example:
248
249            ```python
250            with EnpiApiClient() as enpi_client:
251                reference_name = ...
252                species = ...
253                reference = enpi_client.reference_database_api.get_revision_by_name(
254                    name=reference_name,
255                    species=reference_species,
256                )
257
258                collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_csv(
259                    file_path=import_file_path,
260                    reference_database_revision=reference,
261                    skiprows=1,
262                    mapping={
263                        "title": CollectionTags.Name,
264                        "species": CollectionTags.Organism,
265                    },
266                    metadata={
267                        CollectionTags.ProjectId: "Project 001",
268                    }
269                ).wait()
270                ```
271        """
272
273        logger.info(f"Importing collection from CSV file `{file_path}`")
274
275        # Pandas supports gzipped CSV
276        df = pd.read_csv(file_path, sep=",", skiprows=skiprows)
277
278        # Get the organism from the first line. All lines should hold the same value
279        organism_from_file = str(df.iloc[0].get("Organism", None))
280        if organism_from_file is None:
281            # If not found by tag key, try to access it via the tag ID
282            organism_from_file = str(df.iloc[0].get(CollectionTags.Organism, None))
283
284        # If it's still none, raise an error - it's a mandatory column anyways
285        if organism_from_file is None:
286            raise KeyError("A required 'Organism' column was not found in the imported file/df")
287
288        # If `organism` param was passed, compare the values
289        if (organism is not None) and (organism != organism_from_file):
290            raise ValueError(
291                f"Value of 'organism' param: {organism} differs from the organism found in file: {organism_from_file}",
292            )
293
294        # Map the headers in the CSV file to Tag Keys
295        if mapping is not None:
296            # We drop the columns for which no mapping is specified
297            unmapped_headers = set(df.columns).difference(set(mapping.keys()))
298            logger.warning(f"The following headers are unmapped and are removed:\n{unmapped_headers}")
299            df.drop(columns=list(unmapped_headers), inplace=True)
300            df.rename(columns=mapping, inplace=True)
301        if metadata is not None:
302            for key, value in metadata.items():
303                df[key] = value
304
305        temporary_csv_file_path = f"/tmp/import_collection_csv.{uuid4()}.csv"
306        df.to_csv(temporary_csv_file_path, index=False)
307        verify_headers_uniformity(list(df.columns))
308
309        # Upload the file to the platform
310        file_api = FileApi(self._inner_api_client, self._log_level)
311        file = file_api.upload_file(temporary_csv_file_path).wait()
312
313        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
314
315        # Start the collection import, this starts a task, so we'll wait for that to be completed
316        import_collection_request = openapi_client.ImportCollectionRequest(
317            file_id=file.id,
318            organism=organism_from_file,
319            reference_database_id=str(reference_database_revision.reference_database_id) if reference_database_revision is not None else None,
320            reference_database_version=int(reference_database_revision.reference_database_version) if reference_database_revision is not None else None,
321        )
322
323        with ApiErrorContext():
324            import_collection_response = collection_api_instance.import_collection(import_collection_request)
325            assert import_collection_response.workflow_execution_id is not None
326
327            workflow_execution_id = WorkflowExecutionId(import_collection_response.workflow_execution_id)
328
329            def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> CollectionMetadata:
330                assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead"
331
332                get_collection_id_response = collection_api_instance.get_collection_id_by_workflow_execution_task_id(task_id)
333                assert get_collection_id_response.collection_id is not None
334
335                collection_id = CollectionId(get_collection_id_response.collection_id)
336
337                logger.success(f"Collection with ID `{collection_id}` was successfully imported")
338                # Remove the file from tmp folder
339                os.remove(temporary_csv_file_path)
340                # Remove the file from the platform
341                file_api.delete_file_by_id(file.id)
342
343                return self.get_collection_metadata_by_id(collection_id)
344
345            waitable = WorkflowExecutionTaskWaitable[CollectionMetadata](
346                workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_IMPORT, on_complete=on_complete
347            )
348
349            return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)
350
351    def create_collection_from_df(
352        self,
353        data_frame: pd.DataFrame,
354        reference_database_revision: ReferenceDatabaseRevision | None = None,
355    ) -> Execution[CollectionMetadata]:
356        """Import a collection from a DataFrame.
357
358        This is a convenience method to import a collection from a Pandas DataFrame. For more information about the
359        collection import, see enpi_api.l2.client.api.collection_api.CollectionApi.create_collection_from_csv.
360
361        Args:
362            data_frame (pd.DataFrame): The DataFrame containing the collection to import.
363            reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use.
364                If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one
365                reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references
366                available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter.
367                There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
368        Raises:
369            enpi_api.l2.types.api_error.ApiError: If API request fails.
370
371        Returns:
372            enpi_api.l2.types.execution.Execution[enpi_api.l2.types.collection.CollectionMetadata]: An awaitable that returns the
373              collection that was imported when awaited.
374
375        Example:
376
377            ```python
378            reference_name = ...
379            species = ...
380            reference = enpi_client.reference_database_api.get_revision_by_name(
381                name=reference_name,
382                species=reference_species,
383            )
384
385            with EnpiApiClient() as enpi_client:
386                df = pd.read_csv('/home/data.csv')
387                collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_df(
388                    data_frame=df,
389                    reference_database_revision=reference,
390                ).wait()
391            ```
392        """
393
394        # We need to turn the DataFrame into a CSV file
395        with tempfile.NamedTemporaryFile(suffix=".csv") as temp_file:
396            data_frame.to_csv(temp_file.name, index=False)
397
398            create_collection_execution = self.create_collection_from_csv(
399                file_path=temp_file.name,
400                reference_database_revision=reference_database_revision,
401            )
402
403        def wait() -> CollectionMetadata:
404            return create_collection_execution.wait()
405
406        return Execution(wait=wait, check_execution_state=create_collection_execution.check_execution_state)
407
408    def add_metadata(self, filter: Filter, annotation: import_metadata.Annotation) -> Execution[None]:
409        """Import metadata to annotate collections, clones or sequences in batches using a filter.
410
411        This method allows you to simply annotate collections, clones or sequences using a filter. The annotation values
412        that you provide will be applied to all matching items of the specified level.
413
414        If you would like to add different values based on different matched tags, have a look at the methods that
415        support a templated filter, such as `add_metadata_from_file` or `add_metadata_from_df`.
416
417        Args:
418            filter (enpi_api.l2.types.filter.Filter): The filter to narrow down which collections, clones or sequences you wish to annotate.
419              Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters.
420            annotation (enpi_api.l2.types.import_metadata.Annotation): The annotation to apply to the matched collections, clones or sequences. You
421              specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation,
422              enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation
423              are the preferred way of creating annotation configuration.
424
425        Returns:
426            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
427
428        Raises:
429            enpi_api.l2.types.api_error.ApiError: If API request fails.
430
431        Example:
432
433            Batch tag multiple collections with some tags:
434
435            ```python
436            with EnpiApiClient() as enpi_client:
437                collection_ids = [CollectionId(1), CollectionId(2), CollectionId(3)]
438
439                # Create a filter
440                filter = enpi_client.filter_api.create_filter(
441                    name="My filter",
442                    condition=dict(
443                        type="match_ids",
444                        target="collection",
445                        ids=collection_ids,
446                    ),
447                )
448
449                # Create an annotation
450                annotation = collection_annotation(tags=[
451                    Tag(id=CollectionTags.CampaignId, value="My campaign"),
452                    Tag(id=CollectionTags.ProjectId, value="My project"),
453                ])
454
455                # Add the metadata
456                enpi_client.collection_api.add_metadata(filter=filter, annotation=annotation).wait()
457            ```
458        """
459
460        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
461
462        import_metadata_request = openapi_client.ImportMetadataRequest(
463            openapi_client.SearchAndTag(
464                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
465                annotation=annotation.to_api_payload(),
466            )
467        )
468
469        with ApiErrorContext():
470            import_metadata_response = collection_api_instance.import_metadata(import_metadata_request)
471            assert import_metadata_response.workflow_execution_id is not None
472
473            workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id)
474
475            waitable = WorkflowExecutionTaskWaitable[CollectionMetadata](
476                workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT
477            )
478
479            return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state)
480
481    def add_metadata_from_file(
482        self,
483        filter: TemplatedFilter,
484        annotation: import_metadata_templated.Annotation,
485        file_path: str | Path,
486    ) -> Execution[None]:
487        """Import metadata from a CSV or XLSX file to annotate collections, clones or sequences.
488
489        Args:
490            filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate.
491              Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters.
492            annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You
493              specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation,
494              enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation
495              are the preferred way of creating annotation configuration.
496            file_path (str | Path): The path to the CSV or XLSX file to import.
497
498        Returns:
499            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
500
501        Raises:
502            enpi_api.l2.types.api_error.ApiError: If API request fails.
503
504        Example:
505
506            Assume you have a CSV file which describes 2 sequence tags to match on, and a tag to add to the matched sequences.
507
508            Let's call the match columns *match_chain* and *match_productive*, and the column to add *value_to_add*.
509            We'll add the value to a custom imaginary tag that was created before this example.
510
511            The CSV file would look like this:
512
513            | match_chain | match_productive | value_to_add |
514            |-------------|------------------|--------------|
515            | Heavy       | true             | Heavy and productive |
516            | Heavy       | false            | Heavy and unproductive |
517            | Kappa       | true             | Kappa and productive |
518            | Kappa       | false            | Kappa and unproductive |
519            | Lambda      | true             | Lambda and productive |
520            | Lambda      | false            | Lambda and unproductive |
521
522            We also want to narrow down the collections we want to annotate to a single collection with the imaginary ID *1337*.
523
524            ```python
525            my_collection_id: CollectionId = CollectionId(1337)
526
527            tag_id_chain: TagId = TagId(SequenceTags.Chain)
528            tag_id_productive: TagId = TagId(SequenceTags.Productive)
529            tag_id_value_to_add: TagId = TagId(52001)  # This is a custom tag
530
531            with EnpiApiClient() as enpi_client:
532                filter = enpi_client.filter_api.get_templated_filter_by_id(FilterId('92be003d-6f5c-447a-baac-c9d420783fc8'))
533                enpi_client.collection_api.add_metadata_from_file(
534                    filter=filter,
535                    annotation=sequence_annotation([
536                        template_tag(tag_id=tag_id_value_to_add, key="value_to_add"),
537                    ]),
538                    file_path="path/to/metadata.csv",
539                ).wait()
540            ```
541        """
542
543        # We need to upload the file to the platform
544        file_api = FileApi(self._inner_api_client, self._log_level)
545        file_execution = file_api.upload_file(file_path)
546
547        file = file_execution.wait()
548
549        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
550
551        # Start the metadata import, this starts a task, so we'll wait for that to be completed
552        import_metadata_request = openapi_client.ImportMetadataRequest(
553            openapi_client.TemplatedSearchAndTag(
554                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
555                annotation=annotation.to_api_payload(),
556                template_file_id=file.id,
557            )
558        )
559
560        with ApiErrorContext():
561            # The metadata import has not started yet because we first need to wait for the file upload
562            import_metadata_response = collection_api_instance.import_metadata(import_metadata_request)
563            assert import_metadata_response.workflow_execution_id is not None
564
565        workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id)
566
567        def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> None:
568            assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead"
569
570            nonlocal file
571            file_api.delete_file_by_id(file.id)
572
573        waitable = WorkflowExecutionTaskWaitable[None](
574            on_complete=on_complete, workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT_TEMPLATED
575        )
576
577        return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state)
578
579    def add_metadata_from_df(
580        self,
581        filter: TemplatedFilter,
582        annotation: import_metadata_templated.Annotation,
583        data_frame: pd.DataFrame,
584    ) -> Execution[None]:
585        """Import metadata from a DataFrame to annotate collections, clones or sequences.
586
587        This is a convenience method to import metadata from a Pandas DataFrame. For more information about the
588        metadata import, see the documentation for `import_metadata_from_csv`.
589
590        Args:
591            filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate.
592            annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You
593              specify a specific annotation target and the values to apply.
594            data_frame (pd.DataFrame): The DataFrame containing the templated metadata to import.
595
596        Returns:
597            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
598
599        Raises:
600            enpi_api.l2.types.api_error.ApiError: If API request fails.
601
602        Example:
603
604            Part of the `add_calculated_metadata.py` example script.
605
606            ```python
607            # Specify the filter query to match the sequences we want to add metadata to
608            metadata_filter = client.filter_api.create_templated_filter(
609                name="Metadata import filter",
610                shared=False,
611                condition=TemplatedAndOperator(
612                    conditions=[
613                        TemplatedMatchTag(tag_id=CollectionTags.Name),
614                        TemplatedMatchId(target=MatchIdTarget.SEQUENCE),
615                    ]
616                ),
617            )
618
619            # Specify the sequence-level annotation to add to the collection
620            metadata_annotation: Annotation = sequence_annotation([template_tag(new_tag_archetype.id)])
621
622            # Create metadata dataframe
623            metadata_frame = pd.DataFrame(
624                [
625                    [
626                        collection_name,  # Match
627                        df_row[1]["Unique Sequence ID"],  # Match
628                        grouped_df.loc[df_row[1]["Unique Clone ID"]]["Sequence Count"],  # Add
629                    ]
630                    for df_row in exported_df.iterrows()
631                ],
632                columns=["Name", "Unique Sequence ID", new_tag_archetype.key],
633            )
634
635            # Apply metadata to the collection
636            client.collection_api.add_metadata_from_df(
637                filter=metadata_filter,
638                annotation=metadata_annotation,
639                data_frame=metadata_frame,
640            ).wait()
641            ```
642        """
643
644        # We need to turn the DataFrame into a CSV file
645        temporary_csv_file_path = f"/tmp/import_metadata.{uuid4()}.csv"
646        data_frame.to_csv(temporary_csv_file_path, index=False)
647
648        return self.add_metadata_from_file(filter, annotation, temporary_csv_file_path)
649
650    def get_as_zip(
651        self,
652        collection_ids: list[CollectionId],
653        filter: Filter | None = None,
654        tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS,
655        output_directory: str | Path | None = None,
656    ) -> Execution[Path]:
657        """Export collection(s) into a zip file. Inside of the archive, each collection is exported to a separate TSV file.
658
659        Args:
660            collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
661            filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export.
662                If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used.
663            tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
664            output_directory (str | Path | None): The directory path under which file will get exported. If
665              not provided, a temporary directory will be used.
666
667        Returns:
668            enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the full path to the exported file when
669              awaited.
670
671        Raises:
672            enpi_api.l2.types.api_error.ApiError: If API request fails.
673
674        Example:
675
676            ```python
677            with EnpiApiClient() as enpi_client:
678
679                collection_id = CollectionId(1234)
680
681                # Example assumes you have a filter
682                collection_filter: Filter = ...
683
684                path: str = enpi_client.collection_api.get_as_tsv(
685                    collection_ids=[collection_id],
686                    filter=collection_filter,
687                    tag_ids=[
688                        CollectionTags.Name,
689                        CollectionTags.Organism,
690                        CollectionTags.Complexity,
691                        CollectionTags.Receptor,
692                        SequenceTags.Chain,
693                        SequenceTags.Productive,
694                    ],
695                    output_directory="example/export_result/"
696                )
697            ```
698        """
699
700        # Create the collectiom filter if it wasn't provided, it will match and
701        # get all the clones from target collections
702        if filter is None:
703            filter_api = FilterApi(self._inner_api_client, self._log_level)
704            filter = filter_api.create_filter(
705                name=f"all-collection-clones-filter-{uuid4()}",  # Unique name to avoid collision
706                condition=MatchIds(
707                    target=MatchIdTarget.COLLECTION,
708                    ids=collection_ids,  # Match all collection IDs passed to this function
709                ),
710            )
711
712        # Start the collection export, this starts a task, so we'll wait for that to be completed
713        export_collection_request = openapi_client.ExportRequest(
714            payload=openapi_client.ExportPayload(
715                collection_ids=[int(collection_id) for collection_id in collection_ids],
716                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
717                tag_ids=[int(tag_id) for tag_id in tag_ids],
718            )
719        )
720        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
721
722        with ApiErrorContext():
723            export_collection_response = collection_api_instance.export(export_collection_request)
724            assert export_collection_response.workflow_execution_id is not None
725
726            workflow_execution_id = WorkflowExecutionId(export_collection_response.workflow_execution_id)
727
728            def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> Path:
729                file_api = FileApi(self._inner_api_client, self._log_level)
730                file_path = file_api.download_export_by_workflow_execution_task_id(task_id=task_id, output_directory=output_directory)
731
732                logger.success("Collection(s) export has succeeded.")
733                return file_path
734
735            waitable = WorkflowExecutionTaskWaitable[Path](
736                workflow_execution_id=workflow_execution_id,
737                on_complete=on_complete,
738                task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_EXPORT,
739            )
740
741            return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)
742
743    def get_as_df(
744        self,
745        collection_ids: list[CollectionId],
746        filter: Filter | None = None,
747        tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS,
748    ) -> Execution[pd.DataFrame]:
749        """Export collection(s) to a Pandas DataFrame.
750
751        Args:
752            collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
753            filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export.
754                If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used.
755            tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
756
757        Returns:
758            Execution[pd.DataFrame]: An awaitable that will return a DataFrame with the exported collection.
759
760        Raises:
761            enpi_api.l2.types.api_error.ApiError: If API request fails.
762
763        Example:
764
765            ```python
766            with EnpiApiClient() as enpi_client:
767                # Example assumes you have a filter
768                filter: Filter = ...
769
770                df: pd.DataFrame = enpi_client.collection_api.get_as_df(
771                    collection_ids=[CollectionId(1)],
772                    filter=filter,
773                    tag_ids=[CollectionTags.Name, CloneTags.TenXBarcode, SequenceTags.CDR3AminoAcids],
774                )
775            ```
776        """
777        tmp_dir = tempfile.TemporaryDirectory()
778        get_as_zip_execution = self.get_as_zip(collection_ids=collection_ids, filter=filter, tag_ids=tag_ids, output_directory=tmp_dir.name)
779
780        def wait() -> pd.DataFrame:
781            zip_path = get_as_zip_execution.wait()
782
783            # Extract all TSV files from the ZIP archive
784            with ZipFile(zip_path, "r") as zip_ref:
785                zip_ref.extractall(tmp_dir.name)
786
787            # Read all TSV files into a single DataFrame
788            all_dfs = []
789            for root, _, files in os.walk(tmp_dir.name):
790                for file in files:
791                    if file.endswith(".tsv"):
792                        file_path = os.path.join(root, file)
793                        df = pd.read_csv(file_path, delimiter="\t")
794                        all_dfs.append(df)
795
796            return pd.concat(all_dfs)
797
798        return Execution(wait=wait, check_execution_state=get_as_zip_execution.check_execution_state)
def get_collections_metadata( self, name: str | None = None) -> Generator[enpi_api.l2.types.collection.CollectionMetadata, NoneType, NoneType]:
 79    def get_collections_metadata(self, name: str | None = None) -> Generator[CollectionMetadata, None, None]:
 80        """Get a generator through all available collections in the platform.
 81
 82        Args:
 83            name (str | None): Optional collection name for search by case-insensitive substring matching
 84
 85        Returns:
 86            Generator[enpi_api.l2.types.collection.CollectionMetadata, None, None]: A generator through all collections in the platform.
 87
 88        Raises:
 89            enpi_api.l2.types.api_error.ApiError: If API request fails.
 90
 91        Example:
 92
 93            ```python
 94            with EnpiApiClient() as enpi_client:
 95                for collection in enpi_client.collection_api.get_collections_metadata():
 96                    print(collection)
 97            ```
 98        """
 99
100        logger.info("Getting a generator through all collections")
101
102        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
103
104        # Fetch the first page, there is always a first page, it may be empty
105        try:
106            get_collections_response = collection_api_instance.get_collections(name=name)
107        except openapi_client.ApiException as e:
108            raise ApiError(e)
109
110        # `collections` and `cursor` get overwritten in the loop below when fetching a new page
111        collections = get_collections_response.collections
112        cursor = get_collections_response.cursor
113
114        while True:
115            for collection in collections:
116                yield CollectionMetadata.from_raw(collection)
117
118            # Check if we need to fetch a next page
119            if cursor is None:
120                logger.trace("No more pages of collections")
121                return  # No more pages
122
123            # We have a cursor, so we need to get a next page
124            logger.trace("Fetching next page of collections")
125            try:
126                get_collections_response = collection_api_instance.get_collections(
127                    cursor=cursor,
128                    name=name if name is not None else None,
129                )
130            except openapi_client.ApiException as e:
131                raise ApiError(e)
132            collections = get_collections_response.collections
133            cursor = get_collections_response.cursor

Get a generator through all available collections in the platform.

Arguments:
  • name (str | None): Optional collection name for search by case-insensitive substring matching
Returns:

Generator[enpi_api.l2.types.collection.CollectionMetadata, None, None]: A generator through all collections in the platform.

Raises:
Example:
with EnpiApiClient() as enpi_client:
    for collection in enpi_client.collection_api.get_collections_metadata():
        print(collection)
def get_collection_metadata_by_id( self, collection_id: enpi_api.l2.types.collection.CollectionId) -> enpi_api.l2.types.collection.CollectionMetadata:
135    def get_collection_metadata_by_id(self, collection_id: CollectionId) -> CollectionMetadata:
136        """Get a single collection by its ID.
137
138        Args:
139            collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to get.
140
141        Returns:
142            enpi_api.l2.types.collection.CollectionMetadata: The collection, with all its metadata. This object does not contain
143              the collection's clones or sequences, only the metadata. For collection's clone and sequence data refer
144              to enpi_api.l2.client.api.collection_api.CollectionApi.get_as_zip and enpi_api.l2.client.api.collection_api.CollectionApi.get_as_df.
145
146        Raises:
147            enpi_api.l2.types.api_error.ApiError: If API request fails.
148
149        Example:
150
151            ```python
152            with EnpiApiClient() as enpi_client:
153                collection: Collection = enpi_client.collection_api.get_collection_metadata_by_id(collection_id=CollectionId(1234))
154            ```
155        """
156
157        logger.info(f"Getting collection with ID `{collection_id}`")
158
159        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
160
161        try:
162            get_collection_response = collection_api_instance.get_collection(collection_id)
163        except openapi_client.ApiException as e:
164            raise ApiError(e)
165
166        collection = CollectionMetadata.from_raw(get_collection_response.collection)
167
168        return collection

Get a single collection by its ID.

Arguments:
Returns:

enpi_api.l2.types.collection.CollectionMetadata: The collection, with all its metadata. This object does not contain the collection's clones or sequences, only the metadata. For collection's clone and sequence data refer to enpi_api.l2.client.api.collection_api.CollectionApi.get_as_zip and enpi_api.l2.client.api.collection_api.CollectionApi.get_as_df.

Raises:
Example:
with EnpiApiClient() as enpi_client:
    collection: Collection = enpi_client.collection_api.get_collection_metadata_by_id(collection_id=CollectionId(1234))
def delete_collection_by_id(self, collection_id: enpi_api.l2.types.collection.CollectionId) -> None:
170    def delete_collection_by_id(self, collection_id: CollectionId) -> None:
171        """Delete a single collection by its ID.
172
173        This will remove the collection from the ENPICOM Platform.
174
175        Args:
176            collection_id (enpi_api.l2.types.collection.CollectionId): The ID of the collection to delete.
177
178        Raises:
179            enpi_api.l2.types.api_error.ApiError: If API request fails.
180
181        Example:
182
183            ```python
184            with EnpiApiClient() as enpi_client:
185                enpi_client.collection_api.delete_collection_by_id(collection_id=CollectionId(1234))
186            ```
187        """
188
189        logger.info(f"Deleting collection with ID `{collection_id}`")
190
191        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
192
193        try:
194            collection_api_instance.delete_collection(id=collection_id)
195        except openapi_client.ApiException as e:
196            raise ApiError(e)
197
198        logger.info(f"Collection with ID `{collection_id}` successfully deleted")

Delete a single collection by its ID.

This will remove the collection from the ENPICOM Platform.

Arguments:
Raises:
Example:
with EnpiApiClient() as enpi_client:
    enpi_client.collection_api.delete_collection_by_id(collection_id=CollectionId(1234))
def create_collection_from_csv( self, file_path: str | pathlib.Path, reference_database_revision: enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None = None, skiprows: int = 0, mapping: Union[Mapping[str, str], Mapping[str, enpi_api.l2.types.tag.TagId], NoneType] = None, metadata: Union[Mapping[str, bool | int | float | str], Mapping[enpi_api.l2.types.tag.TagId, bool | int | float | str], NoneType] = None, organism: str | None = None) -> enpi_api.l2.types.execution.Execution[CollectionMetadata]:
200    def create_collection_from_csv(
201        self,
202        file_path: str | Path,
203        reference_database_revision: ReferenceDatabaseRevision | None = None,
204        skiprows: int = 0,
205        mapping: Mapping[str, TagKey] | Mapping[str, TagId] | None = None,
206        metadata: AdditionalImportMetadata | None = None,
207        organism: str | None = None,
208    ) -> Execution[CollectionMetadata]:
209        """Import a collection from a CSV file (can be gzipped).
210
211        The file should be a CSV file with a couple of required headers. These headers must
212        either be the tag IDs (for example: 2035, 2040) or tag keys (for example: Name, Organism).
213        The following tags are required:
214
215            - enpi_api.l2.tags.CollectionTags.Name
216            - enpi_api.l2.tags.CollectionTags.Organism
217            - enpi_api.l2.tags.SequenceTags.SequenceCount
218            - enpi_api.l2.tags.SequenceTags.CDR3Nucleotides or enpi_api.l2.tags.SequenceTags.CDR3AminoAcids
219            - enpi_api.l2.tags.SequenceTags.ReceptorNucleotides or enpi_api.l2.tags.SequenceTags.FullSequenceNucleotides
220            - enpi_api.l2.tags.SequenceTags.VCall
221            - enpi_api.l2.tags.SequenceTags.JCall
222
223        Args:
224            file_path (str | Path): The path to the CSV file to import.
225            reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use.
226                If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one
227                reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references
228                available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter.
229                There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
230            skiprows (int): Number of rows to skip at the beginning of the file, before reading the headers. Defaults to 0.
231            mapping (Mapping[str, enpi_api.l2.types.tag.TagKey] | Mapping[str, enpi_api.l2.types.tag.TagId] | None): Mapping of the
232              CSV headers to ENPICOM Platform tag keys
233            metadata (enpi_api.l2.types.collection.AdditionalImportMetadata | None): Additional metadata to add to the collection.
234                <u>**If the metadata keys overlap with the keys in the CSV (or with the values of the mapping), the metadata will take
235                precedence when creating tags.**</u>
236            organism: (str | None): If passed, it's compared with the organism value found in the first line of the imported file and
237                throws an error if the values are different. Can serve as a quick utility check.
238
239        Returns:
240            enpi_api.l2.types.collection.CollectionMetadata: Metadata of the collection that was imported.
241
242        Raises:
243            KeyError: If 'Organism' column is not found in the imported df/csv.
244            ValueError: If optional `organism` param value differs from the 'Organism' value from the df/csv.
245            enpi_api.l2.types.api_error.ApiError: If API request fails.
246
247        Example:
248
249            ```python
250            with EnpiApiClient() as enpi_client:
251                reference_name = ...
252                species = ...
253                reference = enpi_client.reference_database_api.get_revision_by_name(
254                    name=reference_name,
255                    species=reference_species,
256                )
257
258                collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_csv(
259                    file_path=import_file_path,
260                    reference_database_revision=reference,
261                    skiprows=1,
262                    mapping={
263                        "title": CollectionTags.Name,
264                        "species": CollectionTags.Organism,
265                    },
266                    metadata={
267                        CollectionTags.ProjectId: "Project 001",
268                    }
269                ).wait()
270                ```
271        """
272
273        logger.info(f"Importing collection from CSV file `{file_path}`")
274
275        # Pandas supports gzipped CSV
276        df = pd.read_csv(file_path, sep=",", skiprows=skiprows)
277
278        # Get the organism from the first line. All lines should hold the same value
279        organism_from_file = str(df.iloc[0].get("Organism", None))
280        if organism_from_file is None:
281            # If not found by tag key, try to access it via the tag ID
282            organism_from_file = str(df.iloc[0].get(CollectionTags.Organism, None))
283
284        # If it's still none, raise an error - it's a mandatory column anyways
285        if organism_from_file is None:
286            raise KeyError("A required 'Organism' column was not found in the imported file/df")
287
288        # If `organism` param was passed, compare the values
289        if (organism is not None) and (organism != organism_from_file):
290            raise ValueError(
291                f"Value of 'organism' param: {organism} differs from the organism found in file: {organism_from_file}",
292            )
293
294        # Map the headers in the CSV file to Tag Keys
295        if mapping is not None:
296            # We drop the columns for which no mapping is specified
297            unmapped_headers = set(df.columns).difference(set(mapping.keys()))
298            logger.warning(f"The following headers are unmapped and are removed:\n{unmapped_headers}")
299            df.drop(columns=list(unmapped_headers), inplace=True)
300            df.rename(columns=mapping, inplace=True)
301        if metadata is not None:
302            for key, value in metadata.items():
303                df[key] = value
304
305        temporary_csv_file_path = f"/tmp/import_collection_csv.{uuid4()}.csv"
306        df.to_csv(temporary_csv_file_path, index=False)
307        verify_headers_uniformity(list(df.columns))
308
309        # Upload the file to the platform
310        file_api = FileApi(self._inner_api_client, self._log_level)
311        file = file_api.upload_file(temporary_csv_file_path).wait()
312
313        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
314
315        # Start the collection import, this starts a task, so we'll wait for that to be completed
316        import_collection_request = openapi_client.ImportCollectionRequest(
317            file_id=file.id,
318            organism=organism_from_file,
319            reference_database_id=str(reference_database_revision.reference_database_id) if reference_database_revision is not None else None,
320            reference_database_version=int(reference_database_revision.reference_database_version) if reference_database_revision is not None else None,
321        )
322
323        with ApiErrorContext():
324            import_collection_response = collection_api_instance.import_collection(import_collection_request)
325            assert import_collection_response.workflow_execution_id is not None
326
327            workflow_execution_id = WorkflowExecutionId(import_collection_response.workflow_execution_id)
328
329            def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> CollectionMetadata:
330                assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead"
331
332                get_collection_id_response = collection_api_instance.get_collection_id_by_workflow_execution_task_id(task_id)
333                assert get_collection_id_response.collection_id is not None
334
335                collection_id = CollectionId(get_collection_id_response.collection_id)
336
337                logger.success(f"Collection with ID `{collection_id}` was successfully imported")
338                # Remove the file from tmp folder
339                os.remove(temporary_csv_file_path)
340                # Remove the file from the platform
341                file_api.delete_file_by_id(file.id)
342
343                return self.get_collection_metadata_by_id(collection_id)
344
345            waitable = WorkflowExecutionTaskWaitable[CollectionMetadata](
346                workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_IMPORT, on_complete=on_complete
347            )
348
349            return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)

Import a collection from a CSV file (can be gzipped).

The file should be a CSV file with a couple of required headers. These headers must either be the tag IDs (for example: 2035, 2040) or tag keys (for example: Name, Organism).

The following tags are required:
Arguments:
  • file_path (str | Path): The path to the CSV file to import.
  • reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
  • skiprows (int): Number of rows to skip at the beginning of the file, before reading the headers. Defaults to 0.
  • mapping (Mapping[str, enpi_api.l2.types.tag.TagKey] | Mapping[str, enpi_api.l2.types.tag.TagId] | None): Mapping of the CSV headers to ENPICOM Platform tag keys
  • metadata (enpi_api.l2.types.collection.AdditionalImportMetadata | None): Additional metadata to add to the collection. If the metadata keys overlap with the keys in the CSV (or with the values of the mapping), the metadata will take precedence when creating tags.
  • organism: (str | None): If passed, it's compared with the organism value found in the first line of the imported file and throws an error if the values are different. Can serve as a quick utility check.
Returns:

enpi_api.l2.types.collection.CollectionMetadata: Metadata of the collection that was imported.

Raises:
  • KeyError: If 'Organism' column is not found in the imported df/csv.
  • ValueError: If optional organism param value differs from the 'Organism' value from the df/csv.
  • enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client:
    reference_name = ...
    species = ...
    reference = enpi_client.reference_database_api.get_revision_by_name(
        name=reference_name,
        species=reference_species,
    )

    collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_csv(
        file_path=import_file_path,
        reference_database_revision=reference,
        skiprows=1,
        mapping={
            "title": CollectionTags.Name,
            "species": CollectionTags.Organism,
        },
        metadata={
            CollectionTags.ProjectId: "Project 001",
        }
    ).wait()
   
def create_collection_from_df( self, data_frame: pandas.core.frame.DataFrame, reference_database_revision: enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None = None) -> enpi_api.l2.types.execution.Execution[CollectionMetadata]:
351    def create_collection_from_df(
352        self,
353        data_frame: pd.DataFrame,
354        reference_database_revision: ReferenceDatabaseRevision | None = None,
355    ) -> Execution[CollectionMetadata]:
356        """Import a collection from a DataFrame.
357
358        This is a convenience method to import a collection from a Pandas DataFrame. For more information about the
359        collection import, see enpi_api.l2.client.api.collection_api.CollectionApi.create_collection_from_csv.
360
361        Args:
362            data_frame (pd.DataFrame): The DataFrame containing the collection to import.
363            reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use.
364                If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one
365                reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references
366                available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter.
367                There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
368        Raises:
369            enpi_api.l2.types.api_error.ApiError: If API request fails.
370
371        Returns:
372            enpi_api.l2.types.execution.Execution[enpi_api.l2.types.collection.CollectionMetadata]: An awaitable that returns the
373              collection that was imported when awaited.
374
375        Example:
376
377            ```python
378            reference_name = ...
379            species = ...
380            reference = enpi_client.reference_database_api.get_revision_by_name(
381                name=reference_name,
382                species=reference_species,
383            )
384
385            with EnpiApiClient() as enpi_client:
386                df = pd.read_csv('/home/data.csv')
387                collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_df(
388                    data_frame=df,
389                    reference_database_revision=reference,
390                ).wait()
391            ```
392        """
393
394        # We need to turn the DataFrame into a CSV file
395        with tempfile.NamedTemporaryFile(suffix=".csv") as temp_file:
396            data_frame.to_csv(temp_file.name, index=False)
397
398            create_collection_execution = self.create_collection_from_csv(
399                file_path=temp_file.name,
400                reference_database_revision=reference_database_revision,
401            )
402
403        def wait() -> CollectionMetadata:
404            return create_collection_execution.wait()
405
406        return Execution(wait=wait, check_execution_state=create_collection_execution.check_execution_state)

Import a collection from a DataFrame.

This is a convenience method to import a collection from a Pandas DataFrame. For more information about the collection import, see enpi_api.l2.client.api.collection_api.CollectionApi.create_collection_from_csv.

Arguments:
  • data_frame (pd.DataFrame): The DataFrame containing the collection to import.
  • reference_database_revision (enpi_api.l2.types.reference_database.ReferenceDatabaseRevision | None): The reference database revision to use. If this is not provided, ENPICOM will check the references available for the organism specified in the imported file. If there's only one reference available, it will be picked for the import and the task will continue. If there's none or there's multiple references available, an error will be returned - in such case reference has to be picked manually by passing it to this parameter. There is no downsides to always specifying the reference manually, which is a safer and less error-prone option.
Raises:
Returns:

enpi_api.l2.types.execution.Execution[enpi_api.l2.types.collection.CollectionMetadata]: An awaitable that returns the collection that was imported when awaited.

Example:
reference_name = ...
species = ...
reference = enpi_client.reference_database_api.get_revision_by_name(
    name=reference_name,
    species=reference_species,
)

with EnpiApiClient() as enpi_client:
    df = pd.read_csv('/home/data.csv')
    collection: CollectionMetadata = enpi_client.collection_api.create_collection_from_df(
        data_frame=df,
        reference_database_revision=reference,
    ).wait()
408    def add_metadata(self, filter: Filter, annotation: import_metadata.Annotation) -> Execution[None]:
409        """Import metadata to annotate collections, clones or sequences in batches using a filter.
410
411        This method allows you to simply annotate collections, clones or sequences using a filter. The annotation values
412        that you provide will be applied to all matching items of the specified level.
413
414        If you would like to add different values based on different matched tags, have a look at the methods that
415        support a templated filter, such as `add_metadata_from_file` or `add_metadata_from_df`.
416
417        Args:
418            filter (enpi_api.l2.types.filter.Filter): The filter to narrow down which collections, clones or sequences you wish to annotate.
419              Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters.
420            annotation (enpi_api.l2.types.import_metadata.Annotation): The annotation to apply to the matched collections, clones or sequences. You
421              specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation,
422              enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation
423              are the preferred way of creating annotation configuration.
424
425        Returns:
426            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
427
428        Raises:
429            enpi_api.l2.types.api_error.ApiError: If API request fails.
430
431        Example:
432
433            Batch tag multiple collections with some tags:
434
435            ```python
436            with EnpiApiClient() as enpi_client:
437                collection_ids = [CollectionId(1), CollectionId(2), CollectionId(3)]
438
439                # Create a filter
440                filter = enpi_client.filter_api.create_filter(
441                    name="My filter",
442                    condition=dict(
443                        type="match_ids",
444                        target="collection",
445                        ids=collection_ids,
446                    ),
447                )
448
449                # Create an annotation
450                annotation = collection_annotation(tags=[
451                    Tag(id=CollectionTags.CampaignId, value="My campaign"),
452                    Tag(id=CollectionTags.ProjectId, value="My project"),
453                ])
454
455                # Add the metadata
456                enpi_client.collection_api.add_metadata(filter=filter, annotation=annotation).wait()
457            ```
458        """
459
460        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
461
462        import_metadata_request = openapi_client.ImportMetadataRequest(
463            openapi_client.SearchAndTag(
464                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
465                annotation=annotation.to_api_payload(),
466            )
467        )
468
469        with ApiErrorContext():
470            import_metadata_response = collection_api_instance.import_metadata(import_metadata_request)
471            assert import_metadata_response.workflow_execution_id is not None
472
473            workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id)
474
475            waitable = WorkflowExecutionTaskWaitable[CollectionMetadata](
476                workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT
477            )
478
479            return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state)

Import metadata to annotate collections, clones or sequences in batches using a filter.

This method allows you to simply annotate collections, clones or sequences using a filter. The annotation values that you provide will be applied to all matching items of the specified level.

If you would like to add different values based on different matched tags, have a look at the methods that support a templated filter, such as add_metadata_from_file or add_metadata_from_df.

Arguments:
Returns:

enpi_api.l2.types.execution.Execution[None]: An awaitable execution.

Raises:
Example:

Batch tag multiple collections with some tags:

with EnpiApiClient() as enpi_client:
    collection_ids = [CollectionId(1), CollectionId(2), CollectionId(3)]

    # Create a filter
    filter = enpi_client.filter_api.create_filter(
        name="My filter",
        condition=dict(
            type="match_ids",
            target="collection",
            ids=collection_ids,
        ),
    )

    # Create an annotation
    annotation = collection_annotation(tags=[
        Tag(id=CollectionTags.CampaignId, value="My campaign"),
        Tag(id=CollectionTags.ProjectId, value="My project"),
    ])

    # Add the metadata
    enpi_client.collection_api.add_metadata(filter=filter, annotation=annotation).wait()
481    def add_metadata_from_file(
482        self,
483        filter: TemplatedFilter,
484        annotation: import_metadata_templated.Annotation,
485        file_path: str | Path,
486    ) -> Execution[None]:
487        """Import metadata from a CSV or XLSX file to annotate collections, clones or sequences.
488
489        Args:
490            filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate.
491              Use enpi_api.l2.api.filter_api.FilterApi.create_filter to create new filters.
492            annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You
493              specify a specific annotation target and the values to apply. Utility functions enpi_api.l2.types.import_metadata_templated.collection_annotation,
494              enpi_api.l2.types.import_metadata_templated.clone_annotation and enpi_api.l2.types.import_metadata_templated.sequence_annotation
495              are the preferred way of creating annotation configuration.
496            file_path (str | Path): The path to the CSV or XLSX file to import.
497
498        Returns:
499            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
500
501        Raises:
502            enpi_api.l2.types.api_error.ApiError: If API request fails.
503
504        Example:
505
506            Assume you have a CSV file which describes 2 sequence tags to match on, and a tag to add to the matched sequences.
507
508            Let's call the match columns *match_chain* and *match_productive*, and the column to add *value_to_add*.
509            We'll add the value to a custom imaginary tag that was created before this example.
510
511            The CSV file would look like this:
512
513            | match_chain | match_productive | value_to_add |
514            |-------------|------------------|--------------|
515            | Heavy       | true             | Heavy and productive |
516            | Heavy       | false            | Heavy and unproductive |
517            | Kappa       | true             | Kappa and productive |
518            | Kappa       | false            | Kappa and unproductive |
519            | Lambda      | true             | Lambda and productive |
520            | Lambda      | false            | Lambda and unproductive |
521
522            We also want to narrow down the collections we want to annotate to a single collection with the imaginary ID *1337*.
523
524            ```python
525            my_collection_id: CollectionId = CollectionId(1337)
526
527            tag_id_chain: TagId = TagId(SequenceTags.Chain)
528            tag_id_productive: TagId = TagId(SequenceTags.Productive)
529            tag_id_value_to_add: TagId = TagId(52001)  # This is a custom tag
530
531            with EnpiApiClient() as enpi_client:
532                filter = enpi_client.filter_api.get_templated_filter_by_id(FilterId('92be003d-6f5c-447a-baac-c9d420783fc8'))
533                enpi_client.collection_api.add_metadata_from_file(
534                    filter=filter,
535                    annotation=sequence_annotation([
536                        template_tag(tag_id=tag_id_value_to_add, key="value_to_add"),
537                    ]),
538                    file_path="path/to/metadata.csv",
539                ).wait()
540            ```
541        """
542
543        # We need to upload the file to the platform
544        file_api = FileApi(self._inner_api_client, self._log_level)
545        file_execution = file_api.upload_file(file_path)
546
547        file = file_execution.wait()
548
549        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
550
551        # Start the metadata import, this starts a task, so we'll wait for that to be completed
552        import_metadata_request = openapi_client.ImportMetadataRequest(
553            openapi_client.TemplatedSearchAndTag(
554                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
555                annotation=annotation.to_api_payload(),
556                template_file_id=file.id,
557            )
558        )
559
560        with ApiErrorContext():
561            # The metadata import has not started yet because we first need to wait for the file upload
562            import_metadata_response = collection_api_instance.import_metadata(import_metadata_request)
563            assert import_metadata_response.workflow_execution_id is not None
564
565        workflow_execution_id = WorkflowExecutionId(import_metadata_response.workflow_execution_id)
566
567        def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> None:
568            assert task_state == TaskState.SUCCEEDED, f"Task {task_id} did not reach {TaskState.SUCCEEDED} state, got {task_state} state instead"
569
570            nonlocal file
571            file_api.delete_file_by_id(file.id)
572
573        waitable = WorkflowExecutionTaskWaitable[None](
574            on_complete=on_complete, workflow_execution_id=workflow_execution_id, task_template_name=WorkflowTaskTemplateName.ENPI_APP_METADATA_IMPORT_TEMPLATED
575        )
576
577        return Execution(wait=waitable.wait, check_execution_state=waitable.check_execution_state)

Import metadata from a CSV or XLSX file to annotate collections, clones or sequences.

Arguments:
Returns:

enpi_api.l2.types.execution.Execution[None]: An awaitable execution.

Raises:
Example:

Assume you have a CSV file which describes 2 sequence tags to match on, and a tag to add to the matched sequences.

Let's call the match columns match_chain and match_productive, and the column to add value_to_add. We'll add the value to a custom imaginary tag that was created before this example.

The CSV file would look like this:

match_chain match_productive value_to_add
Heavy true Heavy and productive
Heavy false Heavy and unproductive
Kappa true Kappa and productive
Kappa false Kappa and unproductive
Lambda true Lambda and productive
Lambda false Lambda and unproductive

We also want to narrow down the collections we want to annotate to a single collection with the imaginary ID 1337.

my_collection_id: CollectionId = CollectionId(1337)

tag_id_chain: TagId = TagId(SequenceTags.Chain)
tag_id_productive: TagId = TagId(SequenceTags.Productive)
tag_id_value_to_add: TagId = TagId(52001)  # This is a custom tag

with EnpiApiClient() as enpi_client:
    filter = enpi_client.filter_api.get_templated_filter_by_id(FilterId('92be003d-6f5c-447a-baac-c9d420783fc8'))
    enpi_client.collection_api.add_metadata_from_file(
        filter=filter,
        annotation=sequence_annotation([
            template_tag(tag_id=tag_id_value_to_add, key="value_to_add"),
        ]),
        file_path="path/to/metadata.csv",
    ).wait()
579    def add_metadata_from_df(
580        self,
581        filter: TemplatedFilter,
582        annotation: import_metadata_templated.Annotation,
583        data_frame: pd.DataFrame,
584    ) -> Execution[None]:
585        """Import metadata from a DataFrame to annotate collections, clones or sequences.
586
587        This is a convenience method to import metadata from a Pandas DataFrame. For more information about the
588        metadata import, see the documentation for `import_metadata_from_csv`.
589
590        Args:
591            filter (enpi_api.l2.types.filter.TemplatedFilter): The filter to narrow down which collections, clones or sequences you wish to annotate.
592            annotation (enpi_api.l2.types.import_metadata_templated.Annotation): The annotation to apply to the matched collections, clones or sequences. You
593              specify a specific annotation target and the values to apply.
594            data_frame (pd.DataFrame): The DataFrame containing the templated metadata to import.
595
596        Returns:
597            enpi_api.l2.types.execution.Execution[None]: An awaitable execution.
598
599        Raises:
600            enpi_api.l2.types.api_error.ApiError: If API request fails.
601
602        Example:
603
604            Part of the `add_calculated_metadata.py` example script.
605
606            ```python
607            # Specify the filter query to match the sequences we want to add metadata to
608            metadata_filter = client.filter_api.create_templated_filter(
609                name="Metadata import filter",
610                shared=False,
611                condition=TemplatedAndOperator(
612                    conditions=[
613                        TemplatedMatchTag(tag_id=CollectionTags.Name),
614                        TemplatedMatchId(target=MatchIdTarget.SEQUENCE),
615                    ]
616                ),
617            )
618
619            # Specify the sequence-level annotation to add to the collection
620            metadata_annotation: Annotation = sequence_annotation([template_tag(new_tag_archetype.id)])
621
622            # Create metadata dataframe
623            metadata_frame = pd.DataFrame(
624                [
625                    [
626                        collection_name,  # Match
627                        df_row[1]["Unique Sequence ID"],  # Match
628                        grouped_df.loc[df_row[1]["Unique Clone ID"]]["Sequence Count"],  # Add
629                    ]
630                    for df_row in exported_df.iterrows()
631                ],
632                columns=["Name", "Unique Sequence ID", new_tag_archetype.key],
633            )
634
635            # Apply metadata to the collection
636            client.collection_api.add_metadata_from_df(
637                filter=metadata_filter,
638                annotation=metadata_annotation,
639                data_frame=metadata_frame,
640            ).wait()
641            ```
642        """
643
644        # We need to turn the DataFrame into a CSV file
645        temporary_csv_file_path = f"/tmp/import_metadata.{uuid4()}.csv"
646        data_frame.to_csv(temporary_csv_file_path, index=False)
647
648        return self.add_metadata_from_file(filter, annotation, temporary_csv_file_path)

Import metadata from a DataFrame to annotate collections, clones or sequences.

This is a convenience method to import metadata from a Pandas DataFrame. For more information about the metadata import, see the documentation for import_metadata_from_csv.

Arguments:
Returns:

enpi_api.l2.types.execution.Execution[None]: An awaitable execution.

Raises:
Example:

Part of the add_calculated_metadata.py example script.

# Specify the filter query to match the sequences we want to add metadata to
metadata_filter = client.filter_api.create_templated_filter(
    name="Metadata import filter",
    shared=False,
    condition=TemplatedAndOperator(
        conditions=[
            TemplatedMatchTag(tag_id=CollectionTags.Name),
            TemplatedMatchId(target=MatchIdTarget.SEQUENCE),
        ]
    ),
)

# Specify the sequence-level annotation to add to the collection
metadata_annotation: Annotation = sequence_annotation([template_tag(new_tag_archetype.id)])

# Create metadata dataframe
metadata_frame = pd.DataFrame(
    [
        [
            collection_name,  # Match
            df_row[1]["Unique Sequence ID"],  # Match
            grouped_df.loc[df_row[1]["Unique Clone ID"]]["Sequence Count"],  # Add
        ]
        for df_row in exported_df.iterrows()
    ],
    columns=["Name", "Unique Sequence ID", new_tag_archetype.key],
)

# Apply metadata to the collection
client.collection_api.add_metadata_from_df(
    filter=metadata_filter,
    annotation=metadata_annotation,
    data_frame=metadata_frame,
).wait()
def get_as_zip( self, collection_ids: list[enpi_api.l2.types.collection.CollectionId], filter: enpi_api.l2.types.filter.Filter | None = None, tag_ids: list[enpi_api.l2.types.tag.TagId] = [2035, 2040, 2083, 2084, 2036, 2060, 1001, 1077, 38, 144, 24, 154, 110], output_directory: str | pathlib.Path | None = None) -> enpi_api.l2.types.execution.Execution[Path]:
650    def get_as_zip(
651        self,
652        collection_ids: list[CollectionId],
653        filter: Filter | None = None,
654        tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS,
655        output_directory: str | Path | None = None,
656    ) -> Execution[Path]:
657        """Export collection(s) into a zip file. Inside of the archive, each collection is exported to a separate TSV file.
658
659        Args:
660            collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
661            filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export.
662                If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used.
663            tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
664            output_directory (str | Path | None): The directory path under which file will get exported. If
665              not provided, a temporary directory will be used.
666
667        Returns:
668            enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the full path to the exported file when
669              awaited.
670
671        Raises:
672            enpi_api.l2.types.api_error.ApiError: If API request fails.
673
674        Example:
675
676            ```python
677            with EnpiApiClient() as enpi_client:
678
679                collection_id = CollectionId(1234)
680
681                # Example assumes you have a filter
682                collection_filter: Filter = ...
683
684                path: str = enpi_client.collection_api.get_as_tsv(
685                    collection_ids=[collection_id],
686                    filter=collection_filter,
687                    tag_ids=[
688                        CollectionTags.Name,
689                        CollectionTags.Organism,
690                        CollectionTags.Complexity,
691                        CollectionTags.Receptor,
692                        SequenceTags.Chain,
693                        SequenceTags.Productive,
694                    ],
695                    output_directory="example/export_result/"
696                )
697            ```
698        """
699
700        # Create the collectiom filter if it wasn't provided, it will match and
701        # get all the clones from target collections
702        if filter is None:
703            filter_api = FilterApi(self._inner_api_client, self._log_level)
704            filter = filter_api.create_filter(
705                name=f"all-collection-clones-filter-{uuid4()}",  # Unique name to avoid collision
706                condition=MatchIds(
707                    target=MatchIdTarget.COLLECTION,
708                    ids=collection_ids,  # Match all collection IDs passed to this function
709                ),
710            )
711
712        # Start the collection export, this starts a task, so we'll wait for that to be completed
713        export_collection_request = openapi_client.ExportRequest(
714            payload=openapi_client.ExportPayload(
715                collection_ids=[int(collection_id) for collection_id in collection_ids],
716                filter=openapi_client.FilterIdOptionalVersion(id=filter.id, version=filter.version),
717                tag_ids=[int(tag_id) for tag_id in tag_ids],
718            )
719        )
720        collection_api_instance = openapi_client.CollectionApi(self._inner_api_client)
721
722        with ApiErrorContext():
723            export_collection_response = collection_api_instance.export(export_collection_request)
724            assert export_collection_response.workflow_execution_id is not None
725
726            workflow_execution_id = WorkflowExecutionId(export_collection_response.workflow_execution_id)
727
728            def on_complete(task_id: WorkflowExecutionTaskId, task_state: TaskState) -> Path:
729                file_api = FileApi(self._inner_api_client, self._log_level)
730                file_path = file_api.download_export_by_workflow_execution_task_id(task_id=task_id, output_directory=output_directory)
731
732                logger.success("Collection(s) export has succeeded.")
733                return file_path
734
735            waitable = WorkflowExecutionTaskWaitable[Path](
736                workflow_execution_id=workflow_execution_id,
737                on_complete=on_complete,
738                task_template_name=WorkflowTaskTemplateName.ENPI_APP_COLLECTION_EXPORT,
739            )
740
741            return Execution(wait=waitable.wait_and_return_result, check_execution_state=waitable.check_execution_state)

Export collection(s) into a zip file. Inside of the archive, each collection is exported to a separate TSV file.

Arguments:
  • collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
  • filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export. If it's None, a new filter that matches all the collection_ids provided above will be created and used.
  • tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
  • output_directory (str | Path | None): The directory path under which file will get exported. If not provided, a temporary directory will be used.
Returns:

enpi_api.l2.types.execution.Execution[Path]: An awaitable that returns the full path to the exported file when awaited.

Raises:
Example:
with EnpiApiClient() as enpi_client:

    collection_id = CollectionId(1234)

    # Example assumes you have a filter
    collection_filter: Filter = ...

    path: str = enpi_client.collection_api.get_as_tsv(
        collection_ids=[collection_id],
        filter=collection_filter,
        tag_ids=[
            CollectionTags.Name,
            CollectionTags.Organism,
            CollectionTags.Complexity,
            CollectionTags.Receptor,
            SequenceTags.Chain,
            SequenceTags.Productive,
        ],
        output_directory="example/export_result/"
    )
def get_as_df( self, collection_ids: list[enpi_api.l2.types.collection.CollectionId], filter: enpi_api.l2.types.filter.Filter | None = None, tag_ids: list[enpi_api.l2.types.tag.TagId] = [2035, 2040, 2083, 2084, 2036, 2060, 1001, 1077, 38, 144, 24, 154, 110]) -> enpi_api.l2.types.execution.Execution[DataFrame]:
743    def get_as_df(
744        self,
745        collection_ids: list[CollectionId],
746        filter: Filter | None = None,
747        tag_ids: list[TagId] = DEFAULT_EXPORT_TAG_IDS,
748    ) -> Execution[pd.DataFrame]:
749        """Export collection(s) to a Pandas DataFrame.
750
751        Args:
752            collection_ids (list[enpi_api.l2.types.collection.CollectionId]): The collection IDs to export.
753            filter (enpi_api.l2.types.filter.Filter | None): The filter to narrow down which collections, clones or sequences you wish to export.
754                If it's `None`, a new filter that matches all the `collection_ids` provided above will be created and used.
755            tag_ids (list[enpi_api.l2.types.tag.TagId]): The tag IDs to include in the export.
756
757        Returns:
758            Execution[pd.DataFrame]: An awaitable that will return a DataFrame with the exported collection.
759
760        Raises:
761            enpi_api.l2.types.api_error.ApiError: If API request fails.
762
763        Example:
764
765            ```python
766            with EnpiApiClient() as enpi_client:
767                # Example assumes you have a filter
768                filter: Filter = ...
769
770                df: pd.DataFrame = enpi_client.collection_api.get_as_df(
771                    collection_ids=[CollectionId(1)],
772                    filter=filter,
773                    tag_ids=[CollectionTags.Name, CloneTags.TenXBarcode, SequenceTags.CDR3AminoAcids],
774                )
775            ```
776        """
777        tmp_dir = tempfile.TemporaryDirectory()
778        get_as_zip_execution = self.get_as_zip(collection_ids=collection_ids, filter=filter, tag_ids=tag_ids, output_directory=tmp_dir.name)
779
780        def wait() -> pd.DataFrame:
781            zip_path = get_as_zip_execution.wait()
782
783            # Extract all TSV files from the ZIP archive
784            with ZipFile(zip_path, "r") as zip_ref:
785                zip_ref.extractall(tmp_dir.name)
786
787            # Read all TSV files into a single DataFrame
788            all_dfs = []
789            for root, _, files in os.walk(tmp_dir.name):
790                for file in files:
791                    if file.endswith(".tsv"):
792                        file_path = os.path.join(root, file)
793                        df = pd.read_csv(file_path, delimiter="\t")
794                        all_dfs.append(df)
795
796            return pd.concat(all_dfs)
797
798        return Execution(wait=wait, check_execution_state=get_as_zip_execution.check_execution_state)

Export collection(s) to a Pandas DataFrame.

Arguments:
Returns:

Execution[pd.DataFrame]: An awaitable that will return a DataFrame with the exported collection.

Raises:
Example:
with EnpiApiClient() as enpi_client:
    # Example assumes you have a filter
    filter: Filter = ...

    df: pd.DataFrame = enpi_client.collection_api.get_as_df(
        collection_ids=[CollectionId(1)],
        filter=filter,
        tag_ids=[CollectionTags.Name, CloneTags.TenXBarcode, SequenceTags.CDR3AminoAcids],
    )