Browse Source

Updated test

as-daemon
Johann Schmitz 3 years ago
parent
commit
cdd54d5746
Signed by: ercpe <johann@j-schmitz.net> GPG Key ID: A084064277C501ED
6 changed files with 164 additions and 31 deletions
  1. +4
    -0
      amavisvt/amavisvtc.py
  2. +28
    -26
      amavisvt/amavisvtd.py
  3. +1
    -1
      amavisvt/config.py
  4. +2
    -2
      amavisvt/daemon.py
  5. +93
    -2
      tests/test_daemon.py
  6. +36
    -0
      tests/test_database.py

+ 4
- 0
amavisvt/amavisvtc.py View File

@@ -59,6 +59,10 @@ if __name__ == "__main__":
for h in logger.handlers:
h.setLevel(logging.ERROR)

if not args.command.lower() in ('ping', 'scan', 'report'):
print("Invalid command: %s" % args.command)
sys.exit(1)

error = False
try:
client = AmavisVTClient(args.socket)


+ 28
- 26
amavisvt/amavisvtd.py View File

@@ -11,30 +11,7 @@ from amavisvt.daemon import AmavisVTDaemon

logger = logging.getLogger(__file__)

if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument('-v', '--verbose', action='count', help='Increase verbosity', default=2)
parser.add_argument('-d', '--debug', action='store_true', default=False, help='Send verbose log messages to stdout too')
parser.add_argument('-s', '--socket', help='Socket path')

args = parser.parse_args()

logging.basicConfig(
level=logging.FATAL - (10 * args.verbose),
format='%(asctime)s %(levelname)-7s [%(threadName)s] %(message)s',
)

logger = logging.getLogger()

if not args.debug:
for h in logger.handlers:
h.setLevel(logging.ERROR)

