Browse Source

Replaced tabs with spaces

master
Johann Schmitz 5 years ago
parent
commit
5febed60fc
Signed by: ercpe GPG Key ID: A084064277C501ED
  1. 20
      src/gitbrowser/acl/base.py
  2. 188
      src/gitbrowser/acl/gitolite.py
  3. 156
      src/gitbrowser/conf.py
  4. 162
      src/gitbrowser/middlewares.py
  5. 20
      src/gitbrowser/utils/cache.py
  6. 16
      src/gitbrowser/utils/deps.py
  7. 36
      src/gitbrowser/utils/http.py
  8. 42
      src/gitbrowser/utils/linking.py
  9. 148
      src/gitbrowser/utils/lister.py
  10. 14
      src/gitbrowser/utils/misc.py
  11. 178
      src/gitbrowser/utils/perlcrap.py
  12. 42
      src/gitbrowser/utils/rendering.py
  13. 542
      src/gitbrowser/utils/repo.py
  14. 62
      src/gitbrowser/views/aux.py
  15. 38
      src/gitbrowser/views/core.py
  16. 194
      src/gitbrowser/views/misc.py
  17. 94
      src/gitbrowser/views/mixins.py
  18. 254
      src/gitbrowser/views/repository.py
  19. 174
      tests/test_perlcrap.py
  20. 52
      tests/test_utils.py

20
src/gitbrowser/acl/base.py

@ -3,23 +3,23 @@ import logging
from django.core.cache import InvalidCacheBackendError, caches
try:
acl_cache = caches['acl-cache']
acl_cache = caches['acl-cache']
except InvalidCacheBackendError:
acl_cache = caches['default']
acl_cache = caches['default']
class ACL(object):
def can_read(self, user, repo):
raise NotImplementedError
def can_read(self, user, repo):
raise NotImplementedError
class DenyAllACL(ACL):
def can_read(self, user, repo):
logging.info("Access DENIED for %s to %s" % (user, repo))
return False
def can_read(self, user, repo):
logging.info("Access DENIED for %s to %s" % (user, repo))
return False
class AllowAllACL(ACL):
def can_read(self, user, repo):
logging.info("Access GRANTED for %s to %s" % (user, repo))
return True
def can_read(self, user, repo):
logging.info("Access GRANTED for %s to %s" % (user, repo))
return True

188
src/gitbrowser/acl/gitolite.py

