Browse Source

Converted tabs to spaces

master
Johann Schmitz 3 years ago
parent
commit
61130b79de
Signed by: ercpe <johann@j-schmitz.net> GPG Key ID: A084064277C501ED
10 changed files with 1117 additions and 1117 deletions
  1. +12
    -12
      setup.py
  2. +23
    -23
      src/maxd/__main__.py
  3. +174
    -174
      src/maxd/config.py
  4. +39
    -39
      src/maxd/daemon.py
  5. +26
    -26
      src/maxd/fetcher.py
  6. +307
    -307
      src/maxd/worker.py
  7. +170
    -170
      tests/test_config.py
  8. +2
    -2
      tests/test_daemon.py
  9. +43
    -43
      tests/test_fetcher.py
  10. +321
    -321
      tests/test_worker.py

+ 12
- 12
setup.py View File

@@ -6,16 +6,16 @@ from setuptools import setup, find_packages
VERSION="0.1"

setup(
name='pymaxd',
version=VERSION,
description='Daemon for the MAX Cube',
author='Johann Schmitz',
author_email='johann@j-schmitz.net',
url='https://ercpe.de/projects/pymaxd',
download_url='https://code.not-your-server.de/pymaxd.git/tags/%s.tar.gz' % VERSION,
packages=find_packages('src'),
package_dir={'': 'src'},
include_package_data=True,
zip_safe=False,
license='GPL-3',
name='pymaxd',
version=VERSION,
description='Daemon for the MAX Cube',
author='Johann Schmitz',
author_email='johann@j-schmitz.net',
url='https://ercpe.de/projects/pymaxd',
download_url='https://code.not-your-server.de/pymaxd.git/tags/%s.tar.gz' % VERSION,
packages=find_packages('src'),
package_dir={'': 'src'},
include_package_data=True,
zip_safe=False,
license='GPL-3',
)

+ 23
- 23
src/maxd/__main__.py View File

@@ -9,34 +9,34 @@ logger = logging.getLogger(__name__)
from maxd.daemon import Daemon

if __name__ == "__main__": # pragma: nocover
parser = ArgumentParser()
parser.add_argument('-c', '--config', default='/etc/maxd.cfg', help="Config file to use (default: %(default)s)")
parser.add_argument('-v', '--verbose', action="count", default=1, help='Increase verbosity')
parser.add_argument('-d', '--debug', action='store_true', default=False, help="Enabled debug messages from the pymax library")
parser = ArgumentParser()
parser.add_argument('-c', '--config', default='/etc/maxd.cfg', help="Config file to use (default: %(default)s)")
parser.add_argument('-v', '--verbose', action="count", default=1, help='Increase verbosity')
parser.add_argument('-d', '--debug', action='store_true', default=False, help="Enabled debug messages from the pymax library")

parser.add_argument('--log-target', default='syslog')
parser.add_argument('--log-target', default='syslog')

args = parser.parse_args()
args = parser.parse_args()

if args.log_target == 'syslog':
logging.basicConfig(level=logging.FATAL - (10 * args.verbose), format='maxd: [%(levelname)s] %(message)s',
handlers=(SysLogHandler('/dev/log', facility=SysLogHandler.LOG_DAEMON), ))
else:
logging.basicConfig(level=logging.FATAL - (10 * args.verbose), format='%(asctime)s %(levelname)-7s %(message)s')
if args.log_target == 'syslog':
logging.basicConfig(level=logging.FATAL - (10 * args.verbose), format='maxd: [%(levelname)s] %(message)s',
handlers=(SysLogHandler('/dev/log', facility=SysLogHandler.LOG_DAEMON), ))
else:
logging.basicConfig(level=logging.FATAL - (10 * args.verbose), format='%(asctime)s %(levelname)-7s %(message)s')

if not args.debug:
pymax_logger = logging.getLogger('pymax')
pymax_logger.setLevel(logging.WARNING)
if not args.debug:
pymax_logger = logging.getLogger('pymax')
pymax_logger.setLevel(logging.WARNING)

daemon = None
daemon = None

def stop_daemon(signum, frame):
if daemon:
logger.debug("Stopping daemon")
daemon.stop()
def stop_daemon(signum, frame):
if daemon:
logger.debug("Stopping daemon")
daemon.stop()

signal.signal(signal.SIGTERM, stop_daemon)
signal.signal(signal.SIGINT, stop_daemon)
signal.signal(signal.SIGTERM, stop_daemon)
signal.signal(signal.SIGINT, stop_daemon)

daemon = Daemon(args.config)
daemon.run()
daemon = Daemon(args.config)
daemon.run()

+ 174
- 174
src/maxd/config.py View File

@@ -6,210 +6,210 @@ import re
from functools import wraps

try:
from ConfigParser import ConfigParser
from ConfigParser import ConfigParser
except ImportError: # pragma: no cover
from configparser import ConfigParser
from configparser import ConfigParser

logger = logging.getLogger(__name__)

def timediff(func):
def _wrapper(*args):
result = func(*args)
def _wrapper(*args):
result = func(*args)

if isinstance(result, datetime.timedelta):
return result
if isinstance(result, datetime.timedelta):
return result

if isinstance(result, int) or (isinstance(result, str) and result.isdigit()):
return datetime.timedelta(minutes=int(result))
if isinstance(result, int) or (isinstance(result, str) and result.isdigit()):
return datetime.timedelta(minutes=int(result))

m = re.match("(\d{1,2})\:(\d{1,2})", result)
if m:
hours, minutes = m.groups()
return datetime.timedelta(hours=int(hours), minutes=int(minutes))
m = re.match("(\d{1,2})\:(\d{1,2})", result)
if m:
hours, minutes = m.groups()
return datetime.timedelta(hours=int(hours), minutes=int(minutes))

raise ValueError("Unparsable time diff: %s" % result)
raise ValueError("Unparsable time diff: %s" % result)

return _wrapper
return _wrapper

def max_value(max, allow_none=True):
def _inner(func):
def _wrapper(*args):
value = func(*args)
def _inner(func):
def _wrapper(*args):
value = func(*args)

if value is None:
if allow_none:
return value
else:
logger.info("Limiting value of %s to max %s" % (value, max))
return max
if value is None:
if allow_none:
return value
else:
logger.info("Limiting value of %s to max %s" % (value, max))
return max

if value > max:
logger.info("Limiting value of %s to max %s" % (value, max))
return max
if value > max:
logger.info("Limiting value of %s to max %s" % (value, max))
return max

return value
return _wrapper
return _inner
return value
return _wrapper
return _inner

def min_value(min, allow_none=True):
def _inner(func):
def _wrapper(*args):
value = func(*args)
def _inner(func):
def _wrapper(*args):
value = func(*args)

if value is None:
if allow_none:
return value
else:
logger.info("Limiting value of %s to min %s" % (value, min))
return min
if value is None:
if allow_none:
return value
else:
logger.info("Limiting value of %s to min %s" % (value, min))
return min

if value < min:
logger.info("Limiting value of %s to min %s" % (value, min))
return min
if value < min:
logger.info("Limiting value of %s to min %s" % (value, min))
return min

return value
return _wrapper
return _inner
return value
return _wrapper
return _inner


def time_range(s):
if (s or "").strip():
periods = [x.strip() for x in s.split(',')]
if (s or "").strip():
periods = [x.strip() for x in s.split(',')]

for p in periods:
if not p:
continue
m = re.match(r"(\d{1,2}):(\d{1,2})\s*-\s*(\d{1,2}):(\d{1,2})", p)
if not m:
raise ValueError("'%s' does not match 'hh:mm - hh:mm'" % p)
yield int(m.group(1)), int(m.group(2)), int(m.group(3)), int(m.group(4))
for p in periods:
if not p:
continue
m = re.match(r"(\d{1,2}):(\d{1,2})\s*-\s*(\d{1,2}):(\d{1,2})", p)
if not m:
raise ValueError("'%s' does not match 'hh:mm - hh:mm'" % p)
yield int(m.group(1)), int(m.group(2)), int(m.group(3)), int(m.group(4))


class CalendarConfig(collections.namedtuple('CalendarConfig', ('name', 'url', 'username', 'password', 'filter'))):

def __new__(cls, **kwargs):
kwargs.setdefault('username', None)
kwargs.setdefault('password', None)
kwargs.setdefault('filter', None)
return super(CalendarConfig, cls).__new__(cls, **kwargs)
def __new__(cls, **kwargs):
kwargs.setdefault('username', None)
kwargs.setdefault('password', None)
kwargs.setdefault('filter', None)
return super(CalendarConfig, cls).__new__(cls, **kwargs)

@property
def auth(self):
return bool(self.username and self.password)
@property
def auth(self):
return bool(self.username and self.password)


