Browse Source

Added basic implementation of the lookup api

master
Johann Schmitz 3 years ago
parent
commit
5be123e416
Signed by: ercpe <johann@j-schmitz.net> GPG Key ID: A084064277C501ED
5 changed files with 377 additions and 2 deletions
  1. +159
    -1
      fuglu_safebrowsing/lookup.py
  2. +1
    -0
      requirements_dev.txt
  3. +67
    -0
      tests/01-sample.eml
  4. +143
    -1
      tests/test_lookup.py
  5. +7
    -0
      tests/tests.cfg

+ 159
- 1
fuglu_safebrowsing/lookup.py View File

@ -1,5 +1,19 @@
# -*- coding: utf-8 -*-
from fuglu.shared import ScannerPlugin
import requests
from fuglu.shared import ScannerPlugin, DUNNO, string_to_actioncode
from fuglu_safebrowsing import VERSION
DOMAINMAGIC_AVAILABLE = False
try:
import domainmagic
import domainmagic.extractor
DOMAINMAGIC_AVAILABLE = True
except ImportError:
pass
SAFEBROWSING_API_URL = "https://safebrowsing.googleapis.com/v4/threatMatches:find"
class SafebrowsingLookupPlugin(ScannerPlugin):
@ -11,5 +25,149 @@ class SafebrowsingLookupPlugin(ScannerPlugin):
'default': '',
'description': 'API Key for Safebrowsing API',
'confidential': True,
},
'action': {
'default': '',
'description': 'Default action to take on positive result',
}
}
self.logger = self._logger()
@property
def api_key(self):
if self.config.has_option(self.section, 'api-key'):
return self.config.get(self.section, 'api-key')
return None
@property
def default_action(self):
return string_to_actioncode(self.config.get(self.section, 'action'))
@property
def threat_types(self):
if self.config.has_option(self.section, 'threat-types'):
config_values = self.config.get(self.section, 'threat-types')
if config_values:
return [x.strip().upper() for x in config_values.split()]
return [
"THREAT_TYPE_UNSPECIFIED", # Unknown.
"MALWARE", # Malware threat type.
"SOCIAL_ENGINEERING", # Social engineering threat type.
"UNWANTED_SOFTWARE", # Unwanted software threat type.
"POTENTIALLY_HARMFUL_APPLICATION", # Potentially harmful application threat type.
]
@property
def threat_platforms(self):
if self.config.has_option(self.section, 'threat-platforms'):
config_values = self.config.get(self.section, 'threat-platforms')
if config_values:
return [x.strip().upper() for x in config_values.split()]
return [
"ANY_PLATFORM", # Threat posed to at least one of the defined platforms.
]
@property
def requests_kwargs(self):
return {
'headers': {
'User-Agent': 'fuglu-safebrowsing %s' % VERSION
},
'timeout': float(self.request_timeout)
}
@property
def request_timeout(self):
if self.config.has_option(self.section, 'timeout'):
return self.config.getint(self.section, 'timeout')
return 10
def examine(self, suspect):
self.logger.info("Examine suspect %s", suspect)
urls = self.extract_urls(suspect)
if not urls:
return DUNNO
self.logger.info("Checking %s urls against Safebrowsing API", len(urls))
result = self.check_safebrowsing(urls)
if result:
self.logger.info("Got %s results from Safebrowsing", len(result))
return self.default_action
return DUNNO
def lint(self):
x = super(SafebrowsingLookupPlugin, self).lint()
if not DOMAINMAGIC_AVAILABLE:
print("domainmagic lib or one of it's dependencies(dnspython/pygeoip) is not installed!")
return x and DOMAINMAGIC_AVAILABLE
def check_safebrowsing(self, urls):
if not (self.api_key and urls):
return
request_data = {
'client': {
'clientId': 'fuglu-safebrowsing',
'clientVersion': VERSION
},
"threatInfo": {
"threatTypes": self.threat_types,
"platformTypes": self.threat_platforms,
"threatEntryTypes": ["URL"],
"threatEntries": [{"url": u} for u in urls]
}
}
try:
response = requests.post(SAFEBROWSING_API_URL + '?key=%s' % self.api_key,
json=request_data,
**self.requests_kwargs)
response.raise_for_status()
return response.json()
except:
self.logger.exception("Request to Safebrowsing API failed")
def extract_urls(self, suspect):
extractor = domainmagic.extractor.URIExtractor()
textparts = " ".join(self.get_decoded_textparts(suspect.get_message_rep()))
return extractor.extracturis(textparts)
# copied from uriextract.py
def get_decoded_textparts(self, messagerep):
"""Returns a list of all text contents"""
parts = []
for part in messagerep.walk():
if part.is_multipart():
continue
filename = (part.get_filename(None) or "").lower()
content_type = part.get_content_type()
if content_type.startswith('text/') or filename.endswith(".txt") or \
filename.endswith(".html") or filename.endswith(".htm"):
payload = part.get_payload(None, True)
if 'html' in content_type or '.htm' in filename:
# remove newlines from html so we get uris spanning multiple lines
payload = payload.replace('\n', '').replace('\r', '')
parts.append(payload)
if content_type == 'multipart/alternative':
try:
text = str(part.get_payload(None, True))
parts.append(text)
except:
pass
return parts

