瀏覽代碼

Cleanup

tags/0.2
Johann Schmitz 2 年之前
父節點
當前提交
70b08dd74c
簽署人: Johann Schmitz <johann@j-schmitz.net> GPG 金鑰 ID: A084064277C501ED
共有 14 個檔案被更改,包括 720 行新增687 行删除
  1. 4
    2
      .gitignore
  2. 31
    3
      Makefile
  3. 1
    0
      pypiwik/__init__.py
  4. 二進制
      pypiwik/__init__.pyo
  5. 28
    0
      pypiwik/decorators.py
  6. 二進制
      pypiwik/decorators.pyo
  7. 329
    0
      pypiwik/tracker.py
  8. 二進制
      pypiwik/tracker.pyo
  9. 15
    13
      setup.py
  10. 0
    28
      src/pypiwik/decorators.py
  11. 0
    329
      src/pypiwik/tracker.py
  12. 107
    107
      tests/builder.py
  13. 51
    51
      tests/decorators.py
  14. 154
    154
      tests/tracker.py

+ 4
- 2
.gitignore 查看文件

@@ -1,4 +1,6 @@
.idea
*.pyc
*.py?
__pycache__
src/test.py
.coverage
.coverage
coverage.xml

+ 31
- 3
Makefile 查看文件

@@ -1,16 +1,44 @@
TARGET?=tests

SRC_PATH := pypiwik

VERSION := $(shell grep -Po '"(.*)"' $(SRC_PATH)/__init__.py | sed -e 's/"//g')

test_default_python:
PYTHONPATH="." python tests/ -v

test_py2:
@echo Executing test with python2
PYTHONPATH=".:./src" python2 tests/
PYTHONPATH="." python2 tests/ -v

test_py3:
@echo Executing test with python3
PYTHONPATH=".:./src" python3 tests/
PYTHONPATH="." python3 tests/ -v

test: test_py2 test_py3

compile:
@echo Compiling python code
python -m compileall $(SRC_PATH)/

compile_optimized:
@echo Compiling python code optimized
python -O -m compileall $(SRC_PATH)/

coverage:
coverage erase
PYTHONPATH=".:./src" coverage run --source='src' --omit='src/test.py' --branch tests/__main__.py
PYTHONPATH="." coverage run --source='$(SRC_PATH)' --branch tests/__main__.py
coverage xml -i
coverage report -m

sonar:
/usr/local/bin/sonar-scanner/bin/sonar-scanner -Dsonar.projectVersion=$(VERSION)

clean:
find -name "*.py?" -delete
rm -f coverage.xml testresults.xml
rm -fr htmlcov dist build *.egg-info

travis: compile compile_optimized test_default_python coverage

jenkins: travis sonar

src/pypiwik/__init__.py → pypiwik/__init__.py 查看文件

@@ -1,2 +1,3 @@
# -*- coding: utf-8 -*-

VERSION="0.1.5.1"

二進制
pypiwik/__init__.pyo 查看文件


+ 28
- 0
pypiwik/decorators.py 查看文件

@@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
from pypiwik.tracker import PiwikTracker

ON_SUCCESS = 1
ON_ERROR = 2
ALWAYS = 3

def track_page_view(tracker=None, piwik_url=None, site_id=None, when=ON_SUCCESS, **tracker_kwargs):
if not ((piwik_url and site_id) or tracker):
raise ValueError("Either 'tracker' or 'piwik_url' and 'site_id' must be set")

def decorator_wrapper(func):
def inner(*args, **kwargs):
pt = tracker or PiwikTracker(piwik_url, site_id, **tracker_kwargs)

try:
ret = func(*args, **kwargs)

if when in (ON_SUCCESS, ALWAYS):
pt.track_page_view(**kwargs)

return ret
except:
if when in (ON_ERROR, ALWAYS):
pt.track_page_view(**kwargs)
raise
return inner
return decorator_wrapper

二進制
pypiwik/decorators.pyo 查看文件


+ 329
- 0
pypiwik/tracker.py 查看文件

@@ -0,0 +1,329 @@
# -*- coding: utf-8 -*-
from hashlib import md5
import json
import logging
import os
import time
import datetime
import requests

try:
from urllib.parse import urljoin, urlencode
except ImportError:
from urlparse import urljoin
from urllib import urlencode

PARAMETERS = {
# required parameters
'url': 'url', # The full URL for the current action.

# recommended parameters
'action_name': 'action_name', # The title of the action being tracked.
'referer': 'urlref', # The full HTTP Referrer URL.
'visit_custom_vars': '_cvar', # Visit scope custom variables.
'visit_count': '_idvc', # The current count of visits for this visitor.
'view_timestamp': '_viewts', # The UNIX timestamp of this visitor's previous visit.
'first_visit_timestamp': '_idts', # The UNIX timestamp of this visitor's first visit.
'campaign_name': '_rcn', # The Campaign name (see Tracking Campaigns).
'campaign_keywords': '_rck', # The Campaign Keyword (see Tracking Campaigns).
'resolution': 'res', # The resolution of the device the visitor is using, eg 1280x1024.
'hour': 'h', # The current hour (local time).
'minute': 'm', # The current minute (local time).
'second': 's', # The current second (local time).
'flash': 'fla', # Flash,
'java': 'java', # Java
'director': 'dir', # Director,
'quicktime': 'qt', # Quicktime,
'real_player': 'realp', # Real Player,
'pdf': 'pdf', # PDF
'wma': 'wma', # Windows Media
'gears': 'gears', # Gears
'silverlight': 'ag', # Silverlight
'cookie': 'cookie', # when set to 1, the visitor's client is known to support cookies.
'user_agent': 'ua', # An override value for the User-Agent HTTP header field.
'lang': 'lang', # An override value for the Accept-Language HTTP header field. This value is used to detect the visitor's country if GeoIP is not enabled.
'user_id': 'uid', # defines the User ID for this request. User ID is any non empty unique string identifying the user (such as an email address or a username).
'visitor_id': 'cid', # defines the visitor ID for this request.
'new_visit': 'new_visit', # If set to 1, will force a new visit to be created for this action. This feature is also available in Javascript.

# 'Optional Action info (measure Page view, Outlink, Download, Site search)',
'page_custom_vars': 'cvar', # Page scope custom variables.
'link': 'link', # An external URL the user has opened. Used for tracking outlink clicks. We recommend to also set the url parameter to this same value.
'download': 'download', # URL of a file the user has downloaded. Used for tracking downloads. We recommend to also set the url parameter to this same value.
'search_keyword': 'search', # The Site Search keyword. When specified, the request will not be tracked as a normal pageview but will instead be tracked as a Site Search request.
'search_category': 'search_cat', # when search is specified, you can optionally specify a search category with this parameter.
'search_count': 'search_count', # when search is specified, we also recommend to set the search_count to the number of search results displayed on the results page.

'goal_id': 'idgoal', # If specified, the tracking request will trigger a conversion for the goal of the website being tracked with this ID.
'revenue': 'revenue', # A monetary value that was generated as revenue by this goal conversion. Only used if idgoal is specified in the request.
'gt_ms': 'gt_ms', # The amount of time it took the server to generate this action, in milliseconds.
'charset': 'cs', # The charset of the page being tracked. Specify the charset if the data you send to Piwik is encoded in a different character set than the default utf-8.

# Optional Event Tracking info
'event_category': 'e_c', # The event category. Must not be empty. (eg. Videos, Music, Games...)
'event_action': 'e_a', # The event action. Must not be empty. (eg. Play, Pause, Duration, Add Playlist, Downloaded, Clicked...)
'event_name': 'e_n', # The event name. (eg. a Movie name, or Song name, or File name...)
'event_value': 'e_v', # The event value. Must be a float or integer value (numeric), not a string.

# Optional Content Tracking info
'content_name': 'c_n', # The name of the content. For instance 'Ad Foo Bar'
'content_piece': 'c_p', # The actual content piece. For instance the path to an image, video, audio, any text
'content_target': 'c_t', # The target of the content. For instance the URL of a landing page
'content_interaction': 'c_i', # The name of the interaction with the content. For instance a 'click'

# Other parameters (require authentication via token_auth)
'token_auth': 'token_auth', # 32 character authorization key used to authenticate the API request.
'client_ip': 'cip', # Override value for the visitor IP (both IPv4 and IPv6 notations supported).
'client_dt': 'cdt', # Override for the datetime of the request (normally the current time is used).
'country': 'country', # An override value for the country. Should be set to the two letter country code of the visitor (lowercase), eg fr, de, us.
'region': 'region', # An override value for the region. Should be set to the two letter region code as defined by MaxMind's GeoIP databases.
'city': 'city', # An override value for the city. The name of the city the visitor is located in, eg, Tokyo.
'lat': 'lat', # An override value for the visitor's latitude, eg 22.456.
'long': 'long', # An override value for the visitor's longitude, eg 22.456.

'track_bots': 'bots', # Set to true to track bots

'heartbeat_timer': None, # Set to a positive integer to enable the heartbeat timer
}