class Configuration(object):

def __init__(self, config_path):
self.path = config_path
self.cfg_parser = None
self._calendar = None
self._static = None
self.reload()
def reload(self):
self.cfg_parser = ConfigParser()
if not self.cfg_parser.read(self.path) == [self.path]:
raise Exception("Failed to read configuration file %s" % self.path)
def get_option(self, section, option, default=None):
return self.cfg_parser.get(section, option) if self.cfg_parser.has_option(section, option) else default
def get_int(self, section, option, default=None):
return self.cfg_parser.getint(section, option) if self.cfg_parser.has_option(section, option) else default
@property
def calendars(self):
if self._calendar is None:
self._calendar = []
names = [x.strip() for x in self.get_option('GENERAL', 'calendars', '').split(',') if x.strip()]
for section_name in names:
url = self.get_option(section_name, 'url')
if not url:
logger.warning("Ignoring calendar '%s' (missing url)" % section_name)
continue
calconf = CalendarConfig(name=section_name, url=url,
username=self.get_option(section_name, 'username'),
password=self.get_option(section_name, 'password'))
self._calendar.append(calconf)
return self._calendar
@property
@max_value(datetime.timedelta(minutes=180))
@timediff
def warmup_duration(self):
return self.get_int('GENERAL', 'warmup', 30)
@property
@max_value(30)
@min_value(5)
def high_temperature(self):
return self.get_int('GENERAL', 'high_temperature', 24)
@property
@max_value(30)
@min_value(5)
def low_temperature(self):
return self.get_int('GENERAL', 'low_temperature', 10)
@property
def cube_serial(self):
return self.get_option('cube', 'serial')
@property
def cube_address(self):
return self.get_option('cube', 'address')
@property
def cube_port(self):
return self.get_int('cube', 'port', None)
@property
def cube_timezone(self):
return self.get_option('cube', 'timezone')
@property
def static_schedule(self):
if self._static is None:
schedule = {}
for weekday, name in (
(0, 'monday'),
(1, 'tuesday'),
(2, 'wednesday'),
(3, 'thursday'),
(4, 'friday'),
(5, 'saturday'),
(6, 'sunday'),
):
s = self.get_option('static', name)
schedule[weekday] = [
(datetime.time(a, b), datetime.time(c, d)) for a, b, c, d in time_range(s)
]
self._static = schedule
return self._static
@property
def room_id(self):
return self.get_int('room', 'id')
@property
def room_name(self):
return self.get_int('room', 'name')
@property
def room_rf_addr(self):
return self.get_int('room', 'rf_addr')
@property
def has_room_settings(self):
return self.room_id or self.room_name or self.room_rf_addr
@property
def allday_range(self):
a, b, c, d = list(time_range(self.get_option('GENERAL', 'allday', '06:00 - 23:00')))[0]
return datetime.time(a, b), datetime.time(c, d)
def __init__(self, config_path):
self.path = config_path
self.cfg_parser = None
self._calendar = None
self._static = None
self.reload()
def reload(self):
self.cfg_parser = ConfigParser()
if not self.cfg_parser.read(self.path) == [self.path]:
raise Exception("Failed to read configuration file %s" % self.path)
def get_option(self, section, option, default=None):
return self.cfg_parser.get(section, option) if self.cfg_parser.has_option(section, option) else default
def get_int(self, section, option, default=None):
return self.cfg_parser.getint(section, option) if self.cfg_parser.has_option(section, option) else default
@property
def calendars(self):
if self._calendar is None:
self._calendar = []
names = [x.strip() for x in self.get_option('GENERAL', 'calendars', '').split(',') if x.strip()]
for section_name in names:
url = self.get_option(section_name, 'url')
if not url:
logger.warning("Ignoring calendar '%s' (missing url)" % section_name)
continue
calconf = CalendarConfig(name=section_name, url=url,
username=self.get_option(section_name, 'username'),
password=self.get_option(section_name, 'password'))
self._calendar.append(calconf)
return self._calendar
@property
@max_value(datetime.timedelta(minutes=180))
@timediff
def warmup_duration(self):
return self.get_int('GENERAL', 'warmup', 30)
@property
@max_value(30)
@min_value(5)
def high_temperature(self):
return self.get_int('GENERAL', 'high_temperature', 24)
@property
@max_value(30)
@min_value(5)
def low_temperature(self):
return self.get_int('GENERAL', 'low_temperature', 10)
@property
def cube_serial(self):
return self.get_option('cube', 'serial')
@property
def cube_address(self):
return self.get_option('cube', 'address')
@property
def cube_port(self):
return self.get_int('cube', 'port', None)
@property
def cube_timezone(self):
return self.get_option('cube', 'timezone')
@property
def static_schedule(self):
if self._static is None:
schedule = {}
for weekday, name in (
(0, 'monday'),
(1, 'tuesday'),
(2, 'wednesday'),
(3, 'thursday'),
(4, 'friday'),
(5, 'saturday'),
(6, 'sunday'),
):
s = self.get_option('static', name)
schedule[weekday] = [
(datetime.time(a, b), datetime.time(c, d)) for a, b, c, d in time_range(s)
]
self._static = schedule
return self._static
@property
def room_id(self):
return self.get_int('room', 'id')
@property
def room_name(self):
return self.get_int('room', 'name')
@property
def room_rf_addr(self):
return self.get_int('room', 'rf_addr')
@property
def has_room_settings(self):
return self.room_id or self.room_name or self.room_rf_addr
@property
def allday_range(self):
a, b, c, d = list(time_range(self.get_option('GENERAL', 'allday', '06:00 - 23:00')))[0]
return datetime.time(a, b), datetime.time(c, d)

+ 39
- 39
src/maxd/daemon.py View File

@@ -11,50 +11,50 @@ logger = logging.getLogger(__name__)

class WorkerThread(threading.Thread):

def __init__(self, config_file, *args, **kwargs):
super(WorkerThread, self).__init__(*args, **kwargs)
self.config_file = config_file
self.timer = None
self.exit = threading.Event()
def __init__(self, config_file, *args, **kwargs):
super(WorkerThread, self).__init__(*args, **kwargs)
self.config_file = config_file
self.timer = None
self.exit = threading.Event()

def run(self):
worker = Worker(Configuration(self.config_file))
def run(self):
worker = Worker(Configuration(self.config_file))

def _exec():
try:
worker.execute()
except:
logger.exception("Worker failure")
def _exec():
try:
worker.execute()
except:
logger.exception("Worker failure")

_exec()
while not self.exit.wait(10):
_exec()
_exec()
while not self.exit.wait(10):
_exec()

logger.info("worker thread exiting")
logger.info("worker thread exiting")


class Daemon(object):

def __init__(self, config_file, *args, **kwargs):
super(Daemon, self).__init__(*args, **kwargs)
self.config_file = config_file
self.worker_thread = None
def run(self):
logger.info("Starting worker thread")
self.worker_thread = WorkerThread(self.config_file)
self.worker_thread.daemon = True
self.worker_thread.start()
while True:
time.sleep(1)
if self.worker_thread.exit.is_set():
logger.debug("Waiting for worker thread to exit..")
self.worker_thread.join()
break
def stop(self):
logger.debug("Stopping worker thread")
self.worker_thread.exit.set()
self.worker_thread.join()
logger.debug("Worker Thread join()ed")
def __init__(self, config_file, *args, **kwargs):
super(Daemon, self).__init__(*args, **kwargs)
self.config_file = config_file
self.worker_thread = None
def run(self):
logger.info("Starting worker thread")
self.worker_thread = WorkerThread(self.config_file)
self.worker_thread.daemon = True
self.worker_thread.start()
while True:
time.sleep(1)
if self.worker_thread.exit.is_set():
logger.debug("Waiting for worker thread to exit..")
self.worker_thread.join()
break
def stop(self):
logger.debug("Stopping worker thread")
self.worker_thread.exit.set()
self.worker_thread.join()
logger.debug("Worker Thread join()ed")

+ 26
- 26
src/maxd/fetcher.py View File

@@ -6,42 +6,42 @@ from requests.auth import HTTPBasicAuth

class EventFetcher(object):

def fetch(self, calendar_config):
raise NotImplementedError # pragma: nocover
def fetch(self, calendar_config):
raise NotImplementedError # pragma: nocover


class LocalCalendarEventFetcher(EventFetcher):

def fetch(self, calendar_config):
with open(calendar_config.url, 'r') as f:
calendar = Calendar.from_ical(f.read())
for item in calendar.walk():
if item.name != "VEVENT":
continue
def fetch(self, calendar_config):
with open(calendar_config.url, 'r') as f:
calendar = Calendar.from_ical(f.read())
for item in calendar.walk():
if item.name != "VEVENT":
continue