handler = SysLogHandler(address='/dev/log')
formatter = logging.Formatter('amavisvt: %(threadName)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

def main(args):
logger.info("Starting up")
daemon = None
shutdown_sig = threading.Event()
@@ -59,7 +36,6 @@ if __name__ == "__main__":
if shutdown_sig.is_set():
break
except KeyboardInterrupt:
pass
error = True
except:
logger.exception("Server error")
@@ -68,4 +44,30 @@ if __name__ == "__main__":
if daemon:
daemon.stop()

sys.exit(int(error))
return error

if __name__ == "__main__": # pragma: no cover
parser = ArgumentParser()
parser.add_argument('-v', '--verbose', action='count', help='Increase verbosity', default=2)
parser.add_argument('-d', '--debug', action='store_true', default=False, help='Send verbose log messages to stdout too')
parser.add_argument('-s', '--socket', help='Socket path')

args = parser.parse_args()

logging.basicConfig(
level=logging.FATAL - (10 * args.verbose),
format='%(asctime)s %(levelname)-7s [%(threadName)s] %(message)s',
)

logger = logging.getLogger()

if not args.debug:
for h in logger.handlers:
h.setLevel(logging.ERROR)

handler = SysLogHandler(address='/dev/log')
formatter = logging.Formatter('amavisvt: %(threadName)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

sys.exit(int(main(args)))

+ 1
- 1
amavisvt/config.py View File

@@ -5,7 +5,7 @@ import os

try:
from ConfigParser import SafeConfigParser as ConfigParser
except ImportError:
except ImportError: # pragma: no cover
from configparser import ConfigParser

logger = logging.getLogger(__name__)


+ 2
- 2
amavisvt/daemon.py View File

@@ -12,7 +12,7 @@ logger = logging.getLogger(__file__)

try:
import SocketServer as socketserver
except ImportError:
except ImportError: # pragma: no cover
import socketserver

BUFFER_SIZE = 4096
@@ -20,7 +20,7 @@ BUFFER_SIZE = 4096

class ThreadedRequestHandler(socketserver.BaseRequestHandler):

def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs): # pragma: no cover
self.config = Configuration()
socketserver.BaseRequestHandler.__init__(self, *args, **kwargs)



+ 93
- 2
tests/test_daemon.py View File

@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
import mock
from amavisvt.client import VTResponse
from amavisvt.client import VTResponse, Resource

from amavisvt.daemon import ThreadedRequestHandler, AmavisVTDaemon
from tests.test_client import DummyResource, RAW_DUMMY_RESPONSE
@@ -86,6 +86,33 @@ class TestRequestHandler(object):
handler = PyTestableThreadedRequestHandler(request_mock, 'foo', server_mock)
handler.request.sendall.assert_called_with("ERROR: Wrong argument '/tmp/this-directory-does-not-exist'")

def test_handle_report_command(self):
request_mock = mock.MagicMock()
server_mock = mock.MagicMock()
request_mock.recv.return_value = 'REPORT /tmp\n'
handler = PyTestableThreadedRequestHandler(request_mock, 'foo', server_mock)
handler.request.sendall.assert_called_with("ERROR: File does not exist or is inaccessible: '/tmp'")
@mock.patch('amavisvt.daemon.AmavisVT')
def test_handle_error_in_command(self, avt):
request_mock = mock.MagicMock()
server_mock = mock.MagicMock()
request_mock.recv.return_value = 'REPORT %s\n' % __file__
# return value of the mocked AmavisVT constructor
inner_mock = mock.MagicMock()
avt.return_value = inner_mock

report_to_vt_mock = mock.MagicMock()
report_to_vt_mock.side_effect = Exception
inner_mock.report_to_vt = report_to_vt_mock

handler = PyTestableThreadedRequestHandler(request_mock, 'foo', server_mock)
handler.request.sendall.assert_called_with('ERROR: Command error')
@mock.patch('amavisvt.daemon.AmavisVT')
def test_contscan_command(self, avt):
@@ -142,7 +169,71 @@ class TestRequestHandler(object):
assert lines[1] == 'other.zip: Not scanned by virustotal'
assert lines[2] == 'broken.zip: Error (broken)'
assert lines[3].startswith('infected.zip: Detected as ')
@mock.patch('amavisvt.daemon.AmavisVT')
def test_report_command_invalid_argument(self, avt):
for invalid_path in (
'/tmp/this-file-does-not-exist', # does not exist
'/', # is a directory
'/root/', # no permissions
):
request_mock = mock.MagicMock()
server_mock = mock.MagicMock()
handler = NoHandleRequestHandler(request_mock, None, server_mock)
handler.do_report(invalid_path)
assert not avt.called
@mock.patch('amavisvt.daemon.AmavisVT')
def test_report_command_valid_argument_no_response(self, avt):
request_mock = mock.MagicMock()
server_mock = mock.MagicMock()

# return value of the mocked AmavisVT constructor
inner_mock = mock.MagicMock()
avt.return_value = inner_mock

report_to_vt_mock = mock.MagicMock()
report_to_vt_mock.return_value = None
inner_mock.report_to_vt = report_to_vt_mock

handler = NoHandleRequestHandler(request_mock, None, server_mock)
handler.do_report(__file__)

assert avt.called

assert report_to_vt_mock.called
call_args, call_kwargs = report_to_vt_mock.call_args
assert len(call_args) == 1
assert isinstance(call_args[0], Resource)
assert call_args[0].path == __file__
request_mock.sendall.assert_called_with('No response')

@mock.patch('amavisvt.daemon.AmavisVT')
def test_report_command_valid_argument_with_response(self, avt):
request_mock = mock.MagicMock()
server_mock = mock.MagicMock()

# return value of the mocked AmavisVT constructor
inner_mock = mock.MagicMock()
avt.return_value = inner_mock

report_to_vt_mock = mock.MagicMock()
report_to_vt_mock.return_value = VTResponse(RAW_DUMMY_RESPONSE)
inner_mock.report_to_vt = report_to_vt_mock

handler = NoHandleRequestHandler(request_mock, None, server_mock)
handler.do_report(__file__)

assert avt.called

assert report_to_vt_mock.called
call_args, call_kwargs = report_to_vt_mock.call_args
assert len(call_args) == 1
assert isinstance(call_args[0], Resource)
assert call_args[0].path == __file__
request_mock.sendall.assert_called_with('99017f6eebbac24f351415dd410d522d: Scan finished, scan information embedded in this object')
def test_parse_command_invalid(self):
handler = NoHandleRequestHandler(None, None, None)


+ 36
- 0
tests/test_database.py View File

@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import mock
import pytest
import datetime

@@ -296,3 +297,38 @@ class TestAmavisVTDatabase(object):
(u'foo-bar-123.zip', u'foo-bar-[RANDOM]-zip', 0, FAKE_TIME_S, u'sha256'),
(u'foo-bar-baz.zip', None, 0, FAKE_TIME_S, u'sha256')
])

def test_update_result_no_vtresponse(self, testdb):
testdb.conn = mock.MagicMock()
cursor_mock = mock.MagicMock()
testdb.conn.cursor = cursor_mock

testdb.update_result(None)
assert not cursor_mock.called

def test_update_result_no_vtresponse_not_infected(self, testdb):
testdb.conn = mock.MagicMock()
cursor_mock = mock.MagicMock()
testdb.conn.cursor = cursor_mock
testdb.update_result(DummyVTResult(False))
assert not cursor_mock.called
def test_update_result_nothing_to_update(self, testdb):
testdb.conn = mock.MagicMock()
cursor_mock = mock.MagicMock()
testdb.conn.cursor = cursor_mock
testdb.update_result(DummyVTResult(True))
assert cursor_mock.called
self.validate_filenames_in_database(testdb, [])

def test_update_result_update_filename(self, testdb, frozen_datetime):
testdb.add_resource(DummyResource('file1'))

testdb.update_result(DummyVTResult(True))
# filename, pattern, infected, timestamp, sha256
self.validate_filenames_in_database(testdb, [
('file1', None, 1, FAKE_TIME_S, 'sha256'),
])

Loading…
Cancel
Save