Skip to content

LargeFile#

from great_ai.large_file import *

For more details about using LargeFiles, check out the how to guide.

LargeFileLocal #

Bases: LargeFileBase

LargeFile implementation using local filesystem as a backend.

Store large files remotely using the familiar API of open(). With built-in versioning, pruning and local cache.

IMPORTANT: If LargeFileLocal.max_cache_size is too small, it won't be enough to store all your files and they can end up deleted.

See parent for more details.

Examples:

>>> LargeFileLocal.cache_path = Path(".cache")
>>> LargeFileLocal.max_cache_size = "30GB"
>>> with LargeFileLocal("my_test.txt", "w", keep_last_n=2) as f:
...     f.write('test')
4
>>> with LargeFileLocal("my_test.txt") as f:
...     print(f.read())
test
Source code in great_ai/large_file/large_file/large_file_local.py
class LargeFileLocal(LargeFileBase):
    """LargeFile implementation using local filesystem as a backend.

    Store large files remotely using the familiar API of `open()`. With built-in
    versioning, pruning and local cache.

    IMPORTANT: If LargeFileLocal.max_cache_size is too small, it won't be enough to
    store all your files and they can end up deleted.

    See parent for more details.

    Examples:
        >>> LargeFileLocal.cache_path = Path(".cache")

        >>> LargeFileLocal.max_cache_size = "30GB"

        >>> with LargeFileLocal("my_test.txt", "w", keep_last_n=2) as f:
        ...     f.write('test')
        4

        >>> with LargeFileLocal("my_test.txt") as f:
        ...     print(f.read())
        test
    """

    def __init__(
        self,
        name: str,
        mode: str = "r",
        *,
        buffering: int = -1,
        encoding: Optional[str] = None,
        errors: Optional[str] = None,
        newline: Optional[str] = None,
        version: Optional[int] = None,
        keep_last_n: Optional[int] = None,
    ):
        super().__init__(
            name,
            mode,
            buffering=buffering,
            encoding=encoding,
            errors=errors,
            newline=newline,
            version=version,
            keep_last_n=keep_last_n,
            cache_only_mode=True,
        )
        super().configure_credentials()

    def _find_remote_instances(self) -> List[DataInstance]:
        return []

    def _download(
        self, remote_path: Any, local_path: Path, hide_progress: bool
    ) -> None:
        # This will never be called because the file must be in the cache
        raise NotImplementedError()

    def _upload(self, local_path: Path, hide_progress: bool) -> None:
        pass  # the "upload" is already done py the parent's caching mechanism

    def _delete_old_remote_versions(self) -> None:
        if self._keep_last_n is not None:
            for i in (
                self._instances[: -self._keep_last_n]
                if self._keep_last_n > 0
                else self._instances
            ):
                logger.info(
                    f"Removing old version (keep_last_n={self._keep_last_n}): {i.remote_path}"
                )
                i.remote_path.unlink()

LargeFileS3 #

Bases: LargeFileBase

LargeFile implementation using S3-compatible storage as a backend.

Store large files remotely using the familiar API of open(). With built-in versioning, pruning and local cache.

See parent for more details.

