Merge pull request #8 from mattsb42-aws/black

Blacken!
development
Matt Bullock 2018-06-11 17:28:20 -07:00 committed by GitHub
commit c3a41c5020
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 157 additions and 153 deletions

View File

@ -37,5 +37,7 @@ matrix:
env: TOXENV=flake8-tests
- python: 3.6
env: TOXENV=pylint-tests
- python: 3.6
env: TOXENV=black-check
install: pip install tox
script: tox

View File

@ -26,6 +26,7 @@ Contributions via pull requests are much appreciated. Before sending us a pull r
1. You are working against the latest source on the *master* branch.
2. You check existing open, and recently merged, pull requests to make sure someone else hasn't addressed the problem already.
3. You open an issue to discuss any significant work - we would hate for your time to be wasted.
4. If you modified any source code, run `tox -e blacken` and commit any changes black makes.
To send us a pull request, please:

View File

@ -5,66 +5,71 @@ import io
import os
import re
VERSION_RE = re.compile(r'''__version__ = ['"]([0-9.]+)['"]''')
VERSION_RE = re.compile(r"""__version__ = ['"]([0-9.]+)['"]""")
HERE = os.path.abspath(os.path.dirname(__file__))
def read(*args):
"""Read complete file contents."""
return io.open(os.path.join(HERE, *args), encoding='utf-8').read()
return io.open(os.path.join(HERE, *args), encoding="utf-8").read()
def get_release():
"""Read the release (full three-part version number) from this module."""
init = read('..', 'src', 'base64io', '__init__.py')
init = read("..", "src", "base64io", "__init__.py")
return VERSION_RE.search(init).group(1)
def get_version():
"""Read the version (MAJOR.MINOR) from this module."""
_release = get_release()
split_version = _release.split('.')
split_version = _release.split(".")
if len(split_version) == 3:
return '.'.join(split_version[:2])
return ".".join(split_version[:2])
return _release
project = u'base64io'
project = u"base64io"
version = get_version()
release = get_release()
# Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest',
'sphinx.ext.intersphinx', 'sphinx.ext.todo',
'sphinx.ext.coverage', 'sphinx.ext.autosummary',
'sphinx.ext.napoleon']
extensions = [
"sphinx.ext.autodoc",
"sphinx.ext.doctest",
"sphinx.ext.intersphinx",
"sphinx.ext.todo",
"sphinx.ext.coverage",
"sphinx.ext.autosummary",
"sphinx.ext.napoleon",
]
napoleon_include_special_with_doc = False
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
templates_path = ["_templates"]
source_suffix = '.rst' # The suffix of source filenames.
master_doc = 'index' # The master toctree document.
source_suffix = ".rst" # The suffix of source filenames.
master_doc = "index" # The master toctree document.
copyright = u'%s, Amazon' % datetime.now().year # pylint: disable=redefined-builtin
copyright = u"%s, Amazon" % datetime.now().year # pylint: disable=redefined-builtin
# List of directories, relative to source directory, that shouldn't be searched
# for source files.
exclude_trees = ['_build']
exclude_trees = ["_build"]
pygments_style = 'sphinx'
pygments_style = "sphinx"
autoclass_content = "both"
autodoc_default_flags = ['show-inheritance', 'members']
autodoc_member_order = 'bysource'
autodoc_default_flags = ["show-inheritance", "members"]
autodoc_member_order = "bysource"
html_theme = 'sphinx_rtd_theme'
html_static_path = ['_static']
htmlhelp_basename = '%sdoc' % project
html_theme = "sphinx_rtd_theme"
html_static_path = ["_static"]
htmlhelp_basename = "%sdoc" % project
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {'http://docs.python.org/': None}
intersphinx_mapping = {"http://docs.python.org/": None}
# autosummary
autosummary_generate = True

View File