@ -9,132 +9,132 @@ from gitbrowser.utils.perlcrap import DataDumperReader
class ACLDebugLogWrapper(object):
def null(self, *args, **kwargs):
pass
def null(self, *args, **kwargs):
pass
def __getattr__(self, item):
if config.acl_debug:
return getattr(orig_logging, item)
else:
return self.null
def __getattr__(self, item):
if config.acl_debug:
return getattr(orig_logging, item)
else:
return self.null
logging = ACLDebugLogWrapper()
class GitoliteACLDefinition(object):
def __init__(self, acl):
self.acl = acl
def __init__(self, acl):
self.acl = acl
@staticmethod
def from_bigconf(bigconf, repo):
repo_acl = {}
@staticmethod
def from_bigconf(bigconf, repo):
repo_acl = {}
# add stuff from big_conf
for regex, enabled in bigconf.get('patterns', {}).get('groups', {}).items():
if not enabled == '1':
logging.info("Got regex in %patterns, with value = %s. Skipping." % enabled)
continue
# add stuff from big_conf
for regex, enabled in bigconf.get('patterns', {}).get('groups', {}).items():
if not enabled == '1':
logging.info("Got regex in %patterns, with value = %s. Skipping." % enabled)
continue
if not re.match(regex, repo.clean_relative_path):
logging.info("Repository path %s does not match %s" % (repo.clean_relative_path, regex))
continue
if not re.match(regex, repo.clean_relative_path):
logging.info("Repository path %s does not match %s" % (repo.clean_relative_path, regex))
continue
for group_name in bigconf.get('groups', {}).get(regex, []):
logging.info("%s belongs to repo group %s" % (repo, group_name))
for group_name in bigconf.get('groups', {}).get(regex, []):
logging.info("%s belongs to repo group %s" % (repo, group_name))
for user_or_group, ug_acl in bigconf.get('repos', {}).get(group_name, {}).items():
l = repo_acl.get(user_or_group, [])
l.extend([tuple(y) for y in ug_acl])
repo_acl[user_or_group] = l
for user_or_group, ug_acl in bigconf.get('repos', {}).get(group_name, {}).items():
l = repo_acl.get(user_or_group, [])
l.extend([tuple(y) for y in ug_acl])
repo_acl[user_or_group] = l
return GitoliteACLDefinition(repo_acl)
return GitoliteACLDefinition(repo_acl)
@staticmethod
def from_splitconf(repo):
repo_acl = {}
split_conf_path = os.path.join(repo.repo_path, 'gl-conf')
split_conf_mtime = os.path.getmtime(split_conf_path)
@staticmethod
def from_splitconf(repo):
repo_acl = {}
split_conf_path = os.path.join(repo.repo_path, 'gl-conf')
split_conf_mtime = os.path.getmtime(split_conf_path)
split_conf_cache_key = '%s-%s' % (repo.repo_path, split_conf_mtime)
repo_conf = acl_cache.get(split_conf_cache_key)
split_conf_cache_key = '%s-%s' % (repo.repo_path, split_conf_mtime)
repo_conf = acl_cache.get(split_conf_cache_key)
if repo_conf:
logging.info("Using cached acl for %s (mtime %s)" % (repo, split_conf_mtime))
else:
logging.info("Reading split config from %s" % split_conf_path)
repo_conf = DataDumperReader().read(split_conf_path)
acl_cache.set(split_conf_cache_key, repo_conf)
if repo_conf:
logging.info("Using cached acl for %s (mtime %s)" % (repo, split_conf_mtime))
else:
logging.info("Reading split config from %s" % split_conf_path)
repo_conf = DataDumperReader().read(split_conf_path)
acl_cache.set(split_conf_cache_key, repo_conf)
if 'one_repo' in repo_conf:
one_repo = repo_conf['one_repo']
assert len(one_repo) == 1, \
"Expected a single repository in one_repo of %s; got %s" % (repo, len(repo_conf))
for user_or_group, ug_acl in one_repo[one_repo.keys()[0]].items():
l = repo_acl.get(user_or_group, [])
l.extend([tuple(y) for y in ug_acl])
repo_acl[user_or_group] = l
if 'one_repo' in repo_conf:
one_repo = repo_conf['one_repo']
assert len(one_repo) == 1, \
"Expected a single repository in one_repo of %s; got %s" % (repo, len(repo_conf))
for user_or_group, ug_acl in one_repo[one_repo.keys()[0]].items():
l = repo_acl.get(user_or_group, [])
l.extend([tuple(y) for y in ug_acl])
repo_acl[user_or_group] = l
return GitoliteACLDefinition(repo_acl)
return GitoliteACLDefinition(repo_acl)
def __add__(self, other):
return GitoliteACLDefinition(dict(self.acl.items() + other.acl.items()))
def __add__(self, other):
return GitoliteACLDefinition(dict(self.acl.items() + other.acl.items()))
def __contains__(self, item):
return item in self.acl
def __contains__(self, item):
return item in self.acl
class GitoliteACL(ACL):
def __init__(self):
self.user_groups = {}
self._bigconf = None
self.big_conf_path = os.path.join(config.gitolite_home, ".gitolite/conf/gitolite.conf-compiled.pm")
def __init__(self):
self.user_groups = {}
self._bigconf = None
self.big_conf_path = os.path.join(config.gitolite_home, ".gitolite/conf/gitolite.conf-compiled.pm")
@property
def bigconf(self):
if not self._bigconf:
big_conf_mtime = os.path.getmtime(self.big_conf_path)
bc_cache_key = "gl-bigconf-%s" % big_conf_mtime
self._bigconf = acl_cache.get(bc_cache_key)
if not self._bigconf:
self._bigconf = DataDumperReader().read(self.big_conf_path)
acl_cache.set(bc_cache_key, self._bigconf)
return self._bigconf
@property
def bigconf(self):
if not self._bigconf:
big_conf_mtime = os.path.getmtime(self.big_conf_path)
bc_cache_key = "gl-bigconf-%s" % big_conf_mtime
self._bigconf = acl_cache.get(bc_cache_key)
if not self._bigconf:
self._bigconf = DataDumperReader().read(self.big_conf_path)
acl_cache.set(bc_cache_key, self._bigconf)
return self._bigconf
def read_gl_config(self, repo):
repo_acl = {}
def read_gl_config(self, repo):
repo_acl = {}
repo_acl = GitoliteACLDefinition.from_bigconf(self.bigconf, repo)
repo_acl = GitoliteACLDefinition.from_bigconf(self.bigconf, repo)
if repo.clean_relative_path in self.bigconf.get('split_conf', {}) and \
self.bigconf.get('split_conf', {}).get(repo.clean_relative_path, '0') == '1':
if repo.clean_relative_path in self.bigconf.get('split_conf', {}) and \
self.bigconf.get('split_conf', {}).get(repo.clean_relative_path, '0') == '1':
repo_acl = repo_acl + GitoliteACLDefinition.from_splitconf(repo)
repo_acl = repo_acl + GitoliteACLDefinition.from_splitconf(repo)
return repo_acl
return repo_acl
def can_read(self, user, repo):
logging.info(">>>>>>> CHECKING permissions for %s <<<<<<<<<" % repo)
effective_acl = self.read_gl_config(repo)
def can_read(self, user, repo):
logging.info(">>>>>>> CHECKING permissions for %s <<<<<<<<<" % repo)
effective_acl = self.read_gl_config(repo)
if user.username in self.user_groups:
user_group_names = self.user_groups[user.username]
else:
user_group_names = ["@%s" % g.name for g in user.groups.all()] + ['@all']
self.user_groups[user.username] = user_group_names
if user.username in self.user_groups:
user_group_names = self.user_groups[user.username]
else:
user_group_names = ["@%s" % g.name for g in user.groups.all()] + ['@all']
self.user_groups[user.username] = user_group_names
logging.info("Groups of %s: %s" % (user, user_group_names))
logging.info("Groups of %s: %s" % (user, user_group_names))
if user.username in effective_acl:
logging.info("Access GRANTED due to one of: %s" % effective_acl[user.username])
return True
if user.username in effective_acl:
logging.info("Access GRANTED due to one of: %s" % effective_acl[user.username])
return True
if any(g in effective_acl for g in user_group_names):
logging.info("Access GRANTED due to one of the groups: %s" % ', '.join(user_group_names))
return True
if any(g in effective_acl for g in user_group_names):
logging.info("Access GRANTED due to one of the groups: %s" % ', '.join(user_group_names))
return True
if user.is_anonymous() and 'gitweb' in effective_acl:
logging.info("Access GRANTED due to anonymous access (via gitweb user)")
return True
if user.is_anonymous() and 'gitweb' in effective_acl:
logging.info("Access GRANTED due to anonymous access (via gitweb user)")
return True
logging.warning("Access DENIED for %s to %s" % (user, repo))
return False
logging.warning("Access DENIED for %s to %s" % (user, repo))
return False

156
src/gitbrowser/conf.py