AUTH_RESTRICTED_PARAMS = ('token_auth', 'client_ip', 'client_dt', 'country', 'region', 'city', 'lat', 'long')

class PiwikTracker(object):
API_VERSION = 1

def __init__(self, piwik_url, site_id, request=None, values=None, **kwargs):
super(PiwikTracker, self).__init__()
self.piwik_url = piwik_url
self.idsite = site_id

# initialize all tracking variables on this instance
values = values or {}
values.update(kwargs)
self.update(dict((p, values.get(p, None)) for p in PARAMETERS.keys()))

self.visit_custom_vars = {}
self.page_custom_vars = {}

self.spoof_request = True

# defaults for the requests module
self.request_headers = {}
self.requests_arguments = {
'timeout': 3
}

# default filenames for the tracker file and the js file
self.piwik_php_file = 'piwik.php'
self.piwik_js_file = 'piwik.js'

self.update_from_request(request)

def update(self, values):
for property_name in PARAMETERS.keys():
if property_name in values:
setattr(self, property_name, values[property_name])

@property
def php_url(self):
return urljoin(self.piwik_url, self.piwik_php_file)

@property
def js_url(self):
return urljoin(self.piwik_url, self.piwik_js_file)

def update_from_request(self, request):
"""
Initializes the current tracker instance from a Django-like requests object.
If the request argument is None or does not have a dict as the META attribute, this function does nothing.
"""
if not request:
return

meta = getattr(request, 'META', {})

if not isinstance(meta, dict):
return

self.user_agent = meta.get('HTTP_USER_AGENT', None)
self.referer = meta.get('HTTP_REFERER', None)
self.lang = meta.get('HTTP_ACCEPT_LANGUAGE', None)

if hasattr(request, 'build_absolute_uri'):
bau = request.build_absolute_uri
if callable(bau):
self.url = bau()

def _get_client_ip():
if 'HTTP_X_FORWARDED_FOR' in meta:
return meta['HTTP_X_FORWARDED_FOR'].split(",")[0]
else:
return meta.get('REMOTE_ADDR', None)

self.client_ip = _get_client_ip()

def _build_cvars(self, value):
"""
Converts a custom vars dictionary to it's JSON representation usable for the Piwik API.
"""
if not value:
return None

d = {}

for i, item in enumerate(value.items(), start=1):
d[i] = list(item)

return json.dumps(d)

def _build_parameters(self, **kwargs):
d = {
'idsite': self.idsite,
'rec': '1',
'apiv': PiwikTracker.API_VERSION,
}

for property_name, parameter_name in PARAMETERS.items():
if not parameter_name:
continue

value = kwargs.get(property_name, None) or getattr(self, property_name, None)

token_auth = kwargs.get('token_auth', None) or getattr(self, 'token_auth', None)
if value and property_name in AUTH_RESTRICTED_PARAMS and not token_auth:
logging.info("Skipping %s because token_auth not set" % property_name)
continue

if value is None:
continue

if isinstance(value, bool):
value = 1 if value else 0
elif isinstance(value, datetime.datetime):
if not value.tzinfo:
logging.warning("Passing a naive datetime may result in wrong data. Make sure you pass a datetime object with UTC timezone")
value = value.strftime('%Y-%m-%d %H:%M:%S')

if property_name in ('page_custom_vars', 'visit_custom_vars'):
value = self._build_cvars(value)
if not value:
continue

d[parameter_name] = value

return d

def build_request_headers(self, params):
headers = {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate',
}
headers.update(self.request_headers)

if self.spoof_request:
# this is only used for server-to-server calls. By putting the values into the HTTP headers
# and dropping them from the payload we will transfer less to the server while carrying the same information.
for p, h in (('ua', 'User-Agent'), ('lang', 'Accept-Language'), ('urlref', 'Referer')):
if p in params:
headers[h] = params[p]
del params[p]

return headers

def track_page_view(self, **kwargs):
params = self._build_parameters(**kwargs)
if '_id' not in params:
params['_id'] = md5(os.urandom(16)).hexdigest()[:15]
if '_idts' not in params:
params['_idts'] = int(time.time())

headers = self.build_request_headers(params)
logging.debug("Tracking variables: %s" % params)
logging.debug("Tracking headers: %s" % headers)
try:
response = requests.post(self.php_url, data=params, headers=headers, **self.requests_arguments)
logging.debug("Tracking response: %s" % response)
except:
logging.exception("Tracking request failed")

def tracking_code(self, **kwargs):
return TrackingCodeBuilder(self).render(self._build_parameters(**kwargs))


class TrackingCodeBuilder(object):
template = """<script type="text/javascript">
var _paq = _paq || [];
{custom_vars}
{event_tracking}
{js_vars}
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {{
_paq.push(['setTrackerUrl', '{tracker_url}']);
_paq.push(['setSiteId', {idsite}]);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.type='text/javascript'; g.async=true; g.defer=true; g.src='{javascript_url}'; s.parentNode.insertBefore(g,s);
}})();
</script>
<noscript><p><img src="{tracker_url}?{tracking_args}" style="border:0;" alt="" /></p></noscript>"""

def __init__(self, tracker):
self.tracker = tracker

def _paq_push(self, l):
# python3's filter() does not return a list
return "_paq.push(%s);" % json.dumps(list(l))

