development
mattsb42-aws 2018-06-05 16:21:48 -07:00
parent 05830a966c
commit 17ea873212
5 changed files with 136 additions and 153 deletions

View File

@ -5,66 +5,71 @@ import io
import os import os
import re import re
VERSION_RE = re.compile(r'''__version__ = ['"]([0-9.]+)['"]''') VERSION_RE = re.compile(r"""__version__ = ['"]([0-9.]+)['"]""")
HERE = os.path.abspath(os.path.dirname(__file__)) HERE = os.path.abspath(os.path.dirname(__file__))
def read(*args): def read(*args):
"""Read complete file contents.""" """Read complete file contents."""
return io.open(os.path.join(HERE, *args), encoding='utf-8').read() return io.open(os.path.join(HERE, *args), encoding="utf-8").read()
def get_release(): def get_release():
"""Read the release (full three-part version number) from this module.""" """Read the release (full three-part version number) from this module."""
init = read('..', 'src', 'base64io', '__init__.py') init = read("..", "src", "base64io", "__init__.py")
return VERSION_RE.search(init).group(1) return VERSION_RE.search(init).group(1)
def get_version(): def get_version():
"""Read the version (MAJOR.MINOR) from this module.""" """Read the version (MAJOR.MINOR) from this module."""
_release = get_release() _release = get_release()
split_version = _release.split('.') split_version = _release.split(".")
if len(split_version) == 3: if len(split_version) == 3:
return '.'.join(split_version[:2]) return ".".join(split_version[:2])
return _release return _release
project = u'base64io' project = u"base64io"
version = get_version() version = get_version()
release = get_release() release = get_release()
# Add any Sphinx extension module names here, as strings. They can be extensions # Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones. # coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest', extensions = [
'sphinx.ext.intersphinx', 'sphinx.ext.todo', "sphinx.ext.autodoc",
'sphinx.ext.coverage', 'sphinx.ext.autosummary', "sphinx.ext.doctest",
'sphinx.ext.napoleon'] "sphinx.ext.intersphinx",
"sphinx.ext.todo",
"sphinx.ext.coverage",
"sphinx.ext.autosummary",
"sphinx.ext.napoleon",
]
napoleon_include_special_with_doc = False napoleon_include_special_with_doc = False
# Add any paths that contain templates here, relative to this directory. # Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates'] templates_path = ["_templates"]
source_suffix = '.rst' # The suffix of source filenames. source_suffix = ".rst" # The suffix of source filenames.
master_doc = 'index' # The master toctree document. master_doc = "index" # The master toctree document.
copyright = u'%s, Amazon' % datetime.now().year # pylint: disable=redefined-builtin copyright = u"%s, Amazon" % datetime.now().year # pylint: disable=redefined-builtin
# List of directories, relative to source directory, that shouldn't be searched # List of directories, relative to source directory, that shouldn't be searched
# for source files. # for source files.
exclude_trees = ['_build'] exclude_trees = ["_build"]
pygments_style = 'sphinx' pygments_style = "sphinx"
autoclass_content = "both" autoclass_content = "both"
autodoc_default_flags = ['show-inheritance', 'members'] autodoc_default_flags = ["show-inheritance", "members"]
autodoc_member_order = 'bysource' autodoc_member_order = "bysource"
html_theme = 'sphinx_rtd_theme' html_theme = "sphinx_rtd_theme"
html_static_path = ['_static'] html_static_path = ["_static"]
htmlhelp_basename = '%sdoc' % project htmlhelp_basename = "%sdoc" % project
# Example configuration for intersphinx: refer to the Python standard library. # Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {'http://docs.python.org/': None} intersphinx_mapping = {"http://docs.python.org/": None}
# autosummary # autosummary
autosummary_generate = True autosummary_generate = True

View File