@ -5,22 +5,22 @@ from django.conf import settings
FEATURE_DEFAULTS = {
'gravatar': True,
'render_readme': True,
'intercept_gitweb_links': True
'gravatar': True,
'render_readme': True,
'intercept_gitweb_links': True
}
CONFIG_DEFAULTS = {
'lister': 'GitoliteProjectsFileRepositoryLister',
'acl': 'AllowAllACL',
'lister': 'GitoliteProjectsFileRepositoryLister',
'acl': 'AllowAllACL',
'display': {
'list_style': 'flat',
},
'display': {
'list_style': 'flat',
},
'features': FEATURE_DEFAULTS,
'features': FEATURE_DEFAULTS,
'clone_url_templates': 'ssh://git@YOUR-SERVER-NAME/%(path)s',
'clone_url_templates': 'ssh://git@YOUR-SERVER-NAME/%(path)s',
}
LIST_STYLE_FLAT = "flat"
@ -35,92 +35,92 @@ COMMIT_LIST_STYLES = (COMMIT_LIST_DEFAULT, COMMIT_LIST_CONDENSED)
class GitbrowserConf(object):
def __init__(self):
self.config_dict = None
def __init__(self):
self.config_dict = None
@property
def gbconf(self):
if not self.config_dict:
self.config_dict = CONFIG_DEFAULTS
self.config_dict.update(getattr(settings, 'GITBROWSER', {}))
return self.config_dict
@property
def gbconf(self):
if not self.config_dict:
self.config_dict = CONFIG_DEFAULTS
self.config_dict.update(getattr(settings, 'GITBROWSER', {}))
return self.config_dict
@property
def lister(self):
from gitbrowser.utils import lister as lister_module
lister_class_name = self.gbconf.get('lister', 'GitoliteProjectsFileRepositoryLister')
lister_class = getattr(lister_module, lister_class_name)
return lister_class(self.acl)
@property
def lister(self):
from gitbrowser.utils import lister as lister_module
lister_class_name = self.gbconf.get('lister', 'GitoliteProjectsFileRepositoryLister')
lister_class = getattr(lister_module, lister_class_name)
return lister_class(self.acl)
@property
def acl(self):
from gitbrowser import acl as acl_module
acl_class = self.gbconf.get('acl', 'DenyAllACL')
return getattr(acl_module, acl_class)()
@property
def acl(self):
from gitbrowser import acl as acl_module
acl_class = self.gbconf.get('acl', 'DenyAllACL')
return getattr(acl_module, acl_class)()
@property
def clone_url_templates(self):
return self.gbconf.get('clone_url_templates', [])
@property
def clone_url_templates(self):
return self.gbconf.get('clone_url_templates', [])
@property
def clone_urls_builder(self):
default = '%(path)s'
value = self.gbconf.get('clone_url_templates', default)
@property
def clone_urls_builder(self):
default = '%(path)s'
value = self.gbconf.get('clone_url_templates', default)
if isinstance(value, basestring):
return lambda repo, user=None: [ value % { 'path': repo.relative_path } ]
if isinstance(value, basestring):
return lambda repo, user=None: [ value % { 'path': repo.relative_path } ]
assert callable(value), "Expected clone_url_templates to be a string or a callable, got %s" % value
return value
assert callable(value), "Expected clone_url_templates to be a string or a callable, got %s" % value
return value
@property
def list_flat(self):
return self.list_style == LIST_STYLE_FLAT
@property
def list_flat(self):
return self.list_style == LIST_STYLE_FLAT
@property
def list_style(self):
cfg_value = self.gbconf.get('display', {}).get('list_style', LIST_STYLE_TREE)
assert cfg_value in LIST_STYLES, 'list_style must be one of %s' % ', '.join(LIST_STYLES)
return cfg_value
@property
def list_style(self):
cfg_value = self.gbconf.get('display', {}).get('list_style', LIST_STYLE_TREE)
assert cfg_value in LIST_STYLES, 'list_style must be one of %s' % ', '.join(LIST_STYLES)
return cfg_value
@property
def commit_list_style(self):
cfg_value = self.gbconf.get('display', {}).get('commit_list_style', COMMIT_LIST_DEFAULT)
assert cfg_value in COMMIT_LIST_STYLES, 'commit_list_style must be one of %s' % ', '.join(COMMIT_LIST_STYLES)
return cfg_value
@property
def commit_list_style(self):
cfg_value = self.gbconf.get('display', {}).get('commit_list_style', COMMIT_LIST_DEFAULT)
assert cfg_value in COMMIT_LIST_STYLES, 'commit_list_style must be one of %s' % ', '.join(COMMIT_LIST_STYLES)
return cfg_value
@property
def acl_debug(self):
return self.gbconf.get('debug', {}).get('acl', False) is True
@property
def acl_debug(self):
return self.gbconf.get('debug', {}).get('acl', False) is True
@property
def gitolite_home(self):
return self.get('GL_HOME', os.path.expanduser('~'))
@property
def gitolite_home(self):
return self.get('GL_HOME', os.path.expanduser('~'))
@property
def allow_anonymous(self):
return self.get('allow_anonymous', True)
@property
def allow_anonymous(self):
return self.get('allow_anonymous', True)
@property
def extra_scripts(self):
x = self.gbconf.get('display', {}).get('extra_scripts', [])
@property
def extra_scripts(self):
x = self.gbconf.get('display', {}).get('extra_scripts', [])
if not isinstance(x, (list, tuple)):
x = (x, )
if not isinstance(x, (list, tuple)):
x = (x, )
return x
return x
@property
def extra_html(self):
return self.gbconf.get('display', {}).get('extra_html', "")
@property
def extra_html(self):
return self.gbconf.get('display', {}).get('extra_html', "")
def feature_enabled(self, feature_name):
features = FEATURE_DEFAULTS
features.update(self.gbconf.get('features', {}))
return features.get(feature_name, False) is True
def feature_enabled(self, feature_name):
features = FEATURE_DEFAULTS
features.update(self.gbconf.get('features', {}))
return features.get(feature_name, False) is True
def get(self, item, default=None):
return self.gbconf.get(item, default)
def get(self, item, default=None):
return self.gbconf.get(item, default)
config = GitbrowserConf()
config = GitbrowserConf()

162
src/gitbrowser/middlewares.py