def _event_tracker(self):
if not (self.tracker.event_category and self.tracker.event_action):
return ""
l = ['trackEvent', self.tracker.event_category, self.tracker.event_action, self.tracker.event_name, self.tracker.event_value]
return self._paq_push(filter(lambda x: x, l))

def _custom_vars(self):
def _inner():
for d, scope in (self.tracker.page_custom_vars, 'page'), (self.tracker.visit_custom_vars, 'visit'):
for i, item in enumerate(d.items(), start=1):
if i > 5:
break
k, v = item
l = ['setCustomVariable', i, k, v, scope]
yield self._paq_push(l)
return '\n'.join(_inner())

def _common_vars(self, params):
def _inner():
extra_tracking_params = {}
if 'url' in params:
yield self._paq_push(['setCustomUrl', params['url']])
if 'urlref' in params:
yield self._paq_push(['setReferrerUrl', params['urlref']])
if 'action_name' in params:
yield self._paq_push(['setDocumentTitle', params['action_name']])
if 'new_visit' in params and params['new_visit']: # http://piwik.org/faq/how-to/#faq_187
extra_tracking_params['new_visit'] = 1
yield self._paq_push(["deleteCookies"])
if self.tracker.heartbeat_timer and int(self.tracker.heartbeat_timer) > 0:
yield self._paq_push(['enableHeartBeatTimer', self.tracker.heartbeat_timer])
if 'bots' in params and params['bots']:
extra_tracking_params['bots'] = 1

if extra_tracking_params:
yield self._paq_push(['appendToTrackingUrl', urlencode(extra_tracking_params)])

return '\n'.join(_inner())

def render(self, params):
# remove all tracking variables which doesn't do any good when used with the image or javascript
# tracking api
for x in ['url', 'ua', 'lang'] + [PARAMETERS[x] for x in AUTH_RESTRICTED_PARAMS]:
if x in params:
del params[x]

return TrackingCodeBuilder.template.format(tracker_url=self.tracker.php_url,
javascript_url=self.tracker.js_url,
idsite=self.tracker.idsite,
tracking_args=urlencode(params),
event_tracking=self._event_tracker(),
custom_vars=self._custom_vars(),
js_vars=self._common_vars(params),
)

二進制
pypiwik/tracker.pyo 查看文件


+ 15
- 13
setup.py 查看文件

@@ -3,18 +3,20 @@

from setuptools import setup, find_packages

from pypiwik import VERSION

setup(
name='python-piwik', # The name is 'pypiwik' but it's taken on PyPI by an abandoned project
version='0.1.5.1',
description='Python implementation of the Piwik HTTP API',
# long_description=open('README.md').read(),
author='Johann Schmitz',
author_email='johann@j-schmitz.net',
url='https://ercpe.de/projects/pypiwik',
download_url='https://code.not-your-server.de/pypiwik.git/tags/',
packages=find_packages('src'),
package_dir={'': 'src'},
include_package_data=True,
zip_safe=False,
license='GPL-3',
name='python-piwik', # The name is 'pypiwik' but it's taken on PyPI by an abandoned project
version=VERSION,
description='Python implementation of the Piwik HTTP API',
# long_description=open('README.md').read(),
author='Johann Schmitz',
author_email='johann@j-schmitz.net',
url='https://ercpe.de/projects/pypiwik',
download_url='https://code.not-your-server.de/pypiwik.git/tags/',
packages=find_packages('src'),
package_dir={'': 'src'},
include_package_data=True,
zip_safe=False,
license='GPL-3',
)

+ 0
- 28
src/pypiwik/decorators.py 查看文件

@@ -1,28 +0,0 @@
# -*- coding: utf-8 -*-
from pypiwik.tracker import PiwikTracker

ON_SUCCESS = 1
ON_ERROR = 2
ALWAYS = 3

def track_page_view(tracker=None, piwik_url=None, site_id=None, when=ON_SUCCESS, **tracker_kwargs):
if not ((piwik_url and site_id) or tracker):
raise ValueError("Either 'tracker' or 'piwik_url' and 'site_id' must be set")

def decorator_wrapper(func):
def inner(*args, **kwargs):
pt = tracker or PiwikTracker(piwik_url, site_id, **tracker_kwargs)

try:
ret = func(*args, **kwargs)

if when in (ON_SUCCESS, ALWAYS):
pt.track_page_view(**kwargs)

return ret
except:
if when in (ON_ERROR, ALWAYS):
pt.track_page_view(**kwargs)
raise
return inner
return decorator_wrapper

+ 0
- 329
src/pypiwik/tracker.py 查看文件

@@ -1,329 +0,0 @@
# -*- coding: utf-8 -*-
from hashlib import md5
import json
import logging
import os
import time
import datetime
import requests

try:
from urllib.parse import urljoin, urlencode
except ImportError:
from urlparse import urljoin
from urllib import urlencode

PARAMETERS = {
# required parameters
'url': 'url', # The full URL for the current action.

# recommended parameters
'action_name': 'action_name', # The title of the action being tracked.
'referer': 'urlref', # The full HTTP Referrer URL.
'visit_custom_vars': '_cvar', # Visit scope custom variables.
'visit_count': '_idvc', # The current count of visits for this visitor.
'view_timestamp': '_viewts', # The UNIX timestamp of this visitor's previous visit.
'first_visit_timestamp': '_idts', # The UNIX timestamp of this visitor's first visit.
'campaign_name': '_rcn', # The Campaign name (see Tracking Campaigns).
'campaign_keywords': '_rck', # The Campaign Keyword (see Tracking Campaigns).
'resolution': 'res', # The resolution of the device the visitor is using, eg 1280x1024.
'hour': 'h', # The current hour (local time).
'minute': 'm', # The current minute (local time).
'second': 's', # The current second (local time).
'flash': 'fla', # Flash,
'java': 'java', # Java
'director': 'dir', # Director,
'quicktime': 'qt', # Quicktime,
'real_player': 'realp', # Real Player,
'pdf': 'pdf', # PDF
'wma': 'wma', # Windows Media
'gears': 'gears', # Gears
'silverlight': 'ag', # Silverlight
'cookie': 'cookie', # when set to 1, the visitor's client is known to support cookies.
'user_agent': 'ua', # An override value for the User-Agent HTTP header field.
'lang': 'lang', # An override value for the Accept-Language HTTP header field. This value is used to detect the visitor's country if GeoIP is not enabled.
'user_id': 'uid', # defines the User ID for this request. User ID is any non empty unique string identifying the user (such as an email address or a username).
'visitor_id': 'cid', # defines the visitor ID for this request.
'new_visit': 'new_visit', # If set to 1, will force a new visit to be created for this action. This feature is also available in Javascript.

# 'Optional Action info (measure Page view, Outlink, Download, Site search)',
'page_custom_vars': 'cvar', # Page scope custom variables.
'link': 'link', # An external URL the user has opened. Used for tracking outlink clicks. We recommend to also set the url parameter to this same value.
'download': 'download', # URL of a file the user has downloaded. Used for tracking downloads. We recommend to also set the url parameter to this same value.
'search_keyword': 'search', # The Site Search keyword. When specified, the request will not be tracked as a normal pageview but will instead be tracked as a Site Search request.
'search_category': 'search_cat', # when search is specified, you can optionally specify a search category with this parameter.
'search_count': 'search_count', # when search is specified, we also recommend to set the search_count to the number of search results displayed on the results page.

'goal_id': 'idgoal', # If specified, the tracking request will trigger a conversion for the goal of the website being tracked with this ID.
'revenue': 'revenue', # A monetary value that was generated as revenue by this goal conversion. Only used if idgoal is specified in the request.
'gt_ms': 'gt_ms', # The amount of time it took the server to generate this action, in milliseconds.
'charset': 'cs', # The charset of the page being tracked. Specify the charset if the data you send to Piwik is encoded in a different character set than the default utf-8.

# Optional Event Tracking info
'event_category': 'e_c', # The event category. Must not be empty. (eg. Videos, Music, Games...)
'event_action': 'e_a', # The event action. Must not be empty. (eg. Play, Pause, Duration, Add Playlist, Downloaded, Clicked...)
'event_name': 'e_n', # The event name. (eg. a Movie name, or Song name, or File name...)
'event_value': 'e_v', # The event value. Must be a float or integer value (numeric), not a string.

# Optional Content Tracking info
'content_name': 'c_n', # The name of the content. For instance 'Ad Foo Bar'
'content_piece': 'c_p', # The actual content piece. For instance the path to an image, video, audio, any text
'content_target': 'c_t', # The target of the content. For instance the URL of a landing page
'content_interaction': 'c_i', # The name of the interaction with the content. For instance a 'click'

# Other parameters (require authentication via token_auth)
'token_auth': 'token_auth', # 32 character authorization key used to authenticate the API request.
'client_ip': 'cip', # Override value for the visitor IP (both IPv4 and IPv6 notations supported).
'client_dt': 'cdt', # Override for the datetime of the request (normally the current time is used).
'country': 'country', # An override value for the country. Should be set to the two letter country code of the visitor (lowercase), eg fr, de, us.
'region': 'region', # An override value for the region. Should be set to the two letter region code as defined by MaxMind's GeoIP databases.
'city': 'city', # An override value for the city. The name of the city the visitor is located in, eg, Tokyo.
'lat': 'lat', # An override value for the visitor's latitude, eg 22.456.
'long': 'long', # An override value for the visitor's longitude, eg 22.456.

'track_bots': 'bots', # Set to true to track bots

'heartbeat_timer': None, # Set to a positive integer to enable the heartbeat timer
}