Source code in great_ai/large_file/large_file/large_file_s3.py
class LargeFileS3(LargeFileBase):
    """LargeFile implementation using S3-compatible storage as a backend.

    Store large files remotely using the familiar API of `open()`. With built-in
    versioning, pruning and local cache.

    See parent for more details.
    """

    region_name = None
    access_key_id = None
    secret_access_key = None
    bucket_name = None
    endpoint_url = None

    @classmethod
    def configure_credentials(  # type: ignore
        cls,
        *,
        aws_region_name: str,
        aws_access_key_id: str,
        aws_secret_access_key: str,
        large_files_bucket_name: str,
        aws_endpoint_url: Optional[str] = None,
        **_: Any,
    ) -> None:
        cls.region_name = aws_region_name
        cls.access_key_id = aws_access_key_id
        cls.secret_access_key = aws_secret_access_key
        cls.bucket_name = large_files_bucket_name
        cls.endpoint_url = aws_endpoint_url
        super().configure_credentials()

    @cached_property
    def _client(self) -> boto3.client:
        if (
            self.region_name is None
            or self.access_key_id is None
            or self.secret_access_key is None
            or self.bucket_name is None
        ):
            raise ValueError(
                "Please configure the S3 access options by calling LargeFileS3.configure_credentials or set cache_only_mode=True in the constructor."
            )

        return boto3.client(
            "s3",
            aws_access_key_id=self.access_key_id,
            aws_secret_access_key=self.secret_access_key,
            region_name=self.region_name,
            endpoint_url=self.endpoint_url,
        )

    def _find_remote_instances(self) -> List[DataInstance]:
        logger.debug(f"Fetching S3 versions of {self._name}")

        found_objects = self._client.list_objects_v2(
            Bucket=self.bucket_name, Prefix=self._name
        )
        return (
            [
                DataInstance(
                    name=o["Key"].split(S3_NAME_VERSION_SEPARATOR)[0],
                    version=int(o["Key"].split(S3_NAME_VERSION_SEPARATOR)[-1]),
                    remote_path=o["Key"],
                )
                for o in found_objects["Contents"]
                if o["Key"].split(S3_NAME_VERSION_SEPARATOR)[0] == self._name
            ]
            if "Contents" in found_objects
            else []
        )

    def _download(
        self, remote_path: Any, local_path: Path, hide_progress: bool
    ) -> None:
        logger.info(f"Downloading {remote_path} from S3")

        size = self._client.head_object(Bucket=self.bucket_name, Key=remote_path)[
            "ContentLength"
        ]

        self._client.download_file(
            Bucket=self.bucket_name,
            Key=remote_path,
            Filename=str(local_path),
            Callback=None
            if hide_progress
            else DownloadProgressBar(name=str(remote_path), size=size, logger=logger),
        )

    def _upload(self, local_path: Path, hide_progress: bool) -> None:
        key = f"{self._name}/{self.version}"
        logger.info(f"Uploading {self._local_name} to S3 as {key}")

        self._client.upload_file(
            Filename=str(local_path),
            Bucket=self.bucket_name,
            Key=key,
            Callback=None
            if hide_progress
            else UploadProgressBar(path=local_path, logger=logger),
        )

    def _delete_old_remote_versions(self) -> None:
        if self._keep_last_n is not None:
            for i in (
                self._instances[: -self._keep_last_n]
                if self._keep_last_n > 0
                else self._instances
            ):
                logger.info(
                    f"Removing old version from S3 (keep_last_n={self._keep_last_n}): {i.remote_path}"
                )
                self._client.delete_object(Bucket=self.bucket_name, Key=i.remote_path)

LargeFileMongo #

Bases: LargeFileBase

LargeFile implementation using GridFS (MongoDB) as a backend.

Store large files remotely using the familiar API of open(). With built-in versioning, pruning and local cache.

See parent for more details.

