Source code for intercom_test.http_best_matches

"""Module for finding nearest imperfect match for HTTP request"""

from abc import ABC, abstractmethod
from base64 import b64encode
from collections import namedtuple
from collections.abc import Mapping, Sequence
from difflib import SequenceMatcher
from enum import IntEnum
from heapq import heappush, heappop
import itertools
import json
import Levenshtein
import math
from operator import itemgetter
import time
from typing import Iterable, Tuple, Sequence as SequenceType
from urllib.parse import urlparse, parse_qsl
from intercom_test.cases import hash_from_fields as case_hash
from intercom_test.utils import FilteredDictView

[docs]class Database: def __init__(self, cases: Iterable[dict], *, add_request_keys=()): super().__init__() if not isinstance(cases, Sequence): cases = list(cases) self._additional_request_keys = frozenset(add_request_keys) self._responses = dict((self._case_key(case), case) for case in cases) self._reqlines = _group_dict(cases, _reqline) self._urls = _group_dict(cases, _request_url) self._paths = _group_dict(cases, _request_url_path)
[docs] def get_case(self, request: dict): return self._responses.get(self._case_key(request))
[docs] def best_matches(self, request: dict, *, timeout: float = 0.3) -> dict: """Given HTTP request parameters, find the best known match""" return _Reporter(self, request, deadline=time.time() + timeout).report()
[docs] def json_exchange(self, request_json, reply_stream): request = json.loads(request_json) case = self.get_case(request) if case is not None: # Shallow copy, but only to ensure setting 'response status' # doesn't change the test case response = dict(case) # Ensure there is _always_ a 'response status' when a case is returned response.setdefault('response status', 200) else: response = self.best_matches(request).as_jsonic_data() # Ensure there is _never_ a 'response status' when diffs are returned response.pop('response status', None) json.dump(response, reply_stream)
def _case_key(self, request: dict): def request_key(k): return ( k in ('method', 'url', 'request body') or k in self._additional_request_keys ) def value_lens(v): import sys; from IPython.core.debugger import Pdb; sys.stdout.isatty() and Pdb().set_trace() if isinstance(v, bytes): return ('binary', str(v)) return v hash_input = FilteredDictView( request, key_filter=request_key, value_transform=value_lens, ) return case_hash(hash_input)
class _Reporter: def __init__(self, database, request, *, deadline=math.inf): super().__init__() self.database = database self.request = request self.deadline = deadline @property def request_body(self): return self.request.get('request body') def report(self, ): db = self.database request = self.request if _reqline(request) in db._reqlines: # TODO: Could be a missing additional field or unknown value in an additional field addnl_fields_mismatch = self._get_additional_fields_mismatch() if addnl_fields_mismatch: return addnl_fields_mismatch return self._report_closest_request_bodies() if _request_url(request) in db._urls: return self._report_available_methods() request_path = _request_url_path(request) if request_path in db._paths: return self._report_closest_query_params() def dist_from_request_path(url): return Levenshtein.distance(url, request_path) # TODO: Use a min-priority queue on dist_from_request_url and add items # to it until the deadline (or all items added), then pop the top 5 closest_paths = sorted(db._paths, key=dist_from_request_path)[:5] return AvailablePathsReport(list( (path, db._paths[path]) for path in closest_paths )) def _get_additional_fields_mismatch(self, ): cases = self.database._reqlines[_reqline(self.request)] addnl_request_keys = set(self.database._additional_request_keys).difference({'method', 'url', 'request body'}) def is_addnl_field(k): return isinstance(k, str) and k in addnl_request_keys possible_value_sets = [] for case in cases: fvals = sorted( FilteredDictView(case, key_filter=is_addnl_field).items() ) if fvals not in possible_value_sets: possible_value_sets.append(fvals) request_addnl_fields = sorted( FilteredDictView(self.request, key_filter=is_addnl_field).items() ) if request_addnl_fields in possible_value_sets: return None def dist_from_request_addnl_fields(case_addnl_fields): sm = SequenceMatcher(None, request_addnl_fields, case_addnl_fields) opcodes = sm.get_opcodes() return sum( max(i2 - i1, j2 - j1) for op_type, i1, i2, j1, j2 in opcodes if op_type != 'equal' ) closest_addnl_fields = sorted( possible_value_sets, key=dist_from_request_addnl_fields )[:5] return AvailableAdditionalFieldsReport(closest_addnl_fields) def _report_closest_request_bodies(self, ): # Report items in the database that have the most similar request bodies request_body = self.request_body reqbody_is_jsonic = _is_jsonic_body(request_body) reqbody_type = type(request_body) available_cases = list( case for case in self.database._reqlines[_reqline(self.request)] if type(case.get('request body')) == reqbody_type ) if reqbody_is_jsonic: jcomp = JsonComparer(self.request_body) near_reqbodies_heap = [] for i, case in enumerate(available_cases): if time.time() >= self.deadline: break diff = jcomp.diff(case.get('request body')) heappush(near_reqbodies_heap, ( diff.edit_distance(), i, diff, case )) closest_reqbody_entries = list( heappop(near_reqbodies_heap)[2:] for _ in range(min(5, len(near_reqbodies_heap))) ) return AvailableJsonRequestBodiesReport(closest_reqbody_entries) else: # Look for lowest Levenshtein distance between request_body and case['request body'] def distance_from_request_body(case): return Levenshtein.distance(request_body, case['request body']) # TODO: Use a min-priority queue on distance_from_request_body and # add items to it until the deadline (or all items added), then pop # the top 5 closest_reqbodies = sorted( available_cases, key=distance_from_request_body )[:5] return AvailableScalarRequestBodiesReport(closest_reqbodies) def _report_available_methods(self, ): # Report the HTTP methods that are valid for this URL (path and query-string) return AvailableHttpMethodsReport( _http_method(case) for case in self.database._urls[_request_url(self.request)] ) def _report_closest_query_params(self, ): # Report the query parameter deltas that would produce a known URL with # the same path part request_path, request_qsparams = _request_url(self.request) qscomp = QStringComparer(request_qsparams) near_urls_heap = [] for i, case in enumerate(self.database._paths[request_path]): if time.time() >= self.deadline: break diff = qscomp.diff(_request_url(case)[1]) heappush(near_urls_heap, ( diff.edits, i, diff, case )) closest_url_entries = list( heappop(near_urls_heap)[2:] for _ in range(min(5, len(near_urls_heap))) ) return AvailableQueryStringParamsetsReport(closest_url_entries)
[docs]class Report(ABC):
[docs] @abstractmethod def as_jsonic_data(self, ): """Convert this report to JSON data"""
[docs]class AvailableAdditionalFieldsReport(Report): def __init__(self, available_value_sets): super().__init__() self.available_value_sets = available_value_sets
[docs] def as_jsonic_data(self, ): return { # 'available additional test case field value sets': self.available_value_sets, 'available additional test case field value sets': list( dict(value_set) for value_set in self.available_value_sets ) }
[docs]class AvailablePathsReport(Report): def __init__(self, test_case_groups): super().__init__() self.test_case_groups = test_case_groups
[docs] def as_jsonic_data(self, ): return { 'closest URL paths': list( (path, list( case.get('description') for case in cases if 'description' in case )) for path, cases in self.test_case_groups ) }
[docs]class AvailableJsonRequestBodiesReport(Report): def __init__(self, diff_case_pairs): super().__init__() self.diff_case_pairs = diff_case_pairs
[docs] def as_jsonic_data(self, ): return { 'minimal JSON request body deltas': list( { 'case description': case.get('description'), 'diff': self._json_diff_json(diff), } for diff, case in self.diff_case_pairs ) }
@classmethod def _json_diff_json(cls, json_diff: 'JsonComparer.Delta'): if json_diff.structure_diffs: return {'alter substructures': list( cls._json_mod_json(mod) for mod in json_diff.structure_diffs )} if json_diff.structure_location_diffs: return {'rearrange substructures': list( cls._json_mod_json(mod) for mod in json_diff.structure_location_diffs )} if json_diff.scalar_diffs: return {'alter scalar values': list( cls._json_mod_json(mod) for mod in json_diff.scalar_diffs )} @classmethod def _json_mod_json(cls, mod): result = {} for k in set(mod).intersection({'alt', 'del'}): result[k] = mod[k] if any(k in mod for k in ('alt', 'add')): result.update( (k, cls._json_struct_desc(v)) for k, v in mod.items() if k in ('to', 'add') ) if 'set' in mod: result.update((k, mod[k]) for k in ('set', 'to')) return result @classmethod def _json_struct_desc(cls, sig): coll_type, element_type_info = sig if coll_type is JsonType.list: return list(jt.construct() for jt in element_type_info) if coll_type is JsonType.dict: return dict((name, jt.construct()) for name, jt in element_type_info)
[docs]class AvailableScalarRequestBodiesReport(Report): def __init__(self, test_cases): super().__init__() self.test_cases = test_cases
[docs] def as_jsonic_data(self, ): return {'closest request bodies': list( { 'case description': case.get('description'), **self._body_json(case['request body']), } for case in self.test_cases )}
@classmethod def _body_json(cls, body): main_key = 'request body' if isinstance(body, bytes): return { main_key: b64encode(body).decode('ASCII'), 'isBase64Encoded': True, } else: return {main_key: body}
[docs]class AvailableHttpMethodsReport(Report): def __init__(self, methods): super().__init__() self.methods = set(methods)
[docs] def as_jsonic_data(self, ): return {'available HTTP methods': list(self.methods)}
[docs]class AvailableQueryStringParamsetsReport(Report): def __init__(self, deltas): super().__init__() self.deltas = deltas
[docs] def as_jsonic_data(self, ): return { 'minimal query string deltas': list( { 'case description': case.get('description'), 'diff': self._qstring_diff_json(diff), } for diff, case in self.deltas ) }
def _qstring_diff_json(self, qsparam_diff: 'QStringComparer.Delta'): return { 'params with differing value sequences': qsparam_diff.params, 'mods': qsparam_diff.mods, }
def _request_url_path(case): return urlparse(case['url']).path def _request_url(case): url = urlparse(case['url']) qsparams = sorted( parse_qsl(url.query), # key=lambda i: (i[1][0], i[0]) key=itemgetter(0) ) return (url.path, tuple(qsparams)) def _http_method(case): return case.get('method', 'get').lower() def _reqline(case): return (_http_method(case), case['url']) def _group_dict(a: Iterable, key, *, value=lambda x: x) -> dict: result = {} for item in a: result.setdefault(key(item), []).append(value(item)) return result
[docs]class JsonWalker:
[docs] def walk(self, json_data): yield from self._walk_node(json_data, ())
def _walk_node(self, node, key_path): if isinstance(node, Mapping): yield from self._visit_dict(node, key_path) elif isinstance(node, (list, tuple)): yield from self._visit_list(node, key_path) else: yield from self._visit_scalar(node, key_path) def _visit_scalar(self, value, key_path): yield ((lookup_json_type(type(value)), value), value, key_path) def _visit_list(self, L, key_path): yield ((JsonType.list, tuple(lookup_json_type(type(i)) for i in L)), L, key_path) yield from ( event for i, item in enumerate(L) for event in self._walk_node(item, key_path + (i,)) ) def _visit_dict(self, d, key_path): yield ((JsonType.dict, frozenset((k, lookup_json_type(type(v))) for k, v in d.items())), d, key_path) yield from ( event for i, k in enumerate(sorted(d)) for event in self._walk_node(d[k], key_path + (k,)) )
[docs]class JsonMap: def __init__(self, json_data): super().__init__() self._root = json_data self._json_index = json_index = list(JsonWalker().walk(json_data)) @property def substruct_signatures(self): """Indexes correspond with :meth:`.substruct_key_paths`""" if not hasattr(self, '_substruct_signatures'): self._index_substructures() return self._substruct_signatures @property def substruct_key_paths(self): """Indexes correspond with :meth:`.substruct_signatures`""" if not hasattr(self, '_substruct_key_paths'): self._index_substructures() return self._substruct_key_paths @property def substruct_locations(self, ): if not hasattr(self, '_substruct_locations'): self._substruct_locations = tuple( (item[2], item[0]) for item in self._json_index if item[0][0].is_collection ) return self._substruct_locations
[docs] def items_from_signature(self, sig): if not hasattr(self, '_items_by_signature'): self._items_by_signature = _group_dict( self._json_index, key=itemgetter(0), value=itemgetter(1), ) return self._items_by_signature.get(sig, [])
@property def scalars(self): """Indexes correspond with :meth:`.scalar_key_paths`""" if not hasattr(self, '_scalars'): self._scalars = tuple( (item[2], item[1]) for item in self._json_index if not item[0][0].is_collection ) return self._scalars def _index_substructures(self, ): type_sorted_substructures = sorted( ( item for item in self._json_index if item[0][0].is_collection ), key=itemgetter(0) ) self._substruct_signatures = tuple( item[0] for item in type_sorted_substructures ) self._substruct_key_paths = tuple( item[2] for item in type_sorted_substructures )
[docs]class JsonComparer: """Utility to compare one JSON document with several others""" CONGRUENT_DATA = 'congruent options' def __init__(self, ref): super().__init__() self.ref_map = JsonMap(ref)
[docs] class Delta(tuple): """Differences between two JSON documents This difference always consists of three parts, in decreasing order of precedence: * Changes to which substructures are present, * Changes to where substructures are located, and * Changes to scalar values. Only one of these three will be non-empty. Use :meth:`.distance` to get a sortable distance measure for this delta. """ __slots__ = [] def __new__(cls, struct, struct_loc, scalar): return tuple.__new__(cls, (struct, struct_loc, scalar)) @property def structure_diffs(self): return self[0] @property def structure_location_diffs(self): return self[1] @property def scalar_diffs(self): return self[2]
[docs] def edit_distance(self, ): return tuple(map(len, self))
[docs] def diff(self, case) -> Delta: """Diffs two JSON documents The difference evaluation proceeds in three steps, with each of the later steps proceeding only if the earlier step produced no differences. The steps are: * All substructures (lists and dicts, with correct subitem signatures) are present. * All substructures are in the correct locations. * Each scalar value location holds the expected scalar value. To reflect this, the differences are returned as a tuple of three :class:`.Sequence`s wrapped in a :class:`.JsonComparer.Delta`, only one of which will contain any items: * Changes to which substructures are present. * Changes to where substructures are located. * Changes to scalar values. """ ref_map = self.ref_map case_map = JsonMap(case) # First, diff sorted substructures diffs = tuple(self._substruct_signature_diffs(ref_map, case_map)) if diffs: return self.Delta(diffs, (), ()) # Second, diff (key_path, substruct_sig) pairs diffs = tuple(self._substruct_location_diffs(ref_map, case_map)) if diffs: return self.Delta((), diffs, ()) # Finally, diff scalars in document order diffs = tuple(self._scalar_diffs(ref_map, case_map)) return self.Delta((), (), diffs)
def _substruct_signature_diffs(self, ref_map: JsonMap, case_map: JsonMap): sm = SequenceMatcher(None, ref_map.substruct_signatures, case_map.substruct_signatures) for op_type, i1, i2, j1, j2 in sm.get_opcodes(): if op_type == 'equal': continue current = (ref_map.substruct_key_paths[i] for i in range(i1, i2)) target = (case_map.substruct_signatures[j] for j in range(j1, j2)) # Use weird range iterator to prevent pulling an extra item from current while zipping for _, key_path, struct_sig in zip(_alt_range(i1, i2, j1, j2), current, target): yield {'alt': key_path, 'to': struct_sig, self.CONGRUENT_DATA: case_map.items_from_signature(struct_sig)} for key_path in current: yield {'del': key_path} for struct_sig in target: yield {'add': struct_sig, self.CONGRUENT_DATA: case_map.items_from_signature(struct_sig)} def _substruct_location_diffs(self, ref_map: JsonMap, case_map: JsonMap): sm = SequenceMatcher(None, ref_map.substruct_locations, case_map.substruct_locations) for op_type, i1, i2, j1, j2 in sm.get_opcodes(): if op_type == 'equal': continue current = (ref_map.substruct_locations[i][0] for i in range(i1, i2)) target = (case_map.substruct_locations[j][1] for j in range(j1, j2)) # Use weird range iterator to prevent pulling an extra item from current while zipping for _, key_path, struct_sig in zip(_alt_range(i1, i2, j1, j2), current, target): yield {'alt': key_path, 'to': struct_sig, self.CONGRUENT_DATA: case_map.items_from_signature(struct_sig)} for key_path in current: yield {'del': key_path} for struct_sig in target: yield {'add': struct_sig, self.CONGRUENT_DATA: case_map.items_from_signature(struct_sig)} def _scalar_diffs(self, ref_map: JsonMap, case_map: JsonMap): sm = SequenceMatcher(None, ref_map.scalars, case_map.scalars) opcodes = sm.get_opcodes() for op_type, i1, i2, j1, j2 in opcodes: if op_type == 'equal': continue current = (ref_map.scalars[i][0] for i in range(i1, i2)) target = (case_map.scalars[j][1] for j in range(j1, j2)) # Use weird range iterator to prevent pulling an extra item from current while zipping for _, key_path, value in zip(_alt_range(i1, i2, j1, j2), current, target): # For scalars, this is intentionally different than substructure yield {'set': key_path, 'to': value} for key_path in current: yield {'del': key_path} for value in target: yield {'ins': value}
[docs]class QStringComparer: """Utility to compare one URL query string with several others"""
[docs] class Delta(tuple): __slots__ = [] def __new__(cls, edits, params, mods): return tuple.__new__(cls, (edits, params, mods)) @property def edits(self): return self[0] @property def params(self): return self[1] @property def mods(self): return self[2]
def __init__(self, qsparams: SequenceType[Tuple[str, str]]): super().__init__() self.ref_qsparams = qsparams
[docs] def diff(self, case_qsparams: SequenceType[Tuple[str, str]]): sm = SequenceMatcher(None, self.ref_qsparams, case_qsparams) opcodes = sm.get_opcodes() delta_params = set() edits = 0 for op_type, i1, i2, j1, j2 in opcodes: if op_type == 'equal': continue delta_params.update(self.ref_qsparams[i][0] for i in range(i1, i2)) delta_params.update(case_qsparams[j][0] for j in range(j1, j2)) edits += max(i2 - i1, j2 - j1) delta_param_values = _group_dict( ( param for param in case_qsparams if param[0] in delta_params ), key=itemgetter(0), value=itemgetter(1), ) return self.Delta( edits, delta_param_values, tuple(self._mods(case_qsparams, opcodes)) )
def _mods(self, case_qsparams, opcodes): for op_type, i1, i2, j1, j2 in opcodes: if op_type == 'equal': continue current = (self.ref_qsparams[i] for i in range(i1, i2)) target = (case_qsparams[j] for j in range(j1, j2)) for _, cur, targ in zip(_alt_range(i1, i2, j1, j2), current, target): if cur[0] == targ[0]: yield {'field': cur[0], 'chg': cur[1], 'to': targ[1]} else: yield {'field': cur[0], 'del': cur[1]} yield {'field': targ[0], 'add': targ[1]} for field, value in current: yield {'field': field, 'del': value} for field, value in target: yield {'field': field, 'add': value}
JSON_TYPES = (type(None), str, int, float, list, dict) # list and dict must be the last two, in that order JsonType = IntEnum('JsonType', list(t.__name__ for t in JSON_TYPES)) JsonType.is_collection = property(lambda self: self >= self.list) JsonType.construct = lambda self: JSON_TYPES[self.value - 1]() lookup_json_type = dict(zip(JSON_TYPES, JsonType)).get def _is_jsonic_body(body): return not isinstance(body, (str, bytes)) def _alt_range(i1, i2, j1, j2): return range(min(i2 - i1, j2 - j1))