"""Amazon S3 List Module (PRIVATE).""" import datetime import fnmatch import logging from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Sequence, Union import boto3 import botocore.exceptions from awswrangler import _utils, exceptions from awswrangler._distributed import engine from awswrangler.s3 import _fs if TYPE_CHECKING: from mypy_boto3_s3 import S3Client _logger: logging.Logger = logging.getLogger(__name__) def _path2list( path: Union[str, Sequence[str]], s3_client: "S3Client", s3_additional_kwargs: Optional[Dict[str, Any]], last_modified_begin: Optional[datetime.datetime] = None, last_modified_end: Optional[datetime.datetime] = None, suffix: Union[str, List[str], None] = None, ignore_suffix: Union[str, List[str], None] = None, ignore_empty: bool = False, ) -> List[str]: """Convert Amazon S3 path to list of objects.""" _suffix: Optional[List[str]] = [suffix] if isinstance(suffix, str) else suffix _ignore_suffix: Optional[List[str]] = [ignore_suffix] if isinstance(ignore_suffix, str) else ignore_suffix if isinstance(path, str): # prefix paths: List[str] = [ path for paths in _list_objects( path=path, s3_client=s3_client, suffix=_suffix, ignore_suffix=_ignore_suffix, last_modified_begin=last_modified_begin, last_modified_end=last_modified_end, ignore_empty=ignore_empty, s3_additional_kwargs=s3_additional_kwargs, ) for path in paths ] _logger.debug("Listed %s paths", len(paths)) elif isinstance(path, list): if last_modified_begin or last_modified_end: raise exceptions.InvalidArgumentCombination( "Specify a list of files or (last_modified_begin and last_modified_end)" ) paths = path if _suffix is None else [x for x in path if x.endswith(tuple(_suffix))] paths = path if _ignore_suffix is None else [x for x in paths if x.endswith(tuple(_ignore_suffix)) is False] else: raise exceptions.InvalidArgumentType(f"{type(path)} is not a valid path type. Please, use str or List[str].") return paths def _validate_datetimes( last_modified_begin: Optional[datetime.datetime] = None, last_modified_end: Optional[datetime.datetime] = None ) -> None: if (last_modified_begin is not None) and (last_modified_begin.tzinfo is None): raise exceptions.InvalidArgumentValue("Timezone is not defined for last_modified_begin.") if (last_modified_end is not None) and (last_modified_end.tzinfo is None): raise exceptions.InvalidArgumentValue("Timezone is not defined for last_modified_end.") if (last_modified_begin is not None) and (last_modified_end is not None): if last_modified_begin > last_modified_end: raise exceptions.InvalidArgumentValue("last_modified_begin is bigger than last_modified_end.") def _prefix_cleanup(prefix: str) -> str: for n, c in enumerate(prefix): if c in ["*", "?", "["]: return prefix[:n] return prefix def _list_objects( path: str, s3_client: "S3Client", delimiter: Optional[str] = None, s3_additional_kwargs: Optional[Dict[str, Any]] = None, suffix: Union[str, List[str], None] = None, ignore_suffix: Union[str, List[str], None] = None, last_modified_begin: Optional[datetime.datetime] = None, last_modified_end: Optional[datetime.datetime] = None, ignore_empty: bool = False, ) -> Iterator[List[str]]: suffix: Union[List[str], None] = [suffix] if isinstance(suffix, str) else suffix ignore_suffix: Union[List[str], None] = [ignore_suffix] if isinstance(ignore_suffix, str) else ignore_suffix _validate_datetimes(last_modified_begin=last_modified_begin, last_modified_end=last_modified_end) bucket, pattern = _utils.parse_path(path=path) prefix: str = _prefix_cleanup(prefix=pattern) return _list_objects_paginate( bucket=bucket, pattern=pattern, prefix=prefix, s3_client=s3_client, delimiter=delimiter, suffix=suffix, ignore_suffix=ignore_suffix, last_modified_begin=last_modified_begin, last_modified_end=last_modified_end, ignore_empty=ignore_empty, s3_additional_kwargs=s3_additional_kwargs, ) @engine.dispatch_on_engine def _list_objects_paginate( # pylint: disable=too-many-branches bucket: str, pattern: str, prefix: str, s3_client: "S3Client", delimiter: Optional[str], s3_additional_kwargs: Optional[Dict[str, Any]], suffix: Union[List[str], None], ignore_suffix: Union[List[str], None], last_modified_begin: Optional[datetime.datetime], last_modified_end: Optional[datetime.datetime], ignore_empty: bool, ) -> Iterator[List[str]]: default_pagination: Dict[str, int] = {"PageSize": 1000} extra_kwargs: Dict[str, Any] = {"PaginationConfig": default_pagination} if s3_additional_kwargs: extra_kwargs = _fs.get_botocore_valid_kwargs( function_name="list_objects_v2", s3_additional_kwargs=s3_additional_kwargs ) extra_kwargs["PaginationConfig"] = ( s3_additional_kwargs["PaginationConfig"] if "PaginationConfig" in s3_additional_kwargs else default_pagination ) paginator = s3_client.get_paginator("list_objects_v2") args: Dict[str, Any] = {"Bucket": bucket, "Prefix": prefix, **extra_kwargs} if delimiter is not None: args["Delimiter"] = delimiter _logger.debug("args: %s", args) response_iterator = paginator.paginate(**args) paths: List[str] = [] for page in response_iterator: # pylint: disable=too-many-nested-blocks if delimiter is None: contents = page.get("Contents") if contents is not None: for content in contents: key: str = content["Key"] if ignore_empty and content.get("Size", 0) == 0: _logger.debug("Skipping empty file: %s", f"s3://{bucket}/{key}") elif (content is not None) and ("Key" in content): if (suffix is None) or key.endswith(tuple(suffix)): if last_modified_begin is not None: if content["LastModified"] < last_modified_begin: continue if last_modified_end is not None: if content["LastModified"] > last_modified_end: continue paths.append(f"s3://{bucket}/{key}") else: prefixes = page.get("CommonPrefixes") if prefixes is not None: for pfx in prefixes: if (pfx is not None) and ("Prefix" in pfx): key = pfx["Prefix"] paths.append(f"s3://{bucket}/{key}") if prefix != pattern: paths = fnmatch.filter(paths, f"s3://{bucket}/{pattern}") if ignore_suffix is not None: paths = [p for p in paths if p.endswith(tuple(ignore_suffix)) is False] if paths: yield paths paths = [] def does_object_exist( path: str, s3_additional_kwargs: Optional[Dict[str, Any]] = None, boto3_session: Optional[boto3.Session] = None, version_id: Optional[str] = None, ) -> bool: """Check if object exists on S3. Parameters ---------- path: str S3 path (e.g. s3://bucket/key). s3_additional_kwargs : Optional[Dict[str, Any]] Forwarded to botocore requests. e.g. s3_additional_kwargs={'RequestPayer': 'requester'} boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. version_id: str, optional Specific version of the object that should exist. Returns ------- bool True if exists, False otherwise. Examples -------- Using the default boto3 session >>> import awswrangler as wr >>> wr.s3.does_object_exist('s3://bucket/key_real') True >>> wr.s3.does_object_exist('s3://bucket/key_unreal') False Using a custom boto3 session >>> import boto3 >>> import awswrangler as wr >>> wr.s3.does_object_exist('s3://bucket/key_real', boto3_session=boto3.Session()) True >>> wr.s3.does_object_exist('s3://bucket/key_unreal', boto3_session=boto3.Session()) False """ s3_client = _utils.client(service_name="s3", session=boto3_session) bucket: str key: str bucket, key = _utils.parse_path(path=path) if s3_additional_kwargs: extra_kwargs: Dict[str, Any] = _fs.get_botocore_valid_kwargs( function_name="head_object", s3_additional_kwargs=s3_additional_kwargs ) else: extra_kwargs = {} try: if version_id: extra_kwargs["VersionId"] = version_id s3_client.head_object(Bucket=bucket, Key=key, **extra_kwargs) return True except botocore.exceptions.ClientError as ex: if ex.response["ResponseMetadata"]["HTTPStatusCode"] == 404: return False raise ex @_utils.validate_distributed_kwargs( unsupported_kwargs=["boto3_session", "s3_additional_kwargs"], ) def list_directories( path: str, chunked: bool = False, s3_additional_kwargs: Optional[Dict[str, Any]] = None, boto3_session: Optional[boto3.Session] = None, ) -> Union[List[str], Iterator[List[str]]]: """List Amazon S3 objects from a prefix. This function accepts Unix shell-style wildcards in the path argument. * (matches everything), ? (matches any single character), [seq] (matches any character in seq), [!seq] (matches any character not in seq). If you want to use a path which includes Unix shell-style wildcard characters (`*, ?, []`), you can use `glob.escape(path)` before passing the path to this function. Parameters ---------- path : str S3 path (e.g. s3://bucket/prefix). chunked: bool If True returns iterator, and a single list otherwise. False by default. s3_additional_kwargs : Optional[Dict[str, Any]] Forwarded to botocore requests. e.g. s3_additional_kwargs={'RequestPayer': 'requester'} boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. Returns ------- Union[List[str], Iterator[List[str]]] List of objects paths. Examples -------- Using the default boto3 session >>> import awswrangler as wr >>> wr.s3.list_directories('s3://bucket/prefix/') ['s3://bucket/prefix/dir0/', 's3://bucket/prefix/dir1/', 's3://bucket/prefix/dir2/'] Using a custom boto3 session >>> import boto3 >>> import awswrangler as wr >>> wr.s3.list_directories('s3://bucket/prefix/', boto3_session=boto3.Session()) ['s3://bucket/prefix/dir0/', 's3://bucket/prefix/dir1/', 's3://bucket/prefix/dir2/'] """ s3_client = _utils.client(service_name="s3", session=boto3_session) result_iterator = _list_objects( path=path, delimiter="/", s3_client=s3_client, s3_additional_kwargs=s3_additional_kwargs, ) if chunked: return result_iterator return [path for paths in result_iterator for path in paths] @_utils.validate_distributed_kwargs( unsupported_kwargs=["boto3_session", "s3_additional_kwargs"], ) def list_objects( path: str, suffix: Union[str, List[str], None] = None, ignore_suffix: Union[str, List[str], None] = None, last_modified_begin: Optional[datetime.datetime] = None, last_modified_end: Optional[datetime.datetime] = None, ignore_empty: bool = False, chunked: bool = False, s3_additional_kwargs: Optional[Dict[str, Any]] = None, boto3_session: Optional[boto3.Session] = None, ) -> Union[List[str], Iterator[List[str]]]: """List Amazon S3 objects from a prefix. This function accepts Unix shell-style wildcards in the path argument. * (matches everything), ? (matches any single character), [seq] (matches any character in seq), [!seq] (matches any character not in seq). If you want to use a path which includes Unix shell-style wildcard characters (`*, ?, []`), you can use `glob.escape(path)` before passing the path to this function. Note ---- The filter by last_modified begin last_modified end is applied after list all S3 files Parameters ---------- path : str S3 path (e.g. s3://bucket/prefix). suffix: Union[str, List[str], None] Suffix or List of suffixes for filtering S3 keys. ignore_suffix: Union[str, List[str], None] Suffix or List of suffixes for S3 keys to be ignored. last_modified_begin Filter the s3 files by the Last modified date of the object. The filter is applied only after list all s3 files. last_modified_end: datetime, optional Filter the s3 files by the Last modified date of the object. The filter is applied only after list all s3 files. ignore_empty: bool Ignore files with 0 bytes. chunked: bool If True returns iterator, and a single list otherwise. False by default. s3_additional_kwargs : Optional[Dict[str, Any]] Forwarded to botocore requests. e.g. s3_additional_kwargs={'RequestPayer': 'requester'} boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. Returns ------- Union[List[str], Iterator[List[str]]] List of objects paths. Examples -------- Using the default boto3 session >>> import awswrangler as wr >>> wr.s3.list_objects('s3://bucket/prefix') ['s3://bucket/prefix0', 's3://bucket/prefix1', 's3://bucket/prefix2'] Using a custom boto3 session >>> import boto3 >>> import awswrangler as wr >>> wr.s3.list_objects('s3://bucket/prefix', boto3_session=boto3.Session()) ['s3://bucket/prefix0', 's3://bucket/prefix1', 's3://bucket/prefix2'] """ s3_client = _utils.client(service_name="s3", session=boto3_session) # On top of user provided ignore_suffix input, add "/" ignore_suffix_acc = set("/") if isinstance(ignore_suffix, str): ignore_suffix_acc.add(ignore_suffix) elif isinstance(ignore_suffix, list): ignore_suffix_acc.update(ignore_suffix) result_iterator = _list_objects( path=path, suffix=suffix, ignore_suffix=list(ignore_suffix_acc), last_modified_begin=last_modified_begin, last_modified_end=last_modified_end, ignore_empty=ignore_empty, s3_client=s3_client, s3_additional_kwargs=s3_additional_kwargs, ) if chunked: return result_iterator return [path for paths in result_iterator for path in paths] def list_buckets(boto3_session: Optional[boto3.Session] = None) -> List[str]: """List Amazon S3 buckets. Parameters ---------- boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session to use, default to None. Returns ------- List[str] List of bucket names. """ client_s3 = _utils.client(service_name="s3", session=boto3_session) buckets = client_s3.list_buckets()["Buckets"] return [bucket["Name"] for bucket in buckets]