Browse Source

Added DjangoBackend, refactored other code

master
Johann Schmitz 5 years ago
parent
commit
ef5a21bd5b
Signed by: ercpe GPG Key ID: A084064277C501ED
  1. 1
      .travis.yml
  2. 1
      requirements_optional.txt
  3. 17
      src/phylter/backends/__init__.py
  4. 32
      src/phylter/backends/base.py
  5. 47
      src/phylter/backends/django_backend.py
  6. 18
      src/phylter/backends/objects.py
  7. 81
      tests/test_backends.py

1
.travis.yml

@ -10,6 +10,7 @@ python:
install:
- pip install -r requirements.txt
- pip install -r requirements_optional.txt
- pip install -r requirements_dev.txt
script: make travis

1
requirements_optional.txt

@ -0,0 +1 @@
django

17
src/phylter/backends/__init__.py

@ -1,10 +1,19 @@
# -*- coding: utf-8 -*-
from phylter.backends.objects import ObjectsBackend
backends = [
ObjectsBackend
]
backends = None
if backends is None:
backends = []
try:
from phylter.backends.django_backend import DjangoBackend
backends.append(DjangoBackend)
except ImportError:
pass
backends.append(ObjectsBackend)
def get_backend(o):
for b in backends:

32
src/phylter/backends/base.py

@ -1,6 +1,19 @@
# -*- coding: utf-8 -*-
import re
import sys
class Backend(object): # pragma: nocover
number_re = re.compile("^-?\d+(\.\d+)?$")
def digit_or_float(s):
return s.isdigit() or number_re.match(s) is not None
if sys.version_info[0] == 2:
str_types = (str, unicode, bytes)
else:
str_types = (str, bytes)
class Backend(object): # pragma: nocover
@staticmethod
def supports(o):
@ -8,3 +21,20 @@ class Backend(object): # pragma: nocover
def apply(self, query, iterable):
raise NotImplementedError
def get_compatible_value(self, value, field_type=None):
if field_type in str_types or (field_type is None and isinstance(value, str_types)):
if value[0] in ("'", '"') and value[0] == value[-1]:
# quoted string
return value[1:-1]
if field_type in (int, float) or (field_type is None and isinstance(value, (int, float))):
if isinstance(value, (int, float)):
return value
if isinstance(value, str_types) and digit_or_float(value):
return float(value)
return value
return value

47
src/phylter/backends/django_backend.py

@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
from django.db.models import Q
from django.db.models.manager import Manager
from django.db.models.query import QuerySet
from phylter.backends.base import Backend
from phylter.conditions import Condition, EqualsCondition, GreaterThanCondition, GreaterThanOrEqualCondition, \
LessThanCondition, LessThanOrEqualCondition, AndOperator, OrOperator
class DjangoBackend(Backend):
@staticmethod
def supports(o):
return isinstance(o, QuerySet) or isinstance(o, Manager)
def apply(self, query, iterable):
django_query = iterable if isinstance(iterable, QuerySet) else iterable.all()
for o in query.query:
django_query = self.apply_django_filter(django_query, o)
return django_query
def apply_django_filter(self, django_query, obj):
return django_query.filter(self.to_q(obj))
def to_q(self, obj):
if isinstance(obj, Condition):
suffix = {
EqualsCondition: "",
GreaterThanCondition: "__gt",
GreaterThanOrEqualCondition: "__gte",
LessThanCondition: "__lt",
LessThanOrEqualCondition: "__lte",
}[obj.__class__]
f = "%s%s" % (obj.left, suffix)
return Q(**{f: self.get_compatible_value(obj.right)})
if isinstance(obj, AndOperator):
return Q(self.to_q(obj.left), self.to_q(obj.right))
if isinstance(obj, OrOperator):
return Q(self.to_q(obj.left)) | Q(self.to_q(obj.right))
raise Exception("Unexpected item found in query: %s" % obj)

18
src/phylter/backends/objects.py