AUTH_RESTRICTED_PARAMS = ('token_auth', 'client_ip', 'client_dt', 'country', 'region', 'city', 'lat', 'long')

class PiwikTracker(object):
API_VERSION = 1

def __init__(self, piwik_url, site_id, request=None, values=None, **kwargs):
super(PiwikTracker, self).__init__()
self.piwik_url = piwik_url
self.idsite = site_id

# initialize all tracking variables on this instance
values = values or {}
values.update(kwargs)
self.update(dict((p, values.get(p, None)) for p in PARAMETERS.keys()))

self.visit_custom_vars = {}
self.page_custom_vars = {}

self.spoof_request = True

# defaults for the requests module
self.request_headers = {}
self.requests_arguments = {
'timeout': 3
}

# default filenames for the tracker file and the js file
self.piwik_php_file = 'piwik.php'
self.piwik_js_file = 'piwik.js'

self.update_from_request(request)

def update(self, values):
for property_name in PARAMETERS.keys():
if property_name in values:
setattr(self, property_name, values[property_name])

@property
def php_url(self):
return urljoin(self.piwik_url, self.piwik_php_file)

@property
def js_url(self):
return urljoin(self.piwik_url, self.piwik_js_file)

def update_from_request(self, request):
"""
Initializes the current tracker instance from a Django-like requests object.
If the request argument is None or does not have a dict as the META attribute, this function does nothing.
"""
if not request:
return

meta = getattr(request, 'META', {})

if not isinstance(meta, dict):
return

self.user_agent = meta.get('HTTP_USER_AGENT', None)
self.referer = meta.get('HTTP_REFERER', None)
self.lang = meta.get('HTTP_ACCEPT_LANGUAGE', None)

if hasattr(request, 'build_absolute_uri'):
bau = request.build_absolute_uri
if callable(bau):
self.url = bau()

def _get_client_ip():
if 'HTTP_X_FORWARDED_FOR' in meta:
return meta['HTTP_X_FORWARDED_FOR'].split(",")[0]
else:
return meta.get('REMOTE_ADDR', None)

self.client_ip = _get_client_ip()

def _build_cvars(self, value):
"""
Converts a custom vars dictionary to it's JSON representation usable for the Piwik API.
"""
if not value:
return None

d = {}

for i, item in enumerate(value.items(), start=1):
d[i] = list(item)

return json.dumps(d)

def _build_parameters(self, **kwargs):
d = {
'idsite': self.idsite,
'rec': '1',
'apiv': PiwikTracker.API_VERSION,
}

for property_name, parameter_name in PARAMETERS.items():
if not parameter_name:
continue

value = kwargs.get(property_name, None) or getattr(self, property_name, None)

token_auth = kwargs.get('token_auth', None) or getattr(self, 'token_auth', None)
if value and property_name in AUTH_RESTRICTED_PARAMS and not token_auth:
logging.info("Skipping %s because token_auth not set" % property_name)
continue

if value is None:
continue

if isinstance(value, bool):
value = 1 if value else 0
elif isinstance(value, datetime.datetime):
if not value.tzinfo:
logging.warning("Passing a naive datetime may result in wrong data. Make sure you pass a datetime object with UTC timezone")
value = value.strftime('%Y-%m-%d %H:%M:%S')

if property_name in ('page_custom_vars', 'visit_custom_vars'):
value = self._build_cvars(value)
if not value:
continue

d[parameter_name] = value

return d

def build_request_headers(self, params):
headers = {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate',
}
headers.update(self.request_headers)

if self.spoof_request:
# this is only used for server-to-server calls. By putting the values into the HTTP headers
# and dropping them from the payload we will transfer less to the server while carrying the same information.
for p, h in (('ua', 'User-Agent'), ('lang', 'Accept-Language'), ('urlref', 'Referer')):
if p in params:
headers[h] = params[p]
del params[p]

return headers

def track_page_view(self, **kwargs):
params = self._build_parameters(**kwargs)
if '_id' not in params:
params['_id'] = md5(os.urandom(16)).hexdigest()[:15]
if '_idts' not in params:
params['_idts'] = int(time.time())

headers = self.build_request_headers(params)
logging.debug("Tracking variables: %s" % params)
logging.debug("Tracking headers: %s" % headers)
try:
response = requests.post(self.php_url, data=params, headers=headers, **self.requests_arguments)
logging.debug("Tracking response: %s" % response)
except:
logging.exception("Tracking request failed")

def tracking_code(self, **kwargs):
return TrackingCodeBuilder(self).render(self._build_parameters(**kwargs))


class TrackingCodeBuilder(object):
template = """<script type="text/javascript">
var _paq = _paq || [];
{custom_vars}
{event_tracking}
{js_vars}
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {{
_paq.push(['setTrackerUrl', '{tracker_url}']);
_paq.push(['setSiteId', {idsite}]);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.type='text/javascript'; g.async=true; g.defer=true; g.src='{javascript_url}'; s.parentNode.insertBefore(g,s);
}})();
</script>
<noscript><p><img src="{tracker_url}?{tracking_args}" style="border:0;" alt="" /></p></noscript>"""

