Source code for intercom_test.utils

# Copyright 2018 PayTrace, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

from contextlib import contextmanager, ExitStack
import enum
import functools
import inspect
import shutil
import tempfile

[docs]def def_enum(fn): """Decorator allowing a function to DRYly define an enumeration The decorated function should not require any arguments and should return an enumeration source, which will be passed to :class:`enum.Enum` along with the name of the decorated function. The resulting :class:`enum.Enum`-derived class will be returned. The value returned by *fn* can be any kind of *source* accepted by the functional API of :class:`enum.Enum`. """ return enum.Enum(fn.__name__, fn(), module=fn.__module__, qualname=fn.__qualname__)
[docs]@contextmanager def open_temp_copy(path, binary=False, *, blocksize=None): """Make a temporary copy of *path* and return the opened file The returned file object will be opened with mode ``'w+'`` or ``'w+b'`` (depending on *binary*) and will be positioned at the beginning of the file contents. If specified, *blocksize* indicates the size of the buffer to use (in bytes) when making the copy. """ bflag = 'b' if binary else '' with tempfile.TemporaryFile('w+' + bflag) as copied_file: with open(path, 'r' + bflag) as content: copy_kwargs = {} if blocksize is not None: copy_kwargs['length'] = blocksize shutil.copyfileobj(content, copied_file, **copy_kwargs) yield copied_file
[docs]class FilteredDictView: """:class:`dict`-like access to a key-filtered and value-transformed :class:`dict` Only _viewing_ methods are supported, not modifications. """
[docs] class Keys: def __init__(self, dview): super().__init__() self._dkeys = dview._d.keys() self._dview = dview def _key_filter(self, k): return self._dview._key_filter(k) def __iter__(self, ): return ( k for k in self._dkeys if self._key_filter(k) ) def __contains__(self, k): return k in self._dkeys and self._key_filter(k) def __repr__(self, ): return "dict_keys({!r})".format(list(self))
[docs] class Values: def __init__(self, dview): super().__init__() self._ditems = dview._d.items() self._dview = dview def _key_filter(self, k): return self._dview._key_filter(k) def _value_transform(self, v): return self._dview._value_transform(v) def __iter__(self, ): return ( self._value_transform(v) for k, v in self._ditems if self._key_filter(k) ) def __contains__(self, v): return any(cv == v for cv in self) def __repr__(self, ): return "dict_values({!r})".format(list(self))
[docs] class Items: def __init__(self, dview): super().__init__() self._ditems = dview._d.items() self._dview = dview def _key_filter(self, k): return self._dview._key_filter(k) def _value_transform(self, v): return self._dview._value_transform(v) def __iter__(self, ): return ( (k, self._value_transform(v)) for k, v in self._ditems if self._key_filter(k) ) def __contains__(self, v): return any(cv == v for cv in self) def __repr__(self, ): return "dict_items({!r})".format(list(self))
def __init__(self, d, *, key_filter=None, value_transform=None): """ :param d: A :class:`dict`-like object :keyword key_filter: A callable predicate for keys :keyword value_transform: A callable transform for values """ super().__init__() self._d = d self._key_filter = key_filter or self._unfiltered self._value_transform = value_transform or self._untransformed @staticmethod def _unfiltered(k): return True @staticmethod def _untransformed(v): return v
[docs] def get(self, k, defval=None): if k not in self._d or not self._key_filter(k): return defval return self._value_transform(self._d.get(k, defval))
def __getitem__(self, k): if not self._key_filter(k): raise KeyError(k) return self._value_transform(self._d[k]) def __contains__(self, k): return k in self._d and self._key_filter(k)
[docs] def items(self, ): return self.Items(self)
[docs] def keys(self, ): return self.Keys(self)
[docs] def values(self, ): return self.Values(self)
def __hash__(self, ): raise TypeError("unhashable type: '{}'".format(type(self).__qualname__))
[docs]def attributed_error(cls): """Expose exception instance constructor arguments (or specified names) as properties If the only purpose of the exception class constructor would be to generate properties from the argument names, the ``ATTRIBUTES`` attribute of the class can be assigned with either an iterable of :class:`str` or a single :class:`str` (which will be :meth:`str.split`) to more concisely specify the names to map to the arguments passed to the constructor. """ ctor_sig = inspect.signature(cls.__init__) prop_names = list( name for name, arg_info in ctor_sig.parameters.items() if arg_info.kind not in (arg_info.KEYWORD_ONLY, arg_info.VAR_KEYWORD) )[1:] if len(prop_names) == 1 and ctor_sig.parameters[prop_names[0]].kind is inspect.Parameter.VAR_POSITIONAL: prop_names = cls.ATTRIBUTES if isinstance(prop_names, str): prop_names = prop_names.split() else: prop_names = list(prop_names) for i, prop_name in enumerate(prop_names): value_extractor = functools.partial(_arg_extractor, i) value_extractor.__doc__ = f"Index {i} argument to the constructor" setattr(cls, prop_name, property(value_extractor)) return cls
def _arg_extractor(arg, exception): return exception.args[arg]
[docs]def optional_key(mapping, key): """Syntactic sugar for working with dict keys that *might* be present Typical usage:: for value in optional_key(d, 'answer'): # Body executed once, with *value* assigned ``d['answer']``, if # *d* contains ``'answer'``. The body of the ``for`` is not # executed at all, otherwise. print(f"The answer: {value}") """ if key in mapping: yield mapping[key]
[docs]def complex_test_context(fn): """Decorator to facilitate building a test environment through context managers The callable decorated should accept a *test case* and a *context entry callable*. The test case is simply passed through from the wrapper. The context entry callable should be called on a context manager to enter its context, and all contexts will be exited in reverse order when the decorated function exits. This compares to the ``defer`` statement in the Go programming language or ``scope(exit)`` at the function level for the D programming language. Example:: @complex_test_context def around_interface_case(case, setup): setup(database_fixtures(case)) setup(stubs(case)) yield """ @functools.wraps(fn) def wrapper(case): with ExitStack() as env: yield from fn(case, env.enter_context) return wrapper