Merge pull request #9 from google/unit-tests

Unit tests
This commit is contained in:
Jason 2020-12-04 16:20:55 +11:00 committed by GitHub
commit 10242940ea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 537 additions and 18 deletions

View file

@ -106,16 +106,6 @@ class ElasticsearchDataStore():
The number of events processed.
"""
if event:
for key, value in event.items():
if not isinstance(key, six.text_type):
key = codecs.decode(key, 'utf8')
# Make sure we have decoded strings in the event dict.
if isinstance(value, six.binary_type):
value = codecs.decode(value, 'utf8')
event[key] = value
# Header needed by Elasticsearch when bulk inserting.
header = {'index': {'_index': index_name}}

View file

@ -33,7 +33,6 @@ class ElasticTest(unittest.TestCase):
Returns:
Mock elasticsearch datastore.
"""
# with mock.patch('psycopg2.connect') as _:
es = ElasticsearchDataStore()
return es

View file

@ -20,7 +20,7 @@ import logging
import os
import sys
from dfdewey.utils.image_processor import ImageProcessorOptions, ImageProcessor
from dfdewey.utils.image_processor import ImageProcessor, ImageProcessorOptions
from dfdewey.utils.index_searcher import IndexSearcher
STRING_INDEXING_LOG_INTERVAL = 10000000

View file