@ -5,61 +5,57 @@ import re
from setuptools import find_packages, setup from setuptools import find_packages, setup
VERSION_RE = re.compile(r'''__version__ = ['"]([0-9.]+)['"]''') VERSION_RE = re.compile(r"""__version__ = ['"]([0-9.]+)['"]""")
HERE = os.path.abspath(os.path.dirname(__file__)) HERE = os.path.abspath(os.path.dirname(__file__))
def read(*args): def read(*args):
"""Read complete file contents.""" """Read complete file contents."""
return io.open(os.path.join(HERE, *args), encoding='utf-8').read() return io.open(os.path.join(HERE, *args), encoding="utf-8").read()
def readme(): def readme():
"""Read and patch README.""" """Read and patch README."""
readme_text = read('README.rst') readme_text = read("README.rst")
# PyPI does not accept :class: references. # PyPI does not accept :class: references.
return readme_text.replace(':class:`base64io.Base64IO`', '``base64io.Base64IO``') return readme_text.replace(":class:`base64io.Base64IO`", "``base64io.Base64IO``")
def get_version(): def get_version():
"""Read the version from this module.""" """Read the version from this module."""
init = read('src', 'base64io', '__init__.py') init = read("src", "base64io", "__init__.py")
return VERSION_RE.search(init).group(1) return VERSION_RE.search(init).group(1)
setup( setup(
name='base64io', name="base64io",
version=get_version(), version=get_version(),
packages=find_packages('src'), packages=find_packages("src"),
package_dir={'': 'src'}, package_dir={"": "src"},
url='http://base64io.readthedocs.io/en/latest/', url="http://base64io.readthedocs.io/en/latest/",
author='Amazon Web Services', author="Amazon Web Services",
author_email='aws-cryptools@amazon.com', author_email="aws-cryptools@amazon.com",
maintainer='Amazon Web Services', maintainer="Amazon Web Services",
long_description=readme(), long_description=readme(),
keywords='base64 stream', keywords="base64 stream",
data_files=[ data_files=["README.rst", "CHANGELOG.rst", "LICENSE"],
'README.rst', license="Apache License 2.0",
'CHANGELOG.rst',
'LICENSE'
],
license='Apache License 2.0',
install_requires=[], install_requires=[],
classifiers=[ classifiers=[
'Development Status :: 5 - Production/Stable', "Development Status :: 5 - Production/Stable",
'Intended Audience :: Developers', "Intended Audience :: Developers",
'Natural Language :: English', "Natural Language :: English",
'License :: OSI Approved :: Apache Software License', "License :: OSI Approved :: Apache Software License",
'Programming Language :: Python', "Programming Language :: Python",
'Programming Language :: Python :: 2', "Programming Language :: Python :: 2",
'Programming Language :: Python :: 2.6', "Programming Language :: Python :: 2.6",
'Programming Language :: Python :: 2.7', "Programming Language :: Python :: 2.7",
'Programming Language :: Python :: 3', "Programming Language :: Python :: 3",
'Programming Language :: Python :: 3.3', "Programming Language :: Python :: 3.3",
'Programming Language :: Python :: 3.4', "Programming Language :: Python :: 3.4",
'Programming Language :: Python :: 3.5', "Programming Language :: Python :: 3.5",
'Programming Language :: Python :: 3.6', "Programming Language :: Python :: 3.6",
'Programming Language :: Python :: 3.7', "Programming Language :: Python :: 3.7",
'Programming Language :: Python :: Implementation :: CPython' "Programming Language :: Python :: Implementation :: CPython",
] ],
) )

View File

