From f2252f1826892d0f1762f9f8c92f4903e8fa1994 Mon Sep 17 00:00:00 2001 From: mattsb42-aws Date: Wed, 16 May 2018 09:18:29 -0700 Subject: [PATCH 1/3] add black-check testenv --- tox.ini | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tox.ini b/tox.ini index 2c027cd..5dec086 100644 --- a/tox.ini +++ b/tox.ini @@ -93,6 +93,17 @@ commands = --ignore F811,D103 \ test/ +[testenv:black-check] +basepython = {[testenv:flake8]basepython} +deps = + black +commands = + black --line-length 120 --diff \ + src/base64io/ \ + setup.py \ + doc/conf.py \ + test/ + [testenv:pylint] basepython = {[testenv:default-python]basepython} deps = From 05830a966cbd48119d7db1798e94fd94a21f27c1 Mon Sep 17 00:00:00 2001 From: mattsb42-aws Date: Tue, 5 Jun 2018 16:15:56 -0700 Subject: [PATCH 2/3] add black testenvs, CI stage, and contributing requirement --- .travis.yml | 2 ++ CONTRIBUTING.md | 1 + tox.ini | 13 ++++++++++--- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 3fde6e4..47c50b0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -36,5 +36,7 @@ matrix: env: TOXENV=flake8-tests - python: 3.6 env: TOXENV=pylint-tests + - python: 3.6 + env: TOXENV=black-check install: pip install tox script: tox diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b48fe40..b69a666 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -26,6 +26,7 @@ Contributions via pull requests are much appreciated. Before sending us a pull r 1. You are working against the latest source on the *master* branch. 2. You check existing open, and recently merged, pull requests to make sure someone else hasn't addressed the problem already. 3. You open an issue to discuss any significant work - we would hate for your time to be wasted. +4. If you modified any source code, run `tox -e blacken` and commit any changes black makes. To send us a pull request, please: diff --git a/tox.ini b/tox.ini index 5dec086..045d570 100644 --- a/tox.ini +++ b/tox.ini @@ -93,17 +93,24 @@ commands = --ignore F811,D103 \ test/ -[testenv:black-check] -basepython = {[testenv:flake8]basepython} +[testenv:blacken] +basepython = python3 deps = black commands = - black --line-length 120 --diff \ + black --line-length 120 \ src/base64io/ \ setup.py \ doc/conf.py \ test/ +[testenv:black-check] +basepython = {[testenv:blacken]basepython} +deps = + {[testenv:blacken]deps} +commands = + {[testenv:blacken]commands} --diff + [testenv:pylint] basepython = {[testenv:default-python]basepython} deps = From 17ea87321247e2e71cb587d8da7ccfdd2b809de7 Mon Sep 17 00:00:00 2001 From: mattsb42-aws Date: Tue, 5 Jun 2018 16:21:48 -0700 Subject: [PATCH 3/3] blacken --- doc/conf.py | 49 +++++---- setup.py | 66 ++++++------ src/base64io/__init__.py | 38 +++---- test/functional/test_f_base64_stream.py | 8 +- test/unit/test_base64_stream.py | 128 ++++++++++-------------- 5 files changed, 136 insertions(+), 153 deletions(-) diff --git a/doc/conf.py b/doc/conf.py index add7efc..ef6f7ab 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -5,66 +5,71 @@ import io import os import re -VERSION_RE = re.compile(r'''__version__ = ['"]([0-9.]+)['"]''') +VERSION_RE = re.compile(r"""__version__ = ['"]([0-9.]+)['"]""") HERE = os.path.abspath(os.path.dirname(__file__)) def read(*args): """Read complete file contents.""" - return io.open(os.path.join(HERE, *args), encoding='utf-8').read() + return io.open(os.path.join(HERE, *args), encoding="utf-8").read() def get_release(): """Read the release (full three-part version number) from this module.""" - init = read('..', 'src', 'base64io', '__init__.py') + init = read("..", "src", "base64io", "__init__.py") return VERSION_RE.search(init).group(1) def get_version(): """Read the version (MAJOR.MINOR) from this module.""" _release = get_release() - split_version = _release.split('.') + split_version = _release.split(".") if len(split_version) == 3: - return '.'.join(split_version[:2]) + return ".".join(split_version[:2]) return _release -project = u'base64io' +project = u"base64io" version = get_version() release = get_release() # Add any Sphinx extension module names here, as strings. They can be extensions # coming with Sphinx (named 'sphinx.ext.*') or your custom ones. -extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest', - 'sphinx.ext.intersphinx', 'sphinx.ext.todo', - 'sphinx.ext.coverage', 'sphinx.ext.autosummary', - 'sphinx.ext.napoleon'] +extensions = [ + "sphinx.ext.autodoc", + "sphinx.ext.doctest", + "sphinx.ext.intersphinx", + "sphinx.ext.todo", + "sphinx.ext.coverage", + "sphinx.ext.autosummary", + "sphinx.ext.napoleon", +] napoleon_include_special_with_doc = False # Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] +templates_path = ["_templates"] -source_suffix = '.rst' # The suffix of source filenames. -master_doc = 'index' # The master toctree document. +source_suffix = ".rst" # The suffix of source filenames. +master_doc = "index" # The master toctree document. -copyright = u'%s, Amazon' % datetime.now().year # pylint: disable=redefined-builtin +copyright = u"%s, Amazon" % datetime.now().year # pylint: disable=redefined-builtin # List of directories, relative to source directory, that shouldn't be searched # for source files. -exclude_trees = ['_build'] +exclude_trees = ["_build"] -pygments_style = 'sphinx' +pygments_style = "sphinx" autoclass_content = "both" -autodoc_default_flags = ['show-inheritance', 'members'] -autodoc_member_order = 'bysource' +autodoc_default_flags = ["show-inheritance", "members"] +autodoc_member_order = "bysource" -html_theme = 'sphinx_rtd_theme' -html_static_path = ['_static'] -htmlhelp_basename = '%sdoc' % project +html_theme = "sphinx_rtd_theme" +html_static_path = ["_static"] +htmlhelp_basename = "%sdoc" % project # Example configuration for intersphinx: refer to the Python standard library. -intersphinx_mapping = {'http://docs.python.org/': None} +intersphinx_mapping = {"http://docs.python.org/": None} # autosummary autosummary_generate = True diff --git a/setup.py b/setup.py index 876e7b1..8cd73ed 100644 --- a/setup.py +++ b/setup.py @@ -5,61 +5,57 @@ import re from setuptools import find_packages, setup -VERSION_RE = re.compile(r'''__version__ = ['"]([0-9.]+)['"]''') +VERSION_RE = re.compile(r"""__version__ = ['"]([0-9.]+)['"]""") HERE = os.path.abspath(os.path.dirname(__file__)) def read(*args): """Read complete file contents.""" - return io.open(os.path.join(HERE, *args), encoding='utf-8').read() + return io.open(os.path.join(HERE, *args), encoding="utf-8").read() def readme(): """Read and patch README.""" - readme_text = read('README.rst') + readme_text = read("README.rst") # PyPI does not accept :class: references. - return readme_text.replace(':class:`base64io.Base64IO`', '``base64io.Base64IO``') + return readme_text.replace(":class:`base64io.Base64IO`", "``base64io.Base64IO``") def get_version(): """Read the version from this module.""" - init = read('src', 'base64io', '__init__.py') + init = read("src", "base64io", "__init__.py") return VERSION_RE.search(init).group(1) setup( - name='base64io', + name="base64io", version=get_version(), - packages=find_packages('src'), - package_dir={'': 'src'}, - url='http://base64io.readthedocs.io/en/latest/', - author='Amazon Web Services', - author_email='aws-cryptools@amazon.com', - maintainer='Amazon Web Services', + packages=find_packages("src"), + package_dir={"": "src"}, + url="http://base64io.readthedocs.io/en/latest/", + author="Amazon Web Services", + author_email="aws-cryptools@amazon.com", + maintainer="Amazon Web Services", long_description=readme(), - keywords='base64 stream', - data_files=[ - 'README.rst', - 'CHANGELOG.rst', - 'LICENSE' - ], - license='Apache License 2.0', + keywords="base64 stream", + data_files=["README.rst", "CHANGELOG.rst", "LICENSE"], + license="Apache License 2.0", install_requires=[], classifiers=[ - 'Development Status :: 5 - Production/Stable', - 'Intended Audience :: Developers', - 'Natural Language :: English', - 'License :: OSI Approved :: Apache Software License', - 'Programming Language :: Python', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.6', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.3', - 'Programming Language :: Python :: 3.4', - 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: Implementation :: CPython' - ] + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Natural Language :: English", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 2.6", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.3", + "Programming Language :: Python :: 3.4", + "Programming Language :: Python :: 3.5", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: Implementation :: CPython", + ], ) diff --git a/src/base64io/__init__.py b/src/base64io/__init__.py index 833bf25..729f23a 100644 --- a/src/base64io/__init__.py +++ b/src/base64io/__init__.py @@ -19,7 +19,7 @@ import logging import string import sys -LOGGER_NAME = 'base64io' +LOGGER_NAME = "base64io" try: # Python 3.5.0 and 3.5.1 have incompatible typing modules from types import TracebackType # noqa pylint: disable=unused-import @@ -28,8 +28,8 @@ except ImportError: # pragma: no cover # We only actually need these imports when running the mypy checks pass -__all__ = ('Base64IO',) -__version__ = '1.0.0' +__all__ = ("Base64IO",) +__version__ = "1.0.0" _LOGGER = logging.getLogger(LOGGER_NAME) @@ -72,13 +72,13 @@ class Base64IO(io.IOBase): :raises TypeError: if ``wrapped`` does not have attributes needed to determine the stream's state """ - required_attrs = ('read', 'write', 'close', 'closed', 'flush') + required_attrs = ("read", "write", "close", "closed", "flush") if not all(hasattr(wrapped, attr) for attr in required_attrs): - raise TypeError('Base64IO wrapped object must have attributes: %s' % (repr(sorted(required_attrs)),)) + raise TypeError("Base64IO wrapped object must have attributes: %s" % (repr(sorted(required_attrs)),)) super(Base64IO, self).__init__() self.__wrapped = wrapped - self.__read_buffer = b'' - self.__write_buffer = b'' + self.__read_buffer = b"" + self.__write_buffer = b"" def __enter__(self): # type: () -> Base64IO @@ -100,7 +100,7 @@ class Base64IO(io.IOBase): """ if self.__write_buffer: self.__wrapped.write(base64.b64encode(self.__write_buffer)) - self.__write_buffer = b'' + self.__write_buffer = b"" self.closed = True def _passthrough_interactive_check(self, method_name, mode): @@ -136,7 +136,7 @@ class Base64IO(io.IOBase): :rtype: bool """ - return self._passthrough_interactive_check('writable', 'w') + return self._passthrough_interactive_check("writable", "w") def readable(self): # type: () -> bool @@ -147,7 +147,7 @@ class Base64IO(io.IOBase): :rtype: bool """ - return self._passthrough_interactive_check('readable', 'r') + return self._passthrough_interactive_check("readable", "r") def flush(self): # type: () -> None @@ -172,14 +172,14 @@ class Base64IO(io.IOBase): :raises IOError: if underlying stream is not writable """ if self.closed: - raise ValueError('I/O operation on closed file.') + raise ValueError("I/O operation on closed file.") if not self.writable(): - raise IOError('Stream is not writable') + raise IOError("Stream is not writable") # Load any stashed bytes and clear the buffer _bytes_to_write = self.__write_buffer + b - self.__write_buffer = b'' + self.__write_buffer = b"" # If an even base64 chunk or finalizing the stream, write through. if len(_bytes_to_write) % 3 == 0: @@ -218,7 +218,7 @@ class Base64IO(io.IOBase): return data _data_buffer = io.BytesIO() - _data_buffer.write(b''.join(data.split())) + _data_buffer.write(b"".join(data.split())) _remaining_bytes_to_read = total_bytes_to_read - _data_buffer.tell() while _remaining_bytes_to_read > 0: @@ -227,7 +227,7 @@ class Base64IO(io.IOBase): # No more data to read from wrapped stream. break - _data_buffer.write(b''.join(_raw_additional_data.split())) + _data_buffer.write(b"".join(_raw_additional_data.split())) _remaining_bytes_to_read = total_bytes_to_read - _data_buffer.tell() return _data_buffer.getvalue() @@ -245,10 +245,10 @@ class Base64IO(io.IOBase): :rtype: bytes """ if self.closed: - raise ValueError('I/O operation on closed file.') + raise ValueError("I/O operation on closed file.") if not self.readable(): - raise IOError('Stream is not readable') + raise IOError("Stream is not readable") if b is not None and b < 0: b = None @@ -256,13 +256,13 @@ class Base64IO(io.IOBase): if b is not None: # Calculate number of encoded bytes that must be read to get b raw bytes. _bytes_to_read = int((b - len(self.__read_buffer)) * 4 / 3) - _bytes_to_read += (4 - _bytes_to_read % 4) + _bytes_to_read += 4 - _bytes_to_read % 4 # Read encoded bytes from wrapped stream. data = self.__wrapped.read(_bytes_to_read) # Remove whitespace from read data and attempt to read more data to get the desired # number of bytes. - if any([char.encode('utf-8') in data for char in string.whitespace]): + if any([char.encode("utf-8") in data for char in string.whitespace]): data = self._read_additional_data_removing_whitespace(data, _bytes_to_read) results = io.BytesIO() diff --git a/test/functional/test_f_base64_stream.py b/test/functional/test_f_base64_stream.py index 066c457..dd62015 100644 --- a/test/functional/test_f_base64_stream.py +++ b/test/functional/test_f_base64_stream.py @@ -18,8 +18,8 @@ import pytest from base64io import Base64IO -hypothesis = pytest.importorskip('hypothesis') -hypothesis_strategies = pytest.importorskip('hypothesis.strategies') +hypothesis = pytest.importorskip("hypothesis") +hypothesis_strategies = pytest.importorskip("hypothesis.strategies") pytestmark = [pytest.mark.functional] @@ -28,12 +28,12 @@ HYPOTHESIS_SETTINGS = hypothesis.settings( hypothesis.HealthCheck.too_slow, hypothesis.HealthCheck.data_too_large, hypothesis.HealthCheck.hung_test, - hypothesis.HealthCheck.large_base_example + hypothesis.HealthCheck.large_base_example, ), timeout=hypothesis.unlimited, deadline=None, max_examples=1000, - max_iterations=1500 + max_iterations=1500, ) BINARY = hypothesis_strategies.binary() diff --git a/test/unit/test_base64_stream.py b/test/unit/test_base64_stream.py index 62b1e68..4adc580 100644 --- a/test/unit/test_base64_stream.py +++ b/test/unit/test_base64_stream.py @@ -31,16 +31,16 @@ def test_base64io_bad_wrap(): with pytest.raises(TypeError) as excinfo: Base64IO(7) - excinfo.match(r'Base64IO wrapped object must have attributes: *') + excinfo.match(r"Base64IO wrapped object must have attributes: *") def test_base64io_write_after_closed(): with Base64IO(io.BytesIO()) as test: with pytest.raises(ValueError) as excinfo: test.close() - test.write(b'aksdhjf') + test.write(b"aksdhjf") - excinfo.match(r'I/O operation on closed file.') + excinfo.match(r"I/O operation on closed file.") def test_base64io_read_after_closed(): @@ -49,22 +49,17 @@ def test_base64io_read_after_closed(): test.close() test.read() - excinfo.match(r'I/O operation on closed file.') + excinfo.match(r"I/O operation on closed file.") -@pytest.mark.parametrize('method_name', ('isatty', 'seekable')) +@pytest.mark.parametrize("method_name", ("isatty", "seekable")) def test_base64io_always_false_methods(method_name): test = Base64IO(io.BytesIO()) assert not getattr(test, method_name)() -@pytest.mark.parametrize('method_name, args', ( - ('fileno', ()), - ('seek', (None,)), - ('tell', ()), - ('truncate', ()) -)) +@pytest.mark.parametrize("method_name, args", (("fileno", ()), ("seek", (None,)), ("tell", ()), ("truncate", ()))) def test_unsupported_methods(method_name, args): test = Base64IO(io.BytesIO()) @@ -72,7 +67,7 @@ def test_unsupported_methods(method_name, args): getattr(test, method_name)(*args) -@pytest.mark.parametrize('method_name', ('flush', 'writable', 'readable')) +@pytest.mark.parametrize("method_name", ("flush", "writable", "readable")) def test_passthrough_methods_present(monkeypatch, method_name): wrapped = io.BytesIO() monkeypatch.setattr(wrapped, method_name, lambda: sentinel.passthrough) @@ -81,7 +76,7 @@ def test_passthrough_methods_present(monkeypatch, method_name): assert getattr(wrapper, method_name)() is sentinel.passthrough -@pytest.mark.parametrize('method_name', ('writable', 'readable')) +@pytest.mark.parametrize("method_name", ("writable", "readable")) def test_passthrough_methods_not_present(monkeypatch, method_name): wrapped = MagicMock() monkeypatch.delattr(wrapped, method_name, False) @@ -90,15 +85,13 @@ def test_passthrough_methods_not_present(monkeypatch, method_name): assert not getattr(wrapper, method_name)() -@pytest.mark.parametrize('mode, method_name, expected', ( - ('wb', 'writable', True), - ('rb', 'readable', True), - ('rb', 'writable', False), - ('wb', 'readable', False) -)) +@pytest.mark.parametrize( + "mode, method_name, expected", + (("wb", "writable", True), ("rb", "readable", True), ("rb", "writable", False), ("wb", "readable", False)), +) def test_passthrough_methods_file(tmpdir, method_name, mode, expected): - source = tmpdir.join('source') - source.write('some data') + source = tmpdir.join("source") + source.write("some data") with open(str(source), mode) as reader: with Base64IO(reader) as b64: @@ -110,10 +103,7 @@ def test_passthrough_methods_file(tmpdir, method_name, mode, expected): assert not test -@pytest.mark.parametrize('patch_method, call_method, call_arg', ( - ('writable', 'write', b''), - ('readable', 'read', 0) -)) +@pytest.mark.parametrize("patch_method, call_method, call_arg", (("writable", "write", b""), ("readable", "read", 0))) def test_non_interactive_error(monkeypatch, patch_method, call_method, call_arg): wrapped = io.BytesIO() monkeypatch.setattr(wrapped, patch_method, lambda: False) @@ -122,7 +112,7 @@ def test_non_interactive_error(monkeypatch, patch_method, call_method, call_arg) with pytest.raises(IOError) as excinfo: getattr(wrapper, call_method)(call_arg) - excinfo.match(r'Stream is not ' + patch_method) + excinfo.match(r"Stream is not " + patch_method) def build_test_cases(): @@ -148,15 +138,14 @@ def build_test_cases(): @pytest.mark.parametrize( - 'bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect', - build_test_cases() + "bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect", build_test_cases() ) def test_base64io_decode(bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect): plaintext_source = os.urandom(bytes_to_generate) plaintext_b64 = io.BytesIO(base64.b64encode(plaintext_source)) plaintext_wrapped = Base64IO(plaintext_b64) - test = b'' + test = b"" for _round in range(number_of_rounds): test += plaintext_wrapped.read(bytes_per_round) @@ -165,8 +154,7 @@ def test_base64io_decode(bytes_to_generate, bytes_per_round, number_of_rounds, t @pytest.mark.parametrize( - 'bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect', - build_test_cases() + "bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect", build_test_cases() ) def test_base64io_encode_partial(bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect): plaintext_source = os.urandom(bytes_to_generate) @@ -200,7 +188,7 @@ def test_base64io_encode_partial(bytes_to_generate, bytes_per_round, number_of_r assert plaintext_b64.startswith(target_stream.getvalue()) -@pytest.mark.parametrize('source_bytes', [case[0] for case in build_test_cases()]) +@pytest.mark.parametrize("source_bytes", [case[0] for case in build_test_cases()]) def test_base64io_encode_context_manager(source_bytes): plaintext_source = os.urandom(source_bytes) plaintext_b64 = base64.b64encode(plaintext_source) @@ -225,7 +213,7 @@ def test_base64io_encode_context_manager_reuse(): with stream as plaintext_wrapped: plaintext_wrapped.read() - excinfo.match(r'I/O operation on closed file.') + excinfo.match(r"I/O operation on closed file.") def test_base64io_encode_use_after_context_manager_exit(): @@ -242,10 +230,10 @@ def test_base64io_encode_use_after_context_manager_exit(): with pytest.raises(ValueError) as excinfo: stream.read() - excinfo.match(r'I/O operation on closed file.') + excinfo.match(r"I/O operation on closed file.") -@pytest.mark.parametrize('source_bytes', [case[0] for case in build_test_cases()]) +@pytest.mark.parametrize("source_bytes", [case[0] for case in build_test_cases()]) def test_base64io_encode(source_bytes): plaintext_source = os.urandom(source_bytes) plaintext_b64 = base64.b64encode(plaintext_source) @@ -260,12 +248,9 @@ def test_base64io_encode(source_bytes): assert plaintext_stream.getvalue() == plaintext_b64 -@pytest.mark.parametrize('bytes_to_read, expected_bytes_read', ( - (-1, io.DEFAULT_BUFFER_SIZE), - (0, io.DEFAULT_BUFFER_SIZE), - (1, 1), - (10, 10) -)) +@pytest.mark.parametrize( + "bytes_to_read, expected_bytes_read", ((-1, io.DEFAULT_BUFFER_SIZE), (0, io.DEFAULT_BUFFER_SIZE), (1, 1), (10, 10)) +) def test_base64io_decode_readline(bytes_to_read, expected_bytes_read): source_plaintext = os.urandom(io.DEFAULT_BUFFER_SIZE * 2) source_stream = io.BytesIO(base64.b64encode(source_plaintext)) @@ -279,10 +264,9 @@ def test_base64io_decode_readline(bytes_to_read, expected_bytes_read): def build_b64_with_whitespace(source_bytes, line_length): plaintext_source = os.urandom(source_bytes) b64_plaintext = io.BytesIO(base64.b64encode(plaintext_source)) - b64_plaintext_with_whitespace = b'\n'.join([ - line for line - in iter(functools.partial(b64_plaintext.read, line_length), b'') - ]) + b64_plaintext_with_whitespace = b"\n".join( + [line for line in iter(functools.partial(b64_plaintext.read, line_length), b"")] + ) return plaintext_source, b64_plaintext_with_whitespace @@ -293,18 +277,18 @@ def build_whitespace_testcases(): # first read is mostly whitespace plaintext, b64_plaintext = build_b64_with_whitespace(100, 20) - b64_plaintext = (b' ' * 80) + b64_plaintext + b64_plaintext = (b" " * 80) + b64_plaintext scenarios.append((plaintext, b64_plaintext, 100)) # first several reads are entirely whitespace plaintext, b64_plaintext = build_b64_with_whitespace(100, 20) - b64_plaintext = (b' ' * 500) + b64_plaintext + b64_plaintext = (b" " * 500) + b64_plaintext scenarios.append((plaintext, b64_plaintext, 100)) return scenarios -@pytest.mark.parametrize('plaintext_source, b64_plaintext_with_whitespace, read_bytes', build_whitespace_testcases()) +@pytest.mark.parametrize("plaintext_source, b64_plaintext_with_whitespace, read_bytes", build_whitespace_testcases()) def test_base64io_decode_with_whitespace(plaintext_source, b64_plaintext_with_whitespace, read_bytes): with Base64IO(io.BytesIO(b64_plaintext_with_whitespace)) as decoder: test = decoder.read(read_bytes) @@ -312,9 +296,9 @@ def test_base64io_decode_with_whitespace(plaintext_source, b64_plaintext_with_wh assert test == plaintext_source[:read_bytes] -@pytest.mark.parametrize('plaintext_source, b64_plaintext_with_whitespace, read_bytes', ( - (b'\x00\x00\x00', b'AAAA', 3), -)) +@pytest.mark.parametrize( + "plaintext_source, b64_plaintext_with_whitespace, read_bytes", ((b"\x00\x00\x00", b"AAAA", 3),) +) def test_base64io_decode_parametrized_null_bytes(plaintext_source, b64_plaintext_with_whitespace, read_bytes): # Verifies that pytest is handling null bytes correctly (broken in 3.3.0) # https://github.com/pytest-dev/pytest/issues/2957 @@ -325,7 +309,7 @@ def test_base64io_decode_parametrized_null_bytes(plaintext_source, b64_plaintext def test_base64io_decode_read_only_from_buffer(): - plaintext_source = b'12345' + plaintext_source = b"12345" plaintext_b64 = io.BytesIO(base64.b64encode(plaintext_source)) plaintext_wrapped = Base64IO(plaintext_b64) @@ -333,9 +317,9 @@ def test_base64io_decode_read_only_from_buffer(): test_2 = plaintext_wrapped.read(1) test_3 = plaintext_wrapped.read() - assert test_1 == b'1' - assert test_2 == b'2' - assert test_3 == b'345' + assert test_1 == b"1" + assert test_2 == b"2" + assert test_3 == b"345" def test_base64io_decode_context_manager(): @@ -350,12 +334,10 @@ def test_base64io_decode_context_manager(): assert test.getvalue() == source_plaintext -@pytest.mark.parametrize('hint_bytes, expected_bytes_read', ( - (-1, 102400), - (0, 102400), - (1, io.DEFAULT_BUFFER_SIZE), - (io.DEFAULT_BUFFER_SIZE + 99, io.DEFAULT_BUFFER_SIZE * 2) -)) +@pytest.mark.parametrize( + "hint_bytes, expected_bytes_read", + ((-1, 102400), (0, 102400), (1, io.DEFAULT_BUFFER_SIZE), (io.DEFAULT_BUFFER_SIZE + 99, io.DEFAULT_BUFFER_SIZE * 2)), +) def test_base64io_decode_readlines(hint_bytes, expected_bytes_read): source_plaintext = os.urandom(102400) source_stream = io.BytesIO(base64.b64encode(source_plaintext)) @@ -371,7 +353,7 @@ def test_base64io_decode_readlines(hint_bytes, expected_bytes_read): def test_base64io_encode_writelines(): source_plaintext = [os.urandom(1024) for _ in range(100)] - b64_plaintext = base64.b64encode(b''.join(source_plaintext)) + b64_plaintext = base64.b64encode(b"".join(source_plaintext)) test = io.BytesIO() with Base64IO(test) as encoder: @@ -382,18 +364,18 @@ def test_base64io_encode_writelines(): def test_base64io_decode_file(tmpdir): source_plaintext = os.urandom(1024 * 1024) - b64_plaintext = tmpdir.join('base64_plaintext') + b64_plaintext = tmpdir.join("base64_plaintext") b64_plaintext.write(base64.b64encode(source_plaintext)) - decoded_plaintext = tmpdir.join('decoded_plaintext') + decoded_plaintext = tmpdir.join("decoded_plaintext") - with open(str(b64_plaintext), 'rb') as source: + with open(str(b64_plaintext), "rb") as source: # Separate lines to accommodate 2.6 - with open(str(decoded_plaintext), 'wb') as raw: + with open(str(decoded_plaintext), "wb") as raw: with Base64IO(source) as decoder: for chunk in decoder: raw.write(chunk) - with open(str(decoded_plaintext), 'rb') as raw: + with open(str(decoded_plaintext), "rb") as raw: decoded = raw.read() assert decoded == source_plaintext @@ -402,20 +384,20 @@ def test_base64io_decode_file(tmpdir): def test_base64io_encode_file(tmpdir): source_plaintext = os.urandom(1024 * 1024) plaintext_b64 = base64.b64encode(source_plaintext) - plaintext = tmpdir.join('plaintext') - b64_plaintext = tmpdir.join('base64_plaintext') + plaintext = tmpdir.join("plaintext") + b64_plaintext = tmpdir.join("base64_plaintext") - with open(str(plaintext), 'wb') as file: + with open(str(plaintext), "wb") as file: file.write(source_plaintext) - with open(str(plaintext), 'rb') as source: + with open(str(plaintext), "rb") as source: # Separate lines to accommodate 2.6 - with open(str(b64_plaintext), 'wb') as target: + with open(str(b64_plaintext), "wb") as target: with Base64IO(target) as encoder: for chunk in source: encoder.write(chunk) - with open(str(b64_plaintext), 'rb') as file2: + with open(str(b64_plaintext), "rb") as file2: encoded = file2.read() assert encoded == plaintext_b64