@ -8,94 +8,94 @@ from gitbrowser.conf import config
class InterceptGitwebMiddleware(object):
"""Django Middleware to intercept gitweb-like URLs and issue a redirect
to real gitbrowser urls"""
def process_request(self, request):
if not config.feature_enabled('intercept_gitweb_links'):
return
action = request.GET.get('a', None)
project = request.GET.get('p', None)
file_or_folder = request.GET.get('f', '')
head_base = request.GET.get('hb', 'master')
commit = request.GET.get('h', None)
if not (project and action):
return
redirect_url = None
if action == "summary":
redirect_url = reverse('gitbrowser:overview', args=(project, ))
elif action == 'tree':
redirect_url = reverse('gitbrowser:browse_ref', args=(project, head_base, file_or_folder))
elif action == 'blob':
redirect_url = reverse('gitbrowser:browse_blob', args=(project, head_base, file_or_folder))
elif action == 'shortlog':
redirect_url = reverse('gitbrowser:commits', args=(project, head_base))
elif action in ('commit', 'commitdiff', ):
redirect_url = reverse('gitbrowser:commit', args=(project, commit))
elif action == 'blob_plain':
redirect_url = reverse('gitbrowser:raw', args=(project, head_base, file_or_folder))
elif action == "history":
redirect_url = reverse('gitbrowser:history', args=(project, head_base, file_or_folder))
elif action == "atom" or action == "rss":
redirect_url = reverse('gitbrowser:feed', args=(project, ))
if redirect_url:
logging.info("Intercepted gitweb url. Redirecting to %s" % redirect_url)
return HttpResponsePermanentRedirect(redirect_url)
logging.warning("Could not find a redirect url for p=%s and a=%s" % (project, action))
raise Http404
"""Django Middleware to intercept gitweb-like URLs and issue a redirect
to real gitbrowser urls"""
def process_request(self, request):
if not config.feature_enabled('intercept_gitweb_links'):
return
action = request.GET.get('a', None)
project = request.GET.get('p', None)
file_or_folder = request.GET.get('f', '')
head_base = request.GET.get('hb', 'master')
commit = request.GET.get('h', None)
if not (project and action):
return
redirect_url = None
if action == "summary":
redirect_url = reverse('gitbrowser:overview', args=(project, ))
elif action == 'tree':
redirect_url = reverse('gitbrowser:browse_ref', args=(project, head_base, file_or_folder))
elif action == 'blob':
redirect_url = reverse('gitbrowser:browse_blob', args=(project, head_base, file_or_folder))
elif action == 'shortlog':
redirect_url = reverse('gitbrowser:commits', args=(project, head_base))
elif action in ('commit', 'commitdiff', ):
redirect_url = reverse('gitbrowser:commit', args=(project, commit))
elif action == 'blob_plain':
redirect_url = reverse('gitbrowser:raw', args=(project, head_base, file_or_folder))
elif action == "history":
redirect_url = reverse('gitbrowser:history', args=(project, head_base, file_or_folder))
elif action == "atom" or action == "rss":
redirect_url = reverse('gitbrowser:feed', args=(project, ))
if redirect_url:
logging.info("Intercepted gitweb url. Redirecting to %s" % redirect_url)
return HttpResponsePermanentRedirect(redirect_url)
logging.warning("Could not find a redirect url for p=%s and a=%s" % (project, action))
raise Http404
class LoginRequiredMiddleware(object):
def process_request(self, request):
def process_request(self, request):
exempt_urls = [re.compile(settings.LOGIN_URL.lstrip('/'))]
if hasattr(settings, 'LOGIN_EXEMPT_URLS'):
exempt_urls += [re.compile(expr) for expr in settings.LOGIN_EXEMPT_URLS]
exempt_urls = [re.compile(settings.LOGIN_URL.lstrip('/'))]
if hasattr(settings, 'LOGIN_EXEMPT_URLS'):
exempt_urls += [re.compile(expr) for expr in settings.LOGIN_EXEMPT_URLS]
path = request.path_info.lstrip('/')
if not any(m.match(path) for m in exempt_urls):
if (not config.allow_anonymous) and request.user.is_anonymous():
return HttpResponseRedirect(settings.LOGIN_URL)
path = request.path_info.lstrip('/')
if not any(m.match(path) for m in exempt_urls):
if (not config.allow_anonymous) and request.user.is_anonymous():
return HttpResponseRedirect(settings.LOGIN_URL)
class ContentSecurityPolicyMiddleware(object):
def process_response(self, request, response):
my_url = request.build_absolute_uri('/')
csp_urls = {
'script-src': [my_url],
'img-src': ["'self'", 'data:', my_url, "*.gravatar.com", "*"],
'style-src': ['self', "'unsafe-inline'", my_url],
'connect-src': [my_url],
'font-src': [my_url],
}
conf_urls = getattr(settings, 'CONTENT_SECURITY_POLICY_URLS', None)
if conf_urls:
if isinstance(conf_urls, (list, tuple)):
# one iterable of strings: add them to each key
for k in csp_urls.keys():
csp_urls[k].extend(conf_urls)
elif isinstance(conf_urls, dict):
# merge dicts
for conf_k in conf_urls.keys():
if conf_k not in csp_urls:
continue
urls = conf_urls[conf_k] if isinstance(conf_urls[conf_k], (list, tuple)) else [conf_urls[conf_k]]
csp_urls[conf_k].extend(urls)
else:
# consider it a string
for k in csp_urls.keys():
csp_urls[k].append(conf_urls)
response['Content-Security-Policy'] = '; '.join(["%s %s" % (k, ' '.join(v)) for k, v in csp_urls.items()])
return response
def process_response(self, request, response):
my_url = request.build_absolute_uri('/')
csp_urls = {
'script-src': [my_url],
'img-src': ["'self'", 'data:', my_url, "*.gravatar.com", "*"],
'style-src': ['self', "'unsafe-inline'", my_url],
'connect-src': [my_url],
'font-src': [my_url],
}
conf_urls = getattr(settings, 'CONTENT_SECURITY_POLICY_URLS', None)
if conf_urls:
if isinstance(conf_urls, (list, tuple)):
# one iterable of strings: add them to each key
for k in csp_urls.keys():
csp_urls[k].extend(conf_urls)
elif isinstance(conf_urls, dict):
# merge dicts
for conf_k in conf_urls.keys():
if conf_k not in csp_urls:
continue
urls = conf_urls[conf_k] if isinstance(conf_urls[conf_k], (list, tuple)) else [conf_urls[conf_k]]
csp_urls[conf_k].extend(urls)
else:
# consider it a string
for k in csp_urls.keys():
csp_urls[k].append(conf_urls)
response['Content-Security-Policy'] = '; '.join(["%s %s" % (k, ' '.join(v)) for k, v in csp_urls.items()])
return response