yield item
yield item


class HTTPCalendarEventFetcher(EventFetcher):

def __init__(self):
self.session = CacheControl(requests.session())
def __init__(self):
self.session = CacheControl(requests.session())

def fetch(self, calendar_config):
req_kwargs = {
'headers': {
'Accept': 'text/calendar'
}
}
if calendar_config.auth:
req_kwargs['auth'] = HTTPBasicAuth(calendar_config.username, calendar_config.password)
def fetch(self, calendar_config):
req_kwargs = {
'headers': {
'Accept': 'text/calendar'
}
}
if calendar_config.auth:
req_kwargs['auth'] = HTTPBasicAuth(calendar_config.username, calendar_config.password)

response = self.session.get(calendar_config.url, **req_kwargs)
response.raise_for_status()
response = self.session.get(calendar_config.url, **req_kwargs)
response.raise_for_status()

calendar = Calendar.from_ical(response.content)
for item in calendar.walk():
if item.name != "VEVENT":
continue
calendar = Calendar.from_ical(response.content)
for item in calendar.walk():
if item.name != "VEVENT":
continue

yield item
yield item

+ 307
- 307
src/maxd/worker.py View File

@@ -15,353 +15,353 @@ from maxd.fetcher import HTTPCalendarEventFetcher
from maxd.fetcher import LocalCalendarEventFetcher

try:
from urlparse import urlsplit
from urlparse import urlsplit
except ImportError: # pragma: nocover
from urllib.parse import urlsplit
from urllib.parse import urlsplit

logger = logging.getLogger(__name__)

weekday_names = ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday')

def _to_utc_datetime(dt):
if dt is None:
return None
if dt is None:
return None

if dt.tzinfo == pytz.UTC:
return dt
return dt.astimezone(pytz.UTC)
if dt.tzinfo == pytz.UTC:
return dt
return dt.astimezone(pytz.UTC)


class Event(collections.namedtuple('Event', ('name', 'start', 'end'))):

def __new__(cls, **kwargs):
kwargs['start'] = _to_utc_datetime(kwargs.pop('start', None))
kwargs['end'] = _to_utc_datetime(kwargs.pop('end', None))
def __new__(cls, **kwargs):
kwargs['start'] = _to_utc_datetime(kwargs.pop('start', None))
kwargs['end'] = _to_utc_datetime(kwargs.pop('end', None))

return super(Event, cls).__new__(cls, **kwargs)
return super(Event, cls).__new__(cls, **kwargs)


class Schedule(object):

def __init__(self, weekday_events={}):
self.events = weekday_events or {}
def __init__(self, weekday_events={}):
self.events = weekday_events or {}

def __add__(self, other):
if not isinstance(other, Schedule):
raise ValueError("Cannot add %s instance to %s" % (other.__class__.__name__, self.__class__.__name__))
def __add__(self, other):
if not isinstance(other, Schedule):
raise ValueError("Cannot add %s instance to %s" % (other.__class__.__name__, self.__class__.__name__))

for k, v in other.items():
self.events[k] = self.events.get(k, []) + v
for k, v in other.items():
self.events[k] = self.events.get(k, []) + v

return self
return self

def items(self):
return self.events.items()
def items(self):
return self.events.items()

def effective(self):
new = {}
def effective(self):
new = {}

for weekday, periods in self.events.items():
periods = sorted(periods)
new_periods = []
for weekday, periods in self.events.items():
periods = sorted(periods)
new_periods = []

while periods:
current = periods.pop(0)
other = periods
while periods:
current = periods.pop(0)
other = periods

# remove current period if another period starts before and ends after
if any((p[0] < current[0] and p[1] > current[1] for p in other)) or \
any((p[0] < current[0] and p[1] > current[1] for p in new_periods)):
logger.debug("Removing period %s because it's contained in a larger period" % (current, ))
continue
# remove current period if another period starts before and ends after
if any((p[0] < current[0] and p[1] > current[1] for p in other)) or \
any((p[0] < current[0] and p[1] > current[1] for p in new_periods)):
logger.debug("Removing period %s because it's contained in a larger period" % (current, ))
continue

new_periods.append(current)
new_periods.append(current)

# extend periods (change start or end)
periods = sorted(new_periods)
new_periods = []
while periods:
current = periods.pop(0)
other = periods
# extend periods (change start or end)
periods = sorted(new_periods)
new_periods = []
while periods:
current = periods.pop(0)
other = periods

candidates = [
(s, e) for s, e in other
if (current[0] <= s <= current[1]) or (current[0] <= e <= current[1])
]
if candidates:
new_start = min(p[0] for p in candidates + [current])
new_end = max(p[1] for p in candidates + [current])
logger.debug("Replacing %s and %s with new: %s to %s" % (current, candidates, new_start, new_end))
new_periods.append((new_start, new_end))
for c in candidates:
del periods[periods.index(c)]
else:
new_periods.append(current)
candidates = [
(s, e) for s, e in other
if (current[0] <= s <= current[1]) or (current[0] <= e <= current[1])
]
if candidates:
new_start = min(p[0] for p in candidates + [current])
new_end = max(p[1] for p in candidates + [current])
logger.debug("Replacing %s and %s with new: %s to %s" % (current, candidates, new_start, new_end))
new_periods.append((new_start, new_end))
for c in candidates:
del periods[periods.index(c)]
else:
new_periods.append(current)

new[weekday] = new_periods
new[weekday] = new_periods

return Schedule(new)
return Schedule(new)

def as_timezone(self, tz):
for wd, periods in self.events.items():
self.events[wd] = [
(start.astimezone(tz), end.astimezone(tz)) for start, end in periods
]
def as_timezone(self, tz):
for wd, periods in self.events.items():
self.events[wd] = [
(start.astimezone(tz), end.astimezone(tz)) for start, end in periods
]

def to_program(self, weekday, low_temp, high_temp):
periods = self.events[weekday]
def to_program(self, weekday, low_temp, high_temp):
periods = self.events[weekday]

start = datetime.time()
for pstart, pend in periods:
yield ProgramSchedule(low_temp, start, pstart.time())
yield ProgramSchedule(high_temp, pstart.time(), pend.time())
start = pend.time()
start = datetime.time()
for pstart, pend in periods:
yield ProgramSchedule(low_temp, start, pstart.time())
yield ProgramSchedule(high_temp, pstart.time(), pend.time())
start = pend.time()

end_of_day = 1440
if ((start.hour * 60) + start.minute) < end_of_day:
yield ProgramSchedule(low_temp, start, end_of_day)
end_of_day = 1440
if ((start.hour * 60) + start.minute) < end_of_day:
yield ProgramSchedule(low_temp, start, end_of_day)

def __eq__(self, other):
return isinstance(other, Schedule) and self.events == other.events
def __eq__(self, other):
return isinstance(other, Schedule) and self.events == other.events


class Worker(object):

def __init__(self, config):
self.config = config
self.exception = None
self._current_schedule = None
def __init__(self, config):
self.config = config
self.exception = None
self._current_schedule = None

def execute(self):
logger.info("Running...")
def execute(self):
logger.info("Running...")

start = datetime.datetime.now(tz=pytz.UTC).replace(hour=0, minute=0, second=0, microsecond=0)
end = start + datetime.timedelta(days=7) - datetime.timedelta(seconds=1)
start = datetime.datetime.now(tz=pytz.UTC).replace(hour=0, minute=0, second=0, microsecond=0)
end = start + datetime.timedelta(days=7) - datetime.timedelta(seconds=1)

logger.info("Start: %s, end: %s" % (start, end))
logger.info("Start: %s, end: %s" % (start, end))