+ 1
- 0
requirements_dev.txt View File

@ -1,3 +1,4 @@
# Project dependencies for development
pytest
coveralls
mock

+ 67
- 0
tests/01-sample.eml View File

@ -0,0 +1,67 @@
From: <sender@example.com>
To: <recipient@exmple.com>
Subject: ***SPAM*** hi!
Content-Type: multipart/alternative;
boundary="------------010800030001020207070302"
This is a multi-part message in MIME format.
--------------010800030001020207070302
Content-Type: text/plain; charset=CP-850; format=flowed
Content-Transfer-Encoding: quoted-printable
Hello,
Following the evaluation of your resume, we would like to let you know =
that we've opened a new=20
position that will be perfectly suitable for you, based on your skills =
and our requirements.
Key requirements & benefits:
- International environment in a famous multinational company;
- Possibility to work from home with flexible schedule;
- Salary from $4.000 up $8000 with bonuses.
To to review the details about this vacancy and get in touch with our =
human resource manager, please=20
visit our site.
Looking forward to getting in touch with you.
Kind regards,
HR Department
--------------010800030001020207070302
Content-Type: text/html; charset="CP-850"
Content-Transfer-Encoding: quoted-printable
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<head>
<meta http-equiv=3D"content-type" content=3D"text/html; =
charset=3DCP-850">
</head>
<body text=3D"#000000" bgcolor=3D"#ffffff">
Hello,<br>
<br>
Following the evaluation of your resume, we would like to let you know =
that we've opened a new <br> position that will be perfectly suitable =
for you, based on your skills and our requirements.<br>
<br>
<u>Key requirements & benefits</u>:<br>
- International environment in a famous multinational company;<br>
- Possibility to work from home with flexible schedule;<br>
- Salary <u>from $4.000 up $8000 with bonuses</u>.<br>
<br>
To to review the details about this vacancy and get in touch with our =
human resource manager, please <a =
href=3D"http://diechatburg.de/media/editors/tinymce/plugins/advlist/"><b>=
visit our site</b></a>.<br>
<br>
Looking forward to getting in touch with you.<br>
<br>
Kind regards,<br>
HR Department
</body>
</html>
--------------010800030001020207070302--

+ 143
- 1
tests/test_lookup.py View File