def __init__(self, tracker):
self.tracker = tracker

def _paq_push(self, l):
# python3's filter() does not return a list
return "_paq.push(%s);" % json.dumps(list(l))

def _event_tracker(self):
if not (self.tracker.event_category and self.tracker.event_action):
return ""
l = ['trackEvent', self.tracker.event_category, self.tracker.event_action, self.tracker.event_name, self.tracker.event_value]
return self._paq_push(filter(lambda x: x, l))

def _custom_vars(self):
def _inner():
for d, scope in (self.tracker.page_custom_vars, 'page'), (self.tracker.visit_custom_vars, 'visit'):
for i, item in enumerate(d.items(), start=1):
if i > 5:
break
k, v = item
l = ['setCustomVariable', i, k, v, scope]
yield self._paq_push(l)
return '\n'.join(_inner())

def _common_vars(self, params):
def _inner():
extra_tracking_params = {}
if 'url' in params:
yield self._paq_push(['setCustomUrl', params['url']])
if 'urlref' in params:
yield self._paq_push(['setReferrerUrl', params['urlref']])
if 'action_name' in params:
yield self._paq_push(['setDocumentTitle', params['action_name']])
if 'new_visit' in params and params['new_visit']: # http://piwik.org/faq/how-to/#faq_187
extra_tracking_params['new_visit'] = 1
yield self._paq_push(["deleteCookies"])
if self.tracker.heartbeat_timer and int(self.tracker.heartbeat_timer) > 0:
yield self._paq_push(['enableHeartBeatTimer', self.tracker.heartbeat_timer])
if 'bots' in params and params['bots']:
extra_tracking_params['bots'] = 1

if extra_tracking_params:
yield self._paq_push(['appendToTrackingUrl', urlencode(extra_tracking_params)])

return '\n'.join(_inner())

def render(self, params):
# remove all tracking variables which doesn't do any good when used with the image or javascript
# tracking api
for x in ['url', 'ua', 'lang'] + [PARAMETERS[x] for x in AUTH_RESTRICTED_PARAMS]:
if x in params:
del params[x]

return TrackingCodeBuilder.template.format(tracker_url=self.tracker.php_url,
javascript_url=self.tracker.js_url,
idsite=self.tracker.idsite,
tracking_args=urlencode(params),
event_tracking=self._event_tracker(),
custom_vars=self._custom_vars(),
js_vars=self._common_vars(params),
)

+ 107
- 107
tests/builder.py 查看文件

@@ -4,11 +4,11 @@ import difflib
import unittest

try:
# py3
from urllib.parse import urlparse, parse_qs, urljoin, urlunparse, urlencode
# py3
from urllib.parse import urlparse, parse_qs, urljoin, urlunparse, urlencode
except ImportError:
from urlparse import urljoin, urlparse, parse_qs, urlunparse
from urllib import urlencode
from urlparse import urljoin, urlparse, parse_qs, urlunparse
from urllib import urlencode

from pypiwik.tracker import TrackingCodeBuilder, PiwikTracker
import re
@@ -17,48 +17,48 @@ img_src_re = re.compile('img src="(.*?)"', re.IGNORECASE)

class TrackingCodeBuilderTest(unittest.TestCase):

def assertTrackingCodeEquals(self, first, second):
def assertTrackingCodeEquals(self, first, second):

def _clean(s):
m = img_src_re.search(s)
assert m and len(m.groups()) == 1, "String does not have an image tracker code"
def _clean(s):
m = img_src_re.search(s)
assert m and len(m.groups()) == 1, "String does not have an image tracker code"

url = m.group(1)
urlparts = urlparse(url)
query = parse_qs(urlparts.query)
d = OrderedDict(sorted(((k, v[0]) for k, v in query.items()), key=lambda x: x[0]))
new_url = urlunparse((urlparts.scheme, urlparts.netloc, urlparts.path, urlparts.params, urlencode(d), urlparts.fragment))
url = m.group(1)
urlparts = urlparse(url)
query = parse_qs(urlparts.query)
d = OrderedDict(sorted(((k, v[0]) for k, v in query.items()), key=lambda x: x[0]))
new_url = urlunparse((urlparts.scheme, urlparts.netloc, urlparts.path, urlparts.params, urlencode(d), urlparts.fragment))

s2 = img_src_re.sub(new_url, s)
return [x.strip(' ') for x in s2.splitlines(True) if x.strip(' ') if x.strip(' ') != '\n']
s2 = img_src_re.sub(new_url, s)
return [x.strip(' ') for x in s2.splitlines(True) if x.strip(' ') if x.strip(' ') != '\n']

t1_lines = _clean(first)
t2_lines = _clean(second)
t1_lines = _clean(first)
t2_lines = _clean(second)

assert difflib.SequenceMatcher(None, t1_lines, t2_lines).ratio() == 1.0, \
"Tracking codes do not match: %s" % ''.join(difflib.ndiff(t1_lines, t2_lines))
assert difflib.SequenceMatcher(None, t1_lines, t2_lines).ratio() == 1.0, \
"Tracking codes do not match: %s" % ''.join(difflib.ndiff(t1_lines, t2_lines))

def assertImageTrackingVars(self, tracking_code, expected_vars):
m = img_src_re.search(tracking_code)
assert m and len(m.groups()) == 1, "String does not have an image tracker code"
def assertImageTrackingVars(self, tracking_code, expected_vars):
m = img_src_re.search(tracking_code)
assert m and len(m.groups()) == 1, "String does not have an image tracker code"

url = m.group(1)
urlparts = urlparse(url)
query = parse_qs(urlparts.query)
url = m.group(1)
urlparts = urlparse(url)
query = parse_qs(urlparts.query)

for k, v in expected_vars.items():
assert k in query, "Image tracker query string does not contain '%s'" % k
assert query[k] == [v], \
"Image tracker variable %s does not match expected value '%s'. It's '%s'" % (k, v, query[k])
for k, v in expected_vars.items():
assert k in query, "Image tracker query string does not contain '%s'" % k
assert query[k] == [v], \
"Image tracker variable %s does not match expected value '%s'. It's '%s'" % (k, v, query[k])


def test_simple_tracking_code(self):
tracker = PiwikTracker('http://localhost', 1)
builder = TrackingCodeBuilder(tracker)
def test_simple_tracking_code(self):
tracker = PiwikTracker('http://localhost', 1)
builder = TrackingCodeBuilder(tracker)

s = builder.render(tracker._build_parameters())
s = builder.render(tracker._build_parameters())