20
src/gitbrowser/utils/cache.py

@ -5,17 +5,17 @@ from django.core.cache.backends.base import InvalidCacheBackendError
g_keyprefix=''
def gen_cache_key(key, config_prefix, version):
global g_keyprefix
return ':'.join([config_prefix, g_keyprefix, str(version), key])
global g_keyprefix
return ':'.join([config_prefix, g_keyprefix, str(version), key])
def gitbrowser_cache(name, keyprefix=''):
global g_keyprefix
global g_keyprefix
g_keyprefix = keyprefix
g_keyprefix = keyprefix
cache = None
try:
cache = caches[name]
except InvalidCacheBackendError:
cache = caches['default']
return cache
cache = None
try:
cache = caches[name]
except InvalidCacheBackendError:
cache = caches['default']
return cache

16
src/gitbrowser/utils/deps.py

@ -2,17 +2,17 @@
from pkg_resources import parse_requirements
class PkgResourceWrapper(object):
def __init__(self, req):
self.requirement = req
def __init__(self, req):
self.requirement = req
def url(self):
return "https://pypi.python.org/pypi/%s" % self.requirement.project_name
def url(self):
return "https://pypi.python.org/pypi/%s" % self.requirement.project_name
def __str__(self):
return str(self.requirement)
def __str__(self):
return str(self.requirement)
class PythonRequirements(object):
def parse(self, content):
return [PkgResourceWrapper(req) for req in parse_requirements('\n'.join([line for line in content.splitlines() if not line.startswith('-e')]))]
def parse(self, content):
return [PkgResourceWrapper(req) for req in parse_requirements('\n'.join([line for line in content.splitlines() if not line.startswith('-e')]))]

36
src/gitbrowser/utils/http.py

@ -2,26 +2,26 @@
import re
def parse_accept(accept_header):
if not accept_header:
return []
if not accept_header:
return []
l = []
for content_types, q in re.findall("([\w\d/\+,\.\*]+)(?:;q=([\d\.]+),?)?", accept_header):
for ct in (x.strip() for x in content_types.split(',') if x.strip()):
q = q or '1.0'
l.append((ct, float(q)))
return l
l = []
for content_types, q in re.findall("([\w\d/\+,\.\*]+)(?:;q=([\d\.]+),?)?", accept_header):
for ct in (x.strip() for x in content_types.split(',') if x.strip()):
q = q or '1.0'
l.append((ct, float(q)))
return l
def bestof(accept_header, *args):
wanted_content_types = []
for x in args:
if isinstance(x, (list, tuple)):
wanted_content_types += list(x)
else:
wanted_content_types.append(x)
wanted_content_types = []
for x in args:
if isinstance(x, (list, tuple)):
wanted_content_types += list(x)
else:
wanted_content_types.append(x)
accepted_content_types = sorted(parse_accept(accept_header), key=lambda x: x[1], reverse=True)
for ct, q in accepted_content_types:
if ct in wanted_content_types:
return ct
accepted_content_types = sorted(parse_accept(accept_header), key=lambda x: x[1], reverse=True)
for ct, q in accepted_content_types:
if ct in wanted_content_types:
return ct

42
src/gitbrowser/utils/linking.py

@ -6,31 +6,31 @@ from gitdb.exc import BadName
from six import u
class Autolinker(object):
# http://daringfireball.net/2010/07/improved_regex_for_matching_urls
GRUBER_URLINTEXT_PAT = re.compile(u(r"""(?i)\b((?:[a-z][\w-]+:(?:/{1,3}|[a-z0-9%])|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'".,<>?«»“”‘’]))"""))
# http://daringfireball.net/2010/07/improved_regex_for_matching_urls
GRUBER_URLINTEXT_PAT = re.compile(u(r"""(?i)\b((?:[a-z][\w-]+:(?:/{1,3}|[a-z0-9%])|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'".,<>?«»“”‘’]))"""))
def link(self, raw_text, repository):
message = escape(raw_text)
def link(self, raw_text, repository):
message = escape(raw_text)
def link_urls(match):
return format_html('<a href="{0}">{0}</a>', match.group(1))
message = self.GRUBER_URLINTEXT_PAT.sub(link_urls, message)
def link_urls(match):
return format_html('<a href="{0}">{0}</a>', match.group(1))
message = self.GRUBER_URLINTEXT_PAT.sub(link_urls, message)
commit_id_re = re.compile(r'(?P<pre_text>(?:(commit|fixes):?\s+))(?P<commit_id>[0-9a-f]{7,42})\b', re.IGNORECASE)
commit_id_re = re.compile(r'(?P<pre_text>(?:(commit|fixes):?\s+))(?P<commit_id>[0-9a-f]{7,42})\b', re.IGNORECASE)
def build_commit_link(match):
cid = match.group('commit_id')
def build_commit_link(match):
cid = match.group('commit_id')
try:
commit = repository.get_commit(cid)
return format_html('{0}<a href="{1}" title="{2}">{3}</a>',
match.group('pre_text'),
reverse('gitbrowser:commit', args=(repository.relative_path, commit.hexsha)),
commit.summary, cid
)
except BadName:
return match.string
try:
commit = repository.get_commit(cid)
return format_html('{0}<a href="{1}" title="{2}">{3}</a>',
match.group('pre_text'),
reverse('gitbrowser:commit', args=(repository.relative_path, commit.hexsha)),
commit.summary, cid
)
except BadName:
return match.string
message = commit_id_re.sub(build_commit_link, message)
message = commit_id_re.sub(build_commit_link, message)
return message
return message

