# Copyright 2018 PayTrace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from base64 import b64encode
from codecs import ascii_decode
import hashlib
import itertools
import yaml
from .exceptions import DataParseError
from .json_asn1.convert import asn1_der
from .utils import def_enum
from .yaml_tools import value_from_event_stream as _value_from_events
[docs]def hash_from_fields(test_case):
"""Compute a string hash from any acyclic, JSON-ic :class:`dict`
:param dict test_case: test case data to be hashed
:returns: a repeatably generatable hash of *test_case*
:rtype: str
The hash is computed by encoding *test_case* in ASN1 DER (see
:const:`.json_asn1.types.ASN1_SOURCE` for the ASN1 syntax of the data
format), then hashing with SHA-256, and finally Base64 encoding to get the
result.
Note that this function hashes **all** key/value pairs of *test_case*.
"""
key = test_case if isinstance(test_case, dict) else dict(test_case)
key = asn1_der(key)
key = hashlib.sha256(key).digest()
key = ascii_decode(b64encode(key))[0]
return key
[docs]class IdentificationListReader:
"""Utility class to read case ID and associated events from a YAML event stream
This class is used internally to identify test cases when correlating the
test case with existing augmentation data for editing. In that case, both
the Python-native representation of the test case (for
:func:`hash_from_fields`) and the YAML event stream for the key/value pairs
(to preserve as much format from the source file) are needed.
"""
safe_loading = True
[docs] @def_enum
def State():
return "header content tail"
def __init__(self, key_fields, *, safe_loading=None):
super().__init__()
if safe_loading is not None and safe_loading is not self.safe_loading:
self.safe_loading = safe_loading
self._key_fields = frozenset(key_fields)
self._state = self.State.header
[docs] def read(self, event):
self._event = event
return getattr(self, '_read_from_' + self._state.name)(event)
def _read_from_header(self, event):
if not isinstance(event, yaml.NodeEvent):
pass
else:
self._expect(yaml.SequenceStartEvent)
self._state = self.State.content
self._depth = 0
self._ignoring_entry = 0
def _read_from_content(self, event):
emit = False
if isinstance(event, yaml.CollectionStartEvent):
if self._depth == 0:
self._accumulated_events = []
self._accumulating_mapping = isinstance(event, yaml.MappingStartEvent)
self._reading_assoc_value = False
self._depth += 1
elif isinstance(event, yaml.CollectionEndEvent):
self._depth -= 1
if self._depth == 0:
emit = True
if self._depth < 0:
self._state = self.State.tail
elif self._ignoring_entry:
if self._depth == 1:
self._ignoring_entry -= 1
elif (
self._depth == 1
and not self._reading_assoc_value
and isinstance(event, yaml.ScalarEvent)
and self._accumulating_mapping
):
if event.value in self._key_fields:
self._accumulated_events.append(event)
self._reading_assoc_value = True
else:
self._ignoring_entry = 1
# and don't append event to self._accumulated_events
elif (
self._depth == 2
and not self._reading_assoc_value
and isinstance(event, yaml.CollectionStartEvent)
and self._accumulating_mapping
):
self._ignoring_entry = 2 # we have to wait for self._depth to drop back to 1 *twice* to ignore this key and its corresponding value
# and don't append event to self._accumulated_events
else:
self._accumulated_events.append(event)
self._reading_assoc_value = False
if emit:
events = self._accumulated_events
del self._accumulated_events
return (self._key_from_events(events), events[1:-1])
def _read_from_tail(self, event):
pass
def _key_from_events(self, events):
given_values = _value_from_events(events, safe_loading=self.safe_loading)
return hash_from_fields(dict(
entry for entry in given_values.items()
if entry[0] in self._key_fields
))
def _expect(self, event_type):
if isinstance(self._event, event_type):
return
raise DataParseError(
"{} where {} expected"
" in line {} while reading {}".format(
type(self._event).__name__,
event_type.__name__,
self._event.start_mark.line,
self._state.name.replace('_', ' '),
)
)