events = []
for calendar_config in self.config.calendars:
try:
events.extend(self.fetch_events(calendar_config, start, end))
except:
logger.exception("Failed to read events from %s" % calendar_config.name)
static_schedule = self.get_static_schedule(start)
calendar_schedule = self.create_schedule(events)
if logger.isEnabledFor(logging.DEBUG):
def _debug_schedule(schedule):
for wd in sorted(schedule.events.keys()):
logger.debug(" %s:" % weekday_names[wd])
for start, end in sorted(schedule.events[wd]):
logger.debug(" %s -> %s" % (start, end))
logger.debug("Static schedule:")
_debug_schedule(static_schedule)
logger.debug("Calendar events schedule:")
_debug_schedule(calendar_schedule)
self.apply_schedule(static_schedule + calendar_schedule)
def get_static_schedule(self, start):
d = {}
for day in range(0, 7):
# use start (of the week we are looking at), reset to midnight and add x days
dt = start.astimezone(pytz.UTC).replace(hour=0, minute=0, second=0, microsecond=0) + datetime.timedelta(days=day)
# static schedules are always considered the local timezone
local_dt = dt.astimezone(dateutil.tz.tzlocal())
weekday = dt.weekday()
d[weekday] = []
for event_start, event_end in self.config.static_schedule.get(weekday, []):
# use the *local* datetime of midnight of day x in our window for start and end.
s = local_dt.replace(hour=event_start.hour, minute=event_start.minute).astimezone(pytz.UTC) - self.config.warmup_duration
e = local_dt.replace(hour=event_end.hour, minute=event_end.minute).astimezone(pytz.UTC)
d[weekday].append((s, e))
return Schedule(d)
def fetch_events(self, calendar_config, start, end):
chunks = urlsplit(calendar_config.url)
if chunks.scheme and chunks.netloc:
fetcher = HTTPCalendarEventFetcher()
else:
fetcher = LocalCalendarEventFetcher()
# fetch all ical event for this calendar
events = fetcher.fetch(calendar_config)
# filter the fetched events for the current period and convert them to Event instances
logger.info("Applying range filter to fetched events from %s" % calendar_config.name)
events = self.apply_range_filter(events, start, end)
if calendar_config.filter is not None:
logger.info("Applying user filter \"%s\" to %s events" % (calendar_config.filter, len(events)))
events = self.apply_user_filter(calendar_config.filter, events)
logger.debug("Event list contains now %s events from calendar %s" % (len(events), calendar_config.name))
else:
logger.debug("Filter query not set in calendar config")
return events
def apply_user_filter(self, query_string, events):
q = Parser().parse(query_string)
return q.apply(events)
def apply_range_filter(self, events, start, end):
start = (start.astimezone(pytz.UTC) if start.tzinfo else start).replace(hour=0, minute=0, second=0)
end = (end.astimezone(pytz.UTC) if end.tzinfo else end).replace(hour=23, minute=59, second=59)
def _to_all_day(date):
allday_start, allday_end = self.config.allday_range
day_start = datetime.datetime.combine(date, allday_start).replace(tzinfo=dateutil.tz.tzlocal())
day_end = datetime.datetime.combine(date, allday_end).replace(tzinfo=dateutil.tz.tzlocal())
return day_start.astimezone(pytz.UTC), day_end.astimezone(pytz.UTC)
def _build_all_events():
for cal_event in events:
try:
all_day = cal_event['DTSTART'].dt.__class__ == datetime.date
all_day_start, all_day_end = _to_all_day(cal_event['DTSTART'].dt)
if 'RRULE' in cal_event:
if all_day:
event_start_utc = all_day_start
else:
event_start_utc = cal_event['DTSTART'].dt.astimezone(pytz.UTC)
rule = rrule.rrulestr(cal_event.get('RRULE').to_ical().decode('utf-8'), dtstart=event_start_utc.replace(tzinfo=None))
if rule._until:
# The until identifier in the RRULE may contain a timezone (even if it's UTC).
# Make sure its UTC and remove the tzinfo
rule._until = rule._until.astimezone(pytz.UTC).replace(tzinfo=None)
for dt in rule.between(start.replace(tzinfo=None), end.replace(tzinfo=None), inc=True):
if all_day:
s, e = _to_all_day(dt.date())
yield Event(name=str(cal_event['SUMMARY']), start=s, end=e)
else:
dt = dt.replace(tzinfo=pytz.UTC)
if 'duration' in cal_event:
duration = cal_event['duration'].dt # it's already a timedelta
else:
duration = cal_event['DTEND'].dt - cal_event['DTSTART'].dt
yield Event(name=str(cal_event['SUMMARY']), start=dt, end=dt + duration)
else:
if all_day:
s, e = _to_all_day(cal_event['DTSTART'].dt)
yield Event(name=str(cal_event['SUMMARY']), start=s, end=e)
else:
yield Event(name=str(cal_event['SUMMARY']), start=cal_event['DTSTART'].dt.astimezone(pytz.UTC), end=cal_event['DTEND'].dt.astimezone(pytz.UTC))
except:
logger.exception("Failed to apply range filter to event %s" % cal_event)
for event in _build_all_events():
if start <= event.start <= end:
yield event
def create_schedule(self, events):
schedule = {}
warmup = self.config.warmup_duration
for event in events:
logger.debug(event)
start = event.start - warmup
if start.date() != event.start.date():
start = event.start.replace(hour=0, minute=0, second=0)
end = event.end
if end.date() != start.date():
end = event.end.replace(hour=23, minute=59, second=59)
schedule[start.weekday()] = schedule.get(start.weekday(), []) + [(start, end)]
return Schedule(schedule)
def apply_schedule(self, schedule):
effective_schedule = schedule.effective()
if logger.isEnabledFor(logging.INFO):
logger.info("Effective schedule:")
for weekday_num, items in effective_schedule.items():
logger.info("%10s: %s" % (weekday_names[weekday_num], ', '.join("%s to %s" % x for x in items)))
if self._current_schedule == schedule:
logger.info("Schedule unchanged")
return
with self.connect_to_cube() as cube:
# i would like to use the 'v' message to get the timezone from the cube
# unfortunately, at least my cube doesn't set the timezone properly when using the max cube software
if self.config.cube_timezone:
cube_tz = pytz.timezone(self.config.cube_timezone)
else:
cube_tz = dateutil.tz.tzlocal()
logger.info("Cube time zone: %s" % cube_tz)
effective_schedule.as_timezone(cube_tz)
if self.config.has_room_settings:
rooms = []
for r in cube.rooms:
if (self.config.room_id and r.room_id == self.config.room_id) or \
(self.config.room_name and self.config.room_name == r.name) or \
(self.config.room_rf_addr and self.config.room_rf_addr == r.rf_address):
rooms.append(r)
else:
rooms = [r for r in cube.rooms]
if rooms:
logger.info("Writing program to cube for rooms %s" % rooms)
low_temp = self.config.low_temperature
high_temp = self.config.high_temperature
for weekday_num in effective_schedule.events.keys():
programs = list(effective_schedule.to_program(weekday_num, low_temp, high_temp))
logger.info("%10s: %s" % (weekday_names[weekday_num], ', '.join(["%s-%s (%s)" % (x.begin_minutes, x.end_minutes, x.temperature) for x in programs])))
for room in rooms:
logger.debug("Setting program for room %s, rf addr: %s on day %s" % (room.room_id, room.rf_address, weekday_num))
cube.set_program(room.room_id, room.rf_address, weekday_num, programs)
else:
logger.warning("Could not find any rooms to write the program for")
self._current_schedule = schedule
def connect_to_cube(self):
cube_addr = None
cube_port = self.config.cube_port
if self.config.cube_address:
cube_addr = self.config.cube_address
if not cube_addr:
logger.info("Using discovery to find cube")
d = Discovery()
cube_serial = self.config.cube_serial
if not cube_serial:
logger.info("Making IDENTIFY discovery to find available cubes")
response = Discovery().discover()
logger.info("Got IDENTIFY response: %s" % response)
if response:
cube_serial = response.serial
else:
raise Exception("No cube found with IDENTIFY discovery")
# use network configuration discovery
logger.info("Using NETWORK CONFIG discovery for cube %s" % cube_serial)
discovery_response = d.discover(cube_serial=cube_serial, discovery_type=Discovery.DISCOVERY_TYPE_NETWORK_CONFIG)
if discovery_response:
cube_addr = discovery_response.ip_address
else:
raise Exception("Cube %s did not answer with network configuration" % cube_serial)
logger.info("Cube at %s, port %s" % (cube_addr, cube_port))
return Cube(address=cube_addr, port=cube_port)
events = []
for calendar_config in self.config.calendars:
try:
events.extend(self.fetch_events(calendar_config, start, end))
except:
logger.exception("Failed to read events from %s" % calendar_config.name)
static_schedule = self.get_static_schedule(start)
calendar_schedule = self.create_schedule(events)
if logger.isEnabledFor(logging.DEBUG):
def _debug_schedule(schedule):
for wd in sorted(schedule.events.keys()):
logger.debug(" %s:" % weekday_names[wd])
for start, end in sorted(schedule.events[wd]):
logger.debug(" %s -> %s" % (start, end))
logger.debug("Static schedule:")
_debug_schedule(static_schedule)
logger.debug("Calendar events schedule:")
_debug_schedule(calendar_schedule)
self.apply_schedule(static_schedule + calendar_schedule)
def get_static_schedule(self, start):
d = {}
for day in range(0, 7):
# use start (of the week we are looking at), reset to midnight and add x days
dt = start.astimezone(pytz.UTC).replace(hour=0, minute=0, second=0, microsecond=0) + datetime.timedelta(days=day)
# static schedules are always considered the local timezone
local_dt = dt.astimezone(dateutil.tz.tzlocal())
weekday = dt.weekday()
d[weekday] = []
for event_start, event_end in self.config.static_schedule.get(weekday, []):
# use the *local* datetime of midnight of day x in our window for start and end.
s = local_dt.replace(hour=event_start.hour, minute=event_start.minute).astimezone(pytz.UTC) - self.config.warmup_duration
e = local_dt.replace(hour=event_end.hour, minute=event_end.minute).astimezone(pytz.UTC)
d[weekday].append((s, e))
return Schedule(d)
def fetch_events(self, calendar_config, start, end):
chunks = urlsplit(calendar_config.url)
if chunks.scheme and chunks.netloc:
fetcher = HTTPCalendarEventFetcher()
else:
fetcher = LocalCalendarEventFetcher()
# fetch all ical event for this calendar
events = fetcher.fetch(calendar_config)
# filter the fetched events for the current period and convert them to Event instances
logger.info("Applying range filter to fetched events from %s" % calendar_config.name)
events = self.apply_range_filter(events, start, end)
if calendar_config.filter is not None:
logger.info("Applying user filter \"%s\" to %s events" % (calendar_config.filter, len(events)))
events = self.apply_user_filter(calendar_config.filter, events)
logger.debug("Event list contains now %s events from calendar %s" % (len(events), calendar_config.name))
else:
logger.debug("Filter query not set in calendar config")
return events
def apply_user_filter(self, query_string, events):
q = Parser().parse(query_string)
return q.apply(events)
def apply_range_filter(self, events, start, end):
start = (start.astimezone(pytz.UTC) if start.tzinfo else start).replace(hour=0, minute=0, second=0)
end = (end.astimezone(pytz.UTC) if end.tzinfo else end).replace(hour=23, minute=59, second=59)
def _to_all_day(date):
allday_start, allday_end = self.config.allday_range
day_start = datetime.datetime.combine(date, allday_start).replace(tzinfo=dateutil.tz.tzlocal())
day_end = datetime.datetime.combine(date, allday_end).replace(tzinfo=dateutil.tz.tzlocal())
return day_start.astimezone(pytz.UTC), day_end.astimezone(pytz.UTC)
def _build_all_events():
for cal_event in events:
try:
all_day = cal_event['DTSTART'].dt.__class__ == datetime.date
all_day_start, all_day_end = _to_all_day(cal_event['DTSTART'].dt)
if 'RRULE' in cal_event:
if all_day:
event_start_utc = all_day_start
else:
event_start_utc = cal_event['DTSTART'].dt.astimezone(pytz.UTC)
rule = rrule.rrulestr(cal_event.get('RRULE').to_ical().decode('utf-8'), dtstart=event_start_utc.replace(tzinfo=None))
if rule._until:
# The until identifier in the RRULE may contain a timezone (even if it's UTC).
# Make sure its UTC and remove the tzinfo
rule._until = rule._until.astimezone(pytz.UTC).replace(tzinfo=None)
for dt in rule.between(start.replace(tzinfo=None), end.replace(tzinfo=None), inc=True):
if all_day:
s, e = _to_all_day(dt.date())
yield Event(name=str(cal_event['SUMMARY']), start=s, end=e)
else:
dt = dt.replace(tzinfo=pytz.UTC)
if 'duration' in cal_event:
duration = cal_event['duration'].dt # it's already a timedelta
else:
duration = cal_event['DTEND'].dt - cal_event['DTSTART'].dt
yield Event(name=str(cal_event['SUMMARY']), start=dt, end=dt + duration)
else:
if all_day:
s, e = _to_all_day(cal_event['DTSTART'].dt)
yield Event(name=str(cal_event['SUMMARY']), start=s, end=e)
else:
yield Event(name=str(cal_event['SUMMARY']), start=cal_event['DTSTART'].dt.astimezone(pytz.UTC), end=cal_event['DTEND'].dt.astimezone(pytz.UTC))
except:
logger.exception("Failed to apply range filter to event %s" % cal_event)
for event in _build_all_events():
if start <= event.start <= end:
yield event
def create_schedule(self, events):
schedule = {}
warmup = self.config.warmup_duration
for event in events:
logger.debug(event)
start = event.start - warmup
if start.date() != event.start.date():
start = event.start.replace(hour=0, minute=0, second=0)
end = event.end
if end.date() != start.date():
end = event.end.replace(hour=23, minute=59, second=59)
schedule[start.weekday()] = schedule.get(start.weekday(), []) + [(start, end)]
return Schedule(schedule)
def apply_schedule(self, schedule):
effective_schedule = schedule.effective()
if logger.isEnabledFor(logging.INFO):
logger.info("Effective schedule:")
for weekday_num, items in effective_schedule.items():
logger.info("%10s: %s" % (weekday_names[weekday_num], ', '.join("%s to %s" % x for x in items)))
if self._current_schedule == schedule:
logger.info("Schedule unchanged")
return
with self.connect_to_cube() as cube:
# i would like to use the 'v' message to get the timezone from the cube
# unfortunately, at least my cube doesn't set the timezone properly when using the max cube software
if self.config.cube_timezone:
cube_tz = pytz.timezone(self.config.cube_timezone)
else:
cube_tz = dateutil.tz.tzlocal()
logger.info("Cube time zone: %s" % cube_tz)
effective_schedule.as_timezone(cube_tz)
if self.config.has_room_settings:
rooms = []
for r in cube.rooms:
if (self.config.room_id and r.room_id == self.config.room_id) or \
(self.config.room_name and self.config.room_name == r.name) or \
(self.config.room_rf_addr and self.config.room_rf_addr == r.rf_address):
rooms.append(r)
else:
rooms = [r for r in cube.rooms]
if rooms:
logger.info("Writing program to cube for rooms %s" % rooms)
low_temp = self.config.low_temperature
high_temp = self.config.high_temperature
for weekday_num in effective_schedule.events.keys():
programs = list(effective_schedule.to_program(weekday_num, low_temp, high_temp))
logger.info("%10s: %s" % (weekday_names[weekday_num], ', '.join(["%s-%s (%s)" % (x.begin_minutes, x.end_minutes, x.temperature) for x in programs])))
for room in rooms:
logger.debug("Setting program for room %s, rf addr: %s on day %s" % (room.room_id, room.rf_address, weekday_num))
cube.set_program(room.room_id, room.rf_address, weekday_num, programs)
else:
logger.warning("Could not find any rooms to write the program for")
self._current_schedule = schedule
def connect_to_cube(self):
cube_addr = None
cube_port = self.config.cube_port
if self.config.cube_address:
cube_addr = self.config.cube_address
if not cube_addr:
logger.info("Using discovery to find cube")
d = Discovery()
cube_serial = self.config.cube_serial
if not cube_serial:
logger.info("Making IDENTIFY discovery to find available cubes")
response = Discovery().discover()
logger.info("Got IDENTIFY response: %s" % response)
if response:
cube_serial = response.serial
else:
raise Exception("No cube found with IDENTIFY discovery")
# use network configuration discovery
logger.info("Using NETWORK CONFIG discovery for cube %s" % cube_serial)
discovery_response = d.discover(cube_serial=cube_serial, discovery_type=Discovery.DISCOVERY_TYPE_NETWORK_CONFIG)
if discovery_response:
cube_addr = discovery_response.ip_address
else:
raise Exception("Cube %s did not answer with network configuration" % cube_serial)
logger.info("Cube at %s, port %s" % (cube_addr, cube_port))
return Cube(address=cube_addr, port=cube_port)