148
src/gitbrowser/utils/lister.py

@ -11,87 +11,87 @@ repository_dir_re = re.compile('\.git$')
class GitRepositoryContainer(object):
def __init__(self, name, relative_path):
self.name = name
self.relative_path = relative_path
def __init__(self, name, relative_path):
self.name = name
self.relative_path = relative_path
def __unicode__(self):
return self.name
def __unicode__(self):
return self.name
class RepositoryLister(object):
def __init__(self, acl, *args, **kwargs):
assert isinstance(acl, ACL)
self.acl = acl
def __init__(self, acl, *args, **kwargs):
assert isinstance(acl, ACL)
self.acl = acl
def list(self, user, path=''):
raise NotImplementedError
def list(self, user, path=''):
raise NotImplementedError
class GitoliteProjectsFileRepositoryLister(RepositoryLister):
def __init__(self, acl, *args, **kwargs):
super(GitoliteProjectsFileRepositoryLister, self).__init__(acl, *args, **kwargs)
self.repositories_path = os.path.join(config.gitolite_home, 'repositories')
self.projects_file_path = os.path.join(config.gitolite_home, 'projects.list')
def list(self, user, path='', flat=True):
logging.info("Listing repositories for %s in '%s' (flat: %s)" % (user, path, flat))
readable_repositories = []
if path and not path.endswith('.git'):
path = path.rstrip('/') + '/'
with open(self.projects_file_path, 'r') as f:
for line in f:
rel_path = line.strip()
if not rel_path.startswith(path):
logging.debug("Repository path %s out of scope for path %s" % (rel_path, path))
continue
repo = GitRepository(os.path.join(self.repositories_path, line.strip()), rel_path, user)
if self.acl.can_read(user, repo):
readable_repositories.append(repo)
if flat:
for x in readable_repositories:
yield x
return
def my_key(obj):
start = len(path)
end = obj.relative_path.index('/', start+1) \
if '/' in obj.relative_path[start:] else \
len(os.path.basename(obj.relative_path)) * -1
return obj.relative_path[start:end]
def repo_or_dir_iter():
for directory, repos_in_dir in itertools.groupby(readable_repositories, key=my_key):
if directory == '':
for r in repos_in_dir:
yield r
else:
yield GitRepositoryContainer(name=directory, relative_path=path + directory.rstrip('/'))
def repo_dir_cmp(a, b):
if (isinstance(a, GitRepository) and isinstance(b, GitRepository)) or \
(isinstance(a, GitRepositoryContainer) and isinstance(b, GitRepositoryContainer)):
return cmp(a.name, b.name)
if isinstance(a, GitRepository):
return 1
return -1
for x in sorted(repo_or_dir_iter(), cmp=repo_dir_cmp):
yield x
def get_repository(self, user, path):
logging.info("get_repository for user %s and path '%s'" % (user, path))
for repo in self.list(user, path=path, flat=True):
logging.info("Checking: %s" % repo.relative_path)
if repo.relative_path == path:
return repo
logging.error("No repository found in '%s' for user %s" % (path, user))
def __init__(self, acl, *args, **kwargs):
super(GitoliteProjectsFileRepositoryLister, self).__init__(acl, *args, **kwargs)
self.repositories_path = os.path.join(config.gitolite_home, 'repositories')
self.projects_file_path = os.path.join(config.gitolite_home, 'projects.list')
def list(self, user, path='', flat=True):
logging.info("Listing repositories for %s in '%s' (flat: %s)" % (user, path, flat))
readable_repositories = []
if path and not path.endswith('.git'):
path = path.rstrip('/') + '/'
with open(self.projects_file_path, 'r') as f:
for line in f:
rel_path = line.strip()
if not rel_path.startswith(path):
logging.debug("Repository path %s out of scope for path %s" % (rel_path, path))
continue
repo = GitRepository(os.path.join(self.repositories_path, line.strip()), rel_path, user)
if self.acl.can_read(user, repo):
readable_repositories.append(repo)
if flat:
for x in readable_repositories:
yield x
return
def my_key(obj):
start = len(path)
end = obj.relative_path.index('/', start+1) \
if '/' in obj.relative_path[start:] else \
len(os.path.basename(obj.relative_path)) * -1
return obj.relative_path[start:end]
def repo_or_dir_iter():
for directory, repos_in_dir in itertools.groupby(readable_repositories, key=my_key):
if directory == '':
for r in repos_in_dir:
yield r
else:
yield GitRepositoryContainer(name=directory, relative_path=path + directory.rstrip('/'))
def repo_dir_cmp(a, b):
if (isinstance(a, GitRepository) and isinstance(b, GitRepository)) or \
(isinstance(a, GitRepositoryContainer) and isinstance(b, GitRepositoryContainer)):
return cmp(a.name, b.name)
if isinstance(a, GitRepository):
return 1
return -1
for x in sorted(repo_or_dir_iter(), cmp=repo_dir_cmp):
yield x
def get_repository(self, user, path):
logging.info("get_repository for user %s and path '%s'" % (user, path))
for repo in self.list(user, path=path, flat=True):
logging.info("Checking: %s" % repo.relative_path)
if repo.relative_path == path:
return repo
logging.error("No repository found in '%s' for user %s" % (path, user))

14
src/gitbrowser/utils/misc.py

