import errno
import logging
from contextlib import contextmanager, AsyncExitStack
from botocore.exceptions import ClientError
logger = logging.getLogger("s3fs")
@contextmanager
def ignoring(*exceptions):
try:
yield
except exceptions:
pass
class S3BucketRegionCache:
# See https://github.com/aio-libs/aiobotocore/issues/866
# for details.
def __init__(self, session, **client_kwargs):
self._session = session
self._stack = AsyncExitStack()
self._client = None
self._client_kwargs = client_kwargs
self._buckets = {}
self._regions = {}
async def get_bucket_client(self, bucket_name=None):
if bucket_name in self._buckets:
return self._buckets[bucket_name]
general_client = await self.get_client()
if bucket_name is None:
return general_client
try:
response = await general_client.head_bucket(Bucket=bucket_name)
except ClientError as e:
region = (
e.response["ResponseMetadata"]
.get("HTTPHeaders", {})
.get("x-amz-bucket-region")
)
if not region:
logger.debug(
"RC: HEAD_BUCKET call for %r has failed, returning the general client",
bucket_name,
)
return general_client
else:
region = response["ResponseMetadata"]["HTTPHeaders"]["x-amz-bucket-region"]
if region not in self._regions:
logger.debug(
"RC: Creating a new regional client for %r on the region %r",
bucket_name,
region,
)
self._regions[region] = await self._stack.enter_async_context(
self._session.create_client(
"s3", region_name=region, **self._client_kwargs
)
)
client = self._buckets[bucket_name] = self._regions[region]
return client
async def get_client(self):
if not self._client:
self._client = await self._stack.enter_async_context(
self._session.create_client("s3", **self._client_kwargs)
)
return self._client
async def clear(self):
logger.debug("RC: discarding all clients")
self._buckets.clear()
self._regions.clear()
self._client = None
await self._stack.aclose()
async def __aenter__(self):
return self
async def __aexit__(self, *exc_args):
await self.clear()
class FileExpired(IOError):
"""
Is raised, when the file content has been changed from a different process after
opening the file. Reading the file would lead to invalid or inconsistent output.
This can also be triggered by outdated file-information inside the directory cache.
In this case ``S3FileSystem.invalidate_cache`` can be used to force an update of
the file-information when opening the file.
"""
def __init__(self, filename: str, e_tag: str):
super().__init__(
errno.EBUSY,
"The remote file corresponding to filename %s and Etag %s no longer exists."
% (filename, e_tag),
)
def title_case(string):
"""
TitleCases a given string.
Parameters
----------
string : underscore separated string
"""
return "".join(x.capitalize() for x in string.split("_"))
[docs]class ParamKwargsHelper(object):
"""
Utility class to help extract the subset of keys that an s3 method is
actually using
Parameters
----------
s3 : boto S3FileSystem
"""
_kwarg_cache = {}
def __init__(self, s3):
self.s3 = s3
def _get_valid_keys(self, model_name):
if model_name not in self._kwarg_cache:
model = self.s3.meta.service_model.operation_model(model_name)
valid_keys = (
set(model.input_shape.members.keys())
if model.input_shape is not None
else set()
)
self._kwarg_cache[model_name] = valid_keys
return self._kwarg_cache[model_name]
def filter_dict(self, method_name, d):
model_name = title_case(method_name)
valid_keys = self._get_valid_keys(model_name)
if isinstance(d, SSEParams):
d = d.to_kwargs()
return {k: v for k, v in d.items() if k in valid_keys}
[docs]class SSEParams(object):
def __init__(
self,
server_side_encryption=None,
sse_customer_algorithm=None,
sse_customer_key=None,
sse_kms_key_id=None,
):
self.ServerSideEncryption = server_side_encryption
self.SSECustomerAlgorithm = sse_customer_algorithm
self.SSECustomerKey = sse_customer_key
self.SSEKMSKeyId = sse_kms_key_id
def to_kwargs(self):
return {k: v for k, v in self.__dict__.items() if v is not None}
def _get_brange(size, block):
"""
Chunk up a file into zero-based byte ranges
Parameters
----------
size : file size
block : block size
"""
for offset in range(0, size, block):
yield offset, min(offset + block - 1, size - 1)