+ 170
- 170
tests/test_config.py View File

@@ -2,161 +2,161 @@
import datetime
import pytest
try:
from StringIO import StringIO
from StringIO import StringIO
except ImportError:
from io import StringIO
from io import StringIO
from maxd.config import Configuration, timediff, max_value, min_value, time_range


class TestConfig(object):

def test_missing_file(self):
with pytest.raises(Exception):
Configuration('/file/does/not/exist')
def test_reload(self):
cfg = Configuration('/dev/null')
old_id = id(cfg.cfg_parser)
cfg.reload()
assert old_id != id(cfg.cfg_parser), "Configuration did not reload"
assert cfg._calendar is None, "Calendar property not cleared on reload"
def test_calendar_property_init(self):
cfg = Configuration('tests/fixtures/config/basic.cfg')
cals = cfg.calendars
assert cals == [] # first access => initialize
assert id(cals) == id(cfg.calendars) # second access => cached value
def test_calendar_property_missing_values(self):
cfg = Configuration('tests/fixtures/config/missing_values.cfg')
assert len(cfg.calendars) == 0 # missing url
def test_calendar_property(self):
cfg = Configuration('tests/fixtures/config/basic2.cfg')
assert len(cfg.calendars) == 2
assert cfg.calendars[0].name == 'testcal1'
assert cfg.calendars[0].url == 'http://localhost/test.ics'
assert not cfg.calendars[0].auth
assert cfg.calendars[0].username == cfg.calendars[0].password == None
assert cfg.calendars[1].name == 'testcal2'
assert cfg.calendars[1].url == 'http://localhost/test.ics'
assert cfg.calendars[1].auth
assert cfg.calendars[1].username == 'foo'
assert cfg.calendars[1].password == 'bar'
def test_basic_config(self):
cfg = Configuration('tests/fixtures/config/basic.cfg')
assert cfg.cfg_parser is not None
def test_get_option(self):
cfg = Configuration('tests/fixtures/config/basic2.cfg')
assert cfg.get_option('GENERAL', 'calendars') == 'testcal1, testcal2'
def test_get_option_default(self):
cfg = Configuration('tests/fixtures/config/basic2.cfg')
assert cfg.get_option('GENERAL', 'foo') is None
assert cfg.get_option('GENERAL', 'foo', 'bar') == 'bar'
assert cfg.get_option('does not exist', 'foo') is None
assert cfg.get_option('does not exist', 'foo', 'bar') == 'bar'
def test_get_option_default(self):
cfg = Configuration('tests/fixtures/config/basic2.cfg')
assert cfg.get_int('GENERAL', 'foo', 'bar') == 'bar'
assert cfg.get_int('does not exist', 'foo', 'bar') == 'bar'
def test_timediff_decorator(self):
for s, td in [
(10, datetime.timedelta(minutes=10)),
(360, datetime.timedelta(minutes=360)),
("20", datetime.timedelta(minutes=20)),
("02:30", datetime.timedelta(hours=2, minutes=30)),
("2:3", datetime.timedelta(hours=2, minutes=3)),
(datetime.timedelta(minutes=1), datetime.timedelta(minutes=1))
]:
assert timediff(lambda: s)() == td, "'%s' does not parse into %s" % (s, td)
def test_timediff_decorator_unparsable(self):
with pytest.raises(ValueError):
assert timediff(lambda: "lalala")()
def test_warmup_duration(self):
cfg = Configuration('tests/fixtures/config/basic2.cfg')
assert cfg.warmup_duration == datetime.timedelta(minutes=30) # default value
def test_max_value_decorator(self):
@max_value(20)
def test():
return 10
assert test() == 10
def test_max_value_decorator_limit_to_max(self):
@max_value(10)
def test():
return 20
assert test() == 10
def test_max_value_decorator_allow_none(self):
@max_value(10)
def test():
return None
assert test() is None
@max_value(10, allow_none=False)
def test():
return None
assert test() == 10
def test_min_value_decorator(self):
@min_value(5)
def test():
return 10
assert test() == 10
def test_min_value_decorator_limit_to_min(self):
@min_value(10)
def test():
return 5
assert test() == 10
def test_min_value_decorator_none(self):
@min_value(5)
def test():
return None
assert test() is None
@min_value(5, allow_none=False)
def test2():
return None
assert test2() == 5
def test_get_high_temperature_not_set(self):
cfg = Configuration('/dev/null')
assert cfg.high_temperature == 24
def test_get_high_temperature_limited(self):
cfg1 = Configuration('tests/fixtures/config/temperatures_high.cfg')
assert cfg1.high_temperature is 30
cfg2 = Configuration('tests/fixtures/config/temperatures_low.cfg')
assert cfg2.high_temperature == 5
def test_get_low_temperature_limited(self):
cfg1 = Configuration('tests/fixtures/config/temperatures_high.cfg')
assert cfg1.low_temperature is 30
cfg2 = Configuration('tests/fixtures/config/temperatures_low.cfg')
assert cfg2.low_temperature == 5
def test_get_static_schedule_not_set(self):
cfg1 = Configuration('/dev/null')
assert cfg1.static_schedule == { 0: [], 1: [], 2: [], 3: [], 4: [], 5: [], 6: [], }
cfg2 = Configuration('/dev/null')
cfg2.cfg_parser.readfp(StringIO("""
def test_missing_file(self):
with pytest.raises(Exception):
Configuration('/file/does/not/exist')
def test_reload(self):
cfg = Configuration('/dev/null')
old_id = id(cfg.cfg_parser)
cfg.reload()
assert old_id != id(cfg.cfg_parser), "Configuration did not reload"
assert cfg._calendar is None, "Calendar property not cleared on reload"
def test_calendar_property_init(self):
cfg = Configuration('tests/fixtures/config/basic.cfg')
cals = cfg.calendars
assert cals == [] # first access => initialize
assert id(cals) == id(cfg.calendars) # second access => cached value
def test_calendar_property_missing_values(self):
cfg = Configuration('tests/fixtures/config/missing_values.cfg')
assert len(cfg.calendars) == 0 # missing url
def test_calendar_property(self):
cfg = Configuration('tests/fixtures/config/basic2.cfg')
assert len(cfg.calendars) == 2
assert cfg.calendars[0].name == 'testcal1'
assert cfg.calendars[0].url == 'http://localhost/test.ics'
assert not cfg.calendars[0].auth
assert cfg.calendars[0].username == cfg.calendars[0].password == None
assert cfg.calendars[1].name == 'testcal2'
assert cfg.calendars[1].url == 'http://localhost/test.ics'
assert cfg.calendars[1].auth
assert cfg.calendars[1].username == 'foo'
assert cfg.calendars[1].password == 'bar'
def test_basic_config(self):
cfg = Configuration('tests/fixtures/config/basic.cfg')
assert cfg.cfg_parser is not None
def test_get_option(self):
cfg = Configuration('tests/fixtures/config/basic2.cfg')
assert cfg.get_option('GENERAL', 'calendars') == 'testcal1, testcal2'
def test_get_option_default(self):
cfg = Configuration('tests/fixtures/config/basic2.cfg')
assert cfg.get_option('GENERAL', 'foo') is None
assert cfg.get_option('GENERAL', 'foo', 'bar') == 'bar'
assert cfg.get_option('does not exist', 'foo') is None
assert cfg.get_option('does not exist', 'foo', 'bar') == 'bar'
def test_get_option_default(self):
cfg = Configuration('tests/fixtures/config/basic2.cfg')
assert cfg.get_int('GENERAL', 'foo', 'bar') == 'bar'
assert cfg.get_int('does not exist', 'foo', 'bar') == 'bar'
def test_timediff_decorator(self):
for s, td in [
(10, datetime.timedelta(minutes=10)),
(360, datetime.timedelta(minutes=360)),
("20", datetime.timedelta(minutes=20)),
("02:30", datetime.timedelta(hours=2, minutes=30)),
("2:3", datetime.timedelta(hours=2, minutes=3)),
(datetime.timedelta(minutes=1), datetime.timedelta(minutes=1))
]:
assert timediff(lambda: s)() == td, "'%s' does not parse into %s" % (s, td)
def test_timediff_decorator_unparsable(self):
with pytest.raises(ValueError):
assert timediff(lambda: "lalala")()
def test_warmup_duration(self):
cfg = Configuration('tests/fixtures/config/basic2.cfg')
assert cfg.warmup_duration == datetime.timedelta(minutes=30) # default value
def test_max_value_decorator(self):
@max_value(20)
def test():
return 10
assert test() == 10
def test_max_value_decorator_limit_to_max(self):
@max_value(10)
def test():
return 20
assert test() == 10
def test_max_value_decorator_allow_none(self):
@max_value(10)
def test():
return None
assert test() is None
@max_value(10, allow_none=False)
def test():
return None
assert test() == 10
def test_min_value_decorator(self):
@min_value(5)
def test():
return 10
assert test() == 10
def test_min_value_decorator_limit_to_min(self):
@min_value(10)
def test():
return 5
assert test() == 10
def test_min_value_decorator_none(self):
@min_value(5)
def test():
return None
assert test() is None
@min_value(5, allow_none=False)
def test2():
return None
assert test2() == 5
def test_get_high_temperature_not_set(self):
cfg = Configuration('/dev/null')
assert cfg.high_temperature == 24
def test_get_high_temperature_limited(self):
cfg1 = Configuration('tests/fixtures/config/temperatures_high.cfg')
assert cfg1.high_temperature is 30
cfg2 = Configuration('tests/fixtures/config/temperatures_low.cfg')
assert cfg2.high_temperature == 5
def test_get_low_temperature_limited(self):
cfg1 = Configuration('tests/fixtures/config/temperatures_high.cfg')
assert cfg1.low_temperature is 30
cfg2 = Configuration('tests/fixtures/config/temperatures_low.cfg')
assert cfg2.low_temperature == 5
def test_get_static_schedule_not_set(self):
cfg1 = Configuration('/dev/null')
assert cfg1.static_schedule == { 0: [], 1: [], 2: [], 3: [], 4: [], 5: [], 6: [], }
cfg2 = Configuration('/dev/null')
cfg2.cfg_parser.readfp(StringIO("""
[static]
monday =
tuesday =
@@ -166,33 +166,33 @@ friday =
saturday =
sunday =
"""))
assert cfg2.static_schedule == { 0: [], 1: [], 2: [], 3: [], 4: [], 5: [], 6: [], }
assert cfg2.static_schedule == { 0: [], 1: [], 2: [], 3: [], 4: [], 5: [], 6: [], }

def test_get_static_schedule(self):
cfg1 = Configuration('/dev/null')
cfg1.cfg_parser.readfp(StringIO("""
def test_get_static_schedule(self):
cfg1 = Configuration('/dev/null')
cfg1.cfg_parser.readfp(StringIO("""
[static]
monday = 08:00 - 10:00
"""))
assert cfg1.static_schedule == { 0: [
(datetime.time(8, 0), datetime.time(10, 0)),
], 1: [], 2: [], 3: [], 4: [], 5: [], 6: [], }
assert cfg1.static_schedule == { 0: [
(datetime.time(8, 0), datetime.time(10, 0)),
], 1: [], 2: [], 3: [], 4: [], 5: [], 6: [], }

def test_time_range(self):
assert list(time_range('')) == []
assert list(time_range(None)) == []
def test_time_range(self):
assert list(time_range('')) == []
assert list(time_range(None)) == []

for bad in ('foobar', 'ab:cd', '123:456', '123:456 789:012', '1 2', '12:34 56:78'):
with pytest.raises(ValueError):
list(time_range(bad))
for bad in ('foobar', 'ab:cd', '123:456', '123:456 789:012', '1 2', '12:34 56:78'):
with pytest.raises(ValueError):
list(time_range(bad))

assert list(time_range('10:00-11:00')) == [(10, 00, 11, 00)]
assert list(time_range('10:00 - 11:00')) == [(10, 00, 11, 00)]
assert list(time_range('77:88-99:00')) == [(77, 88, 99, 00)]
assert list(time_range('10:00-11:00')) == [(10, 00, 11, 00)]
assert list(time_range('10:00 - 11:00')) == [(10, 00, 11, 00)]
assert list(time_range('77:88-99:00')) == [(77, 88, 99, 00)]

assert list(time_range('08:30 - 09:00, 05:15-17:00')) == [
(8, 30, 9, 0),
(5, 15, 17, 00),
]
assert list(time_range('08:30 - 09:00, 05:15-17:00')) == [
(8, 30, 9, 0),
(5, 15, 17, 00),
]

assert list(time_range('08:30 - 09:00,')) == [(8, 30, 9, 0)]
assert list(time_range('08:30 - 09:00,')) == [(8, 30, 9, 0)]

+ 2
- 2
tests/test_daemon.py View File

@@ -4,6 +4,6 @@ from maxd.__main__ import Daemon

class TestDaemon(object):

def test_constructor(self):
d = Daemon('tests/fixtures/config/basic.cfg')
def test_constructor(self):
d = Daemon('tests/fixtures/config/basic.cfg')


+ 43
- 43
tests/test_fetcher.py View File

@@ -7,62 +7,62 @@ import pytz
import sys

if sys.version_info.major == 2 or (sys.version_info.major == 3 and sys.version_info.minor <= 2):
from mock import Mock
from mock import Mock
else:
from unittest.mock import Mock
from unittest.mock import Mock

class TestLocalFetcher(object):

def test_local_fetcher(self):
f = LocalCalendarEventFetcher()
events = list(f.fetch(CalendarConfig(name='test', url='tests/fixtures/calendars/single_event.ics')))
assert len(events) == 1
def test_local_fetcher(self):
f = LocalCalendarEventFetcher()
events = list(f.fetch(CalendarConfig(name='test', url='tests/fixtures/calendars/single_event.ics')))
assert len(events) == 1

event = events[0]
event = events[0]

assert event['SUMMARY'] == 'Test Event'
assert event['DTSTART'].dt == datetime.datetime(2015, 12, 20, 9, 0, tzinfo=pytz.UTC)
assert event['DTEND'].dt == datetime.datetime(2015, 12, 20, 10, 0, tzinfo=pytz.UTC)
assert event['SUMMARY'] == 'Test Event'
assert event['DTSTART'].dt == datetime.datetime(2015, 12, 20, 9, 0, tzinfo=pytz.UTC)
assert event['DTEND'].dt == datetime.datetime(2015, 12, 20, 10, 0, tzinfo=pytz.UTC)


class TestHTTPFetcher(object):

def test_constructor(self):
f = HTTPCalendarEventFetcher()
assert isinstance(f.session, requests.sessions.Session)
def test_constructor(self):
f = HTTPCalendarEventFetcher()
assert isinstance(f.session, requests.sessions.Session)

def test_fetch_without_auth(self):
response_mock = Mock()
with open('tests/fixtures/calendars/single_event.ics', 'r') as f:
response_mock.content = f.read()
def test_fetch_without_auth(self):
response_mock = Mock()
with open('tests/fixtures/calendars/single_event.ics', 'r') as f:
response_mock.content = f.read()

f = HTTPCalendarEventFetcher()
f.session = Mock()
f.session.get = Mock(return_value=response_mock)
f = HTTPCalendarEventFetcher()
f.session = Mock()
f.session.get = Mock(return_value=response_mock)

response = list(f.fetch(CalendarConfig(name='test', url='http://example.com/test.ics')))
f.session.get.assert_called_with('http://example.com/test.ics', headers={
'Accept': 'text/calendar'
})
assert len(response) == 1
response = list(f.fetch(CalendarConfig(name='test', url='http://example.com/test.ics')))
f.session.get.assert_called_with('http://example.com/test.ics', headers={
'Accept': 'text/calendar'
})
assert len(response) == 1

def test_fetch_with_auth(self):
f = HTTPCalendarEventFetcher()
f.session = Mock()
def test_fetch_with_auth(self):
f = HTTPCalendarEventFetcher()
f.session = Mock()

def get_mock(*args, **kwargs): # stupid way to get around the not implemented __eq__ for HttpBasicAuth
assert len(args) == 1 and args[0] == 'http://example.com/test.ics'
assert len(kwargs) == 2 and \
('auth' in kwargs and kwargs['auth'].username == 'foo' and kwargs['auth'].password == 'bar') and \
('headers' in kwargs and kwargs['headers'] == {
'Accept': 'text/calendar'
}), "A HTTPBasicAuth instance should be passed to requests"
response_mock = Mock()
with open('tests/fixtures/calendars/single_event.ics', 'r') as f:
response_mock.content = f.read()
return response_mock
def get_mock(*args, **kwargs): # stupid way to get around the not implemented __eq__ for HttpBasicAuth
assert len(args) == 1 and args[0] == 'http://example.com/test.ics'
assert len(kwargs) == 2 and \
('auth' in kwargs and kwargs['auth'].username == 'foo' and kwargs['auth'].password == 'bar') and \
('headers' in kwargs and kwargs['headers'] == {
'Accept': 'text/calendar'
}), "A HTTPBasicAuth instance should be passed to requests"
response_mock = Mock()
with open('tests/fixtures/calendars/single_event.ics', 'r') as f:
response_mock.content = f.read()
return response_mock

f.session.get = Mock(side_effect=get_mock)
response = list(f.fetch(CalendarConfig(name='test', url='http://example.com/test.ics', username='foo', password='bar')))
assert f.session.get.called
assert len(response) == 1
f.session.get = Mock(side_effect=get_mock)
response = list(f.fetch(CalendarConfig(name='test', url='http://example.com/test.ics', username='foo', password='bar')))
assert f.session.get.called
assert len(response) == 1

+ 321
- 321
tests/test_worker.py View File

@@ -10,149 +10,149 @@ from pymax.objects import ProgramSchedule
from maxd.config import CalendarConfig

try:
from StringIO import StringIO
from StringIO import StringIO
except ImportError:
from io import StringIO
from io import StringIO
from maxd.config import Configuration
from maxd.worker import Worker, Schedule, _to_utc_datetime, Event

if sys.version_info.major == 2 or (sys.version_info.major == 3 and sys.version_info.minor <= 2):
from mock import Mock, patch
from mock import Mock, patch
else:
from unittest.mock import Mock, patch
from unittest.mock import Mock, patch

class TestWorker(object):

def test_apply_range_filter(self):
w = Worker(Configuration('/dev/null'))
with open('tests/fixtures/calendars/repeating.ics', 'r') as f:
events = [o for o in icalendar.Calendar.from_ical(f.read()).walk() if o.name == 'VEVENT']
filtered = w.apply_range_filter(events, datetime.datetime(2015, 12, 21, tzinfo=pytz.UTC), datetime.datetime(2015, 12, 27, tzinfo=pytz.UTC))
filtered = list(filtered)
assert len(filtered) == 4
def test_apply_range_filter_exclude(self):
w = Worker(Configuration('/dev/null'))
with open('tests/fixtures/calendars/repeating.ics', 'r') as f:
events = [o for o in icalendar.Calendar.from_ical(f.read()).walk() if o.name == 'VEVENT']
filtered = w.apply_range_filter(events,
datetime.datetime(2015, 12, 28, tzinfo=pytz.UTC),
datetime.datetime(2016, 1, 1, tzinfo=pytz.UTC))
filtered = list(filtered)
# weekly event: once (2015-12-29)
# daily event: 4 (2015-12-28 till 2015-12-31)
assert len(filtered) == 5
def test_apply_user_filter(self):
w = Worker(Configuration('/dev/null'))
with open('tests/fixtures/calendars/repeating.ics', 'r') as f:
events = [o for o in icalendar.Calendar.from_ical(f.read()).walk() if o.name == 'VEVENT']
filtered = w.apply_range_filter(events,
datetime.datetime(2015, 12, 28, tzinfo=pytz.UTC),
datetime.datetime(2016, 1, 1, tzinfo=pytz.UTC))
filtered = list(filtered)
filtered = w.apply_user_filter("name == 'Ending repeating event'", filtered)
filtered = list(filtered)
assert len(filtered) == 4
def test_create_schedule(self):
w = Worker(Configuration('/dev/null'))
with open('tests/fixtures/calendars/repeating.ics', 'r') as f:
events = [o for o in icalendar.Calendar.from_ical(f.read()).walk() if o.name == 'VEVENT']
vevents = w.apply_range_filter(events,
start=datetime.datetime(2015, 12, 24, tzinfo=pytz.UTC),
end=datetime.datetime(2015, 12, 30, 23, 59, 59, tzinfo=pytz.UTC))
schedule = w.create_schedule(vevents)
for wd, periods in sorted(schedule.items()):
print("Weekday %s:" % wd)
for start, end in sorted(periods):
print(" %s -> %s" % (start, end))
d = {
3: [ # 2015-12-24
# repeating daily
(datetime.datetime(2015, 12, 24, 7, 30, 00, tzinfo=pytz.UTC), datetime.datetime(2015, 12, 24, 9, 0, 00, tzinfo=pytz.UTC)),
],
4: [ # 2015-12-25
# repeating daily
(datetime.datetime(2015, 12, 25, 7, 30, 00, tzinfo=pytz.UTC), datetime.datetime(2015, 12, 25, 9, 0, 00, tzinfo=pytz.UTC)),
],
5: [ # 2015-12-26
# repeating daily
(datetime.datetime(2015, 12, 26, 7, 30, 00, tzinfo=pytz.UTC), datetime.datetime(2015, 12, 26, 9, 0, 00, tzinfo=pytz.UTC)),
],
6: [ # 2015-12-27
# repeating daily
(datetime.datetime(2015, 12, 27, 7, 30, 00, tzinfo=pytz.UTC), datetime.datetime(2015, 12, 27, 9, 0, 00, tzinfo=pytz.UTC)),
],
0: [ # 2015-12-28
# repeating daily
(datetime.datetime(2015, 12, 28, 7, 30, 00, tzinfo=pytz.UTC), datetime.datetime(2015, 12, 28, 9, 0, 00, tzinfo=pytz.UTC)),
],
1: [ # 2015-12-29
# repeating weekly
(datetime.datetime(2015, 12, 29, 8, 30, 00, tzinfo=pytz.UTC), datetime.datetime(2015, 12, 29, 10, 0, 00, tzinfo=pytz.UTC)),
# repeating daily
(datetime.datetime(2015, 12, 29, 7, 30, 00, tzinfo=pytz.UTC), datetime.datetime(2015, 12, 29, 9, 0, 00, tzinfo=pytz.UTC)),
],
2: [ # 2015-12-30
(datetime.datetime(2015, 12, 30, 7, 30, 00, tzinfo=pytz.UTC), datetime.datetime(2015, 12, 30, 9, 0, 00, tzinfo=pytz.UTC)),
]
}
assert schedule == Schedule(d)
def test_create_schedule_all_day_events(self, monkeypatch):
# the static schedule is always in local time, so monkeypatch dateutil.tz.tzlocal() to return a constant
# timezone to avoid test failures in different timezones
def faketz():
return pytz.timezone('Europe/Berlin')
import dateutil.tz
monkeypatch.setattr(dateutil.tz, 'tzlocal', lambda: faketz())
w = Worker(Configuration('/dev/null'))
with open('tests/fixtures/calendars/feiertage.ics', 'r') as f:
events = [o for o in icalendar.Calendar.from_ical(f.read()).walk() if o.name == 'VEVENT']
vevents = w.apply_range_filter(events,
start=datetime.datetime(2015, 12, 21, tzinfo=pytz.UTC),
end=datetime.datetime(2015, 12, 27, 23, 59, 59, tzinfo=pytz.UTC))
schedule = w.create_schedule(vevents)
assert schedule.events == {
4: [
(datetime.datetime(2015, 12, 25, 5, 30, 0, tzinfo=pytz.timezone('Europe/Berlin')), datetime.datetime(2015, 12, 25, 23, 0, 0, tzinfo=pytz.timezone('Europe/Berlin'))),
],
5: [
(datetime.datetime(2015, 12, 26, 5, 30, 0, tzinfo=pytz.timezone('Europe/Berlin')), datetime.datetime(2015, 12, 26, 23, 0, 0, tzinfo=pytz.timezone('Europe/Berlin'))),
],
}
def test_get_static_schedule(self, monkeypatch):
# the static schedule is always in local time, so monkeypatch dateutil.tz.tzlocal() to return a constant
# timezone to avoid test failures in different timezones
def faketz():
return pytz.timezone('Europe/Berlin')
import dateutil.tz
monkeypatch.setattr(dateutil.tz, 'tzlocal', lambda: faketz())
w = Worker(Configuration('/dev/null'))
w.config.cfg_parser.readfp(StringIO("""
def test_apply_range_filter(self):
w = Worker(Configuration('/dev/null'))
with open('tests/fixtures/calendars/repeating.ics', 'r') as f:
events = [o for o in icalendar.Calendar.from_ical(f.read()).walk() if o.name == 'VEVENT']
filtered = w.apply_range_filter(events, datetime.datetime(2015, 12, 21, tzinfo=pytz.UTC), datetime.datetime(2015, 12, 27, tzinfo=pytz.UTC))
filtered = list(filtered)
assert len(filtered) == 4
def test_apply_range_filter_exclude(self):
w = Worker(Configuration('/dev/null'))
with open('tests/fixtures/calendars/repeating.ics', 'r') as f:
events = [o for o in icalendar.Calendar.from_ical(f.read()).walk() if o.name == 'VEVENT']
filtered = w.apply_range_filter(events,
datetime.datetime(2015, 12, 28, tzinfo=pytz.UTC),
datetime.datetime(2016, 1, 1, tzinfo=pytz.UTC))
filtered = list(filtered)
# weekly event: once (2015-12-29)
# daily event: 4 (2015-12-28 till 2015-12-31)
assert len(filtered) == 5
def test_apply_user_filter(self):
w = Worker(Configuration('/dev/null'))
with open('tests/fixtures/calendars/repeating.ics', 'r') as f:
events = [o for o in icalendar.Calendar.from_ical(f.read()).walk() if o.name == 'VEVENT']
filtered = w.apply_range_filter(events,
datetime.datetime(2015, 12, 28, tzinfo=pytz.UTC),
datetime.datetime(2016, 1, 1, tzinfo=pytz.UTC))
filtered = list(filtered)
filtered = w.apply_user_filter("name == 'Ending repeating event'", filtered)
filtered = list(filtered)
assert len(filtered) == 4
def test_create_schedule(self):
w = Worker(Configuration('/dev/null'))
with open('tests/fixtures/calendars/repeating.ics', 'r') as f:
events = [o for o in icalendar.Calendar.from_ical(f.read()).walk() if o.name == 'VEVENT']
vevents = w.apply_range_filter(events,
start=datetime.datetime(2015, 12, 24, tzinfo=pytz.UTC),
end=datetime.datetime(2015, 12, 30, 23, 59, 59, tzinfo=pytz.UTC))