@ -3,13 +3,13 @@ from gitbrowser.conf import config
def generate_breadcrumb_path(path):
l = []
for chunk in path.split('/'):
if not chunk:
continue
l.append(chunk)
yield '/'.join(l), chunk
l = []
for chunk in path.split('/'):
if not chunk:
continue
l.append(chunk)
yield '/'.join(l), chunk
def clone_urls(repo, user):
return config.clone_urls_builder(repo, user)
return config.clone_urls_builder(repo, user)

178
src/gitbrowser/utils/perlcrap.py

@ -4,124 +4,124 @@ import shlex
class DataDumperReader(object):
"""Parser for dump files created by perl's Data::Dumper."""
"""Parser for dump files created by perl's Data::Dumper."""
def read(self, filename):
with open(filename, 'r') as f:
file_content = f.read()
return self.parse(file_content)
def read(self, filename):
with open(filename, 'r') as f:
file_content = f.read()
return self.parse(file_content)
def parse(self, raw):
"""Parses the content of `raw` into python data structures (lists, dictionaries, primitives)
using a mix of regular expressions and the `shlex` module.
def parse(self, raw):
"""Parses the content of `raw` into python data structures (lists, dictionaries, primitives)
using a mix of regular expressions and the `shlex` module.
Returns a dictionary"""
Returns a dictionary"""
data = raw
data = raw
py_data = {}
py_data = {}
while ';' in data:
m = re.match('(.*?);$', data, re.MULTILINE | re.DOTALL)
current_data = m.group(1).strip()
while ';' in data:
m = re.match('(.*?);$', data, re.MULTILINE | re.DOTALL)
current_data = m.group(1).strip()
py_data.update([self.parse_block(current_data)])
py_data.update([self.parse_block(current_data)])
data = data[m.end(1) + 1:]
data = data[m.end(1) + 1:]
assert data.strip() == "", 'Leftovers found: %s' % data
assert data.strip() == "", 'Leftovers found: %s' % data
return py_data
return py_data
def parse_block(self, block_data):
"""Parses a single block of markup into a python data structure"""
def parse_block(self, block_data):
"""Parses a single block of markup into a python data structure"""
lexer = shlex.shlex(block_data, posix=True)
lexer.quotes = "'"
lexer.wordchars += '"'
lexer.escapedquotes = "'"
return self.parse_structure(lexer)
lexer = shlex.shlex(block_data, posix=True)
lexer.quotes = "'"
lexer.wordchars += '"'
lexer.escapedquotes = "'"
return self.parse_structure(lexer)
def unquote(self, s):
if len(s or "") <= 2:
return s
def unquote(self, s):
if len(s or "") <= 2:
return s
if s[0] == s[-1] and s[0] == "'":
return s[1:-1]
return s
if s[0] == s[-1] and s[0] == "'":
return s[1:-1]
return s
def parse_structure(self, lexer):
"""Starts the parsing process for a concrete data structure by looking at the next token.
Primitives are returned directly, for lists and dicts :func:`parse_dict` or :func:`parse_list`
are called
"""
token = lexer.get_token()
def parse_structure(self, lexer):
"""Starts the parsing process for a concrete data structure by looking at the next token.
Primitives are returned directly, for lists and dicts :func:`parse_dict` or :func:`parse_list`
are called
"""
token = lexer.get_token()
if token == '%':
variable_name = lexer.get_token()
nt = lexer.get_token()
assert nt == "=", "Expected equal sign; got %s" % nt
return self.unquote(variable_name), self.parse_dict(lexer)
if token == '%':
variable_name = lexer.get_token()
nt = lexer.get_token()
assert nt == "=", "Expected equal sign; got %s" % nt
return self.unquote(variable_name), self.parse_dict(lexer)
if token == '(' or token == '{':
lexer.push_token(token)
return self.parse_dict(lexer)
if token == '(' or token == '{':
lexer.push_token(token)
return self.parse_dict(lexer)
if token == '[':
lexer.push_token(token)
return self.parse_list(lexer)
if token == '[':
lexer.push_token(token)
return self.parse_list(lexer)
if token == '$':
variable_name = lexer.get_token()
nt = lexer.get_token()
assert nt == "=", "Expected equal sign; got %s" % nt
return variable_name, lexer.get_token()
if token == '$':
variable_name = lexer.get_token()
nt = lexer.get_token()
assert nt == "=", "Expected equal sign; got %s" % nt
return variable_name, lexer.get_token()
return self.unquote(token)
return self.unquote(token)
def parse_dict(self, lexer):
"""Parses the current structure block into a python dict. Calls :func:`parse_structure` for each
value in the dictionary"""
def parse_dict(self, lexer):
"""Parses the current structure block into a python dict. Calls :func:`parse_structure` for each
value in the dictionary"""
token = lexer.get_token()
assert token in ('(', '{'), "Expected '(' or '{'; got %s" % token
token = lexer.get_token()
assert token in ('(', '{'), "Expected '(' or '{'; got %s" % token
d = {}
d = {}
for token in lexer:
if token == ')' or token == '}':
break
if token == ',':
continue
for token in lexer:
if token == ')' or token == '}':
break
if token == ',':
continue
variable_name = token
nt = lexer.get_token() + lexer.get_token()
assert nt == "=>", "Expected '=>'; got %s" % nt
d[self.unquote(variable_name)] = self.parse_structure(lexer)
variable_name = token
nt = lexer.get_token() + lexer.get_token()
assert nt == "=>", "Expected '=>'; got %s" % nt
d[self.unquote(variable_name)] = self.parse_structure(lexer)
return d
return d
def parse_list(self, lexer):
"""Parses the current structure block into a python list. Calls :func:`parse_structure` for each
value."""
def parse_list(self, lexer):
"""Parses the current structure block into a python list. Calls :func:`parse_structure` for each
value."""
token = lexer.get_token()
assert token == '[', "Expected '['; got %s" % token
token = lexer.get_token()
assert token == '[', "Expected '['; got %s" % token
l = []
l = []
for token in lexer:
if token == ']':
return l
if token == ',':
continue
for token in lexer:
if token == ']':
return l
if token == ',':
continue
lexer.push_token(token)
list_item = self.parse_structure(lexer)
l.append(list_item)
lexer.push_token(token)
list_item = self.parse_structure(lexer)
l.append(list_item)
nt = lexer.get_token()
if nt == ']':
break
assert nt == ",", "Expected ','; got %s" % nt
return l
nt = lexer.get_token()
if nt == ']':
break
assert nt == ",", "Expected ','; got %s" % nt
return l