@ -5,61 +5,57 @@ import re
from setuptools import find_packages, setup
VERSION_RE = re.compile(r'''__version__ = ['"]([0-9.]+)['"]''')
VERSION_RE = re.compile(r"""__version__ = ['"]([0-9.]+)['"]""")
HERE = os.path.abspath(os.path.dirname(__file__))
def read(*args):
"""Read complete file contents."""
return io.open(os.path.join(HERE, *args), encoding='utf-8').read()
return io.open(os.path.join(HERE, *args), encoding="utf-8").read()
def readme():
"""Read and patch README."""
readme_text = read('README.rst')
readme_text = read("README.rst")
# PyPI does not accept :class: references.
return readme_text.replace(':class:`base64io.Base64IO`', '``base64io.Base64IO``')
return readme_text.replace(":class:`base64io.Base64IO`", "``base64io.Base64IO``")
def get_version():
"""Read the version from this module."""
init = read('src', 'base64io', '__init__.py')
init = read("src", "base64io", "__init__.py")
return VERSION_RE.search(init).group(1)
setup(
name='base64io',
name="base64io",
version=get_version(),
packages=find_packages('src'),
package_dir={'': 'src'},
url='http://base64io.readthedocs.io/en/latest/',
author='Amazon Web Services',
author_email='aws-cryptools@amazon.com',
maintainer='Amazon Web Services',
packages=find_packages("src"),
package_dir={"": "src"},
url="http://base64io.readthedocs.io/en/latest/",
author="Amazon Web Services",
author_email="aws-cryptools@amazon.com",
maintainer="Amazon Web Services",
long_description=readme(),
keywords='base64 stream',
data_files=[
'README.rst',
'CHANGELOG.rst',
'LICENSE'
],
license='Apache License 2.0',
keywords="base64 stream",
data_files=["README.rst", "CHANGELOG.rst", "LICENSE"],
license="Apache License 2.0",
install_requires=[],
classifiers=[
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'Natural Language :: English',
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: Implementation :: CPython'
]
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"Natural Language :: English",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.6",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: Implementation :: CPython",
],
)

View File

@ -19,7 +19,7 @@ import logging
import string
import sys
LOGGER_NAME = 'base64io'
LOGGER_NAME = "base64io"
try: # Python 3.5.0 and 3.5.1 have incompatible typing modules
from types import TracebackType # noqa pylint: disable=unused-import
@ -28,8 +28,8 @@ except ImportError: # pragma: no cover
# We only actually need these imports when running the mypy checks
pass
__all__ = ('Base64IO',)
__version__ = '1.0.0'
__all__ = ("Base64IO",)
__version__ = "1.0.0"
_LOGGER = logging.getLogger(LOGGER_NAME)
@ -72,13 +72,13 @@ class Base64IO(io.IOBase):
:raises TypeError: if ``wrapped`` does not have attributes needed to determine the stream's state
"""
required_attrs = ('read', 'write', 'close', 'closed', 'flush')
required_attrs = ("read", "write", "close", "closed", "flush")
if not all(hasattr(wrapped, attr) for attr in required_attrs):
raise TypeError('Base64IO wrapped object must have attributes: %s' % (repr(sorted(required_attrs)),))
raise TypeError("Base64IO wrapped object must have attributes: %s" % (repr(sorted(required_attrs)),))
super(Base64IO, self).__init__()
self.__wrapped = wrapped
self.__read_buffer = b''
self.__write_buffer = b''
self.__read_buffer = b""
self.__write_buffer = b""
def __enter__(self):
# type: () -> Base64IO
@ -100,7 +100,7 @@ class Base64IO(io.IOBase):
"""
if self.__write_buffer:
self.__wrapped.write(base64.b64encode(self.__write_buffer))
self.__write_buffer = b''
self.__write_buffer = b""
self.closed = True
def _passthrough_interactive_check(self, method_name, mode):
@ -136,7 +136,7 @@ class Base64IO(io.IOBase):
:rtype: bool
"""
return self._passthrough_interactive_check('writable', 'w')
return self._passthrough_interactive_check("writable", "w")
def readable(self):
# type: () -> bool
@ -147,7 +147,7 @@ class Base64IO(io.IOBase):
:rtype: bool
"""
return self._passthrough_interactive_check('readable', 'r')
return self._passthrough_interactive_check("readable", "r")
def flush(self):
# type: () -> None
@ -172,14 +172,14 @@ class Base64IO(io.IOBase):
:raises IOError: if underlying stream is not writable
"""
if self.closed:
raise ValueError('I/O operation on closed file.')
raise ValueError("I/O operation on closed file.")
if not self.writable():
raise IOError('Stream is not writable')
raise IOError("Stream is not writable")
# Load any stashed bytes and clear the buffer
_bytes_to_write = self.__write_buffer + b
self.__write_buffer = b''
self.__write_buffer = b""
# If an even base64 chunk or finalizing the stream, write through.
if len(_bytes_to_write) % 3 == 0:
@ -218,7 +218,7 @@ class Base64IO(io.IOBase):
return data
_data_buffer = io.BytesIO()
_data_buffer.write(b''.join(data.split()))
_data_buffer.write(b"".join(data.split()))
_remaining_bytes_to_read = total_bytes_to_read - _data_buffer.tell()
while _remaining_bytes_to_read > 0:
@ -227,7 +227,7 @@ class Base64IO(io.IOBase):
# No more data to read from wrapped stream.
break
_data_buffer.write(b''.join(_raw_additional_data.split()))
_data_buffer.write(b"".join(_raw_additional_data.split()))
_remaining_bytes_to_read = total_bytes_to_read - _data_buffer.tell()
return _data_buffer.getvalue()
@ -245,10 +245,10 @@ class Base64IO(io.IOBase):
:rtype: bytes
"""
if self.closed:
raise ValueError('I/O operation on closed file.')
raise ValueError("I/O operation on closed file.")
if not self.readable():
raise IOError('Stream is not readable')
raise IOError("Stream is not readable")
if b is not None and b < 0:
b = None
@ -256,13 +256,13 @@ class Base64IO(io.IOBase):
if b is not None:
# Calculate number of encoded bytes that must be read to get b raw bytes.
_bytes_to_read = int((b - len(self.__read_buffer)) * 4 / 3)
_bytes_to_read += (4 - _bytes_to_read % 4)
_bytes_to_read += 4 - _bytes_to_read % 4
# Read encoded bytes from wrapped stream.
data = self.__wrapped.read(_bytes_to_read)
# Remove whitespace from read data and attempt to read more data to get the desired
# number of bytes.
if any([char.encode('utf-8') in data for char in string.whitespace]):
if any([char.encode("utf-8") in data for char in string.whitespace]):
data = self._read_additional_data_removing_whitespace(data, _bytes_to_read)
results = io.BytesIO()

