Viewing file: exclusions.py (12.47 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# testing/exclusions.py # Copyright (C) 2005-2019 the SQLAlchemy authors and contributors # <see AUTHORS file> # # This module is part of SQLAlchemy and is released under # the MIT License: http://www.opensource.org/licenses/mit-license.php
import contextlib import operator import re
from . import config from .. import util from ..util import decorator from ..util.compat import inspect_getfullargspec
def skip_if(predicate, reason=None): rule = compound() pred = _as_predicate(predicate, reason) rule.skips.add(pred) return rule
def fails_if(predicate, reason=None): rule = compound() pred = _as_predicate(predicate, reason) rule.fails.add(pred) return rule
class compound(object): def __init__(self): self.fails = set() self.skips = set() self.tags = set()
def __add__(self, other): return self.add(other)
def add(self, *others): copy = compound() copy.fails.update(self.fails) copy.skips.update(self.skips) copy.tags.update(self.tags) for other in others: copy.fails.update(other.fails) copy.skips.update(other.skips) copy.tags.update(other.tags) return copy
def not_(self): copy = compound() copy.fails.update(NotPredicate(fail) for fail in self.fails) copy.skips.update(NotPredicate(skip) for skip in self.skips) copy.tags.update(self.tags) return copy
@property def enabled(self): return self.enabled_for_config(config._current)
def enabled_for_config(self, config): for predicate in self.skips.union(self.fails): if predicate(config): return False else: return True
def matching_config_reasons(self, config): return [ predicate._as_string(config) for predicate in self.skips.union(self.fails) if predicate(config) ]
def include_test(self, include_tags, exclude_tags): return bool( not self.tags.intersection(exclude_tags) and (not include_tags or self.tags.intersection(include_tags)) )
def _extend(self, other): self.skips.update(other.skips) self.fails.update(other.fails) self.tags.update(other.tags)
def __call__(self, fn): if hasattr(fn, "_sa_exclusion_extend"): fn._sa_exclusion_extend._extend(self) return fn
@decorator def decorate(fn, *args, **kw): return self._do(config._current, fn, *args, **kw)
decorated = decorate(fn) decorated._sa_exclusion_extend = self return decorated
@contextlib.contextmanager def fail_if(self): all_fails = compound() all_fails.fails.update(self.skips.union(self.fails))
try: yield except Exception as ex: all_fails._expect_failure(config._current, ex) else: all_fails._expect_success(config._current)
def _do(self, cfg, fn, *args, **kw): for skip in self.skips: if skip(cfg): msg = "'%s' : %s" % (fn.__name__, skip._as_string(cfg)) config.skip_test(msg)
try: return_value = fn(*args, **kw) except Exception as ex: self._expect_failure(cfg, ex, name=fn.__name__) else: self._expect_success(cfg, name=fn.__name__) return return_value
def _expect_failure(self, config, ex, name="block"): for fail in self.fails: if fail(config): print( ( "%s failed as expected (%s): %s " % (name, fail._as_string(config), str(ex)) ) ) break else: util.raise_from_cause(ex)
def _expect_success(self, config, name="block"): if not self.fails: return for fail in self.fails: if not fail(config): break else: raise AssertionError( "Unexpected success for '%s' (%s)" % ( name, " and ".join( fail._as_string(config) for fail in self.fails ), ) )
def requires_tag(tagname): return tags([tagname])
def tags(tagnames): comp = compound() comp.tags.update(tagnames) return comp
def only_if(predicate, reason=None): predicate = _as_predicate(predicate) return skip_if(NotPredicate(predicate), reason)
def succeeds_if(predicate, reason=None): predicate = _as_predicate(predicate) return fails_if(NotPredicate(predicate), reason)
class Predicate(object): @classmethod def as_predicate(cls, predicate, description=None): if isinstance(predicate, compound): return cls.as_predicate(predicate.enabled_for_config, description) elif isinstance(predicate, Predicate): if description and predicate.description is None: predicate.description = description return predicate elif isinstance(predicate, (list, set)): return OrPredicate( [cls.as_predicate(pred) for pred in predicate], description ) elif isinstance(predicate, tuple): return SpecPredicate(*predicate) elif isinstance(predicate, util.string_types): tokens = re.match( r"([\+\w]+)\s*(?:(>=|==|!=|<=|<|>)\s*([\d\.]+))?", predicate ) if not tokens: raise ValueError( "Couldn't locate DB name in predicate: %r" % predicate ) db = tokens.group(1) op = tokens.group(2) spec = ( tuple(int(d) for d in tokens.group(3).split(".")) if tokens.group(3) else None )
return SpecPredicate(db, op, spec, description=description) elif util.callable(predicate): return LambdaPredicate(predicate, description) else: assert False, "unknown predicate type: %s" % predicate
def _format_description(self, config, negate=False): bool_ = self(config) if negate: bool_ = not negate return self.description % { "driver": config.db.url.get_driver_name() if config else "<no driver>", "database": config.db.url.get_backend_name() if config else "<no database>", "doesnt_support": "doesn't support" if bool_ else "does support", "does_support": "does support" if bool_ else "doesn't support", }
def _as_string(self, config=None, negate=False): raise NotImplementedError()
class BooleanPredicate(Predicate): def __init__(self, value, description=None): self.value = value self.description = description or "boolean %s" % value
def __call__(self, config): return self.value
def _as_string(self, config, negate=False): return self._format_description(config, negate=negate)
class SpecPredicate(Predicate): def __init__(self, db, op=None, spec=None, description=None): self.db = db self.op = op self.spec = spec self.description = description
_ops = { "<": operator.lt, ">": operator.gt, "==": operator.eq, "!=": operator.ne, "<=": operator.le, ">=": operator.ge, "in": operator.contains, "between": lambda val, pair: val >= pair[0] and val <= pair[1], }
def __call__(self, config): engine = config.db
if "+" in self.db: dialect, driver = self.db.split("+") else: dialect, driver = self.db, None
if dialect and engine.name != dialect: return False if driver is not None and engine.driver != driver: return False
if self.op is not None: assert driver is None, "DBAPI version specs not supported yet"
version = _server_version(engine) oper = ( hasattr(self.op, "__call__") and self.op or self._ops[self.op] ) return oper(version, self.spec) else: return True
def _as_string(self, config, negate=False): if self.description is not None: return self._format_description(config) elif self.op is None: if negate: return "not %s" % self.db else: return "%s" % self.db else: if negate: return "not %s %s %s" % (self.db, self.op, self.spec) else: return "%s %s %s" % (self.db, self.op, self.spec)
class LambdaPredicate(Predicate): def __init__(self, lambda_, description=None, args=None, kw=None): spec = inspect_getfullargspec(lambda_) if not spec[0]: self.lambda_ = lambda db: lambda_() else: self.lambda_ = lambda_ self.args = args or () self.kw = kw or {} if description: self.description = description elif lambda_.__doc__: self.description = lambda_.__doc__ else: self.description = "custom function"
def __call__(self, config): return self.lambda_(config)
def _as_string(self, config, negate=False): return self._format_description(config)
class NotPredicate(Predicate): def __init__(self, predicate, description=None): self.predicate = predicate self.description = description
def __call__(self, config): return not self.predicate(config)
def _as_string(self, config, negate=False): if self.description: return self._format_description(config, not negate) else: return self.predicate._as_string(config, not negate)
class OrPredicate(Predicate): def __init__(self, predicates, description=None): self.predicates = predicates self.description = description
def __call__(self, config): for pred in self.predicates: if pred(config): return True return False
def _eval_str(self, config, negate=False): if negate: conjunction = " and " else: conjunction = " or " return conjunction.join( p._as_string(config, negate=negate) for p in self.predicates )
def _negation_str(self, config): if self.description is not None: return "Not " + self._format_description(config) else: return self._eval_str(config, negate=True)
def _as_string(self, config, negate=False): if negate: return self._negation_str(config) else: if self.description is not None: return self._format_description(config) else: return self._eval_str(config)
_as_predicate = Predicate.as_predicate
def _is_excluded(db, op, spec): return SpecPredicate(db, op, spec)(config._current)
def _server_version(engine): """Return a server_version_info tuple."""
# force metadata to be retrieved conn = engine.connect() version = getattr(engine.dialect, "server_version_info", None) if version is None: version = () conn.close() return version
def db_spec(*dbs): return OrPredicate([Predicate.as_predicate(db) for db in dbs])
def open(): # noqa return skip_if(BooleanPredicate(False, "mark as execute"))
def closed(): return skip_if(BooleanPredicate(True, "marked as skip"))
def fails(reason=None): return fails_if(BooleanPredicate(True, reason or "expected to fail"))
@decorator def future(fn, *arg): return fails_if(LambdaPredicate(fn), "Future feature")
def fails_on(db, reason=None): return fails_if(db, reason)
def fails_on_everything_except(*dbs): return succeeds_if(OrPredicate([Predicate.as_predicate(db) for db in dbs]))
def skip(db, reason=None): return skip_if(db, reason)
def only_on(dbs, reason=None): return only_if( OrPredicate( [Predicate.as_predicate(db, reason) for db in util.to_list(dbs)] ) )
def exclude(db, op, spec, reason=None): return skip_if(SpecPredicate(db, op, spec), reason)
def against(config, *queries): assert queries, "no queries sent!" return OrPredicate([Predicate.as_predicate(query) for query in queries])( config )
|