@ -4,10 +4,6 @@ from phylter.backends.base import Backend
from phylter.conditions import Condition, OrOperator, AndOperator, EqualsCondition, \
GreaterThanOrEqualCondition, LessThanCondition, LessThanOrEqualCondition, GreaterThanCondition
number_re = re.compile("^-?\d+(\.\d+)?$")
def digit_or_float(s):
return s.isdigit() or number_re.match(s) is not None
class ObjectsBackend(Backend):
@ -25,8 +21,8 @@ class ObjectsBackend(Backend):
def eval_op(self, op, item):
if isinstance(op, Condition):
left_value = self.get_item_value(item, op.left)
right_value = self.get_item_value(item, op.right)
left_value = getattr(item, op.left)
right_value = self.get_compatible_value(op.right, type(left_value))
return {
EqualsCondition: lambda a, b: a == b,
@ -43,13 +39,3 @@ class ObjectsBackend(Backend):
return self.eval_op(op.left, item) or self.eval_op(op.right, item)
raise Exception("Unexpected item found in query: %s" % op)
def get_item_value(self, item, name_or_value):
if isinstance(name_or_value, (int, float)) or digit_or_float(name_or_value):
return float(name_or_value)
if name_or_value[0] in ("'", '"') and name_or_value[0] == name_or_value[-1]:
# quoted string
return name_or_value[1:-1]
return getattr(item, name_or_value)

81
tests/test_backends.py

@ -1,7 +1,13 @@
# -*- coding: utf-8 -*-
import pytest
import sys
from django.db.models import Q
from django.db.models.manager import Manager
from django.db.models.query import QuerySet
from phylter.backends import backends, get_backend
from phylter.backends.base import Backend, str_types
from phylter.backends.django_backend import DjangoBackend
from phylter.backends.objects import ObjectsBackend
from phylter.conditions import EqualsCondition, GreaterThanCondition, GreaterThanOrEqualCondition, LessThanCondition, \
LessThanOrEqualCondition, AndOperator, OrOperator
@ -12,6 +18,7 @@ class TestBackends(object):
def test_objectbackend_exists(self):
assert ObjectsBackend in backends
assert DjangoBackend in backends
def test_get_backend(self):
class Foo(object):
@ -20,6 +27,29 @@ class TestBackends(object):
be = get_backend(Foo())
assert isinstance(be, ObjectsBackend)
@pytest.mark.skipif(sys.version_info >= (3, 0), reason="requires python 2")
def test_str_types_py2(self):
assert str_types == (str, unicode, bytes)
@pytest.mark.skipif(sys.version_info < (3, 0), reason="requires python 3")
def test_str_types_py3(self):
assert str_types == (str, bytes)
def test_get_compatible_value(self):
ob = Backend()
assert ob.get_compatible_value('"test"') == "test"
assert ob.get_compatible_value(10) == 10
assert ob.get_compatible_value("10", int) == 10
assert ob.get_compatible_value("10.0", int) == 10
assert ob.get_compatible_value("10.0", float) == 10
assert ob.get_compatible_value("-10", int) == -10
assert ob.get_compatible_value("-10.0", int) == -10
assert ob.get_compatible_value("-10.0", float) == -10
assert ob.get_compatible_value(True) == True
class TestObjectBackend(object):
def test_supports(self):
@ -30,24 +60,6 @@ class TestObjectBackend(object):
assert ObjectsBackend.supports(True)
assert ObjectsBackend.supports(Foo())
def test_get_item_value(self):
ob = ObjectsBackend()
assert ob.get_item_value(None, '"test"') == "test"
assert ob.get_item_value(None, "10") == 10
assert ob.get_item_value(None, "10.0") == 10.0
assert ob.get_item_value(None, "-10") == -10
assert ob.get_item_value(None, "-10.0") == -10.0
class Foo(object):
def __init__(self):
self.bar = 'baz'
assert ob.get_item_value(Foo(), 'bar') == 'baz'
with pytest.raises(AttributeError) as e:
assert ob.get_item_value(Foo(), 'bat')
def test_eval_op_condition(self):
ob = ObjectsBackend()
@ -84,3 +96,36 @@ class TestObjectBackend(object):
query = Query([EqualsCondition('a', 1)])
assert list(ob.apply(query, [Foo()])) == [Foo()]
assert list(query.apply([Foo()])) == [Foo()]
class TestDjangoBackend(object):
def test_supports(self):
assert DjangoBackend.supports(QuerySet())
assert DjangoBackend.supports(Manager())
assert not DjangoBackend.supports(Q())
def test_get_backend(self):
assert isinstance(get_backend(QuerySet()), DjangoBackend)
assert isinstance(get_backend(Manager()), DjangoBackend)
def test_to_q(self):
db = DjangoBackend()
assert db.to_q(EqualsCondition('a', 1)).children == [('a', 1)]
assert db.to_q(GreaterThanCondition('a', 0)).children == [('a__gt', 0)]
assert db.to_q(GreaterThanOrEqualCondition('a', 1)).children == [('a__gte', 1)]
assert db.to_q(LessThanCondition('a', 2)).children == [('a__lt', 2)]
assert db.to_q(LessThanOrEqualCondition('a', 1)).children == [('a__lte', 1)]
q = db.to_q(AndOperator(EqualsCondition('a', 1), GreaterThanCondition('a', 0)))
assert len(q.children) == 2
assert q.children[0].children == [('a', 1)]
assert q.children[1].children == [('a__gt', 0)]
assert q.connector == 'AND'
q = db.to_q(OrOperator(EqualsCondition('a', 1), GreaterThanCondition('a', 0)))
assert len(q.children) == 2
assert q.children[0].children == [('a', 1)]
assert q.children[1].children == [('a__gt', 0)]
assert q.connector == 'OR'