@ -1,6 +1,14 @@
# -*- coding: utf-8 -*-
import sys
import os
import pytest
from fuglu.shared import Suspect, DUNNO, DELETE
from mock import mock
from fuglu_safebrowsing import VERSION
if sys.version_info < (3, 0, 0):
from ConfigParser import SafeConfigParser as ConfigParser
else:
@ -9,8 +17,142 @@ else:
from fuglu_safebrowsing.lookup import SafebrowsingLookupPlugin
@pytest.fixture(name='config')
def config_fixture():
cfg = ConfigParser()
cfg.read([os.path.join(os.path.dirname(__file__), 'tests.cfg')])
return cfg
@pytest.fixture(name='plugin')
def plugin_fixture():
return SafebrowsingLookupPlugin(config_fixture())
@pytest.fixture(name='suspect')
def suspect_fixture():
return Suspect("sender@example.com", "recipient@example.com",
os.path.join(os.path.dirname(__file__), '01-sample.eml'))
class TestLookup(object):
def test_lint(self):
plugin = SafebrowsingLookupPlugin(ConfigParser())
assert not plugin.lint()
def test_properties_no_config(self):
cfg = ConfigParser()
cfg.add_section('SafebrowsingLookupPlugin')
plugin = SafebrowsingLookupPlugin(cfg)
assert plugin.threat_platforms == ["ANY_PLATFORM"]
assert plugin.threat_types == [
"THREAT_TYPE_UNSPECIFIED",
"MALWARE",
"SOCIAL_ENGINEERING",
"UNWANTED_SOFTWARE",
"POTENTIALLY_HARMFUL_APPLICATION",
]
assert plugin.request_timeout == 10
def test_properties_config(self, plugin):
assert plugin.threat_platforms == ["ALL_PLATFORMS"]
assert plugin.threat_types == [
"MALWARE",
"SOCIAL_ENGINEERING",
]
assert plugin.request_timeout == 1
def test_requests_kwargs(self):
cfg = ConfigParser()
cfg.add_section('SafebrowsingLookupPlugin')
plugin = SafebrowsingLookupPlugin(cfg)
assert plugin.requests_kwargs == {
'headers': {
'User-Agent': 'fuglu-safebrowsing %s' % VERSION
},
'timeout': 10.0
}
def test_examine_no_urls(self, plugin, suspect):
plugin.check_safebrowsing = mock.MagicMock()
plugin.extract_urls = mock.MagicMock(return_value=[])
assert plugin.examine(suspect) == DUNNO
plugin.check_safebrowsing.assert_not_called()
@mock.patch("fuglu_safebrowsing.lookup.requests")
def test_check_safebrowsing_no_apikey(self, requests_mock):
cfg = ConfigParser()
cfg.add_section('SafebrowsingLookupPlugin')
plugin = SafebrowsingLookupPlugin(cfg)
assert not plugin.check_safebrowsing(["http://example.com"])
requests_mock.post.assert_not_called()
@mock.patch("fuglu_safebrowsing.lookup.requests")
def test_check_safebrowsing(self, requests_mock, plugin):
response_mock = mock.MagicMock()
response_mock.json = mock.MagicMock(return_value=[])
requests_mock.post = mock.MagicMock(return_value=response_mock)
assert plugin.check_safebrowsing(["http://example.com"]) == []
requests_mock.post.assert_called_with(
'https://safebrowsing.googleapis.com/v4/threatMatches:find?key=some-api-key', json={
'client': {
'clientId': 'fuglu-safebrowsing',
'clientVersion': VERSION
},
"threatInfo": {
"threatTypes": ["MALWARE", "SOCIAL_ENGINEERING"],
"platformTypes": ["ALL_PLATFORMS"],
"threatEntryTypes": ["URL"],
"threatEntries": [{"url": "http://example.com"}]
}
}, headers={
'User-Agent': 'fuglu-safebrowsing %s' % VERSION
}, timeout=1.0
)
@mock.patch("fuglu_safebrowsing.lookup.requests")
def test_check_safebrowsing_no_result(self, requests_mock, plugin, suspect):
response_mock = mock.MagicMock()
response_mock.json = mock.MagicMock(return_value=[])
requests_mock.post = mock.MagicMock(return_value=response_mock)
plugin.extract_urls = mock.MagicMock(return_value=["http://example.com"])
assert plugin.examine(suspect) == DUNNO
@mock.patch("fuglu_safebrowsing.lookup.requests")
def test_check_safebrowsing_positive_result(self, requests_mock, plugin, suspect):
response_mock = mock.MagicMock()
response_mock.json = mock.MagicMock(return_value={
"matches": [{
"threatType": "MALWARE",
"platformType": "WINDOWS",
"threatEntryType": "URL",
"threat": {"url": "http://example.com/"},
"threatEntryMetadata": {
"entries": [{
"key": "malware_threat_type",
"value": "landing"
}]
},
"cacheDuration": "300.000s"
}]
})
requests_mock.post = mock.MagicMock(return_value=response_mock)
plugin.extract_urls = mock.MagicMock(return_value=["http://example.com"])
assert plugin.examine(suspect) == DELETE
def test_extract_urls(self, plugin, suspect):
assert plugin.extract_urls(suspect) == [
"http://diechatburg.de/media/editors/tinymce/plugins/advlist/"
]

+ 7
- 0
tests/tests.cfg View File

@ -0,0 +1,7 @@
[SafebrowsingLookupPlugin]
api-key = some-api-key
threat-types = malware social_engineering
threat-platforms = all_platforms
timeout = 1
action = delete