View File

@ -18,8 +18,8 @@ import pytest
from base64io import Base64IO
hypothesis = pytest.importorskip('hypothesis')
hypothesis_strategies = pytest.importorskip('hypothesis.strategies')
hypothesis = pytest.importorskip("hypothesis")
hypothesis_strategies = pytest.importorskip("hypothesis.strategies")
pytestmark = [pytest.mark.functional]
@ -28,12 +28,12 @@ HYPOTHESIS_SETTINGS = hypothesis.settings(
hypothesis.HealthCheck.too_slow,
hypothesis.HealthCheck.data_too_large,
hypothesis.HealthCheck.hung_test,
hypothesis.HealthCheck.large_base_example
hypothesis.HealthCheck.large_base_example,
),
timeout=hypothesis.unlimited,
deadline=None,
max_examples=1000,
max_iterations=1500
max_iterations=1500,
)
BINARY = hypothesis_strategies.binary()

View File

@ -31,16 +31,16 @@ def test_base64io_bad_wrap():
with pytest.raises(TypeError) as excinfo:
Base64IO(7)
excinfo.match(r'Base64IO wrapped object must have attributes: *')
excinfo.match(r"Base64IO wrapped object must have attributes: *")
def test_base64io_write_after_closed():
with Base64IO(io.BytesIO()) as test:
with pytest.raises(ValueError) as excinfo:
test.close()
test.write(b'aksdhjf')
test.write(b"aksdhjf")
excinfo.match(r'I/O operation on closed file.')
excinfo.match(r"I/O operation on closed file.")
def test_base64io_read_after_closed():
@ -49,22 +49,17 @@ def test_base64io_read_after_closed():
test.close()
test.read()
excinfo.match(r'I/O operation on closed file.')
excinfo.match(r"I/O operation on closed file.")
@pytest.mark.parametrize('method_name', ('isatty', 'seekable'))
@pytest.mark.parametrize("method_name", ("isatty", "seekable"))
def test_base64io_always_false_methods(method_name):
test = Base64IO(io.BytesIO())
assert not getattr(test, method_name)()
@pytest.mark.parametrize('method_name, args', (
('fileno', ()),
('seek', (None,)),
('tell', ()),
('truncate', ())
))
@pytest.mark.parametrize("method_name, args", (("fileno", ()), ("seek", (None,)), ("tell", ()), ("truncate", ())))
def test_unsupported_methods(method_name, args):
test = Base64IO(io.BytesIO())
@ -72,7 +67,7 @@ def test_unsupported_methods(method_name, args):
getattr(test, method_name)(*args)
@pytest.mark.parametrize('method_name', ('flush', 'writable', 'readable'))
@pytest.mark.parametrize("method_name", ("flush", "writable", "readable"))
def test_passthrough_methods_present(monkeypatch, method_name):
wrapped = io.BytesIO()
monkeypatch.setattr(wrapped, method_name, lambda: sentinel.passthrough)
@ -81,7 +76,7 @@ def test_passthrough_methods_present(monkeypatch, method_name):
assert getattr(wrapper, method_name)() is sentinel.passthrough
@pytest.mark.parametrize('method_name', ('writable', 'readable'))
@pytest.mark.parametrize("method_name", ("writable", "readable"))
def test_passthrough_methods_not_present(monkeypatch, method_name):
wrapped = MagicMock()
monkeypatch.delattr(wrapped, method_name, False)
@ -90,15 +85,13 @@ def test_passthrough_methods_not_present(monkeypatch, method_name):
assert not getattr(wrapper, method_name)()
@pytest.mark.parametrize('mode, method_name, expected', (
('wb', 'writable', True),
('rb', 'readable', True),
('rb', 'writable', False),
('wb', 'readable', False)
))
@pytest.mark.parametrize(
"mode, method_name, expected",
(("wb", "writable", True), ("rb", "readable", True), ("rb", "writable", False), ("wb", "readable", False)),
)
def test_passthrough_methods_file(tmpdir, method_name, mode, expected):
source = tmpdir.join('source')
source.write('some data')
source = tmpdir.join("source")
source.write("some data")
with open(str(source), mode) as reader:
with Base64IO(reader) as b64:
@ -110,10 +103,7 @@ def test_passthrough_methods_file(tmpdir, method_name, mode, expected):
assert not test
@pytest.mark.parametrize('patch_method, call_method, call_arg', (
('writable', 'write', b''),
('readable', 'read', 0)
))
@pytest.mark.parametrize("patch_method, call_method, call_arg", (("writable", "write", b""), ("readable", "read", 0)))
def test_non_interactive_error(monkeypatch, patch_method, call_method, call_arg):
wrapped = io.BytesIO()
monkeypatch.setattr(wrapped, patch_method, lambda: False)
@ -122,7 +112,7 @@ def test_non_interactive_error(monkeypatch, patch_method, call_method, call_arg)
with pytest.raises(IOError) as excinfo:
getattr(wrapper, call_method)(call_arg)
excinfo.match(r'Stream is not ' + patch_method)
excinfo.match(r"Stream is not " + patch_method)
def build_test_cases():
@ -148,15 +138,14 @@ def build_test_cases():
@pytest.mark.parametrize(
'bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect',
build_test_cases()
"bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect", build_test_cases()
)
def test_base64io_decode(bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect):
plaintext_source = os.urandom(bytes_to_generate)
plaintext_b64 = io.BytesIO(base64.b64encode(plaintext_source))
plaintext_wrapped = Base64IO(plaintext_b64)
test = b''
test = b""
for _round in range(number_of_rounds):
test += plaintext_wrapped.read(bytes_per_round)
@ -165,8 +154,7 @@ def test_base64io_decode(bytes_to_generate, bytes_per_round, number_of_rounds, t
@pytest.mark.parametrize(
'bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect',
build_test_cases()
"bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect", build_test_cases()
)
def test_base64io_encode_partial(bytes_to_generate, bytes_per_round, number_of_rounds, total_bytes_to_expect):
plaintext_source = os.urandom(bytes_to_generate)
@ -200,7 +188,7 @@ def test_base64io_encode_partial(bytes_to_generate, bytes_per_round, number_of_r
assert plaintext_b64.startswith(target_stream.getvalue())
@pytest.mark.parametrize('source_bytes', [case[0] for case in build_test_cases()])
@pytest.mark.parametrize("source_bytes", [case[0] for case in build_test_cases()])
def test_base64io_encode_context_manager(source_bytes):
plaintext_source = os.urandom(source_bytes)
plaintext_b64 = base64.b64encode(plaintext_source)
@ -225,7 +213,7 @@ def test_base64io_encode_context_manager_reuse():
with stream as plaintext_wrapped:
plaintext_wrapped.read()
excinfo.match(r'I/O operation on closed file.')
excinfo.match(r"I/O operation on closed file.")
def test_base64io_encode_use_after_context_manager_exit():
@ -242,10 +230,10 @@ def test_base64io_encode_use_after_context_manager_exit():
with pytest.raises(ValueError) as excinfo:
stream.read()
excinfo.match(r'I/O operation on closed file.')
excinfo.match(r"I/O operation on closed file.")
@pytest.mark.parametrize('source_bytes', [case[0] for case in build_test_cases()])
@pytest.mark.parametrize("source_bytes", [case[0] for case in build_test_cases()])
def test_base64io_encode(source_bytes):
plaintext_source = os.urandom(source_bytes)
plaintext_b64 = base64.b64encode(plaintext_source)
@ -260,12 +248,9 @@ def test_base64io_encode(source_bytes):
assert plaintext_stream.getvalue() == plaintext_b64
@pytest.mark.parametrize('bytes_to_read, expected_bytes_read', (
(-1, io.DEFAULT_BUFFER_SIZE),
(0, io.DEFAULT_BUFFER_SIZE),
(1, 1),
(10, 10)
))
@pytest.mark.parametrize(
"bytes_to_read, expected_bytes_read", ((-1, io.DEFAULT_BUFFER_SIZE), (0, io.DEFAULT_BUFFER_SIZE), (1, 1), (10, 10))
)
def test_base64io_decode_readline(bytes_to_read, expected_bytes_read):
source_plaintext = os.urandom(io.DEFAULT_BUFFER_SIZE * 2)
source_stream = io.BytesIO(base64.b64encode(source_plaintext))
@ -279,10 +264,9 @@ def test_base64io_decode_readline(bytes_to_read, expected_bytes_read):
def build_b64_with_whitespace(source_bytes, line_length):
plaintext_source = os.urandom(source_bytes)
b64_plaintext = io.BytesIO(base64.b64encode(plaintext_source))
b64_plaintext_with_whitespace = b'\n'.join([
line for line
in iter(functools.partial(b64_plaintext.read, line_length), b'')
])
b64_plaintext_with_whitespace = b"\n".join(
[line for line in iter(functools.partial(b64_plaintext.read, line_length), b"")]
)
return plaintext_source, b64_plaintext_with_whitespace
@ -293,18 +277,18 @@ def build_whitespace_testcases():
# first read is mostly whitespace
plaintext, b64_plaintext = build_b64_with_whitespace(100, 20)
b64_plaintext = (b' ' * 80) + b64_plaintext
b64_plaintext = (b" " * 80) + b64_plaintext
scenarios.append((plaintext, b64_plaintext, 100))
# first several reads are entirely whitespace
plaintext, b64_plaintext = build_b64_with_whitespace(100, 20)
b64_plaintext = (b' ' * 500) + b64_plaintext
b64_plaintext = (b" " * 500) + b64_plaintext
scenarios.append((plaintext, b64_plaintext, 100))
return scenarios
@pytest.mark.parametrize('plaintext_source, b64_plaintext_with_whitespace, read_bytes', build_whitespace_testcases())
@pytest.mark.parametrize("plaintext_source, b64_plaintext_with_whitespace, read_bytes", build_whitespace_testcases())
def test_base64io_decode_with_whitespace(plaintext_source, b64_plaintext_with_whitespace, read_bytes):
with Base64IO(io.BytesIO(b64_plaintext_with_whitespace)) as decoder:
test = decoder.read(read_bytes)
@ -312,9 +296,9 @@ def test_base64io_decode_with_whitespace(plaintext_source, b64_plaintext_with_wh
assert test == plaintext_source[:read_bytes]
@pytest.mark.parametrize('plaintext_source, b64_plaintext_with_whitespace, read_bytes', (
(b'\x00\x00\x00', b'AAAA', 3),
))
@pytest.mark.parametrize(
"plaintext_source, b64_plaintext_with_whitespace, read_bytes", ((b"\x00\x00\x00", b"AAAA", 3),)
)
def test_base64io_decode_parametrized_null_bytes(plaintext_source, b64_plaintext_with_whitespace, read_bytes):
# Verifies that pytest is handling null bytes correctly (broken in 3.3.0)
# https://github.com/pytest-dev/pytest/issues/2957
@ -325,7 +309,7 @@ def test_base64io_decode_parametrized_null_bytes(plaintext_source, b64_plaintext
def test_base64io_decode_read_only_from_buffer():
plaintext_source = b'12345'
plaintext_source = b"12345"
plaintext_b64 = io.BytesIO(base64.b64encode(plaintext_source))
plaintext_wrapped = Base64IO(plaintext_b64)
@ -333,9 +317,9 @@ def test_base64io_decode_read_only_from_buffer():
test_2 = plaintext_wrapped.read(1)
test_3 = plaintext_wrapped.read()
assert test_1 == b'1'
assert test_2 == b'2'
assert test_3 == b'345'
assert test_1 == b"1"
assert test_2 == b"2"
assert test_3 == b"345"
def test_base64io_decode_context_manager():
@ -350,12 +334,10 @@ def test_base64io_decode_context_manager():
assert test.getvalue() == source_plaintext
@pytest.mark.parametrize('hint_bytes, expected_bytes_read', (
(-1, 102400),
(0, 102400),
(1, io.DEFAULT_BUFFER_SIZE),
(io.DEFAULT_BUFFER_SIZE + 99, io.DEFAULT_BUFFER_SIZE * 2)
))
@pytest.mark.parametrize(
"hint_bytes, expected_bytes_read",
((-1, 102400), (0, 102400), (1, io.DEFAULT_BUFFER_SIZE), (io.DEFAULT_BUFFER_SIZE + 99, io.DEFAULT_BUFFER_SIZE * 2)),
)
def test_base64io_decode_readlines(hint_bytes, expected_bytes_read):
source_plaintext = os.urandom(102400)
source_stream = io.BytesIO(base64.b64encode(source_plaintext))
@ -371,7 +353,7 @@ def test_base64io_decode_readlines(hint_bytes, expected_bytes_read):
def test_base64io_encode_writelines():
source_plaintext = [os.urandom(1024) for _ in range(100)]
b64_plaintext = base64.b64encode(b''.join(source_plaintext))
b64_plaintext = base64.b64encode(b"".join(source_plaintext))
test = io.BytesIO()
with Base64IO(test) as encoder:
@ -382,18 +364,18 @@ def test_base64io_encode_writelines():
def test_base64io_decode_file(tmpdir):
source_plaintext = os.urandom(1024 * 1024)
b64_plaintext = tmpdir.join('base64_plaintext')
b64_plaintext = tmpdir.join("base64_plaintext")
b64_plaintext.write(base64.b64encode(source_plaintext))
decoded_plaintext = tmpdir.join('decoded_plaintext')
decoded_plaintext = tmpdir.join("decoded_plaintext")
with open(str(b64_plaintext), 'rb') as source:
with open(str(b64_plaintext), "rb") as source:
# Separate lines to accommodate 2.6
with open(str(decoded_plaintext), 'wb') as raw:
with open(str(decoded_plaintext), "wb") as raw:
with Base64IO(source) as decoder:
for chunk in decoder:
raw.write(chunk)
with open(str(decoded_plaintext), 'rb') as raw:
with open(str(decoded_plaintext), "rb") as raw:
decoded = raw.read()
assert decoded == source_plaintext
@ -402,20 +384,20 @@ def test_base64io_decode_file(tmpdir):
def test_base64io_encode_file(tmpdir):
source_plaintext = os.urandom(1024 * 1024)
plaintext_b64 = base64.b64encode(source_plaintext)
plaintext = tmpdir.join('plaintext')
b64_plaintext = tmpdir.join('base64_plaintext')
plaintext = tmpdir.join("plaintext")
b64_plaintext = tmpdir.join("base64_plaintext")
with open(str(plaintext), 'wb') as file:
with open(str(plaintext), "wb") as file:
file.write(source_plaintext)
with open(str(plaintext), 'rb') as source:
with open(str(plaintext), "rb") as source:
# Separate lines to accommodate 2.6
with open(str(b64_plaintext), 'wb') as target:
with open(str(b64_plaintext), "wb") as target:
with Base64IO(target) as encoder:
for chunk in source:
encoder.write(chunk)
with open(str(b64_plaintext), 'rb') as file2:
with open(str(b64_plaintext), "rb") as file2:
encoded = file2.read()
assert encoded == plaintext_b64

18
tox.ini
View File

@ -93,6 +93,24 @@ commands =
--ignore F811,D103 \
test/
[testenv:blacken]
basepython = python3
deps =
black
commands =
black --line-length 120 \
src/base64io/ \
setup.py \
doc/conf.py \
test/
[testenv:black-check]
basepython = {[testenv:blacken]basepython}
deps =
{[testenv:blacken]deps}
commands =
{[testenv:blacken]commands} --diff
[testenv:pylint]
basepython = {[testenv:default-python]basepython}
deps =