enpi_api.l2.client.api.file_api
1import os 2import time 3from pathlib import Path 4from typing import Generator, Sequence 5from urllib.parse import urlparse 6 7from loguru import logger 8from typing_extensions import assert_never 9 10from enpi_api.l1 import openapi_client 11from enpi_api.l2.types.api_error import ApiError 12from enpi_api.l2.types.execution import Execution 13from enpi_api.l2.types.file import FederatedCredentials, File, FileId, FileStatus, OnCollisionAction 14from enpi_api.l2.types.log import LogLevel 15from enpi_api.l2.types.tag import Tag, TagId 16from enpi_api.l2.types.task import TaskState 17from enpi_api.l2.types.workflow import WorkflowExecutionTaskId 18from enpi_api.l2.util.file import download_file, unique_temp_dir, upload_file_to_s3 19from enpi_api.l2.util.tag import tags_to_api_payload 20 21 22class NameEmpty(Exception): 23 """Thrown when the name of a file is empty, which is not allowed.""" 24 25 def __init__(self) -> None: 26 """@private""" 27 super().__init__("Name cannot be empty") 28 29 30class S3UploadFailed(Exception): 31 """Indicates that the upload to S3 failed.""" 32 33 def __init__(self, file_path: str | Path, error: Exception): 34 """@private""" 35 super().__init__(f"Failed to upload file `{file_path}` to S3, error: {error}") 36 37 38class FileApi: 39 _inner_api_client: openapi_client.ApiClient 40 _log_level: LogLevel 41 42 def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel): 43 """@private""" 44 self._inner_api_client = inner_api_client 45 self._log_level = log_level 46 47 def get_files(self, filename: str | None = None) -> Generator[File, None, None]: 48 """Get a generator through all available files in the platform. 49 50 Args: 51 filename (str | None): Optional filename for search by case-insensitive substring matching 52 53 Returns: 54 Generator[enpi_api.l2.types.file.File, None, None]: A generator through all files in the platform. 55 56 Raises: 57 enpi_api.l2.types.api_error.ApiError: If API request fails. 58 59 Example: 60 61 ```python 62 with EnpiApiClient() as enpi_client: 63 for file in enpi_client.file_api.get_files(): 64 print(file) 65 ``` 66 """ 67 68 logger.info("Getting a generator through all files") 69 70 file_api_instance = openapi_client.FileApi(self._inner_api_client) 71 72 # Fetch the first page, there is always a first page, it may be empty 73 try: 74 get_files_response = file_api_instance.get_files() 75 except openapi_client.ApiException as e: 76 raise ApiError(e) 77 78 # `files` and `cursor` get overwritten in the loop below when fetching a new page 79 files = get_files_response.files 80 cursor = get_files_response.cursor 81 82 while True: 83 for file in files: 84 yield File.from_raw(file) 85 86 # Check if we need to fetch a next page 87 if cursor is None: 88 logger.debug("No more pages of files") 89 return # No more pages 90 91 # We have a cursor, so we need to get a next page 92 logger.debug("Fetching next page of files") 93 try: 94 get_files_response = file_api_instance.get_files(cursor=cursor, filename=filename) 95 except openapi_client.ApiException as e: 96 raise ApiError(e) 97 files = get_files_response.files 98 cursor = get_files_response.cursor 99 100 def get_file_by_id(self, file_id: FileId) -> File: 101 """Get a single file by its ID. 102 103 Args: 104 file_id (enpi_api.l2.types.file.FileId): The ID of the file to get. 105 106 Returns: 107 enpi_api.l2.types.file.File: The file, with all its metadata. 108 109 Raises: 110 enpi_api.l2.types.api_error.ApiError: If API request fails. 111 112 Example: 113 114 ```python 115 with EnpiApiClient() as enpi_client: 116 example_file_id = FileId("00000000-0000-0000-0000-000000000000") 117 file: File = enpi_client.file_api.get_file_by_id(file_id=example_file_id) 118 ``` 119 """ 120 121 logger.info(f"Getting file with ID `{file_id}`") 122 123 file_api_instance = openapi_client.FileApi(self._inner_api_client) 124 125 try: 126 get_file_response = file_api_instance.get_file(file_id) 127 except openapi_client.ApiException as e: 128 raise ApiError(e) 129 130 file = File.from_raw(get_file_response.file) 131 132 return file 133 134 def delete_file_by_id(self, file_id: FileId) -> None: 135 """Delete a single file by its ID. 136 137 This will remove the file from the ENPICOM Platform. 138 139 Args: 140 file_id (enpi_api.l2.types.file.FileId): The ID of the file to delete. 141 142 Raises: 143 enpi_api.l2.types.api_error.ApiError: If API request fails. 144 145 Example: 146 147 ```python 148 with EnpiApiClient() as enpi_client: 149 example_file_id = FileId("00000000-0000-0000-0000-000000000000") 150 enpi_client.file_api.delete_file_by_id(file_id=example_file_id)) 151 ``` 152 """ 153 154 logger.info(f"Deleting file with ID `{file_id}`") 155 156 file_api_instance = openapi_client.FileApi(self._inner_api_client) 157 158 try: 159 file_api_instance.delete_file(file_id=str(file_id)) 160 except openapi_client.ApiException as e: 161 raise ApiError(e) 162 163 logger.info(f"File with ID `{file_id}` successfully deleted") 164 165 def upload_file( 166 self, 167 file_path: str | Path, 168 tags: Sequence[Tag] = (), 169 on_collision: OnCollisionAction = OnCollisionAction.ERROR, 170 ) -> Execution[File]: 171 """Upload a file to the platform. 172 173 Args: 174 file_path (str | Path): The path to the file to upload. 175 tags (Sequence[enpi_api.l2.types.tag.Tag]): The tags to add to the file. 176 on_collision (enpi_api.l2.types.file.OnCollisionAction): The action to take when uploading a file with the same name as an existing file. 177 178 Returns: 179 enpi_api.l2.types.execution.Execution[enpi_api.l2.types.file.File]: An awaitable that returns the uploaded file 180 or an existing one if the `OnCollisionAction` is set to `SKIP`. 181 182 Raises: 183 enpi_api.l2.client.api.file_api.NameEmpty: If the name of the file is empty. 184 enpi_api.l2.client.api.file_api.S3UploadFailed: If the upload to S3 failed. 185 enpi_api.l2.types.api_error.ApiError: If API request fails. 186 187 Example: 188 189 ```python 190 with EnpiApiClient() as enpi_client: 191 file: File = enpi_client.file_api.upload_file(file_path="path/to/file.txt").wait() 192 ``` 193 """ 194 195 logger.info(f"Uploading file with path `{file_path}`") 196 197 file_api_instance = openapi_client.FileApi(self._inner_api_client) 198 199 # Uploading a file is a two-step process: 200 # 1. Request an S3 temporary credentials used for upload 201 logger.debug("Requesting temporary upload credentials.") 202 203 name = os.path.basename(file_path) 204 name = name.strip() 205 if not name: 206 raise NameEmpty() 207 208 upload_file_request = openapi_client.UploadFileRequest(name=name, tags=tags_to_api_payload(tags), on_collision=on_collision) 209 210 try: 211 upload_file_response = file_api_instance.upload_file(upload_file_request) 212 except openapi_client.ApiException as e: 213 raise ApiError(e) 214 215 file_id = FileId(upload_file_response.id) 216 217 # In the event that the file already exists, and we chose to SKIP, then we can return the existing file 218 if upload_file_response.credentials is None: 219 logger.info(f"File with name `{name}` already exists, and `on_collision` is set to `SKIP`") 220 return Execution(wait=lambda: self.get_file_by_id(file_id), check_execution_state=lambda: TaskState.SUCCEEDED) 221 222 s3_federated_credentials = FederatedCredentials.model_validate( 223 upload_file_response.credentials, 224 from_attributes=True, 225 ) 226 227 # 2. Upload the file by using temporary credentials and boto3 client 228 try: 229 upload_file_to_s3(file_path, s3_federated_credentials) 230 except Exception as err: 231 raise S3UploadFailed(file_path, err) 232 233 def wait() -> File: 234 # A file is not immediately usable after uploading, it needs to be processed first 235 # So before you can use a file you need to wait for it to be processed 236 self.wait_for_file_to_be_processed(file_id) 237 238 logger.success(f"File uploaded with ID `{file_id}`") 239 240 return self.get_file_by_id(file_id) 241 242 return Execution(wait=wait, check_execution_state=lambda: TaskState.SUCCEEDED) 243 244 def download_file_by_id( 245 self, 246 file_id: FileId, 247 output_directory: str | Path | None = None, 248 name: str | None = None, 249 ) -> Path: 250 """Download a single file by its ID into the specified directory. 251 252 Download a file from the platform to your local machine. The file will be saved in the specified directory with 253 the name of the file as it was in the ENPICOM Platform. Alternatively you can overwrite the name by providing one 254 yourself as the `name` argument. 255 256 Args: 257 file_id (enpi_api.l2.types.file.FileId): The ID of the file to download. 258 output_directory (str | Path): The directory to save the file to. If left empty, a temporary directory will be used. 259 name (str | None): The name of the file. If not provided, a name will be generated. 260 261 Returns: 262 Path: The path to the downloaded file. 263 264 Raises: 265 enpi_api.l2.client.api.file_api.NameEmpty: If the name of the file is empty. 266 enpi_api.l2.types.api_error.ApiError: If API request fails. 267 268 Example: 269 270 ```python 271 with EnpiApiClient() as enpi_client: 272 my_directory = f"/path/to/files" 273 example_file_id = FileId("00000000-0000-0000-0000-000000000000") 274 275 # Assume the file has the name `my_data.fastq` 276 full_file_path = enpi_client.file_api.download_file_by_id( 277 file_id=example_file_id, 278 directory=my_directory 279 ) 280 # `full_file_path` will now be `/path/to/files/my_data.fastq` 281 ``` 282 """ 283 284 file_api_instance = openapi_client.FileApi(self._inner_api_client) 285 286 try: 287 download_file_response = file_api_instance.download_file(file_id=str(file_id)) 288 except openapi_client.ApiException as e: 289 raise ApiError(e) 290 291 # If no name is provided, we parse the URL to get the file name 292 if name is None: 293 parsed_url = urlparse(download_file_response.download_url) 294 name = Path(parsed_url.path).name 295 296 if not name or name == "": 297 raise NameEmpty() 298 299 # Ensure that the directory exists 300 if output_directory is None: 301 output_directory = unique_temp_dir() 302 303 os.makedirs(output_directory, exist_ok=True) 304 305 full_path = os.path.join(output_directory, name) 306 307 logger.info(f"Downloading file with ID `{file_id}` to `{full_path}`") 308 downloaded_file_path = download_file(download_file_response.download_url, full_path) 309 logger.success(f"File with ID `{file_id}` successfully downloaded to `{downloaded_file_path}`") 310 311 return downloaded_file_path 312 313 def download_export_by_workflow_execution_task_id( 314 self, task_id: WorkflowExecutionTaskId, output_directory: str | Path | None = None, name: str | None = None 315 ) -> Path: 316 """Download a single file by its job ID to the specified directory. 317 318 Download an export from a job to your local machine. The export will be saved in the specified directory with 319 the name of the file as it was in the job. Alternatively you can overwrite the name by providing one 320 yourself as the `name` argument. 321 322 Args: 323 workflow_execution_id (enpi_api.l2.types.workflow.WorkflowExecutionId): The ID of the workflow execution to download an export from. 324 output_directory (str | Path | None): The directory to save the file to. If none is provided, a temporary 325 directory will be used. 326 name (str | None): The name of the file. If not provided, a name will be generated. 327 328 Returns: 329 Path: The path to the downloaded file. 330 331 Raises: 332 enpi_api.l2.client.api.file_api.NameEmpty: If the name of the file is empty. 333 enpi_api.l2.types.api_error.ApiError: If API request fails. 334 335 Example: 336 337 ```python 338 with EnpiApiClient() as enpi_client: 339 my_directory = f"/path/to/files" 340 example_task_id = TaskId(1234) 341 342 # Assume the file has the name `my_data.fastq` 343 full_file_path = enpi_client.file_api.download_export_by_workflow_execution_task_id( 344 task_id=example_task_id, 345 directory=my_directory 346 ) 347 # `full_file_path` will now be `/path/to/files/my_data.fastq` 348 ``` 349 """ 350 351 file_api_instance = openapi_client.FileApi(self._inner_api_client) 352 353 try: 354 download_file_response = file_api_instance.download_export(job_id=task_id) 355 except openapi_client.ApiException as e: 356 raise ApiError(e) 357 358 # If no name is provided, we parse the URL to get the file name 359 if name is None: 360 parsed_url = urlparse(download_file_response.download_url) 361 name = Path(parsed_url.path).name 362 363 if not name or name == "": 364 raise NameEmpty() 365 366 # Ensure that the directory exists 367 if output_directory is None: 368 output_directory = unique_temp_dir() 369 370 os.makedirs(output_directory, exist_ok=True) 371 372 full_path = os.path.join(output_directory, name) 373 374 logger.info(f"Downloading export from task with ID `{task_id}` to `{full_path}`") 375 downloaded_file_path = download_file(download_file_response.download_url, full_path) 376 logger.success(f"Export from task with ID `{task_id}` successfully downloaded to `{downloaded_file_path}`") 377 378 return downloaded_file_path 379 380 def update_tags(self, file_id: FileId, tags: list[Tag]) -> None: 381 """Update the tags of a file. 382 383 Adds and updates the given tags to the file. If a tag is already present on the file, the value will be 384 overwritten with the given value for the same tag. 385 386 Args: 387 file_id (enpi_api.l2.types.file.FileId): The ID of the file to update. 388 tags (list[enpi_api.l2.types.tag.Tag]): The tags that will be updated or added if they are not already present. 389 390 Raises: 391 enpi_api.l2.types.api_error.ApiError: If API request fails. 392 393 Example: 394 395 ```python 396 with EnpiApiClient() as enpi_client: 397 enpi_client.file_api.update_tags( 398 file_id=FileId("00000000-0000-0000-0000-000000000000"), 399 tags=[ 400 Tag(id=TagId(FileTags.CampaignId), value="my new value"), 401 Tag(id=TagId(FileTags.ProjectId), value="another value") 402 ] 403 ) 404 ``` 405 """ 406 407 logger.info(f"Updating tags for file with ID `{file_id}`") 408 409 file_api_instance = openapi_client.FileApi(self._inner_api_client) 410 411 update_file_tags_request = openapi_client.UpdateTagsRequest(tags=tags_to_api_payload(tags)) 412 413 try: 414 file_api_instance.update_tags(file_id=str(file_id), update_tags_request=update_file_tags_request) 415 except openapi_client.ApiException as e: 416 raise ApiError(e) 417 418 logger.success(f"Tags updated for file with ID `{file_id}`") 419 420 def remove_tags(self, file_id: FileId, tags: list[TagId]) -> None: 421 """Remove the specified tags from a file. 422 423 Args: 424 file_id (enpi_api.l2.types.file.FileId): The ID of the file to update. 425 tags (List[enpi_api.l2.types.tag.TagId]): The tags that will be removed from the file. 426 427 Raises: 428 enpi_api.l2.types.api_error.ApiError: If API request fails. 429 430 Example: 431 432 ```python 433 with EnpiApiClient() as enpi_client: 434 enpi_client.file_api.remove_tags( 435 file_id=FileId("00000000-0000-0000-0000-000000000000"), 436 tags=[TagId(FileTags.CampaignId), TagId(FileTags.ProjectId)] 437 ) 438 ``` 439 """ 440 441 logger.info(f"Removing tags: {tags} from file with ID `{file_id}`") 442 443 file_api_instance = openapi_client.FileApi(self._inner_api_client) 444 445 delete_tags_request = openapi_client.DeleteTagsRequest(tags=[int(x) for x in tags]) 446 447 try: 448 file_api_instance.delete_tags(file_id=str(file_id), delete_tags_request=delete_tags_request) 449 except openapi_client.ApiException as e: 450 raise ApiError(e) 451 452 logger.success(f"Tags removed from file with ID `{file_id}`") 453 454 def wait_for_file_to_be_processed(self, file_id: FileId) -> None: 455 """Wait for a file to be processed. 456 457 Files are not immediately usable after uploading, they need to be processed first. This convenience method 458 waits for a file to be processed. 459 460 Args: 461 file_id (enpi_api.l2.types.file.FileId): The ID of the file to wait for. 462 463 Raises: 464 enpi_api.l2.types.api_error.ApiError: If API request fails. 465 466 Example: 467 468 ```python 469 with EnpiApiClient() as enpi_client: 470 enpi_client.file_api.wait_for_file_to_be_processed(file_id=FileId("00000000-0000-0000-0000-000000000000")) 471 ``` 472 """ 473 474 logger.info(f"Waiting for file with ID `{file_id}` to be processed") 475 476 poll_interval_seconds = 1 477 478 # We do not know how long it will take for a file to be processed, so we poll the file until it is processed 479 while True: 480 file = self.get_file_by_id(file_id) 481 482 if file.status == FileStatus.PROCESSED: 483 logger.success(f"File with ID `{file_id}` has been processed") 484 return 485 elif file.status == FileStatus.PROCESSING: 486 logger.debug(f"File with ID `{file_id}` is still being processed. Waiting for {poll_interval_seconds} seconds") 487 time.sleep(poll_interval_seconds) 488 else: 489 assert_never(file.status)
23class NameEmpty(Exception): 24 """Thrown when the name of a file is empty, which is not allowed.""" 25 26 def __init__(self) -> None: 27 """@private""" 28 super().__init__("Name cannot be empty")
Thrown when the name of a file is empty, which is not allowed.
31class S3UploadFailed(Exception): 32 """Indicates that the upload to S3 failed.""" 33 34 def __init__(self, file_path: str | Path, error: Exception): 35 """@private""" 36 super().__init__(f"Failed to upload file `{file_path}` to S3, error: {error}")
Indicates that the upload to S3 failed.
39class FileApi: 40 _inner_api_client: openapi_client.ApiClient 41 _log_level: LogLevel 42 43 def __init__(self, inner_api_client: openapi_client.ApiClient, log_level: LogLevel): 44 """@private""" 45 self._inner_api_client = inner_api_client 46 self._log_level = log_level 47 48 def get_files(self, filename: str | None = None) -> Generator[File, None, None]: 49 """Get a generator through all available files in the platform. 50 51 Args: 52 filename (str | None): Optional filename for search by case-insensitive substring matching 53 54 Returns: 55 Generator[enpi_api.l2.types.file.File, None, None]: A generator through all files in the platform. 56 57 Raises: 58 enpi_api.l2.types.api_error.ApiError: If API request fails. 59 60 Example: 61 62 ```python 63 with EnpiApiClient() as enpi_client: 64 for file in enpi_client.file_api.get_files(): 65 print(file) 66 ``` 67 """ 68 69 logger.info("Getting a generator through all files") 70 71 file_api_instance = openapi_client.FileApi(self._inner_api_client) 72 73 # Fetch the first page, there is always a first page, it may be empty 74 try: 75 get_files_response = file_api_instance.get_files() 76 except openapi_client.ApiException as e: 77 raise ApiError(e) 78 79 # `files` and `cursor` get overwritten in the loop below when fetching a new page 80 files = get_files_response.files 81 cursor = get_files_response.cursor 82 83 while True: 84 for file in files: 85 yield File.from_raw(file) 86 87 # Check if we need to fetch a next page 88 if cursor is None: 89 logger.debug("No more pages of files") 90 return # No more pages 91 92 # We have a cursor, so we need to get a next page 93 logger.debug("Fetching next page of files") 94 try: 95 get_files_response = file_api_instance.get_files(cursor=cursor, filename=filename) 96 except openapi_client.ApiException as e: 97 raise ApiError(e) 98 files = get_files_response.files 99 cursor = get_files_response.cursor 100 101 def get_file_by_id(self, file_id: FileId) -> File: 102 """Get a single file by its ID. 103 104 Args: 105 file_id (enpi_api.l2.types.file.FileId): The ID of the file to get. 106 107 Returns: 108 enpi_api.l2.types.file.File: The file, with all its metadata. 109 110 Raises: 111 enpi_api.l2.types.api_error.ApiError: If API request fails. 112 113 Example: 114 115 ```python 116 with EnpiApiClient() as enpi_client: 117 example_file_id = FileId("00000000-0000-0000-0000-000000000000") 118 file: File = enpi_client.file_api.get_file_by_id(file_id=example_file_id) 119 ``` 120 """ 121 122 logger.info(f"Getting file with ID `{file_id}`") 123 124 file_api_instance = openapi_client.FileApi(self._inner_api_client) 125 126 try: 127 get_file_response = file_api_instance.get_file(file_id) 128 except openapi_client.ApiException as e: 129 raise ApiError(e) 130 131 file = File.from_raw(get_file_response.file) 132 133 return file 134 135 def delete_file_by_id(self, file_id: FileId) -> None: 136 """Delete a single file by its ID. 137 138 This will remove the file from the ENPICOM Platform. 139 140 Args: 141 file_id (enpi_api.l2.types.file.FileId): The ID of the file to delete. 142 143 Raises: 144 enpi_api.l2.types.api_error.ApiError: If API request fails. 145 146 Example: 147 148 ```python 149 with EnpiApiClient() as enpi_client: 150 example_file_id = FileId("00000000-0000-0000-0000-000000000000") 151 enpi_client.file_api.delete_file_by_id(file_id=example_file_id)) 152 ``` 153 """ 154 155 logger.info(f"Deleting file with ID `{file_id}`") 156 157 file_api_instance = openapi_client.FileApi(self._inner_api_client) 158 159 try: 160 file_api_instance.delete_file(file_id=str(file_id)) 161 except openapi_client.ApiException as e: 162 raise ApiError(e) 163 164 logger.info(f"File with ID `{file_id}` successfully deleted") 165 166 def upload_file( 167 self, 168 file_path: str | Path, 169 tags: Sequence[Tag] = (), 170 on_collision: OnCollisionAction = OnCollisionAction.ERROR, 171 ) -> Execution[File]: 172 """Upload a file to the platform. 173 174 Args: 175 file_path (str | Path): The path to the file to upload. 176 tags (Sequence[enpi_api.l2.types.tag.Tag]): The tags to add to the file. 177 on_collision (enpi_api.l2.types.file.OnCollisionAction): The action to take when uploading a file with the same name as an existing file. 178 179 Returns: 180 enpi_api.l2.types.execution.Execution[enpi_api.l2.types.file.File]: An awaitable that returns the uploaded file 181 or an existing one if the `OnCollisionAction` is set to `SKIP`. 182 183 Raises: 184 enpi_api.l2.client.api.file_api.NameEmpty: If the name of the file is empty. 185 enpi_api.l2.client.api.file_api.S3UploadFailed: If the upload to S3 failed. 186 enpi_api.l2.types.api_error.ApiError: If API request fails. 187 188 Example: 189 190 ```python 191 with EnpiApiClient() as enpi_client: 192 file: File = enpi_client.file_api.upload_file(file_path="path/to/file.txt").wait() 193 ``` 194 """ 195 196 logger.info(f"Uploading file with path `{file_path}`") 197 198 file_api_instance = openapi_client.FileApi(self._inner_api_client) 199 200 # Uploading a file is a two-step process: 201 # 1. Request an S3 temporary credentials used for upload 202 logger.debug("Requesting temporary upload credentials.") 203 204 name = os.path.basename(file_path) 205 name = name.strip() 206 if not name: 207 raise NameEmpty() 208 209 upload_file_request = openapi_client.UploadFileRequest(name=name, tags=tags_to_api_payload(tags), on_collision=on_collision) 210 211 try: 212 upload_file_response = file_api_instance.upload_file(upload_file_request) 213 except openapi_client.ApiException as e: 214 raise ApiError(e) 215 216 file_id = FileId(upload_file_response.id) 217 218 # In the event that the file already exists, and we chose to SKIP, then we can return the existing file 219 if upload_file_response.credentials is None: 220 logger.info(f"File with name `{name}` already exists, and `on_collision` is set to `SKIP`") 221 return Execution(wait=lambda: self.get_file_by_id(file_id), check_execution_state=lambda: TaskState.SUCCEEDED) 222 223 s3_federated_credentials = FederatedCredentials.model_validate( 224 upload_file_response.credentials, 225 from_attributes=True, 226 ) 227 228 # 2. Upload the file by using temporary credentials and boto3 client 229 try: 230 upload_file_to_s3(file_path, s3_federated_credentials) 231 except Exception as err: 232 raise S3UploadFailed(file_path, err) 233 234 def wait() -> File: 235 # A file is not immediately usable after uploading, it needs to be processed first 236 # So before you can use a file you need to wait for it to be processed 237 self.wait_for_file_to_be_processed(file_id) 238 239 logger.success(f"File uploaded with ID `{file_id}`") 240 241 return self.get_file_by_id(file_id) 242 243 return Execution(wait=wait, check_execution_state=lambda: TaskState.SUCCEEDED) 244 245 def download_file_by_id( 246 self, 247 file_id: FileId, 248 output_directory: str | Path | None = None, 249 name: str | None = None, 250 ) -> Path: 251 """Download a single file by its ID into the specified directory. 252 253 Download a file from the platform to your local machine. The file will be saved in the specified directory with 254 the name of the file as it was in the ENPICOM Platform. Alternatively you can overwrite the name by providing one 255 yourself as the `name` argument. 256 257 Args: 258 file_id (enpi_api.l2.types.file.FileId): The ID of the file to download. 259 output_directory (str | Path): The directory to save the file to. If left empty, a temporary directory will be used. 260 name (str | None): The name of the file. If not provided, a name will be generated. 261 262 Returns: 263 Path: The path to the downloaded file. 264 265 Raises: 266 enpi_api.l2.client.api.file_api.NameEmpty: If the name of the file is empty. 267 enpi_api.l2.types.api_error.ApiError: If API request fails. 268 269 Example: 270 271 ```python 272 with EnpiApiClient() as enpi_client: 273 my_directory = f"/path/to/files" 274 example_file_id = FileId("00000000-0000-0000-0000-000000000000") 275 276 # Assume the file has the name `my_data.fastq` 277 full_file_path = enpi_client.file_api.download_file_by_id( 278 file_id=example_file_id, 279 directory=my_directory 280 ) 281 # `full_file_path` will now be `/path/to/files/my_data.fastq` 282 ``` 283 """ 284 285 file_api_instance = openapi_client.FileApi(self._inner_api_client) 286 287 try: 288 download_file_response = file_api_instance.download_file(file_id=str(file_id)) 289 except openapi_client.ApiException as e: 290 raise ApiError(e) 291 292 # If no name is provided, we parse the URL to get the file name 293 if name is None: 294 parsed_url = urlparse(download_file_response.download_url) 295 name = Path(parsed_url.path).name 296 297 if not name or name == "": 298 raise NameEmpty() 299 300 # Ensure that the directory exists 301 if output_directory is None: 302 output_directory = unique_temp_dir() 303 304 os.makedirs(output_directory, exist_ok=True) 305 306 full_path = os.path.join(output_directory, name) 307 308 logger.info(f"Downloading file with ID `{file_id}` to `{full_path}`") 309 downloaded_file_path = download_file(download_file_response.download_url, full_path) 310 logger.success(f"File with ID `{file_id}` successfully downloaded to `{downloaded_file_path}`") 311 312 return downloaded_file_path 313 314 def download_export_by_workflow_execution_task_id( 315 self, task_id: WorkflowExecutionTaskId, output_directory: str | Path | None = None, name: str | None = None 316 ) -> Path: 317 """Download a single file by its job ID to the specified directory. 318 319 Download an export from a job to your local machine. The export will be saved in the specified directory with 320 the name of the file as it was in the job. Alternatively you can overwrite the name by providing one 321 yourself as the `name` argument. 322 323 Args: 324 workflow_execution_id (enpi_api.l2.types.workflow.WorkflowExecutionId): The ID of the workflow execution to download an export from. 325 output_directory (str | Path | None): The directory to save the file to. If none is provided, a temporary 326 directory will be used. 327 name (str | None): The name of the file. If not provided, a name will be generated. 328 329 Returns: 330 Path: The path to the downloaded file. 331 332 Raises: 333 enpi_api.l2.client.api.file_api.NameEmpty: If the name of the file is empty. 334 enpi_api.l2.types.api_error.ApiError: If API request fails. 335 336 Example: 337 338 ```python 339 with EnpiApiClient() as enpi_client: 340 my_directory = f"/path/to/files" 341 example_task_id = TaskId(1234) 342 343 # Assume the file has the name `my_data.fastq` 344 full_file_path = enpi_client.file_api.download_export_by_workflow_execution_task_id( 345 task_id=example_task_id, 346 directory=my_directory 347 ) 348 # `full_file_path` will now be `/path/to/files/my_data.fastq` 349 ``` 350 """ 351 352 file_api_instance = openapi_client.FileApi(self._inner_api_client) 353 354 try: 355 download_file_response = file_api_instance.download_export(job_id=task_id) 356 except openapi_client.ApiException as e: 357 raise ApiError(e) 358 359 # If no name is provided, we parse the URL to get the file name 360 if name is None: 361 parsed_url = urlparse(download_file_response.download_url) 362 name = Path(parsed_url.path).name 363 364 if not name or name == "": 365 raise NameEmpty() 366 367 # Ensure that the directory exists 368 if output_directory is None: 369 output_directory = unique_temp_dir() 370 371 os.makedirs(output_directory, exist_ok=True) 372 373 full_path = os.path.join(output_directory, name) 374 375 logger.info(f"Downloading export from task with ID `{task_id}` to `{full_path}`") 376 downloaded_file_path = download_file(download_file_response.download_url, full_path) 377 logger.success(f"Export from task with ID `{task_id}` successfully downloaded to `{downloaded_file_path}`") 378 379 return downloaded_file_path 380 381 def update_tags(self, file_id: FileId, tags: list[Tag]) -> None: 382 """Update the tags of a file. 383 384 Adds and updates the given tags to the file. If a tag is already present on the file, the value will be 385 overwritten with the given value for the same tag. 386 387 Args: 388 file_id (enpi_api.l2.types.file.FileId): The ID of the file to update. 389 tags (list[enpi_api.l2.types.tag.Tag]): The tags that will be updated or added if they are not already present. 390 391 Raises: 392 enpi_api.l2.types.api_error.ApiError: If API request fails. 393 394 Example: 395 396 ```python 397 with EnpiApiClient() as enpi_client: 398 enpi_client.file_api.update_tags( 399 file_id=FileId("00000000-0000-0000-0000-000000000000"), 400 tags=[ 401 Tag(id=TagId(FileTags.CampaignId), value="my new value"), 402 Tag(id=TagId(FileTags.ProjectId), value="another value") 403 ] 404 ) 405 ``` 406 """ 407 408 logger.info(f"Updating tags for file with ID `{file_id}`") 409 410 file_api_instance = openapi_client.FileApi(self._inner_api_client) 411 412 update_file_tags_request = openapi_client.UpdateTagsRequest(tags=tags_to_api_payload(tags)) 413 414 try: 415 file_api_instance.update_tags(file_id=str(file_id), update_tags_request=update_file_tags_request) 416 except openapi_client.ApiException as e: 417 raise ApiError(e) 418 419 logger.success(f"Tags updated for file with ID `{file_id}`") 420 421 def remove_tags(self, file_id: FileId, tags: list[TagId]) -> None: 422 """Remove the specified tags from a file. 423 424 Args: 425 file_id (enpi_api.l2.types.file.FileId): The ID of the file to update. 426 tags (List[enpi_api.l2.types.tag.TagId]): The tags that will be removed from the file. 427 428 Raises: 429 enpi_api.l2.types.api_error.ApiError: If API request fails. 430 431 Example: 432 433 ```python 434 with EnpiApiClient() as enpi_client: 435 enpi_client.file_api.remove_tags( 436 file_id=FileId("00000000-0000-0000-0000-000000000000"), 437 tags=[TagId(FileTags.CampaignId), TagId(FileTags.ProjectId)] 438 ) 439 ``` 440 """ 441 442 logger.info(f"Removing tags: {tags} from file with ID `{file_id}`") 443 444 file_api_instance = openapi_client.FileApi(self._inner_api_client) 445 446 delete_tags_request = openapi_client.DeleteTagsRequest(tags=[int(x) for x in tags]) 447 448 try: 449 file_api_instance.delete_tags(file_id=str(file_id), delete_tags_request=delete_tags_request) 450 except openapi_client.ApiException as e: 451 raise ApiError(e) 452 453 logger.success(f"Tags removed from file with ID `{file_id}`") 454 455 def wait_for_file_to_be_processed(self, file_id: FileId) -> None: 456 """Wait for a file to be processed. 457 458 Files are not immediately usable after uploading, they need to be processed first. This convenience method 459 waits for a file to be processed. 460 461 Args: 462 file_id (enpi_api.l2.types.file.FileId): The ID of the file to wait for. 463 464 Raises: 465 enpi_api.l2.types.api_error.ApiError: If API request fails. 466 467 Example: 468 469 ```python 470 with EnpiApiClient() as enpi_client: 471 enpi_client.file_api.wait_for_file_to_be_processed(file_id=FileId("00000000-0000-0000-0000-000000000000")) 472 ``` 473 """ 474 475 logger.info(f"Waiting for file with ID `{file_id}` to be processed") 476 477 poll_interval_seconds = 1 478 479 # We do not know how long it will take for a file to be processed, so we poll the file until it is processed 480 while True: 481 file = self.get_file_by_id(file_id) 482 483 if file.status == FileStatus.PROCESSED: 484 logger.success(f"File with ID `{file_id}` has been processed") 485 return 486 elif file.status == FileStatus.PROCESSING: 487 logger.debug(f"File with ID `{file_id}` is still being processed. Waiting for {poll_interval_seconds} seconds") 488 time.sleep(poll_interval_seconds) 489 else: 490 assert_never(file.status)
48 def get_files(self, filename: str | None = None) -> Generator[File, None, None]: 49 """Get a generator through all available files in the platform. 50 51 Args: 52 filename (str | None): Optional filename for search by case-insensitive substring matching 53 54 Returns: 55 Generator[enpi_api.l2.types.file.File, None, None]: A generator through all files in the platform. 56 57 Raises: 58 enpi_api.l2.types.api_error.ApiError: If API request fails. 59 60 Example: 61 62 ```python 63 with EnpiApiClient() as enpi_client: 64 for file in enpi_client.file_api.get_files(): 65 print(file) 66 ``` 67 """ 68 69 logger.info("Getting a generator through all files") 70 71 file_api_instance = openapi_client.FileApi(self._inner_api_client) 72 73 # Fetch the first page, there is always a first page, it may be empty 74 try: 75 get_files_response = file_api_instance.get_files() 76 except openapi_client.ApiException as e: 77 raise ApiError(e) 78 79 # `files` and `cursor` get overwritten in the loop below when fetching a new page 80 files = get_files_response.files 81 cursor = get_files_response.cursor 82 83 while True: 84 for file in files: 85 yield File.from_raw(file) 86 87 # Check if we need to fetch a next page 88 if cursor is None: 89 logger.debug("No more pages of files") 90 return # No more pages 91 92 # We have a cursor, so we need to get a next page 93 logger.debug("Fetching next page of files") 94 try: 95 get_files_response = file_api_instance.get_files(cursor=cursor, filename=filename) 96 except openapi_client.ApiException as e: 97 raise ApiError(e) 98 files = get_files_response.files 99 cursor = get_files_response.cursor
Get a generator through all available files in the platform.
Arguments:
- filename (str | None): Optional filename for search by case-insensitive substring matching
Returns:
Generator[enpi_api.l2.types.file.File, None, None]: A generator through all files in the platform.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: for file in enpi_client.file_api.get_files(): print(file)
101 def get_file_by_id(self, file_id: FileId) -> File: 102 """Get a single file by its ID. 103 104 Args: 105 file_id (enpi_api.l2.types.file.FileId): The ID of the file to get. 106 107 Returns: 108 enpi_api.l2.types.file.File: The file, with all its metadata. 109 110 Raises: 111 enpi_api.l2.types.api_error.ApiError: If API request fails. 112 113 Example: 114 115 ```python 116 with EnpiApiClient() as enpi_client: 117 example_file_id = FileId("00000000-0000-0000-0000-000000000000") 118 file: File = enpi_client.file_api.get_file_by_id(file_id=example_file_id) 119 ``` 120 """ 121 122 logger.info(f"Getting file with ID `{file_id}`") 123 124 file_api_instance = openapi_client.FileApi(self._inner_api_client) 125 126 try: 127 get_file_response = file_api_instance.get_file(file_id) 128 except openapi_client.ApiException as e: 129 raise ApiError(e) 130 131 file = File.from_raw(get_file_response.file) 132 133 return file
Get a single file by its ID.
Arguments:
- file_id (enpi_api.l2.types.file.FileId): The ID of the file to get.
Returns:
enpi_api.l2.types.file.File: The file, with all its metadata.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: example_file_id = FileId("00000000-0000-0000-0000-000000000000") file: File = enpi_client.file_api.get_file_by_id(file_id=example_file_id)
135 def delete_file_by_id(self, file_id: FileId) -> None: 136 """Delete a single file by its ID. 137 138 This will remove the file from the ENPICOM Platform. 139 140 Args: 141 file_id (enpi_api.l2.types.file.FileId): The ID of the file to delete. 142 143 Raises: 144 enpi_api.l2.types.api_error.ApiError: If API request fails. 145 146 Example: 147 148 ```python 149 with EnpiApiClient() as enpi_client: 150 example_file_id = FileId("00000000-0000-0000-0000-000000000000") 151 enpi_client.file_api.delete_file_by_id(file_id=example_file_id)) 152 ``` 153 """ 154 155 logger.info(f"Deleting file with ID `{file_id}`") 156 157 file_api_instance = openapi_client.FileApi(self._inner_api_client) 158 159 try: 160 file_api_instance.delete_file(file_id=str(file_id)) 161 except openapi_client.ApiException as e: 162 raise ApiError(e) 163 164 logger.info(f"File with ID `{file_id}` successfully deleted")
Delete a single file by its ID.
This will remove the file from the ENPICOM Platform.
Arguments:
- file_id (enpi_api.l2.types.file.FileId): The ID of the file to delete.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: example_file_id = FileId("00000000-0000-0000-0000-000000000000") enpi_client.file_api.delete_file_by_id(file_id=example_file_id))
166 def upload_file( 167 self, 168 file_path: str | Path, 169 tags: Sequence[Tag] = (), 170 on_collision: OnCollisionAction = OnCollisionAction.ERROR, 171 ) -> Execution[File]: 172 """Upload a file to the platform. 173 174 Args: 175 file_path (str | Path): The path to the file to upload. 176 tags (Sequence[enpi_api.l2.types.tag.Tag]): The tags to add to the file. 177 on_collision (enpi_api.l2.types.file.OnCollisionAction): The action to take when uploading a file with the same name as an existing file. 178 179 Returns: 180 enpi_api.l2.types.execution.Execution[enpi_api.l2.types.file.File]: An awaitable that returns the uploaded file 181 or an existing one if the `OnCollisionAction` is set to `SKIP`. 182 183 Raises: 184 enpi_api.l2.client.api.file_api.NameEmpty: If the name of the file is empty. 185 enpi_api.l2.client.api.file_api.S3UploadFailed: If the upload to S3 failed. 186 enpi_api.l2.types.api_error.ApiError: If API request fails. 187 188 Example: 189 190 ```python 191 with EnpiApiClient() as enpi_client: 192 file: File = enpi_client.file_api.upload_file(file_path="path/to/file.txt").wait() 193 ``` 194 """ 195 196 logger.info(f"Uploading file with path `{file_path}`") 197 198 file_api_instance = openapi_client.FileApi(self._inner_api_client) 199 200 # Uploading a file is a two-step process: 201 # 1. Request an S3 temporary credentials used for upload 202 logger.debug("Requesting temporary upload credentials.") 203 204 name = os.path.basename(file_path) 205 name = name.strip() 206 if not name: 207 raise NameEmpty() 208 209 upload_file_request = openapi_client.UploadFileRequest(name=name, tags=tags_to_api_payload(tags), on_collision=on_collision) 210 211 try: 212 upload_file_response = file_api_instance.upload_file(upload_file_request) 213 except openapi_client.ApiException as e: 214 raise ApiError(e) 215 216 file_id = FileId(upload_file_response.id) 217 218 # In the event that the file already exists, and we chose to SKIP, then we can return the existing file 219 if upload_file_response.credentials is None: 220 logger.info(f"File with name `{name}` already exists, and `on_collision` is set to `SKIP`") 221 return Execution(wait=lambda: self.get_file_by_id(file_id), check_execution_state=lambda: TaskState.SUCCEEDED) 222 223 s3_federated_credentials = FederatedCredentials.model_validate( 224 upload_file_response.credentials, 225 from_attributes=True, 226 ) 227 228 # 2. Upload the file by using temporary credentials and boto3 client 229 try: 230 upload_file_to_s3(file_path, s3_federated_credentials) 231 except Exception as err: 232 raise S3UploadFailed(file_path, err) 233 234 def wait() -> File: 235 # A file is not immediately usable after uploading, it needs to be processed first 236 # So before you can use a file you need to wait for it to be processed 237 self.wait_for_file_to_be_processed(file_id) 238 239 logger.success(f"File uploaded with ID `{file_id}`") 240 241 return self.get_file_by_id(file_id) 242 243 return Execution(wait=wait, check_execution_state=lambda: TaskState.SUCCEEDED)
Upload a file to the platform.
Arguments:
- file_path (str | Path): The path to the file to upload.
- tags (Sequence[enpi_api.l2.types.tag.Tag]): The tags to add to the file.
- on_collision (enpi_api.l2.types.file.OnCollisionAction): The action to take when uploading a file with the same name as an existing file.
Returns:
enpi_api.l2.types.execution.Execution[enpi_api.l2.types.file.File]: An awaitable that returns the uploaded file or an existing one if the
OnCollisionActionis set toSKIP.
Raises:
- enpi_api.l2.client.api.file_api.NameEmpty: If the name of the file is empty.
- enpi_api.l2.client.api.file_api.S3UploadFailed: If the upload to S3 failed.
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: file: File = enpi_client.file_api.upload_file(file_path="path/to/file.txt").wait()
245 def download_file_by_id( 246 self, 247 file_id: FileId, 248 output_directory: str | Path | None = None, 249 name: str | None = None, 250 ) -> Path: 251 """Download a single file by its ID into the specified directory. 252 253 Download a file from the platform to your local machine. The file will be saved in the specified directory with 254 the name of the file as it was in the ENPICOM Platform. Alternatively you can overwrite the name by providing one 255 yourself as the `name` argument. 256 257 Args: 258 file_id (enpi_api.l2.types.file.FileId): The ID of the file to download. 259 output_directory (str | Path): The directory to save the file to. If left empty, a temporary directory will be used. 260 name (str | None): The name of the file. If not provided, a name will be generated. 261 262 Returns: 263 Path: The path to the downloaded file. 264 265 Raises: 266 enpi_api.l2.client.api.file_api.NameEmpty: If the name of the file is empty. 267 enpi_api.l2.types.api_error.ApiError: If API request fails. 268 269 Example: 270 271 ```python 272 with EnpiApiClient() as enpi_client: 273 my_directory = f"/path/to/files" 274 example_file_id = FileId("00000000-0000-0000-0000-000000000000") 275 276 # Assume the file has the name `my_data.fastq` 277 full_file_path = enpi_client.file_api.download_file_by_id( 278 file_id=example_file_id, 279 directory=my_directory 280 ) 281 # `full_file_path` will now be `/path/to/files/my_data.fastq` 282 ``` 283 """ 284 285 file_api_instance = openapi_client.FileApi(self._inner_api_client) 286 287 try: 288 download_file_response = file_api_instance.download_file(file_id=str(file_id)) 289 except openapi_client.ApiException as e: 290 raise ApiError(e) 291 292 # If no name is provided, we parse the URL to get the file name 293 if name is None: 294 parsed_url = urlparse(download_file_response.download_url) 295 name = Path(parsed_url.path).name 296 297 if not name or name == "": 298 raise NameEmpty() 299 300 # Ensure that the directory exists 301 if output_directory is None: 302 output_directory = unique_temp_dir() 303 304 os.makedirs(output_directory, exist_ok=True) 305 306 full_path = os.path.join(output_directory, name) 307 308 logger.info(f"Downloading file with ID `{file_id}` to `{full_path}`") 309 downloaded_file_path = download_file(download_file_response.download_url, full_path) 310 logger.success(f"File with ID `{file_id}` successfully downloaded to `{downloaded_file_path}`") 311 312 return downloaded_file_path
Download a single file by its ID into the specified directory.
Download a file from the platform to your local machine. The file will be saved in the specified directory with
the name of the file as it was in the ENPICOM Platform. Alternatively you can overwrite the name by providing one
yourself as the name argument.
Arguments:
- file_id (enpi_api.l2.types.file.FileId): The ID of the file to download.
- output_directory (str | Path): The directory to save the file to. If left empty, a temporary directory will be used.
- name (str | None): The name of the file. If not provided, a name will be generated.
Returns:
Path: The path to the downloaded file.
Raises:
- enpi_api.l2.client.api.file_api.NameEmpty: If the name of the file is empty.
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: my_directory = f"/path/to/files" example_file_id = FileId("00000000-0000-0000-0000-000000000000") # Assume the file has the name `my_data.fastq` full_file_path = enpi_client.file_api.download_file_by_id( file_id=example_file_id, directory=my_directory ) # `full_file_path` will now be `/path/to/files/my_data.fastq`
314 def download_export_by_workflow_execution_task_id( 315 self, task_id: WorkflowExecutionTaskId, output_directory: str | Path | None = None, name: str | None = None 316 ) -> Path: 317 """Download a single file by its job ID to the specified directory. 318 319 Download an export from a job to your local machine. The export will be saved in the specified directory with 320 the name of the file as it was in the job. Alternatively you can overwrite the name by providing one 321 yourself as the `name` argument. 322 323 Args: 324 workflow_execution_id (enpi_api.l2.types.workflow.WorkflowExecutionId): The ID of the workflow execution to download an export from. 325 output_directory (str | Path | None): The directory to save the file to. If none is provided, a temporary 326 directory will be used. 327 name (str | None): The name of the file. If not provided, a name will be generated. 328 329 Returns: 330 Path: The path to the downloaded file. 331 332 Raises: 333 enpi_api.l2.client.api.file_api.NameEmpty: If the name of the file is empty. 334 enpi_api.l2.types.api_error.ApiError: If API request fails. 335 336 Example: 337 338 ```python 339 with EnpiApiClient() as enpi_client: 340 my_directory = f"/path/to/files" 341 example_task_id = TaskId(1234) 342 343 # Assume the file has the name `my_data.fastq` 344 full_file_path = enpi_client.file_api.download_export_by_workflow_execution_task_id( 345 task_id=example_task_id, 346 directory=my_directory 347 ) 348 # `full_file_path` will now be `/path/to/files/my_data.fastq` 349 ``` 350 """ 351 352 file_api_instance = openapi_client.FileApi(self._inner_api_client) 353 354 try: 355 download_file_response = file_api_instance.download_export(job_id=task_id) 356 except openapi_client.ApiException as e: 357 raise ApiError(e) 358 359 # If no name is provided, we parse the URL to get the file name 360 if name is None: 361 parsed_url = urlparse(download_file_response.download_url) 362 name = Path(parsed_url.path).name 363 364 if not name or name == "": 365 raise NameEmpty() 366 367 # Ensure that the directory exists 368 if output_directory is None: 369 output_directory = unique_temp_dir() 370 371 os.makedirs(output_directory, exist_ok=True) 372 373 full_path = os.path.join(output_directory, name) 374 375 logger.info(f"Downloading export from task with ID `{task_id}` to `{full_path}`") 376 downloaded_file_path = download_file(download_file_response.download_url, full_path) 377 logger.success(f"Export from task with ID `{task_id}` successfully downloaded to `{downloaded_file_path}`") 378 379 return downloaded_file_path
Download a single file by its job ID to the specified directory.
Download an export from a job to your local machine. The export will be saved in the specified directory with
the name of the file as it was in the job. Alternatively you can overwrite the name by providing one
yourself as the name argument.
Arguments:
- workflow_execution_id (enpi_api.l2.types.workflow.WorkflowExecutionId): The ID of the workflow execution to download an export from.
- output_directory (str | Path | None): The directory to save the file to. If none is provided, a temporary directory will be used.
- name (str | None): The name of the file. If not provided, a name will be generated.
Returns:
Path: The path to the downloaded file.
Raises:
- enpi_api.l2.client.api.file_api.NameEmpty: If the name of the file is empty.
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: my_directory = f"/path/to/files" example_task_id = TaskId(1234) # Assume the file has the name `my_data.fastq` full_file_path = enpi_client.file_api.download_export_by_workflow_execution_task_id( task_id=example_task_id, directory=my_directory ) # `full_file_path` will now be `/path/to/files/my_data.fastq`
455 def wait_for_file_to_be_processed(self, file_id: FileId) -> None: 456 """Wait for a file to be processed. 457 458 Files are not immediately usable after uploading, they need to be processed first. This convenience method 459 waits for a file to be processed. 460 461 Args: 462 file_id (enpi_api.l2.types.file.FileId): The ID of the file to wait for. 463 464 Raises: 465 enpi_api.l2.types.api_error.ApiError: If API request fails. 466 467 Example: 468 469 ```python 470 with EnpiApiClient() as enpi_client: 471 enpi_client.file_api.wait_for_file_to_be_processed(file_id=FileId("00000000-0000-0000-0000-000000000000")) 472 ``` 473 """ 474 475 logger.info(f"Waiting for file with ID `{file_id}` to be processed") 476 477 poll_interval_seconds = 1 478 479 # We do not know how long it will take for a file to be processed, so we poll the file until it is processed 480 while True: 481 file = self.get_file_by_id(file_id) 482 483 if file.status == FileStatus.PROCESSED: 484 logger.success(f"File with ID `{file_id}` has been processed") 485 return 486 elif file.status == FileStatus.PROCESSING: 487 logger.debug(f"File with ID `{file_id}` is still being processed. Waiting for {poll_interval_seconds} seconds") 488 time.sleep(poll_interval_seconds) 489 else: 490 assert_never(file.status)
Wait for a file to be processed.
Files are not immediately usable after uploading, they need to be processed first. This convenience method waits for a file to be processed.
Arguments:
- file_id (enpi_api.l2.types.file.FileId): The ID of the file to wait for.
Raises:
- enpi_api.l2.types.api_error.ApiError: If API request fails.
Example:
with EnpiApiClient() as enpi_client: enpi_client.file_api.wait_for_file_to_be_processed(file_id=FileId("00000000-0000-0000-0000-000000000000"))