@ -19,7 +19,7 @@ import logging
import string import string
import sys import sys
LOGGER_NAME = 'base64io' LOGGER_NAME = "base64io"
try: # Python 3.5.0 and 3.5.1 have incompatible typing modules try: # Python 3.5.0 and 3.5.1 have incompatible typing modules
from types import TracebackType # noqa pylint: disable=unused-import from types import TracebackType # noqa pylint: disable=unused-import
@ -28,8 +28,8 @@ except ImportError: # pragma: no cover
# We only actually need these imports when running the mypy checks # We only actually need these imports when running the mypy checks
pass pass
__all__ = ('Base64IO',) __all__ = ("Base64IO",)
__version__ = '1.0.0' __version__ = "1.0.0"
_LOGGER = logging.getLogger(LOGGER_NAME) _LOGGER = logging.getLogger(LOGGER_NAME)
@ -72,13 +72,13 @@ class Base64IO(io.IOBase):
:raises TypeError: if ``wrapped`` does not have attributes needed to determine the stream's state :raises TypeError: if ``wrapped`` does not have attributes needed to determine the stream's state
""" """
required_attrs = ('read', 'write', 'close', 'closed', 'flush') required_attrs = ("read", "write", "close", "closed", "flush")
if not all(hasattr(wrapped, attr) for attr in required_attrs): if not all(hasattr(wrapped, attr) for attr in required_attrs):
raise TypeError('Base64IO wrapped object must have attributes: %s' % (repr(sorted(required_attrs)),)) raise TypeError("Base64IO wrapped object must have attributes: %s" % (repr(sorted(required_attrs)),))
super(Base64IO, self).__init__() super(Base64IO, self).__init__()
self.__wrapped = wrapped self.__wrapped = wrapped
self.__read_buffer = b'' self.__read_buffer = b""
self.__write_buffer = b'' self.__write_buffer = b""
def __enter__(self): def __enter__(self):
# type: () -> Base64IO # type: () -> Base64IO
@ -100,7 +100,7 @@ class Base64IO(io.IOBase):
""" """
if self.__write_buffer: if self.__write_buffer:
self.__wrapped.write(base64.b64encode(self.__write_buffer)) self.__wrapped.write(base64.b64encode(self.__write_buffer))
self.__write_buffer = b'' self.__write_buffer = b""
self.closed = True self.closed = True
def _passthrough_interactive_check(self, method_name, mode): def _passthrough_interactive_check(self, method_name, mode):
@ -136,7 +136,7 @@ class Base64IO(io.IOBase):
:rtype: bool :rtype: bool
""" """
return self._passthrough_interactive_check('writable', 'w') return self._passthrough_interactive_check("writable", "w")
def readable(self): def readable(self):
# type: () -> bool # type: () -> bool
@ -147,7 +147,7 @@ class Base64IO(io.IOBase):
:rtype: bool :rtype: bool
""" """
return self._passthrough_interactive_check('readable', 'r') return self._passthrough_interactive_check("readable", "r")
def flush(self): def flush(self):
# type: () -> None # type: () -> None
@ -172,14 +172,14 @@ class Base64IO(io.IOBase):
:raises IOError: if underlying stream is not writable :raises IOError: if underlying stream is not writable
""" """
if self.closed: if self.closed:
raise ValueError('I/O operation on closed file.') raise ValueError("I/O operation on closed file.")
if not self.writable(): if not self.writable():
raise IOError('Stream is not writable') raise IOError("Stream is not writable")
# Load any stashed bytes and clear the buffer # Load any stashed bytes and clear the buffer
_bytes_to_write = self.__write_buffer + b _bytes_to_write = self.__write_buffer + b
self.__write_buffer = b'' self.__write_buffer = b""
# If an even base64 chunk or finalizing the stream, write through. # If an even base64 chunk or finalizing the stream, write through.
if len(_bytes_to_write) % 3 == 0: if len(_bytes_to_write) % 3 == 0:
@ -218,7 +218,7 @@ class Base64IO(io.IOBase):
return data return data
_data_buffer = io.BytesIO() _data_buffer = io.BytesIO()
_data_buffer.write(b''.join(data.split())) _data_buffer.write(b"".join(data.split()))
_remaining_bytes_to_read = total_bytes_to_read - _data_buffer.tell() _remaining_bytes_to_read = total_bytes_to_read - _data_buffer.tell()
while _remaining_bytes_to_read > 0: while _remaining_bytes_to_read > 0:
@ -227,7 +227,7 @@ class Base64IO(io.IOBase):
# No more data to read from wrapped stream. # No more data to read from wrapped stream.
break break
_data_buffer.write(b''.join(_raw_additional_data.split())) _data_buffer.write(b"".join(_raw_additional_data.split()))
_remaining_bytes_to_read = total_bytes_to_read - _data_buffer.tell() _remaining_bytes_to_read = total_bytes_to_read - _data_buffer.tell()
return _data_buffer.getvalue() return _data_buffer.getvalue()
@ -245,10 +245,10 @@ class Base64IO(io.IOBase):
:rtype: bytes :rtype: bytes
""" """
if self.closed: if self.closed:
raise ValueError('I/O operation on closed file.') raise ValueError("I/O operation on closed file.")
if not self.readable(): if not self.readable():
raise IOError('Stream is not readable') raise IOError("Stream is not readable")
if b is not None and b < 0: if b is not None and b < 0:
b = None b = None
@ -256,13 +256,13 @@ class Base64IO(io.IOBase):
if b is not None: if b is not None:
# Calculate number of encoded bytes that must be read to get b raw bytes. # Calculate number of encoded bytes that must be read to get b raw bytes.
_bytes_to_read = int((b - len(self.__read_buffer)) * 4 / 3) _bytes_to_read = int((b - len(self.__read_buffer)) * 4 / 3)
_bytes_to_read += (4 - _bytes_to_read % 4) _bytes_to_read += 4 - _bytes_to_read % 4
# Read encoded bytes from wrapped stream. # Read encoded bytes from wrapped stream.
data = self.__wrapped.read(_bytes_to_read) data = self.__wrapped.read(_bytes_to_read)
# Remove whitespace from read data and attempt to read more data to get the desired # Remove whitespace from read data and attempt to read more data to get the desired
# number of bytes. # number of bytes.
if any([char.encode('utf-8') in data for char in string.whitespace]): if any([char.encode("utf-8") in data for char in string.whitespace]):
data = self._read_additional_data_removing_whitespace(data, _bytes_to_read) data = self._read_additional_data_removing_whitespace(data, _bytes_to_read)
results = io.BytesIO() results = io.BytesIO()