@ -179,7 +179,8 @@ class FileEntryScanner(volume_scanner.VolumeScanner):
))
for data_stream in file_entry.data_streams:
if not data_stream.IsDefault():
filename = ':'.join((filename, data_stream.name))
filename = self._get_display_path(
file_entry.path_spec, path_segments, data_stream.name)
self._rows.append((
inode,
filename,

View file

@ -0,0 +1,326 @@
# -*- coding: utf-8 -*-
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for image processor."""
import os
from subprocess import CalledProcessError
import unittest
from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.path import factory as path_spec_factory
import mock
from dfdewey.utils.image_processor import (
_StringRecord, FileEntryScanner, ImageProcessor, ImageProcessorOptions,
UnattendedVolumeScannerMediator)
TEST_CASE = 'testcase'
TEST_IMAGE = 'test.dd'
TEST_IMAGE_HASH = 'd41d8cd98f00b204e9800998ecf8427e'
class FileEntryScannerTest(unittest.TestCase):
"""Tests for file entry scanner."""
def _get_file_entry_scanner(self):
"""Get a test file entry scanner.
Returns:
Test file entry scanner.
"""
mediator = UnattendedVolumeScannerMediator()
scanner = FileEntryScanner(mediator=mediator)
return scanner
@mock.patch('dfdewey.datastore.postgresql.PostgresqlDataStore')
def test_parse_file_entries(self, mock_datastore):
"""Test parse file entries method."""
scanner = self._get_file_entry_scanner()
current_path = os.path.abspath(os.path.dirname(__file__))
image_path = os.path.join(
current_path, '..', '..', 'test_data', 'test_volume.dd')
path_specs = scanner.GetBasePathSpecs(image_path)
scanner.parse_file_entries(path_specs, mock_datastore)
self.assertEqual(mock_datastore.bulk_insert.call_count, 2)
insert_calls = mock_datastore.bulk_insert.mock_calls
self.assertEqual(len(insert_calls[0].args[1]), 1500)
self.assertEqual(len(insert_calls[1].args[1]), 3)
# Test APFS
mock_datastore.reset_mock()
scanner = self._get_file_entry_scanner()
image_path = os.path.join(current_path, '..', '..', 'test_data', 'test.dmg')
path_specs = scanner.GetBasePathSpecs(image_path)
self.assertEqual(getattr(path_specs[0].parent, 'location', None), '/apfs1')
scanner.parse_file_entries(path_specs, mock_datastore)
mock_datastore.bulk_insert.assert_not_called()
class ImageProcessorTest(unittest.TestCase):
"""Tests for image processor."""
def _get_image_processor(self):
"""Get a test image processor.
Returns:
Test image processor.
"""
image_processor_options = ImageProcessorOptions()
image_processor = ImageProcessor(
TEST_CASE, TEST_IMAGE, image_processor_options)
image_processor.image_hash = TEST_IMAGE_HASH
return image_processor
@mock.patch(
'dfdewey.utils.image_processor.ImageProcessor._initialise_database')
@mock.patch('dfdewey.datastore.postgresql.PostgresqlDataStore')
def test_already_parsed(self, mock_postgresql, mock_initialise_database):
"""Test already parsed method."""
image_processor = self._get_image_processor()
# Test if new database
mock_postgresql.table_exists.return_value = False
image_processor.postgresql = mock_postgresql
result = image_processor._already_parsed()
mock_initialise_database.assert_called_once()
calls = [
mock.call((
'INSERT INTO images (image_path, image_hash) '
'VALUES (\'{0:s}\', \'{1:s}\')').format(
TEST_IMAGE, TEST_IMAGE_HASH)),
mock.call((
'INSERT INTO image_case (case_id, image_hash) '
'VALUES (\'{0:s}\', \'{1:s}\')').format(TEST_CASE, TEST_IMAGE_HASH))
]
mock_postgresql.execute.assert_has_calls(calls)
self.assertEqual(result, False)
# Test database exists, image already in case
mock_postgresql.table_exists.return_value = True
mock_postgresql.value_exists.return_value = True
mock_postgresql.query_single_row.return_value = (1,)
mock_postgresql.execute.reset_mock()
image_processor.postgresql = mock_postgresql
result = image_processor._already_parsed()
mock_postgresql.execute.assert_not_called()
self.assertEqual(result, True)
# Test database exists, image exists, but not in case
mock_postgresql.query_single_row.return_value = None
image_processor.postgresql = mock_postgresql
result = image_processor._already_parsed()
mock_postgresql.execute.assert_called_once_with((
'INSERT INTO image_case (case_id, image_hash) '
'VALUES (\'{0:s}\', \'{1:s}\')').format(TEST_CASE, TEST_IMAGE_HASH))
self.assertEqual(result, True)
@mock.patch('dfdewey.datastore.postgresql.PostgresqlDataStore')
def test_create_filesystem_database(self, mock_postgresql):
"""Test create filesystem database method."""
image_processor = self._get_image_processor()
image_processor.postgresql = mock_postgresql
image_processor._create_filesystem_database()
calls = [
mock.call((
'CREATE TABLE blocks (block INTEGER, inum INTEGER, part TEXT, '
'PRIMARY KEY (block, inum, part))')),
mock.call((
'CREATE TABLE files (inum INTEGER, filename TEXT, part TEXT, '
'PRIMARY KEY (inum, filename, part))'))
]
mock_postgresql.execute.assert_has_calls(calls)
@mock.patch('subprocess.check_output')
def test_extract_strings(self, mock_subprocess):
"""Test extract strings method."""
image_processor = self._get_image_processor()
image_processor.output_path = '/tmp/tmpxaemz75r'
image_processor.image_hash = None
# Test with default options
mock_subprocess.return_value = 'MD5 of Disk Image: {0:s}'.format(
TEST_IMAGE_HASH).encode('utf-8')
image_processor._extract_strings()
mock_subprocess.assert_called_once_with([
'bulk_extractor', '-o', '/tmp/tmpxaemz75r', '-x', 'all', '-e',
'wordlist', '-e', 'base64', '-e', 'gzip', '-e', 'zip', '-S',
'strings=YES', '-S', 'word_max=1000000', TEST_IMAGE
])
self.assertEqual(image_processor.image_hash, TEST_IMAGE_HASH)
# Test options
mock_subprocess.reset_mock()
mock_subprocess.return_value = 'MD5 of Disk Image: {0:s}'.format(
TEST_IMAGE_HASH).encode('utf-8')
image_processor.options.base64 = False
image_processor.options.gunzip = False
image_processor.options.unzip = False
image_processor._extract_strings()
mock_subprocess.assert_called_once_with([
'bulk_extractor', '-o', '/tmp/tmpxaemz75r', '-x', 'all', '-e',
'wordlist', '-S', 'strings=YES', '-S', 'word_max=1000000', TEST_IMAGE
])
# Test error in processing
mock_subprocess.reset_mock()
mock_subprocess.side_effect = CalledProcessError(1, 'bulk_extractor')
with self.assertRaises(RuntimeError):
image_processor._extract_strings()
def test_get_volume_details(self):
"""Test get volume details method."""
image_processor = self._get_image_processor()
os_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=TEST_IMAGE)
raw_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_RAW, parent=os_path_spec)
tsk_partition_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK_PARTITION, parent=raw_path_spec,
location='/p1', part_index=2, start_offset=2048)
tsk_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_NTFS, parent=tsk_partition_spec,
location='/')
location, start_offset = image_processor._get_volume_details(tsk_spec)
self.assertEqual(location, '/p1')
self.assertEqual(start_offset, 2048)
@mock.patch('dfdewey.datastore.elastic.ElasticsearchDataStore')
def test_index_record(self, mock_elasticsearch):
"""Test index record method."""
image_processor = self._get_image_processor()
index_name = ''.join(('es', TEST_IMAGE_HASH))
string_record = _StringRecord()
string_record.image = TEST_IMAGE_HASH
string_record.offset = 1234567
string_record.data = 'test string'
image_processor.elasticsearch = mock_elasticsearch
image_processor._index_record(index_name, string_record)
json_record = {
'image': string_record.image,
'offset': string_record.offset,
'file_offset': string_record.file_offset,
'data': string_record.data
}
mock_elasticsearch.import_event.assert_called_once_with(
index_name, event=json_record)
@mock.patch('elasticsearch.client.IndicesClient.create')
@mock.patch('dfdewey.utils.image_processor.ImageProcessor._index_record')
@mock.patch('dfdewey.datastore.elastic.ElasticsearchDataStore.index_exists')
@mock.patch('dfdewey.datastore.elastic.ElasticsearchDataStore.import_event')
@mock.patch('dfdewey.datastore.elastic.ElasticsearchDataStore.create_index')
def test_index_strings(
self, mock_create_index, mock_import_event, mock_index_exists,
mock_index_record, _):
"""Test index strings method."""
image_processor = self._get_image_processor()
current_path = os.path.abspath(os.path.dirname(__file__))
image_processor.output_path = os.path.join(
current_path, '..', '..', 'test_data')
# Test index already exists
mock_index_exists.return_value = True
image_processor._index_strings()
mock_index_record.assert_not_called()
# Test new index
mock_index_exists.return_value = False
mock_index_record.return_value = 10000000
image_processor._index_strings()
mock_create_index.assert_called_once_with(
index_name=''.join(('es', TEST_IMAGE_HASH)))
self.assertEqual(mock_index_record.call_count, 3)
mock_import_event.assert_called_once()
@mock.patch('psycopg2.connect')
@mock.patch('dfdewey.utils.image_processor.ImageProcessor._already_parsed')
@mock.patch(
'dfdewey.datastore.postgresql.PostgresqlDataStore.switch_database')
@mock.patch('dfdewey.datastore.postgresql.PostgresqlDataStore.execute')
@mock.patch('dfdewey.datastore.postgresql.PostgresqlDataStore.bulk_insert')
def test_parse_filesystems(
self, mock_bulk_insert, mock_execute, mock_switch_database,
mock_already_parsed, _):
"""Test parse filesystems method."""
image_processor = self._get_image_processor()
# Test image already parsed
mock_already_parsed.return_value = True
image_processor._parse_filesystems()
mock_execute.assert_not_called()
# Test image not parsed
current_path = os.path.abspath(os.path.dirname(__file__))
image_processor.image_path = os.path.join(
current_path, '..', '..', 'test_data', 'test.dd')
mock_already_parsed.return_value = False
image_processor._parse_filesystems()
self.assertEqual(mock_execute.call_count, 3)
mock_switch_database.assert_called_once_with(
db_name=''.join(('fs', TEST_IMAGE_HASH)))
self.assertIsInstance(image_processor.scanner, FileEntryScanner)
self.assertEqual(len(image_processor.path_specs), 2)
ntfs_path_spec = image_processor.path_specs[0]
tsk_path_spec = image_processor.path_specs[1]
self.assertEqual(
ntfs_path_spec.type_indicator, dfvfs_definitions.TYPE_INDICATOR_NTFS)
self.assertEqual(
tsk_path_spec.type_indicator, dfvfs_definitions.TYPE_INDICATOR_TSK)
self.assertEqual(mock_bulk_insert.call_count, 48)
# Check number of blocks inserted for p1
self.assertEqual(len(mock_bulk_insert.mock_calls[0].args[1]), 639)
# Check number of files inserted for p1
self.assertEqual(len(mock_bulk_insert.mock_calls[1].args[1]), 21)
# Check number of blocks inserted for p3
for mock_call in mock_bulk_insert.mock_calls[2:46]:
self.assertEqual(len(mock_call.args[1]), 1500)
self.assertEqual(len(mock_bulk_insert.mock_calls[46].args[1]), 1113)
# Check number of files inserted for p3
self.assertEqual(len(mock_bulk_insert.mock_calls[47].args[1]), 4)
# Test missing image
image_processor.image_path = TEST_IMAGE
image_processor.path_specs = []
image_processor._parse_filesystems()
# Test unsupported volume
image_processor.image_path = os.path.join(
current_path, '..', '..', 'test_data', 'test.dmg')
image_processor._parse_filesystems()
@mock.patch('dfdewey.utils.image_processor.ImageProcessor._parse_filesystems')
@mock.patch('dfdewey.utils.image_processor.ImageProcessor._index_strings')
@mock.patch('dfdewey.utils.image_processor.ImageProcessor._extract_strings')
def test_process_image(
self, mock_extract_strings, mock_index_strings, mock_parse_filesystems):
"""Test process image method."""
image_processor = self._get_image_processor()
image_processor.process_image()
mock_extract_strings.assert_called_once()
mock_index_strings.assert_called_once()
mock_parse_filesystems.assert_called_once()
if __name__ == '__main__':
unittest.main()

View file

@ -97,7 +97,7 @@ class IndexSearcher():
location: Partition number
Returns:
Filename of given inode or None
Filename(s) of given inode or None
"""
results = self.postgresql.query((
'SELECT filename FROM files '
@ -107,8 +107,8 @@ class IndexSearcher():
filenames.append(result[0])
return filenames
def _get_filename_from_offset(self, image_path, image_hash, offset):
"""Gets filename given a byte offset within an image.
def _get_filenames_from_offset(self, image_path, image_hash, offset):
"""Gets filename(s) given a byte offset within an image.
Args:
image_path: source image path.
@ -116,7 +116,7 @@ class IndexSearcher():
offset: byte offset within the image.
Returns:
Filename allocated to the given offset, or None.
Filename(s) allocated to the given offset, or None.
"""
filenames = []
@ -295,7 +295,7 @@ class IndexSearcher():
file_offset = '\n'.join(file_offset)
offset = '\n'.join((offset, file_offset))
hit.offset = offset
filenames = self._get_filename_from_offset(
filenames = self._get_filenames_from_offset(
image_path, image_hash, result['_source']['offset'])
filenames = self._wrap_filenames(filenames)
hit.filename = '\n'.join(filenames)

View file

@ -0,0 +1,195 @@
# -*- coding: utf-8 -*-
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for index searcher."""
import os
import unittest
import mock
from dfdewey.utils.image_processor import FileEntryScanner
from dfdewey.utils.index_searcher import IndexSearcher
TEST_CASE = 'testcase'
TEST_IMAGE = 'test.dd'
TEST_IMAGE_HASH = 'd41d8cd98f00b204e9800998ecf8427e'
class IndexSearcherTest(unittest.TestCase):
"""Tests for index searcher."""
def _get_index_searcher(self):
"""Get a test index searcher.
Returns:
Test index searcher.
"""
with mock.patch('psycopg2.connect'), mock.patch(
'dfdewey.datastore.postgresql.PostgresqlDataStore.query_single_row'
) as mock_query_single_row:
mock_query_single_row.return_value = (TEST_IMAGE_HASH,)
index_searcher = IndexSearcher(TEST_CASE, TEST_IMAGE)
return index_searcher
@mock.patch('dfdewey.datastore.postgresql.PostgresqlDataStore.query')
def test_get_case_images(self, mock_query):
"""Test get case images method."""
mock_query.return_value = [(
'hash1',
'image1.dd',
), (
'hash2',
'image2.dd',
)]
with mock.patch('psycopg2.connect'):
index_searcher = IndexSearcher(TEST_CASE, 'all')
self.assertEqual(index_searcher.images['hash1'], 'image1.dd')
self.assertEqual(index_searcher.images['hash2'], 'image2.dd')
@mock.patch('dfdewey.datastore.postgresql.PostgresqlDataStore.query')
def test_get_filenames_from_inode(self, mock_query):
"""Test get filenames from inode method."""
index_searcher = self._get_index_searcher()
mock_query.return_value = [('test.txt',), ('test.txt:ads',)]
filenames = index_searcher._get_filenames_from_inode(42, '/p1')
self.assertEqual(len(filenames), 2)
self.assertEqual(filenames[0], 'test.txt')
self.assertEqual(filenames[1], 'test.txt:ads')
@mock.patch('dfdewey.utils.index_searcher.IndexSearcher._get_inodes')
@mock.patch(
'dfdewey.utils.index_searcher.IndexSearcher._get_filenames_from_inode')
@mock.patch(
'dfdewey.datastore.postgresql.PostgresqlDataStore.switch_database')
def test_get_filenames_from_offset(
self, mock_switch_database, mock_get_filenames_from_inode,
mock_get_inodes):
"""Test get filenames from offset method."""
index_searcher = self._get_index_searcher()
current_path = os.path.abspath(os.path.dirname(__file__))
image_path = os.path.join(current_path, '..', '..', 'test_data', 'test.dd')
# Test offset not within a file
filenames = index_searcher._get_filenames_from_offset(
image_path, TEST_IMAGE_HASH, 1048579)
mock_switch_database.assert_called_once_with(
db_name=''.join(('fs', TEST_IMAGE_HASH)))
self.assertIsInstance(index_searcher.scanner, FileEntryScanner)
mock_get_inodes.assert_called_once_with(0, '/p1')
self.assertEqual(filenames, [])
# Test offset within a file
mock_get_inodes.reset_mock()
mock_get_inodes.return_value = [(0,)]
mock_get_filenames_from_inode.return_value = ['adams.txt']
filenames = index_searcher._get_filenames_from_offset(
image_path, TEST_IMAGE_HASH, 1133936)
mock_get_inodes.assert_called_once_with(20, '/p1')
mock_get_filenames_from_inode.assert_called_once_with(67, '/p1')
self.assertEqual(filenames, ['adams.txt (67)'])
# Test volume image
mock_get_inodes.reset_mock()
mock_get_inodes.return_value = [(2,)]
mock_get_filenames_from_inode.reset_mock()
mock_get_filenames_from_inode.return_value = []
image_path = os.path.join(
current_path, '..', '..', 'test_data', 'test_volume.dd')
filenames = index_searcher._get_filenames_from_offset(
image_path, TEST_IMAGE_HASH, 334216)
mock_get_inodes.assert_called_once_with(326, '/')
mock_get_filenames_from_inode.assert_called_once_with(2, '/')
self.assertEqual(filenames, [' (2)'])
# Test missing image
index_searcher.scanner = None
filenames = index_searcher._get_filenames_from_offset(
'test.dd', TEST_IMAGE_HASH, 1048579)
self.assertEqual(filenames, [])
def test_wrap_filenames(self):
"""Test wrap filenames method."""
index_searcher = self._get_index_searcher()
filenames = ['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa']
filenames = index_searcher._wrap_filenames(filenames, width=20)
expected_filenames = [
'aaaaaaaaaaaaaaaaaaaa\naaaaaaaaaaaaaaaaaaaa\naaaaaaaaaaaaaaaaaaaa'
]
self.assertEqual(filenames, expected_filenames)
@mock.patch('logging.Logger.info')
@mock.patch('dfdewey.datastore.elastic.ElasticsearchDataStore.search')
def test_list_search(self, mock_search, mock_output):
"""Test list search."""
index_searcher = self._get_index_searcher()
index_searcher.images = {TEST_IMAGE_HASH: TEST_IMAGE}
current_path = os.path.abspath(os.path.dirname(__file__))
query_list = os.path.join(
current_path, '..', '..', 'test_data', 'wordlist.txt')
mock_search.return_value = {'hits': {'total': {'value': 1}}}
index_searcher.list_search(query_list)
self.assertEqual(mock_search.call_count, 8)
mock_output.assert_called_once()
self.assertEqual(mock_output.call_args.args[1], TEST_IMAGE)
self.assertEqual(mock_output.call_args.args[2], TEST_IMAGE_HASH)
self.assertEqual(mock_output.call_args.args[3], query_list)
# Test no results
mock_output.reset_mock()
mock_search.return_value = {'hits': {'total': {'value': 0}}}
index_searcher.list_search(query_list)
mock_output.assert_called_once()
self.assertEqual(mock_output.call_args.args[4], 'No results.')
@mock.patch('logging.Logger.info')
@mock.patch('dfdewey.datastore.postgresql.PostgresqlDataStore')
@mock.patch('dfdewey.datastore.elastic.ElasticsearchDataStore.search')
def test_search(self, mock_search, mock_postgresql, mock_output):
"""Test search method."""
index_searcher = self._get_index_searcher()
current_path = os.path.abspath(os.path.dirname(__file__))
image_path = os.path.join(current_path, '..', '..', 'test_data', 'test.dd')
index_searcher.images = {TEST_IMAGE_HASH: image_path}
index_searcher.postgresql = mock_postgresql
mock_search.return_value = {
'took': 2,
'hits': {
'total': {
'value': 1
},
'hits': [{
'_source': {
'offset': 12889600,
'file_offset': 'GZIP-0',
'data': 'test'
}
}]
}
}
index_searcher.search('test')
mock_search.assert_called_once()
output_calls = mock_output.mock_calls
self.assertEqual(output_calls[0].args[1], image_path)
self.assertEqual(output_calls[0].args[2], TEST_IMAGE_HASH)
self.assertEqual(output_calls[0].args[3], 'test')
self.assertEqual(output_calls[1].args[1], 1)
self.assertEqual(output_calls[1].args[2], 2)
table_output = output_calls[1].args[3]
self.assertEqual(table_output[137:145], '12889600')
self.assertEqual(table_output[169:173], 'test')
self.assertEqual(table_output[182:188], 'GZIP-0')
if __name__ == '__main__':
unittest.main()

BIN
test_data/test.dd Normal file

Binary file not shown.

BIN
test_data/test.dmg Normal file

Binary file not shown.

BIN
test_data/test_volume.dd Normal file

Binary file not shown.

8
test_data/wordlist.txt Normal file
View file

@ -0,0 +1,8 @@
# BANNER FILE NOT PROVIDED (-b option)
# BULK_EXTRACTOR-Version: 1.6.0 ($Rev: 10844 $)
# Feature-Recorder: wordlist
# Filename: test.dd
# Feature-File-Version: 1.1
2681139 Quoth the Raven
2681170 Nevermore.
19998720-ZIP-516 I doubted if I should ever come back.