42
src/gitbrowser/utils/rendering.py

@ -8,40 +8,40 @@ from markdown.extensions.headerid import HeaderIdExtension
class Renderer(object):
def render(self, markup):
raise NotImplementedError
def render(self, markup):
raise NotImplementedError
class PlainTextRenderer(Renderer):
def render(self, markup):
return linebreaks(markup)
def render(self, markup):
return linebreaks(markup)
class MarkdownRenderer(Renderer):
def render(self, markup):
extensions = [
HeaderIdExtension([]),
]
def render(self, markup):
extensions = [
HeaderIdExtension([]),
]
md = markdown.Markdown(extensions, output_format='html5')
return md.convert(bleach.clean(markup))
md = markdown.Markdown(extensions, output_format='html5')
return md.convert(bleach.clean(markup))
class RestructuredTextRenderer(Renderer):
def render(self, markup):
return publish_parts(bleach.clean(markup), writer_name='html')['html_body']
def render(self, markup):
return publish_parts(bleach.clean(markup), writer_name='html')['html_body']
def get_renderer_by_name(name, fallback_renderer_name='text'):
if name == 'text':
return PlainTextRenderer()
if name == 'markdown':
return MarkdownRenderer()
if name == 'rest':
return RestructuredTextRenderer()
if fallback_renderer_name:
return get_renderer_by_name(fallback_renderer_name, None)
if name == 'text':
return PlainTextRenderer()
if name == 'markdown':
return MarkdownRenderer()
if name == 'rest':
return RestructuredTextRenderer()
if fallback_renderer_name:
return get_renderer_by_name(fallback_renderer_name, None)

542
src/gitbrowser/utils/repo.py

@ -30,14 +30,14 @@ repo_commit_count_cache = gitbrowser_cache('repository-commit-count', 'count')
README_CHOICES = [
('README.md', 'markdown',),
('README.rst', 'rest'),
('README', 'text'),
('README.txt', 'text'),
('README.md', 'markdown',),
('README.rst', 'rest'),
('README', 'text'),
('README.txt', 'text'),
]
DEPENDENCY_CHOICES = (
('requirements.txt', PythonRequirements),
('requirements.txt', PythonRequirements),
)
###
@ -45,19 +45,19 @@ DEPENDENCY_CHOICES = (
###
def changes(self):
if self.parents:
for x in self.parents[0].diff(self, create_patch=True):
yield x
else:
tree = self.tree
for item in tree:
if not isinstance(item, Blob):
continue
if self.parents:
for x in self.parents[0].diff(self, create_patch=True):
yield x
else:
tree = self.tree
for item in tree:
if not isinstance(item, Blob):
continue
yield {
'a_blob': { 'path': item.path },
'diff': item.data_stream.read()
}
yield {
'a_blob': { 'path': item.path },
'diff': item.data_stream.read()
}
Commit.message_without_summary = lambda self: self.message[len(self.summary):].strip()
@ -68,37 +68,37 @@ Commit.stats_iter = lambda self: ((k, self.stats.files[k]['insertions'], self.st
Commit.shorthexsha = lambda self: self.hexsha[:7]
def tag_for_commit(self):
tags = tag_commit_cache.get(self.repo.head.commit.hexsha)
tags = tag_commit_cache.get(self.repo.head.commit.hexsha)
if not tags:
tags = dict([(x.commit.hexsha, x.path) for x in self.repo.tags])
tag_commit_cache.set(self.repo.head.commit.hexsha, tags)
if not tags:
tags = dict([(x.commit.hexsha, x.path) for x in self.repo.tags])
tag_commit_cache.set(self.repo.head.commit.hexsha, tags)
for _t in self.repo.tags:
if self.hexsha == _t.commit:
return self.repo.tag(tags[self.hexsha])
for _t in self.repo.tags:
if self.hexsha == _t.commit:
return self.repo.tag(tags[self.hexsha])
Commit.tag = tag_for_commit
def latest_commit_patch(self):
latest_commit_hexsha = repo_commit_cache.get(self.hexsha)
if latest_commit_hexsha:
logging.debug("Cache HIT for %s" % self.hexsha)
return git.Commit.new(self.repo, latest_commit_hexsha)
latest_commit_hexsha = repo_commit_cache.get(self.hexsha)
if latest_commit_hexsha:
logging.debug("Cache HIT for %s" % self.hexsha)
return git.Commit.new(self.repo, latest_commit_hexsha)
latest_commit = self.repo.iter_commits(paths=self.path, max_count=1).next()
repo_commit_cache.set(self.hexsha, latest_commit.hexsha)
return latest_commit
latest_commit = self.repo.iter_commits(paths=self.path, max_count=1).next()
repo_commit_cache.set(self.hexsha, latest_commit.hexsha)
return latest_commit
###
### Monkey patching of git.objects.blob.Blob
###
ACCEPT_MIMETYPES_LAMBDAS = (
lambda mt: mt.startswith('text/'),
lambda mt: mt.startswith('application/xml'),
lambda mt: mt.startswith('application/x-javascript'),
lambda mt: mt.startswith('application/x-sql'),
lambda mt: mt.startswith('text/'),
lambda mt: mt.startswith('application/xml'),
lambda mt: mt.startswith('application/x-javascript'),
lambda mt: mt.startswith('application/x-sql'),
)