Source code in great_ai/large_file/large_file/large_file_mongo.py
class LargeFileMongo(LargeFileBase):
    """LargeFile implementation using GridFS (MongoDB) as a backend.

    Store large files remotely using the familiar API of `open()`. With built-in
    versioning, pruning and local cache.

    See parent for more details.
    """

    mongo_connection_string = None
    mongo_database = None

    @classmethod
    def configure_credentials(  # type: ignore
        cls,
        *,
        mongo_connection_string: str,
        mongo_database: str,
        **_: Any,
    ) -> None:
        cls.mongo_connection_string = mongo_connection_string
        cls.mongo_database = mongo_database
        super().configure_credentials()

    @cached_property
    def _client(self) -> GridFSBucket:
        if self.mongo_connection_string is None or self.mongo_database is None:
            raise ValueError(
                "Please configure the MongoDB access options by calling LargeFileMongo.configure_credentials or set cache_only_mode=True in the constructor."
            )

        db: Database = MongoClient(self.mongo_connection_string)[self.mongo_database]
        return GridFSBucket(db)

    def _find_remote_instances(self) -> List[DataInstance]:
        logger.debug(f"Fetching Mongo (GridFS) versions of {self._name}")

        return [
            DataInstance(
                name=MONGO_NAME_VERSION_SEPARATOR.join(
                    f.name.split(MONGO_NAME_VERSION_SEPARATOR)[:-1]
                ),
                version=int(f.name.split(MONGO_NAME_VERSION_SEPARATOR)[-1]),
                remote_path=(f._id, f.length),
            )
            for f in self._client.find(
                {
                    "filename": re.compile(
                        re.escape(self._name + MONGO_NAME_VERSION_SEPARATOR) + ".*"
                    )
                }
            )
        ]

    def _download(
        self, remote_path: Any, local_path: Path, hide_progress: bool
    ) -> None:
        logger.info(f"Downloading {remote_path[0]} from Mongo (GridFS)")

        progress = (
            DownloadProgressBar(
                name=str(remote_path[0]), size=remote_path[1], logger=logger
            )
            if not hide_progress
            else None
        )
        with self._client.open_download_stream(remote_path[0]) as stream:
            with open(local_path, "wb") as f:
                while True:
                    content = stream.read(DEFAULT_CHUNK_SIZE)
                    f.write(content)

                    if progress:
                        progress(len(content))
                    if len(content) < DEFAULT_CHUNK_SIZE:
                        break

    def _upload(self, local_path: Path, hide_progress: bool) -> None:
        logger.info(f"Uploading {local_path} to Mongo (GridFS)")

        progress = (
            UploadProgressBar(path=local_path, logger=logger)
            if not hide_progress
            else None
        )
        with self._client.open_upload_stream(
            f"{self._name}{MONGO_NAME_VERSION_SEPARATOR}{self.version}"
        ) as stream:
            with open(local_path, "rb") as f:
                while True:
                    content = f.read(DEFAULT_CHUNK_SIZE)
                    stream.write(content)

                    if progress:
                        progress(len(content))
                    if len(content) < DEFAULT_CHUNK_SIZE:
                        break

    def _delete_old_remote_versions(self) -> None:
        if self._keep_last_n is not None:
            for i in (
                self._instances[: -self._keep_last_n]
                if self._keep_last_n > 0
                else self._instances
            ):
                logger.info(
                    f"Removing old version from MongoDB (GridFS) (keep_last_n={self._keep_last_n}): {i.name}{MONGO_NAME_VERSION_SEPARATOR}{i.version}"
                )
                self._client.delete(i.remote_path[0])

LargeFileBase #

Bases: ABC

Base for LargeFile implementations with different backends.

Store large files remotely using the familiar API of open(). With built-in versioning, pruning and local cache.

By default, files are stored in the ".cache" folder and the least recently used is deleted after the overall size reaches 30 GBs.

Examples:

>>> LargeFileBase.cache_path = Path(".cache")
>>> LargeFileBase.max_cache_size = "30GB"

Attributes:

Name Type Description
initialized

Tell whether configure_credentials or configure_credentials_from_file has been already called.

cache_path

Storage location for cached files.

max_cache_size Optional[str]

Delete files until the folder at cache_path is smaller than this value. Examples: "5 GB", "10MB", "0.3 TB". Set to None for no automatic cache-pruning.

