Unit tests

This commit is contained in:
Jason Solomon 2020-11-30 15:23:27 +11:00
parent 2a02528ff5
commit 560d370e33
2 changed files with 154 additions and 24 deletions

View file

@ -14,11 +14,16 @@
# limitations under the License.
"""Tests for image processor."""
import os
from subprocess import CalledProcessError
import unittest
from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.path import factory as path_spec_factory
import mock
from dfdewey.datastore.postgresql import PostgresqlDataStore
from dfdewey.utils.image_processor import ImageProcessor, ImageProcessorOptions
from dfdewey.utils.image_processor import (
_StringRecord, ImageProcessor, ImageProcessorOptions)
TEST_CASE = 'testcase'
TEST_IMAGE = 'test.dd'
@ -37,29 +42,19 @@ class ImageProcessorTest(unittest.TestCase):
image_processor_options = ImageProcessorOptions()
image_processor = ImageProcessor(
TEST_CASE, TEST_IMAGE, image_processor_options)
image_processor.image_hash = TEST_IMAGE_HASH
with mock.patch('psycopg2.connect') as _:
postgresql = PostgresqlDataStore()
image_processor.postgresql = postgresql
return image_processor
@mock.patch(
'dfdewey.utils.image_processor.ImageProcessor._initialise_database')
@mock.patch('dfdewey.datastore.postgresql.PostgresqlDataStore.value_exists')
@mock.patch('dfdewey.datastore.postgresql.PostgresqlDataStore.table_exists')
@mock.patch(
'dfdewey.datastore.postgresql.PostgresqlDataStore.query_single_row')
@mock.patch('dfdewey.datastore.postgresql.PostgresqlDataStore.execute')
def test_already_parsed(
self, mock_execute, mock_query_single_row, mock_table_exists,
mock_value_exists, mock_initialise_database):
@mock.patch('dfdewey.datastore.postgresql.PostgresqlDataStore')
def test_already_parsed(self, mock_postgresql, mock_initialise_database):
"""Test already parsed method."""
image_processor = self._get_image_processor()
# Test if new database
mock_table_exists.return_value = False
mock_postgresql.table_exists.return_value = False
image_processor.postgresql = mock_postgresql
result = image_processor._already_parsed()
mock_initialise_database.assert_called_once()
@ -72,27 +67,154 @@ class ImageProcessorTest(unittest.TestCase):
'INSERT INTO image_case (case_id, image_hash) '
'VALUES (\'{0:s}\', \'{1:s}\')').format(TEST_CASE, TEST_IMAGE_HASH))
]
mock_execute.assert_has_calls(calls)
mock_postgresql.execute.assert_has_calls(calls)
self.assertEqual(result, False)
# Test database exists, image already in case
mock_table_exists.return_value = True
mock_value_exists.return_value = True
mock_query_single_row.return_value = (1,)
mock_execute.reset_mock()
mock_postgresql.table_exists.return_value = True
mock_postgresql.value_exists.return_value = True
mock_postgresql.query_single_row.return_value = (1,)
mock_postgresql.execute.reset_mock()
image_processor.postgresql = mock_postgresql
result = image_processor._already_parsed()
mock_execute.assert_not_called()
mock_postgresql.execute.assert_not_called()
self.assertEqual(result, True)
# Test database exists, image exists, but not in case
mock_query_single_row.return_value = None
mock_postgresql.query_single_row.return_value = None
image_processor.postgresql = mock_postgresql
result = image_processor._already_parsed()
mock_execute.assert_called_once_with((
mock_postgresql.execute.assert_called_once_with((
'INSERT INTO image_case (case_id, image_hash) '
'VALUES (\'{0:s}\', \'{1:s}\')').format(TEST_CASE, TEST_IMAGE_HASH))
self.assertEqual(result, True)
@mock.patch('dfdewey.datastore.postgresql.PostgresqlDataStore')
def test_create_filesystem_database(self, mock_postgresql):
"""Test create filesystem database method."""
image_processor = self._get_image_processor()
image_processor.postgresql = mock_postgresql
image_processor._create_filesystem_database()
calls = [
mock.call((
'CREATE TABLE blocks (block INTEGER, inum INTEGER, part TEXT, '
'PRIMARY KEY (block, inum, part))')),
mock.call((
'CREATE TABLE files (inum INTEGER, filename TEXT, part TEXT, '
'PRIMARY KEY (inum, filename, part))'))
]
mock_postgresql.execute.assert_has_calls(calls)
@mock.patch('subprocess.check_output')
def test_extract_strings(self, mock_subprocess):
"""Test extract strings method."""
image_processor = self._get_image_processor()
image_processor.output_path = '/tmp/tmpxaemz75r'
image_processor.image_hash = None
# Test with default options
mock_subprocess.return_value = 'MD5 of Disk Image: {0:s}'.format(
TEST_IMAGE_HASH).encode('utf-8')
image_processor._extract_strings()
mock_subprocess.assert_called_once_with([
'bulk_extractor', '-o', '/tmp/tmpxaemz75r', '-x', 'all', '-e',
'wordlist', '-e', 'base64', '-e', 'gzip', '-e', 'zip', '-S',
'strings=YES', '-S', 'word_max=1000000', TEST_IMAGE
])
self.assertEqual(image_processor.image_hash, TEST_IMAGE_HASH)
# Test options
mock_subprocess.reset_mock()
mock_subprocess.return_value = 'MD5 of Disk Image: {0:s}'.format(
TEST_IMAGE_HASH).encode('utf-8')
image_processor.options.base64 = False
image_processor.options.gunzip = False
image_processor.options.unzip = False
image_processor._extract_strings()
mock_subprocess.assert_called_once_with([
'bulk_extractor', '-o', '/tmp/tmpxaemz75r', '-x', 'all', '-e',
'wordlist', '-S', 'strings=YES', '-S', 'word_max=1000000', TEST_IMAGE
])
# Test error in processing
mock_subprocess.reset_mock()
mock_subprocess.side_effect = CalledProcessError(1, 'bulk_extractor')
with self.assertRaises(RuntimeError):
image_processor._extract_strings()
def test_get_volume_details(self):
"""Test get volume details method."""
image_processor = self._get_image_processor()
os_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=TEST_IMAGE)
raw_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_RAW, parent=os_path_spec)
tsk_partition_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK_PARTITION, parent=raw_path_spec,
location='/p1', part_index=2, start_offset=2048)
tsk_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_NTFS, parent=tsk_partition_spec,
location='/')
location, start_offset = image_processor._get_volume_details(tsk_spec)
self.assertEqual(location, '/p1')
self.assertEqual(start_offset, 2048)
@mock.patch('dfdewey.datastore.elastic.ElasticsearchDataStore')
def test_index_record(self, mock_elasticsearch):
"""Test index record method."""
image_processor = self._get_image_processor()
index_name = ''.join(('es', TEST_IMAGE_HASH))
string_record = _StringRecord()
string_record.image = TEST_IMAGE_HASH
string_record.offset = 1234567
string_record.data = 'test string'
image_processor.elasticsearch = mock_elasticsearch
image_processor._index_record(index_name, string_record)
json_record = {
'image': string_record.image,
'offset': string_record.offset,
'file_offset': string_record.file_offset,
'data': string_record.data
}
mock_elasticsearch.import_event.assert_called_once_with(
index_name, event=json_record)
@mock.patch('elasticsearch.client.IndicesClient.create')
@mock.patch('dfdewey.utils.image_processor.ImageProcessor._index_record')
@mock.patch('dfdewey.datastore.elastic.ElasticsearchDataStore.index_exists')
@mock.patch('dfdewey.datastore.elastic.ElasticsearchDataStore.import_event')
@mock.patch('dfdewey.datastore.elastic.ElasticsearchDataStore.create_index')
def test_index_strings(
self, mock_create_index, mock_import_event, mock_index_exists,
mock_index_record, _):
"""Test index strings method."""
image_processor = self._get_image_processor()
current_path = os.path.abspath(os.path.dirname(__file__))
image_processor.output_path = os.path.join(
current_path, '..', '..', 'test_data')
# Test index already exists
mock_index_exists.return_value = True
image_processor._index_strings()
mock_index_record.assert_not_called()
# Test new index
mock_index_exists.return_value = False
mock_index_record.return_value = 10000000
image_processor._index_strings()
mock_create_index.assert_called_once_with(
index_name=''.join(('es', TEST_IMAGE_HASH)))
self.assertEqual(mock_index_record.call_count, 3)
mock_import_event.assert_called_once()
if __name__ == '__main__':
unittest.main()

8
test_data/wordlist.txt Normal file
View file

@ -0,0 +1,8 @@
# BANNER FILE NOT PROVIDED (-b option)
# BULK_EXTRACTOR-Version: 1.6.0 ($Rev: 10844 $)
# Feature-Recorder: wordlist
# Filename: test.dd
# Feature-File-Version: 1.1
2681139 Quoth the Raven
2681170 Nevermore.
19998720-ZIP-516 I doubted if I should ever come back.