View File

@ -18,8 +18,8 @@ import pytest
from base64io import Base64IO from base64io import Base64IO
hypothesis = pytest.importorskip('hypothesis') hypothesis = pytest.importorskip("hypothesis")
hypothesis_strategies = pytest.importorskip('hypothesis.strategies') hypothesis_strategies = pytest.importorskip("hypothesis.strategies")
pytestmark = [pytest.mark.functional] pytestmark = [pytest.mark.functional]
@ -28,12 +28,12 @@ HYPOTHESIS_SETTINGS = hypothesis.settings(
hypothesis.HealthCheck.too_slow, hypothesis.HealthCheck.too_slow,
hypothesis.HealthCheck.data_too_large, hypothesis.HealthCheck.data_too_large,
hypothesis.HealthCheck.hung_test, hypothesis.HealthCheck.hung_test,
hypothesis.HealthCheck.large_base_example hypothesis.HealthCheck.large_base_example,
), ),
timeout=hypothesis.unlimited, timeout=hypothesis.unlimited,
deadline=None, deadline=None,
max_examples=1000, max_examples=1000,
max_iterations=1500 max_iterations=1500,
) )
BINARY = hypothesis_strategies.binary() BINARY = hypothesis_strategies.binary()

View File

@ -31,16 +31,16 @@ def test_base64io_bad_wrap():
with pytest.raises(TypeError) as excinfo: with pytest.raises(TypeError) as excinfo:
Base64IO(7) Base64IO(7)
excinfo.match(r'Base64IO wrapped object must have attributes: *') excinfo.match(r"Base64IO wrapped object must have attributes: *")
def test_base64io_write_after_closed(): def test_base64io_write_after_closed():
with Base64IO(io.BytesIO()) as test: with Base64IO(io.BytesIO()) as test:
with pytest.raises(ValueError) as excinfo: with pytest.raises(ValueError) as excinfo:
test.close() test.close()
test.write(b'aksdhjf') test.write(b"aksdhjf")
excinfo.match(r'I/O operation on closed file.') excinfo.match(r"I/O operation on closed file.")
def test_base64io_read_after_closed(): def test_base64io_read_after_closed():
@ -49,22 +49,17 @@ def test_base64io_read_after_closed():
test.close() test.close()
test.read() test.read()
excinfo.match(r'I/O operation on closed file.') excinfo.match(r"I/O operation on closed file.")
@pytest.mark.parametrize('method_name', ('isatty', 'seekable')) @pytest.mark.parametrize("method_name", ("isatty", "seekable"))
def test_base64io_always_false_methods(method_name): def test_base64io_always_false_methods(method_name):
test = Base64IO(io.BytesIO()) test = Base64IO(io.BytesIO())
assert not getattr(test, method_name)() assert not getattr(test, method_name)()
@pytest.mark.parametrize('method_name, args', ( @pytest.mark.parametrize("method_name, args", (("fileno", ()), ("seek", (None,)), ("tell", ()), ("truncate", ())))
('fileno', ()),
('seek', (None,)),
('tell', ()),
('truncate', ())
))
def test_unsupported_methods(method_name, args): def test_unsupported_methods(method_name, args):
test = Base64IO(io.BytesIO()) test = Base64IO(io.BytesIO())
@ -72,7 +67,7 @@ def test_unsupported_methods(method_name, args):
getattr(test, method_name)(*args) getattr(test, method_name)(*args)
@pytest.mark.parametrize('method_name', ('flush', 'writable', 'readable')) @pytest.mark.parametrize("method_name", ("flush", "writable", "readable"))
def test_passthrough_methods_present(monkeypatch, method_name): def test_passthrough_methods_present(monkeypatch, method_name):
wrapped = io.BytesIO() wrapped = io.BytesIO()
monkeypatch.setattr(wrapped, method_name, lambda: sentinel.passthrough) monkeypatch.setattr(wrapped, method_name, lambda: sentinel.passthrough)
@ -81,7 +76,7 @@ def test_passthrough_methods_present(monkeypatch, method_name):
assert getattr(wrapper, method_name)() is sentinel.passthrough assert getattr(wrapper, method_name)() is sentinel.passthrough
@pytest.mark.parametrize('method_name', ('writable', 'readable')) @pytest.mark.parametrize("method_name", ("writable", "readable"))
def test_passthrough_methods_not_present(monkeypatch, method_name): def test_passthrough_methods_not_present(monkeypatch, method_name):
wrapped = MagicMock() wrapped = MagicMock()
monkeypatch.delattr(wrapped, method_name, False) monkeypatch.delattr(wrapped, method_name, False)
@ -90,15 +85,13 @@ def test_passthrough_methods_not_present(monkeypatch, method_name):
assert not getattr(wrapper, method_name)() assert not getattr(wrapper, method_name)()
@pytest.mark.parametrize('mode, method_name, expected', ( @pytest.mark.parametrize(
('wb', 'writable', True), "mode, method_name, expected",
('rb', 'readable', True), (("wb", "writable", True), ("rb", "readable", True), ("rb", "writable", False), ("wb", "readable", False)),
('rb', 'writable', False), )
('wb', 'readable', False)
))
def test_passthrough_methods_file(tmpdir, method_name, mode, expected): def test_passthrough_methods_file(tmpdir, method_name, mode, expected):
source = tmpdir.join('source') source = tmpdir.join("source")
source.write('some data') source.write("some data")
with open(str(source), mode) as reader: with open(str(source), mode) as reader:
with Base64IO(reader) as b64: with Base64IO(reader) as b64:
@ -110,10 +103,7 @@ def test_passthrough_methods_file(tmpdir, method_name, mode, expected):
assert not test assert not test
@pytest.mark.parametrize('patch_method, call_method, call_arg', ( @pytest.mark.parametrize("patch_method, call_method, call_arg", (("writable", "write", b""), ("readable", "read", 0)))
('writable', 'write', b''),
('readable', 'read', 0)
))
def test_non_interactive_error(monkeypatch, patch_method, call_method, call_arg): def test_non_interactive_error(monkeypatch, patch_method, call_method, call_arg):
wrapped = io.BytesIO() wrapped = io.BytesIO()
monkeypatch.setattr(wrapped, patch_method, lambda: False) monkeypatch.setattr(wrapped, patch_method, lambda: False)
@ -122,7 +112,7 @@ def test_non_interactive_error(monkeypatch, patch_method, call_method, call_arg)
with pytest.raises(IOError) as excinfo: with pytest.raises(IOError) as excinfo:
getattr(wrapper, call_method)(call_arg) getattr(wrapper, call_method)(call_arg)
excinfo.match(r'Stream is not ' + patch_method) excinfo.match(r"Stream is not " + patch_method)
def build_test_cases(): def build_test_cases():
@ -148,15 +138,14 @@ def build_test_cases():
@pytest.mark.parametrize( @pytest.mark.parametrize(
'bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect', "bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect", build_test_cases()
build_test_cases()
) )
def test_base64io_decode(bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect): def test_base64io_decode(bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect):
plaintext_source = os.urandom(bytes_to_generate) plaintext_source = os.urandom(bytes_to_generate)
plaintext_b64 = io.BytesIO(base64.b64encode(plaintext_source)) plaintext_b64 = io.BytesIO(base64.b64encode(plaintext_source))
plaintext_wrapped = Base64IO(plaintext_b64) plaintext_wrapped = Base64IO(plaintext_b64)
test = b'' test = b""
for _round in range(number_of_rounds): for _round in range(number_of_rounds):
test += plaintext_wrapped.read(bytes_per_round) test += plaintext_wrapped.read(bytes_per_round)
@ -165,8 +154,7 @@ def test_base64io_decode(bytes_to_generate, bytes_per_round, number_of_rounds, t
@pytest.mark.parametrize( @pytest.mark.parametrize(
'bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect', "bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect", build_test_cases()
build_test_cases()
) )
def test_base64io_encode_partial(bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect): def test_base64io_encode_partial(bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect):
plaintext_source = os.urandom(bytes_to_generate) plaintext_source = os.urandom(bytes_to_generate)
@ -200,7 +188,7 @@ def test_base64io_encode_partial(bytes_to_generate, bytes_per_round, number_of_r
assert plaintext_b64.startswith(target_stream.getvalue()) assert plaintext_b64.startswith(target_stream.getvalue())
@pytest.mark.parametrize('source_bytes', [case[0] for case in build_test_cases()]) @pytest.mark.parametrize("source_bytes", [case[0] for case in build_test_cases()])
def test_base64io_encode_context_manager(source_bytes): def test_base64io_encode_context_manager(source_bytes):
plaintext_source = os.urandom(source_bytes) plaintext_source = os.urandom(source_bytes)
plaintext_b64 = base64.b64encode(plaintext_source) plaintext_b64 = base64.b64encode(plaintext_source)
@ -225,7 +213,7 @@ def test_base64io_encode_context_manager_reuse():
with stream as plaintext_wrapped: with stream as plaintext_wrapped:
plaintext_wrapped.read() plaintext_wrapped.read()
excinfo.match(r'I/O operation on closed file.') excinfo.match(r"I/O operation on closed file.")
def test_base64io_encode_use_after_context_manager_exit(): def test_base64io_encode_use_after_context_manager_exit():
@ -242,10 +230,10 @@ def test_base64io_encode_use_after_context_manager_exit():
with pytest.raises(ValueError) as excinfo: with pytest.raises(ValueError) as excinfo:
stream.read() stream.read()
excinfo.match(r'I/O operation on closed file.') excinfo.match(r"I/O operation on closed file.")
@pytest.mark.parametrize('source_bytes', [case[0] for case in build_test_cases()]) @pytest.mark.parametrize("source_bytes", [case[0] for case in build_test_cases()])
def test_base64io_encode(source_bytes): def test_base64io_encode(source_bytes):
plaintext_source = os.urandom(source_bytes) plaintext_source = os.urandom(source_bytes)
plaintext_b64 = base64.b64encode(plaintext_source) plaintext_b64 = base64.b64encode(plaintext_source)
@ -260,12 +248,9 @@ def test_base64io_encode(source_bytes):
assert plaintext_stream.getvalue() == plaintext_b64 assert plaintext_stream.getvalue() == plaintext_b64
@pytest.mark.parametrize('bytes_to_read, expected_bytes_read', ( @pytest.mark.parametrize(
(-1, io.DEFAULT_BUFFER_SIZE), "bytes_to_read, expected_bytes_read", ((-1, io.DEFAULT_BUFFER_SIZE), (0, io.DEFAULT_BUFFER_SIZE), (1, 1), (10, 10))
(0, io.DEFAULT_BUFFER_SIZE), )
(1, 1),
(10, 10)
))
def test_base64io_decode_readline(bytes_to_read, expected_bytes_read): def test_base64io_decode_readline(bytes_to_read, expected_bytes_read):
source_plaintext = os.urandom(io.DEFAULT_BUFFER_SIZE * 2) source_plaintext = os.urandom(io.DEFAULT_BUFFER_SIZE * 2)
source_stream = io.BytesIO(base64.b64encode(source_plaintext)) source_stream = io.BytesIO(base64.b64encode(source_plaintext))
@ -279,10 +264,9 @@ def test_base64io_decode_readline(bytes_to_read, expected_bytes_read):
def build_b64_with_whitespace(source_bytes, line_length): def build_b64_with_whitespace(source_bytes, line_length):
plaintext_source = os.urandom(source_bytes) plaintext_source = os.urandom(source_bytes)
b64_plaintext = io.BytesIO(base64.b64encode(plaintext_source)) b64_plaintext = io.BytesIO(base64.b64encode(plaintext_source))
b64_plaintext_with_whitespace = b'\n'.join([ b64_plaintext_with_whitespace = b"\n".join(
line for line [line for line in iter(functools.partial(b64_plaintext.read, line_length), b"")]
in iter(functools.partial(b64_plaintext.read, line_length), b'') )
])
return plaintext_source, b64_plaintext_with_whitespace return plaintext_source, b64_plaintext_with_whitespace
@ -293,18 +277,18 @@ def build_whitespace_testcases():
# first read is mostly whitespace # first read is mostly whitespace
plaintext, b64_plaintext = build_b64_with_whitespace(100, 20) plaintext, b64_plaintext = build_b64_with_whitespace(100, 20)
b64_plaintext = (b' ' * 80) + b64_plaintext b64_plaintext = (b" " * 80) + b64_plaintext
scenarios.append((plaintext, b64_plaintext, 100)) scenarios.append((plaintext, b64_plaintext, 100))
# first several reads are entirely whitespace # first several reads are entirely whitespace
plaintext, b64_plaintext = build_b64_with_whitespace(100, 20) plaintext, b64_plaintext = build_b64_with_whitespace(100, 20)
b64_plaintext = (b' ' * 500) + b64_plaintext b64_plaintext = (b" " * 500) + b64_plaintext
scenarios.append((plaintext, b64_plaintext, 100)) scenarios.append((plaintext, b64_plaintext, 100))
return scenarios return scenarios
@pytest.mark.parametrize('plaintext_source, b64_plaintext_with_whitespace, read_bytes', build_whitespace_testcases()) @pytest.mark.parametrize("plaintext_source, b64_plaintext_with_whitespace, read_bytes", build_whitespace_testcases())
def test_base64io_decode_with_whitespace(plaintext_source, b64_plaintext_with_whitespace, read_bytes): def test_base64io_decode_with_whitespace(plaintext_source, b64_plaintext_with_whitespace, read_bytes):
with Base64IO(io.BytesIO(b64_plaintext_with_whitespace)) as decoder: with Base64IO(io.BytesIO(b64_plaintext_with_whitespace)) as decoder:
test = decoder.read(read_bytes) test = decoder.read(read_bytes)
@ -312,9 +296,9 @@ def test_base64io_decode_with_whitespace(plaintext_source, b64_plaintext_with_wh
assert test == plaintext_source[:read_bytes] assert test == plaintext_source[:read_bytes]
@pytest.mark.parametrize('plaintext_source, b64_plaintext_with_whitespace, read_bytes', ( @pytest.mark.parametrize(
(b'\x00\x00\x00', b'AAAA', 3), "plaintext_source, b64_plaintext_with_whitespace, read_bytes", ((b"\x00\x00\x00", b"AAAA", 3),)
)) )
def test_base64io_decode_parametrized_null_bytes(plaintext_source, b64_plaintext_with_whitespace, read_bytes): def test_base64io_decode_parametrized_null_bytes(plaintext_source, b64_plaintext_with_whitespace, read_bytes):
# Verifies that pytest is handling null bytes correctly (broken in 3.3.0) # Verifies that pytest is handling null bytes correctly (broken in 3.3.0)
# https://github.com/pytest-dev/pytest/issues/2957 # https://github.com/pytest-dev/pytest/issues/2957
@ -325,7 +309,7 @@ def test_base64io_decode_parametrized_null_bytes(plaintext_source, b64_plaintext
def test_base64io_decode_read_only_from_buffer(): def test_base64io_decode_read_only_from_buffer():
plaintext_source = b'12345' plaintext_source = b"12345"
plaintext_b64 = io.BytesIO(base64.b64encode(plaintext_source)) plaintext_b64 = io.BytesIO(base64.b64encode(plaintext_source))
plaintext_wrapped = Base64IO(plaintext_b64) plaintext_wrapped = Base64IO(plaintext_b64)
@ -333,9 +317,9 @@ def test_base64io_decode_read_only_from_buffer():
test_2 = plaintext_wrapped.read(1) test_2 = plaintext_wrapped.read(1)
test_3 = plaintext_wrapped.read() test_3 = plaintext_wrapped.read()
assert test_1 == b'1' assert test_1 == b"1"
assert test_2 == b'2' assert test_2 == b"2"
assert test_3 == b'345' assert test_3 == b"345"
def test_base64io_decode_context_manager(): def test_base64io_decode_context_manager():
@ -350,12 +334,10 @@ def test_base64io_decode_context_manager():
assert test.getvalue() == source_plaintext assert test.getvalue() == source_plaintext
@pytest.mark.parametrize('hint_bytes, expected_bytes_read', ( @pytest.mark.parametrize(
(-1, 102400), "hint_bytes, expected_bytes_read",
(0, 102400), ((-1, 102400), (0, 102400), (1, io.DEFAULT_BUFFER_SIZE), (io.DEFAULT_BUFFER_SIZE + 99, io.DEFAULT_BUFFER_SIZE * 2)),
(1, io.DEFAULT_BUFFER_SIZE), )
(io.DEFAULT_BUFFER_SIZE + 99, io.DEFAULT_BUFFER_SIZE * 2)
))
def test_base64io_decode_readlines(hint_bytes, expected_bytes_read): def test_base64io_decode_readlines(hint_bytes, expected_bytes_read):
source_plaintext = os.urandom(102400) source_plaintext = os.urandom(102400)
source_stream = io.BytesIO(base64.b64encode(source_plaintext)) source_stream = io.BytesIO(base64.b64encode(source_plaintext))
@ -371,7 +353,7 @@ def test_base64io_decode_readlines(hint_bytes, expected_bytes_read):
def test_base64io_encode_writelines(): def test_base64io_encode_writelines():
source_plaintext = [os.urandom(1024) for _ in range(100)] source_plaintext = [os.urandom(1024) for _ in range(100)]
b64_plaintext = base64.b64encode(b''.join(source_plaintext)) b64_plaintext = base64.b64encode(b"".join(source_plaintext))
test = io.BytesIO() test = io.BytesIO()
with Base64IO(test) as encoder: with Base64IO(test) as encoder:
@ -382,18 +364,18 @@ def test_base64io_encode_writelines():
def test_base64io_decode_file(tmpdir): def test_base64io_decode_file(tmpdir):
source_plaintext = os.urandom(1024 * 1024) source_plaintext = os.urandom(1024 * 1024)
b64_plaintext = tmpdir.join('base64_plaintext') b64_plaintext = tmpdir.join("base64_plaintext")
b64_plaintext.write(base64.b64encode(source_plaintext)) b64_plaintext.write(base64.b64encode(source_plaintext))
decoded_plaintext = tmpdir.join('decoded_plaintext') decoded_plaintext = tmpdir.join("decoded_plaintext")
with open(str(b64_plaintext), 'rb') as source: with open(str(b64_plaintext), "rb") as source:
# Separate lines to accommodate 2.6 # Separate lines to accommodate 2.6
with open(str(decoded_plaintext), 'wb') as raw: with open(str(decoded_plaintext), "wb") as raw:
with Base64IO(source) as decoder: with Base64IO(source) as decoder:
for chunk in decoder: for chunk in decoder:
raw.write(chunk) raw.write(chunk)
with open(str(decoded_plaintext), 'rb') as raw: with open(str(decoded_plaintext), "rb") as raw:
decoded = raw.read() decoded = raw.read()
assert decoded == source_plaintext assert decoded == source_plaintext
@ -402,20 +384,20 @@ def test_base64io_decode_file(tmpdir):
def test_base64io_encode_file(tmpdir): def test_base64io_encode_file(tmpdir):
source_plaintext = os.urandom(1024 * 1024) source_plaintext = os.urandom(1024 * 1024)
plaintext_b64 = base64.b64encode(source_plaintext) plaintext_b64 = base64.b64encode(source_plaintext)
plaintext = tmpdir.join('plaintext') plaintext = tmpdir.join("plaintext")
b64_plaintext = tmpdir.join('base64_plaintext') b64_plaintext = tmpdir.join("base64_plaintext")
with open(str(plaintext), 'wb') as file: with open(str(plaintext), "wb") as file:
file.write(source_plaintext) file.write(source_plaintext)
with open(str(plaintext), 'rb') as source: with open(str(plaintext), "rb") as source:
# Separate lines to accommodate 2.6 # Separate lines to accommodate 2.6
with open(str(b64_plaintext), 'wb') as target: with open(str(b64_plaintext), "wb") as target:
with Base64IO(target) as encoder: with Base64IO(target) as encoder:
for chunk in source: for chunk in source:
encoder.write(chunk) encoder.write(chunk)
with open(str(b64_plaintext), 'rb') as file2: with open(str(b64_plaintext), "rb") as file2:
encoded = file2.read() encoded = file2.read()
assert encoded == plaintext_b64 assert encoded == plaintext_b64