Source code in great_ai/large_file/large_file/large_file_base.py
class LargeFileBase(ABC):
    """Base for LargeFile implementations with different backends.

    Store large files remotely using the familiar API of `open()`. With built-in
    versioning, pruning and local cache.

    By default, files are stored in the ".cache" folder and the least recently used is
    deleted after the overall size reaches 30 GBs.

    Examples:
        >>> LargeFileBase.cache_path = Path(".cache")

        >>> LargeFileBase.max_cache_size = "30GB"

    Attributes:
        initialized: Tell whether `configure_credentials` or
            `configure_credentials_from_file` has been already called.
        cache_path: Storage location for cached files.
        max_cache_size: Delete files until the folder at `cache_path` is smaller than
            this value. Examples: "5 GB", "10MB", "0.3 TB". Set to `None` for no
            automatic cache-pruning.
    """

    initialized = False
    cache_path = Path(".cache")
    max_cache_size: Optional[str] = "30GB"

    def __init__(
        self,
        name: str,
        mode: str = "r",
        *,
        buffering: int = -1,
        encoding: Optional[str] = None,
        errors: Optional[str] = None,
        newline: Optional[str] = None,
        version: Optional[int] = None,
        keep_last_n: Optional[int] = None,
        cache_only_mode: bool = False,
    ):
        clean_name = re.sub(r"[^!a-zA-Z0-9._-]+", "", name)
        if clean_name != name:
            raise ValueError(
                f"Given name contains illegal characters, consider changing it to: `{clean_name}`"
            )

        self._name = name
        self._version = version
        self._mode = mode
        self._keep_last_n = keep_last_n
        self._cache_only_mode = cache_only_mode

        self._buffering = buffering
        self._encoding = encoding

        if errors is not None and sys.version_info[1] < 8:
            raise RuntimeError(
                "The `errors` kwarg is only supported in 3.8 <= Python versions."
            )
        self._errors = errors

        self._newline = newline

        LargeFileBase.cache_path.mkdir(parents=True, exist_ok=True)

        self._find_instances()
        self._check_mode_and_set_version()

    @classmethod
    def configure_credentials_from_file(
        cls,
        secrets: Union[Path, str, ConfigFile],
    ) -> None:
        """Load file and feed its content to `configure_credentials`.

        Extra keys are ignored.
        """

        if not isinstance(secrets, ConfigFile):
            secrets = ConfigFile(secrets)
        cls.configure_credentials(**{k.lower(): v for k, v in secrets.items()})

    @classmethod
    def configure_credentials(cls, **kwargs: str) -> None:
        """Configure required credentials for the LargeFile backend."""

        cls.initialized = True

    def __enter__(self) -> IO:
        params = dict(
            mode=self._mode,
            buffering=self._buffering,
            encoding=self._encoding,
            newline=self._newline,
            delete=False,
            prefix="large_file-",
        )

        if sys.version_info[1] >= 8:
            params["errors"] = self._errors

        self._file: IO[Any] = (
            tempfile.NamedTemporaryFile(**params)  # type: ignore
            if "w" in self._mode
            else open(
                self.get(),
                mode=self._mode,
                buffering=self._buffering,
                encoding=self._encoding,
                newline=self._newline,
                errors=self._errors,
            )
        )

        return self._file

    def __exit__(
        self,
        type: Optional[Type[BaseException]],
        exc: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> Literal[False]:
        self._file.close()

        if type is None:
            if "w" in self._mode:
                self.push(Path(self._file.name))
                os.unlink(self._file.name)
        else:
            logger.exception("Could not finish operation.")

        return False

    @property
    def version(self) -> int:
        """Numeric version of the file proxied by this LargeFile instance."""

        return cast(int, self._version)

    @lru_cache(1)
    def get(self, hide_progress: bool = False) -> Path:
        """Return path to the proxy of a file (or directory).

        If not available in the local cache, an attempt is made to download it.

        Args:
            hide_progress: Do not show a progress update after each 10% of progress.
        """

        remote_path = next(
            i.remote_path for i in self._instances if i.version == self._version
        )

        destination = self.cache_path / self._local_name
        if not destination.exists():
            logger.info(f"File {self._local_name} does not exist locally")

            with tempfile.TemporaryDirectory() as tmp:
                local_root_path = Path(tmp)
                tmp_file_archive = (
                    local_root_path / f"{self._local_name}{ARCHIVE_EXTENSION}"
                )
                self._download(
                    remote_path, tmp_file_archive, hide_progress=hide_progress
                )

                logger.info(f"Decompressing {self._local_name}")
                shutil.unpack_archive(str(tmp_file_archive), tmp, COMPRESSION_ALGORITHM)
                shutil.move(str(local_root_path / self._local_name), str(destination))
        else:
            logger.info(f"File {self._local_name} found in cache")

        return destination

    def push(self, path: Union[Path, str], hide_progress: bool = False) -> None:
        """Upload a file (or directory) as a new version of `key`.

        The file/directory is compressed before upload.

        Args:
            hide_progress: Do not show a progress update after each 10% of progress.
        """

        if isinstance(path, str):
            path = Path(path)

        with tempfile.TemporaryDirectory() as tmp:
            if path.is_file():
                logger.info(f"Copying file for {self._local_name}")
                copy: Any = shutil.copy
            else:
                logger.info(f"Copying directory for {self._local_name}")
                copy = shutil.copytree

            try:
                # Make local copy in the cache
                shutil.rmtree(self.cache_path / self._local_name, ignore_errors=True)
                copy(str(path), str(self.cache_path / self._local_name))
            except shutil.SameFileError:
                pass  # No worries

            copy(str(path), str(Path(tmp) / self._local_name))

            with tempfile.TemporaryDirectory() as tmp2:
                # A directory has to be zipped and it cannot contain the output of the zipping
                logger.info(f"Compressing {self._local_name}")
                shutil.make_archive(
                    str(Path(tmp2) / self._local_name),
                    COMPRESSION_ALGORITHM,
                    tmp,
                )

                file_to_be_uploaded = (
                    Path(tmp2) / f"{self._local_name}{ARCHIVE_EXTENSION}"
                )
                self._upload(file_to_be_uploaded, hide_progress=hide_progress)

        self.clean_up()

    def delete(self) -> None:
        """Delete all versions of the files under this `key`."""

        self._keep_last_n = 0
        self._delete_old_remote_versions()

    @property
    def versions_pretty(self) -> str:
        """Formatted string of all available versions."""
        return ", ".join((str(i.version) for i in self._instances))

    def clean_up(self) -> None:
        """Delete local and remote versions according to currently set cache and retention policy."""

        self._delete_old_remote_versions()
        self._prune_cache()

    @property
    def _local_name(self) -> str:
        return f"{self._name}{CACHE_NAME_VERSION_SEPARATOR}{self.version}"

    def _find_instances(self) -> None:
        if self._cache_only_mode:
            self._instances = self._find_instances_from_cache()
        else:
            self._instances = self._find_remote_instances()

        self._instances = sorted(self._instances, key=lambda i: i.version)

    def _find_instances_from_cache(self) -> List[DataInstance]:
        logger.info(f"Fetching cached versions of {self._name}")

        candidates = [
            DataInstance(
                name=CACHE_NAME_VERSION_SEPARATOR.join(
                    f.name.split(CACHE_NAME_VERSION_SEPARATOR)[:-1]
                ),
                version=int(f.name.split(CACHE_NAME_VERSION_SEPARATOR)[-1]),
                remote_path=f,
            )
            for f in self.cache_path.glob(
                f"{self._name}{CACHE_NAME_VERSION_SEPARATOR}*"
            )
        ]

        return [c for c in candidates if c.name == self._name]

    def _check_mode_and_set_version(self) -> None:
        if "+" in self._mode:
            raise ValueError(
                f"File mode `{self._mode}` is not allowed3, remove the `+`."
            )

        if "w" in self._mode:
            if self._version is not None:
                raise ValueError("Providing a version is not allowed in write mode.")

            self._version = self._instances[-1].version + 1 if self._instances else 0

        elif "r" in self._mode:
            if not self._instances:
                raise FileNotFoundError(
                    f"File {self._name} not found. No versions are available."
                )

            if self._version is None:
                self._version = self._instances[-1].version
                logger.info(
                    f"Latest version of {self._name} is {self._version} "
                    + f"(from versions: {self.versions_pretty})"
                )
            elif self._version not in [i.version for i in self._instances]:
                raise FileNotFoundError(
                    f"File {self._name} not found with version {self._version}. "
                    + f"(from versions: {self.versions_pretty})"
                )
        else:
            raise ValueError("Unsupported file mode.")

    def _prune_cache(self) -> None:
        self.cache_path.mkdir(parents=True, exist_ok=True)

        if self.max_cache_size is None:
            return

        allowed_size = human_readable_to_byte(self.max_cache_size)
        assert allowed_size >= 0

        least_recently_read = sorted(
            [f for f in self.cache_path.glob("*")], key=lambda f: f.stat().st_atime
        )

        while sum(os.path.getsize(f) for f in least_recently_read) > allowed_size:
            file = least_recently_read.pop(0)
            logger.info(
                f"Deleting file from cache to meet quota (max_cache_size={self.max_cache_size}): {file}"
            )
            os.unlink(file)

    @abstractmethod
    def _find_remote_instances(self) -> List[DataInstance]:
        pass

    @abstractmethod
    def _download(
        self, remote_path: Any, local_path: Path, hide_progress: bool
    ) -> None:
        pass

    @abstractmethod
    def _upload(self, local_path: Path, hide_progress: bool) -> None:
        pass

    @abstractmethod
    def _delete_old_remote_versions(self) -> None:
        pass

version: int property #

Numeric version of the file proxied by this LargeFile instance.

versions_pretty: str property #

Formatted string of all available versions.

clean_up() #

Delete local and remote versions according to currently set cache and retention policy.

Source code in great_ai/large_file/large_file/large_file_base.py
def clean_up(self) -> None:
    """Delete local and remote versions according to currently set cache and retention policy."""

    self._delete_old_remote_versions()
    self._prune_cache()

configure_credentials(**kwargs) classmethod #

Configure required credentials for the LargeFile backend.

Source code in great_ai/large_file/large_file/large_file_base.py
@classmethod
def configure_credentials(cls, **kwargs: str) -> None:
    """Configure required credentials for the LargeFile backend."""

    cls.initialized = True

configure_credentials_from_file(secrets) classmethod #

Load file and feed its content to configure_credentials.

Extra keys are ignored.

Source code in great_ai/large_file/large_file/large_file_base.py
@classmethod
def configure_credentials_from_file(
    cls,
    secrets: Union[Path, str, ConfigFile],
) -> None:
    """Load file and feed its content to `configure_credentials`.

    Extra keys are ignored.
    """

    if not isinstance(secrets, ConfigFile):
        secrets = ConfigFile(secrets)
    cls.configure_credentials(**{k.lower(): v for k, v in secrets.items()})

delete() #

Delete all versions of the files under this key.

Source code in great_ai/large_file/large_file/large_file_base.py
def delete(self) -> None:
    """Delete all versions of the files under this `key`."""

    self._keep_last_n = 0
    self._delete_old_remote_versions()

get(hide_progress=False) cached #

Return path to the proxy of a file (or directory).

If not available in the local cache, an attempt is made to download it.

Parameters:

Name Type Description Default
hide_progress bool

Do not show a progress update after each 10% of progress.

False
Source code in great_ai/large_file/large_file/large_file_base.py
@lru_cache(1)
def get(self, hide_progress: bool = False) -> Path:
    """Return path to the proxy of a file (or directory).

    If not available in the local cache, an attempt is made to download it.

    Args:
        hide_progress: Do not show a progress update after each 10% of progress.
    """

    remote_path = next(
        i.remote_path for i in self._instances if i.version == self._version
    )

    destination = self.cache_path / self._local_name
    if not destination.exists():
        logger.info(f"File {self._local_name} does not exist locally")

        with tempfile.TemporaryDirectory() as tmp:
            local_root_path = Path(tmp)
            tmp_file_archive = (
                local_root_path / f"{self._local_name}{ARCHIVE_EXTENSION}"
            )
            self._download(
                remote_path, tmp_file_archive, hide_progress=hide_progress
            )

            logger.info(f"Decompressing {self._local_name}")
            shutil.unpack_archive(str(tmp_file_archive), tmp, COMPRESSION_ALGORITHM)
            shutil.move(str(local_root_path / self._local_name), str(destination))
    else:
        logger.info(f"File {self._local_name} found in cache")

    return destination

push(path, hide_progress=False) #

Upload a file (or directory) as a new version of key.

The file/directory is compressed before upload.

Parameters:

Name Type Description Default
hide_progress bool

Do not show a progress update after each 10% of progress.

False
Source code in great_ai/large_file/large_file/large_file_base.py
def push(self, path: Union[Path, str], hide_progress: bool = False) -> None:
    """Upload a file (or directory) as a new version of `key`.

    The file/directory is compressed before upload.

    Args:
        hide_progress: Do not show a progress update after each 10% of progress.
    """

    if isinstance(path, str):
        path = Path(path)

    with tempfile.TemporaryDirectory() as tmp:
        if path.is_file():
            logger.info(f"Copying file for {self._local_name}")
            copy: Any = shutil.copy
        else:
            logger.info(f"Copying directory for {self._local_name}")
            copy = shutil.copytree

        try:
            # Make local copy in the cache
            shutil.rmtree(self.cache_path / self._local_name, ignore_errors=True)
            copy(str(path), str(self.cache_path / self._local_name))
        except shutil.SameFileError:
            pass  # No worries

        copy(str(path), str(Path(tmp) / self._local_name))

        with tempfile.TemporaryDirectory() as tmp2:
            # A directory has to be zipped and it cannot contain the output of the zipping
            logger.info(f"Compressing {self._local_name}")
            shutil.make_archive(
                str(Path(tmp2) / self._local_name),
                COMPRESSION_ALGORITHM,
                tmp,
            )

            file_to_be_uploaded = (
                Path(tmp2) / f"{self._local_name}{ARCHIVE_EXTENSION}"
            )
            self._upload(file_to_be_uploaded, hide_progress=hide_progress)

    self.clean_up()

Last update: July 16, 2022