self.assertTrackingCodeEquals(s, """<script type="text/javascript">
self.assertTrackingCodeEquals(s, """<script type="text/javascript">
var _paq = _paq || [];
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
@@ -71,15 +71,15 @@ _paq.push(['enableLinkTracking']);
</script>
<noscript><p><img src="http://localhost/piwik.php?idsite=1&rec=1&apiv=1" style="border:0;" alt="" /></p></noscript>""")

def test_event_tracking_code(self):
tracker = PiwikTracker('http://localhost', 1)
tracker.event_category = "event_category"
tracker.event_action = "event_action"
builder = TrackingCodeBuilder(tracker)
def test_event_tracking_code(self):
tracker = PiwikTracker('http://localhost', 1)
tracker.event_category = "event_category"
tracker.event_action = "event_action"
builder = TrackingCodeBuilder(tracker)

s = builder.render(tracker._build_parameters())
s = builder.render(tracker._build_parameters())

self.assertTrackingCodeEquals(s, """<script type="text/javascript">
self.assertTrackingCodeEquals(s, """<script type="text/javascript">
var _paq = _paq || [];
_paq.push(["trackEvent", "event_category", "event_action"]);
_paq.push(['trackPageView']);
@@ -93,69 +93,69 @@ _paq.push(['enableLinkTracking']);
</script>
<noscript><p><img src="http://localhost/piwik.php?idsite=1&rec=1&apiv=1&e_a=event_action&e_c=event_category" style="border:0;" alt="" /></p></noscript>""")

def test_method_call_generation(self):
# test new_visit=True
tracker = PiwikTracker('http://localhost', 1)
tracker.new_visit = True
builder = TrackingCodeBuilder(tracker)
s = builder.render(tracker._build_parameters())
assert '_paq.push(["deleteCookies"]);' in s
assert '_paq.push(["appendToTrackingUrl", "new_visit=1"]);' in s
self.assertImageTrackingVars(s, {
'new_visit': '1'
})
# test track_bots=True
tracker = PiwikTracker('http://localhost', 1)
tracker.track_bots = True
builder = TrackingCodeBuilder(tracker)
s = builder.render(tracker._build_parameters())
assert '_paq.push(["appendToTrackingUrl", "bots=1"]);' in s
self.assertImageTrackingVars(s, {
'bots': '1'
})
# test action_name
tracker = PiwikTracker('http://localhost', 1)
tracker.action_name = "Foo / Bar"
builder = TrackingCodeBuilder(tracker)
s = builder.render(tracker._build_parameters())
assert '_paq.push(["setDocumentTitle", "Foo / Bar"]);' in s
self.assertImageTrackingVars(s, {
'action_name': 'Foo / Bar'
})
# test action_name
tracker = PiwikTracker('http://localhost', 1)
tracker.referer = 'http://foobar.local'
builder = TrackingCodeBuilder(tracker)
s = builder.render(tracker._build_parameters())
assert '_paq.push(["setReferrerUrl", "http://foobar.local"]);' in s
self.assertImageTrackingVars(s, {
'urlref': 'http://foobar.local'
})
def test_heartbeat_timer(self):
tracker = PiwikTracker('http://localhost', 1)
tracker.heartbeat_timer = None
builder = TrackingCodeBuilder(tracker)
s = builder.render(tracker._build_parameters())
assert 'enableHeartBeatTimer' not in s, "Heartbeat timer enabled but set to None"
tracker = PiwikTracker('http://localhost', 1)
tracker.heartbeat_timer = 0
builder = TrackingCodeBuilder(tracker)
s = builder.render(tracker._build_parameters())
assert 'enableHeartBeatTimer' not in s, "Heartbeat timer enabled but set to 0"
tracker = PiwikTracker('http://localhost', 1)
tracker.heartbeat_timer = 10
builder = TrackingCodeBuilder(tracker)
s = builder.render(tracker._build_parameters())
assert '_paq.push(["enableHeartBeatTimer", 10]);' in s, "heartbeat_timer set but not enabled in tracking code"
def test_method_call_generation(self):
# test new_visit=True
tracker = PiwikTracker('http://localhost', 1)
tracker.new_visit = True
builder = TrackingCodeBuilder(tracker)
s = builder.render(tracker._build_parameters())
assert '_paq.push(["deleteCookies"]);' in s
assert '_paq.push(["appendToTrackingUrl", "new_visit=1"]);' in s
self.assertImageTrackingVars(s, {
'new_visit': '1'
})
# test track_bots=True
tracker = PiwikTracker('http://localhost', 1)
tracker.track_bots = True
builder = TrackingCodeBuilder(tracker)
s = builder.render(tracker._build_parameters())
assert '_paq.push(["appendToTrackingUrl", "bots=1"]);' in s
self.assertImageTrackingVars(s, {
'bots': '1'
})
# test action_name
tracker = PiwikTracker('http://localhost', 1)
tracker.action_name = "Foo / Bar"
builder = TrackingCodeBuilder(tracker)
s = builder.render(tracker._build_parameters())
assert '_paq.push(["setDocumentTitle", "Foo / Bar"]);' in s
self.assertImageTrackingVars(s, {
'action_name': 'Foo / Bar'
})
# test action_name
tracker = PiwikTracker('http://localhost', 1)
tracker.referer = 'http://foobar.local'
builder = TrackingCodeBuilder(tracker)
s = builder.render(tracker._build_parameters())
assert '_paq.push(["setReferrerUrl", "http://foobar.local"]);' in s
self.assertImageTrackingVars(s, {
'urlref': 'http://foobar.local'
})
def test_heartbeat_timer(self):
tracker = PiwikTracker('http://localhost', 1)
tracker.heartbeat_timer = None
builder = TrackingCodeBuilder(tracker)
s = builder.render(tracker._build_parameters())
assert 'enableHeartBeatTimer' not in s, "Heartbeat timer enabled but set to None"
tracker = PiwikTracker('http://localhost', 1)
tracker.heartbeat_timer = 0
builder = TrackingCodeBuilder(tracker)
s = builder.render(tracker._build_parameters())
assert 'enableHeartBeatTimer' not in s, "Heartbeat timer enabled but set to 0"
tracker = PiwikTracker('http://localhost', 1)
tracker.heartbeat_timer = 10
builder = TrackingCodeBuilder(tracker)
s = builder.render(tracker._build_parameters())
assert '_paq.push(["enableHeartBeatTimer", 10]);' in s, "heartbeat_timer set but not enabled in tracking code"

+ 51
- 51
tests/decorators.py 查看文件

@@ -7,74 +7,74 @@ from pypiwik.tracker import PiwikTracker

class DecoratorTest(unittest.TestCase):

def test_args(self):
self.assertRaises(ValueError, track_page_view)
def test_args(self):
self.assertRaises(ValueError, track_page_view)

self.assertRaises(ValueError, track_page_view, piwik_url='test')
self.assertRaises(ValueError, track_page_view, piwik_url='test')

self.assertRaises(ValueError, track_page_view, site_id=1)
self.assertRaises(ValueError, track_page_view, site_id=1)

@unittest.skipIf(sys.version_info < (3,0), "Mock not available in python 2.x")
def test_when_success(self):
from unittest.mock import MagicMock
tracker = PiwikTracker('http://localhost', 1)
tracker.track_page_view = MagicMock()
@unittest.skipIf(sys.version_info < (3,0), "Mock not available in python 2.x")
def test_when_success(self):
from unittest.mock import MagicMock
tracker = PiwikTracker('http://localhost', 1)
tracker.track_page_view = MagicMock()

@track_page_view(tracker=tracker, when=ON_SUCCESS)
def my_func():
pass
my_func()
@track_page_view(tracker=tracker, when=ON_SUCCESS)
def my_func():
pass
my_func()

tracker.track_page_view.assert_called()
tracker.track_page_view.assert_called()

tracker.track_page_view.reset_mock()
tracker.track_page_view.reset_mock()

@track_page_view(tracker=tracker, when=ON_SUCCESS)
def my_func2():
raise Exception("lalala")
@track_page_view(tracker=tracker, when=ON_SUCCESS)
def my_func2():
raise Exception("lalala")

self.assertRaises(Exception, my_func2)
assert not tracker.track_page_view.called, "track_page_view() should not be called due to an raised exception"
self.assertRaises(Exception, my_func2)
assert not tracker.track_page_view.called, "track_page_view() should not be called due to an raised exception"

@unittest.skipIf(sys.version_info < (3,0), "Mock not available in python 2.x")
def test_when_success(self):
from unittest.mock import MagicMock
tracker = PiwikTracker('http://localhost', 1)
tracker.track_page_view = MagicMock()
@unittest.skipIf(sys.version_info < (3,0), "Mock not available in python 2.x")
def test_when_success(self):
from unittest.mock import MagicMock
tracker = PiwikTracker('http://localhost', 1)
tracker.track_page_view = MagicMock()

@track_page_view(tracker=tracker, when=ON_ERROR)
def my_func():
raise Exception("lalala")
@track_page_view(tracker=tracker, when=ON_ERROR)
def my_func():
raise Exception("lalala")

self.assertRaises(Exception, my_func)
tracker.track_page_view.assert_called()
self.assertRaises(Exception, my_func)
tracker.track_page_view.assert_called()

tracker.track_page_view.reset_mock()
tracker.track_page_view.reset_mock()

@track_page_view(tracker=tracker, when=ON_ERROR)
def my_func2():
pass
@track_page_view(tracker=tracker, when=ON_ERROR)
def my_func2():
pass

assert not tracker.track_page_view.called, "track_page_view() should not be called because no exception was raised"
assert not tracker.track_page_view.called, "track_page_view() should not be called because no exception was raised"

@unittest.skipIf(sys.version_info < (3,0), "Mock not available in python 2.x")
def test_when_success(self):
from unittest.mock import MagicMock
tracker = PiwikTracker('http://localhost', 1)
tracker.track_page_view = MagicMock()
@unittest.skipIf(sys.version_info < (3,0), "Mock not available in python 2.x")
def test_when_success(self):
from unittest.mock import MagicMock
tracker = PiwikTracker('http://localhost', 1)
tracker.track_page_view = MagicMock()

@track_page_view(tracker=tracker, when=ALWAYS)
def my_func():
pass
my_func()
@track_page_view(tracker=tracker, when=ALWAYS)
def my_func():
pass
my_func()

tracker.track_page_view.assert_called()
tracker.track_page_view.assert_called()

tracker.track_page_view.reset_mock()
tracker.track_page_view.reset_mock()

@track_page_view(tracker=tracker, when=ALWAYS)
def my_func2():
raise Exception("lalala")
@track_page_view(tracker=tracker, when=ALWAYS)
def my_func2():
raise Exception("lalala")

self.assertRaises(Exception, my_func2)
tracker.track_page_view.assert_called()
self.assertRaises(Exception, my_func2)
tracker.track_page_view.assert_called()

+ 154
- 154
tests/tracker.py 查看文件

@@ -8,167 +8,167 @@ from pypiwik.tracker import PiwikTracker, PARAMETERS, AUTH_RESTRICTED_PARAMS

class TrackerTest(unittest.TestCase):

def test_init(self):
tracker = PiwikTracker('http://localhost', 1)
def test_init(self):
tracker = PiwikTracker('http://localhost', 1)

for property_name in PARAMETERS.keys():
assert hasattr(tracker, property_name), "Missing property on %s: %s" % (tracker, property_name)
for property_name in PARAMETERS.keys():
assert hasattr(tracker, property_name), "Missing property on %s: %s" % (tracker, property_name)

def test_from_request(self):
class FakeRequest(object):
META = {
'HTTP_USER_AGENT': 'ua',
'HTTP_REFERER': 'ref',
'HTTP_ACCEPT_LANGUAGE': 'lang'
}
def test_from_request(self):
class FakeRequest(object):
META = {
'HTTP_USER_AGENT': 'ua',
'HTTP_REFERER': 'ref',
'HTTP_ACCEPT_LANGUAGE': 'lang'
}

tracker = PiwikTracker('http://localhost', 1, FakeRequest())
assert tracker.user_agent == 'ua'
assert tracker.referer == 'ref'
assert tracker.lang == 'lang'
assert tracker.url is None
assert tracker.client_ip is None
tracker = PiwikTracker('http://localhost', 1, FakeRequest())
assert tracker.user_agent == 'ua'
assert tracker.referer == 'ref'
assert tracker.lang == 'lang'
assert tracker.url is None
assert tracker.client_ip is None

def test_from_request_with_url(self):
class FakeRequest(object):
def build_absolute_uri(self):
return "http://foobar.local/test123"
def test_from_request_with_url(self):
class FakeRequest(object):
def build_absolute_uri(self):
return "http://foobar.local/test123"

tracker = PiwikTracker('http://localhost', 1, FakeRequest())
assert tracker.url == "http://foobar.local/test123"
def test_from_request_client_ip(self):
class FakeRequest(object):
META = { 'HTTP_X_FORWARDED_FOR': '127.1.2.3' }
tracker = PiwikTracker('http://localhost', 1, FakeRequest())
assert tracker.client_ip == '127.1.2.3'
def test_from_bad_request(self):
class FakeRequest(object):
pass
tracker = PiwikTracker('http://localhost', 1, FakeRequest())
self.assertDictEqual({'apiv': PiwikTracker.API_VERSION, 'idsite': 1, 'rec': '1'}, tracker._build_parameters())
class FakeRequest2(object):
META = "test"
tracker = PiwikTracker('http://localhost', 1, FakeRequest2())
self.assertDictEqual({'apiv': PiwikTracker.API_VERSION, 'idsite': 1, 'rec': '1'}, tracker._build_parameters())
def test_parameters(self):
site_id = 123
tracker = PiwikTracker('http://localhost', site_id)
for property_name in PARAMETERS.keys():
if property_name in ('page_custom_vars', 'visit_custom_vars'):
continue
setattr(tracker, property_name, 'test')
params = tracker._build_parameters()
assert 'apiv' in params and params['apiv'] == PiwikTracker.API_VERSION, "Wrong or missing API version"
assert 'idsite' in params and params['idsite'] == site_id, 'Wrong or missing site id'
assert 'rec' in params and params['rec'] == '1', 'Wrong or missing "rec" parameter'
for property_name, parameter_name in PARAMETERS.items():
if not parameter_name:
continue
if property_name in ('page_custom_vars', 'visit_custom_vars'):
continue
assert parameter_name in params, "Parameter %s not found in dict" % parameter_name
assert params[parameter_name] == getattr(tracker, property_name)
def test_custom_vars_conversion(self):
tracker = PiwikTracker('http://localhost', 1)
tracker.page_custom_vars['foo'] = 'bar'
params = tracker._build_parameters()
cvars = json.loads(params['cvar'])
self.assertDictEqual(cvars, {
'1': ['foo', 'bar']
})
def test_boolean_parameter_conversion(self):
tracker = PiwikTracker('http://localhost', 1)
tracker.track_bots = True
params = tracker._build_parameters()
assert params['bots'] == 1, "Boolean conversion from True to 1 failed"
def test_datetime_parameter_conversion(self):
tracker = PiwikTracker('http://localhost', 1)
tracker.token_auth = 'foobar'
tracker.client_dt = datetime.datetime(2015, 7, 12, 11, 22, 33, tzinfo=pytz.UTC)
params = tracker._build_parameters()
assert params['cdt'] == '2015-07-12 11:22:33', "DateTime conversion to string failed"
def test_skip_auth_params_on_unauth_request(self):
tracker = PiwikTracker('http://localhost', 1)
for p in AUTH_RESTRICTED_PARAMS:
if p == "token_auth":
continue
setattr(tracker, p, 'test')
params = tracker._build_parameters()
assert not any((k in AUTH_RESTRICTED_PARAMS for k in params.keys())),\
"Tracking variables with authentication required should be filtered out if token_auth is not set"
def test_request_headers_with_spoof(self):
tracker = PiwikTracker('http://localhost', 1)
tracker = PiwikTracker('http://localhost', 1, FakeRequest())
assert tracker.url == "http://foobar.local/test123"
def test_from_request_client_ip(self):
class FakeRequest(object):
META = { 'HTTP_X_FORWARDED_FOR': '127.1.2.3' }
tracker = PiwikTracker('http://localhost', 1, FakeRequest())
assert tracker.client_ip == '127.1.2.3'
def test_from_bad_request(self):
class FakeRequest(object):
pass
tracker = PiwikTracker('http://localhost', 1, FakeRequest())
self.assertDictEqual({'apiv': PiwikTracker.API_VERSION, 'idsite': 1, 'rec': '1'}, tracker._build_parameters())
class FakeRequest2(object):
META = "test"
tracker = PiwikTracker('http://localhost', 1, FakeRequest2())
self.assertDictEqual({'apiv': PiwikTracker.API_VERSION, 'idsite': 1, 'rec': '1'}, tracker._build_parameters())
def test_parameters(self):
site_id = 123
tracker = PiwikTracker('http://localhost', site_id)
for property_name in PARAMETERS.keys():
if property_name in ('page_custom_vars', 'visit_custom_vars'):
continue
setattr(tracker, property_name, 'test')
params = tracker._build_parameters()
assert 'apiv' in params and params['apiv'] == PiwikTracker.API_VERSION, "Wrong or missing API version"
assert 'idsite' in params and params['idsite'] == site_id, 'Wrong or missing site id'
assert 'rec' in params and params['rec'] == '1', 'Wrong or missing "rec" parameter'
for property_name, parameter_name in PARAMETERS.items():
if not parameter_name:
continue
if property_name in ('page_custom_vars', 'visit_custom_vars'):
continue
assert parameter_name in params, "Parameter %s not found in dict" % parameter_name
assert params[parameter_name] == getattr(tracker, property_name)
def test_custom_vars_conversion(self):
tracker = PiwikTracker('http://localhost', 1)
tracker.page_custom_vars['foo'] = 'bar'
params = tracker._build_parameters()
cvars = json.loads(params['cvar'])
self.assertDictEqual(cvars, {
'1': ['foo', 'bar']
})
def test_boolean_parameter_conversion(self):
tracker = PiwikTracker('http://localhost', 1)
tracker.track_bots = True
params = tracker._build_parameters()
assert params['bots'] == 1, "Boolean conversion from True to 1 failed"
def test_datetime_parameter_conversion(self):
tracker = PiwikTracker('http://localhost', 1)
tracker.token_auth = 'foobar'
tracker.client_dt = datetime.datetime(2015, 7, 12, 11, 22, 33, tzinfo=pytz.UTC)
params = tracker._build_parameters()
assert params['cdt'] == '2015-07-12 11:22:33', "DateTime conversion to string failed"
def test_skip_auth_params_on_unauth_request(self):
tracker = PiwikTracker('http://localhost', 1)
for p in AUTH_RESTRICTED_PARAMS:
if p == "token_auth":
continue
setattr(tracker, p, 'test')
params = tracker._build_parameters()
assert not any((k in AUTH_RESTRICTED_PARAMS for k in params.keys())),\
"Tracking variables with authentication required should be filtered out if token_auth is not set"
def test_request_headers_with_spoof(self):
tracker = PiwikTracker('http://localhost', 1)

headers = tracker.build_request_headers({})
self.assertDictEqual(headers, {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate',
})
headers = tracker.build_request_headers({})
self.assertDictEqual(headers, {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate',
})

def test_request_headers_with_spoof_params(self):
tracker = PiwikTracker('http://localhost', 1)
def test_request_headers_with_spoof_params(self):
tracker = PiwikTracker('http://localhost', 1)

headers = tracker.build_request_headers({'ua': 'fake user agent'})
self.assertDictEqual(headers, {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate',
'User-Agent': 'fake user agent'
})
headers = tracker.build_request_headers({'ua': 'fake user agent'})
self.assertDictEqual(headers, {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate',
'User-Agent': 'fake user agent'
})

def test_request_headers_with_spoof_headers(self):
tracker = PiwikTracker('http://localhost', 1)
tracker.request_headers['foo'] = 'bar'
def test_request_headers_with_spoof_headers(self):
tracker = PiwikTracker('http://localhost', 1)
tracker.request_headers['foo'] = 'bar'

headers = tracker.build_request_headers({})
self.assertDictEqual(headers, {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate',
'foo': 'bar'
})
def test_request_headers_without_spoof(self):
tracker = PiwikTracker('http://localhost', 1)
tracker.spoof_request = False
headers = tracker.build_request_headers({'user_agent': 'fake user agent'})
self.assertDictEqual(headers, {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate',
})
def test_tracker_url_builder(self):
tracker = PiwikTracker('http://localhost', 1)
assert tracker.php_url == "http://localhost/piwik.php"
tracker.piwik_php_file = 'test123.php'
assert tracker.php_url == "http://localhost/test123.php"
def test_js_url_builder(self):
tracker = PiwikTracker('http://localhost', 1)
assert tracker.js_url == "http://localhost/piwik.js"
tracker.piwik_js_file = 'test123.js'
assert tracker.js_url == "http://localhost/test123.js"
headers = tracker.build_request_headers({})
self.assertDictEqual(headers, {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate',
'foo': 'bar'
})
def test_request_headers_without_spoof(self):
tracker = PiwikTracker('http://localhost', 1)
tracker.spoof_request = False
headers = tracker.build_request_headers({'user_agent': 'fake user agent'})
self.assertDictEqual(headers, {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate',
})
def test_tracker_url_builder(self):
tracker = PiwikTracker('http://localhost', 1)
assert tracker.php_url == "http://localhost/piwik.php"
tracker.piwik_php_file = 'test123.php'
assert tracker.php_url == "http://localhost/test123.php"
def test_js_url_builder(self):
tracker = PiwikTracker('http://localhost', 1)
assert tracker.js_url == "http://localhost/piwik.js"
tracker.piwik_js_file = 'test123.js'
assert tracker.js_url == "http://localhost/test123.js"

Loading…
取消
儲存