Merge branch 'master' into colour

This commit is contained in:
Jason 2021-08-18 11:07:29 +10:00 committed by GitHub
commit 1a34d99c7e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 37 additions and 129 deletions

2
.coveragerc Normal file
View file

@ -0,0 +1,2 @@
[report]
show_missing = True

View file

@ -205,7 +205,11 @@ class FileEntryScanner(volume_scanner.VolumeScanner):
Volume location / identifier, offset, and size for all volumes.
"""
if not self._volumes or self._source_path != image_path:
base_path_specs = self.GetBasePathSpecs(image_path)
options = volume_scanner.VolumeScannerOptions()
options.partitions = ['all']
options.volumes = ['all']
options.snapshots = ['none']
base_path_specs = self.GetBasePathSpecs(image_path, options=options)
for path_spec in base_path_specs:
partition_path_spec = self._get_tsk_partition_path_spec(path_spec)
@ -482,12 +486,14 @@ class ImageProcessor():
self._create_filesystem_database()
# Scan image for volumes
dfvfs_definitions.PREFERRED_GPT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_GPT)
mediator = UnattendedVolumeScannerMediator()
options = volume_scanner.VolumeScannerOptions()
options.partitions = ['all']
options.volumes = ['all']
options.snapshots = ['none']
try:
self.scanner = FileEntryScanner(mediator=mediator)
self.path_specs = self.scanner.GetBasePathSpecs(self.image_path)
self.scanner = FileEntryScanner()
self.path_specs = self.scanner.GetBasePathSpecs(
self.image_path, options=options)
log.info(
'Found %d volume%s in [%s]:', len(self.path_specs),
'' if len(self.path_specs) == 1 else 's', self.image_path)
@ -499,8 +505,8 @@ class ImageProcessor():
log.info(
'%s: %s (Offset %d)', location, path_spec.type_indicator,
start_offset)
if path_spec.type_indicator in (dfvfs_definitions.TYPE_INDICATOR_NTFS,
dfvfs_definitions.TYPE_INDICATOR_TSK):
if path_spec.type_indicator in (dfvfs_definitions.TYPE_INDICATOR_EXT,
dfvfs_definitions.TYPE_INDICATOR_NTFS):
self._parse_inodes(location, start_offset)
self.scanner.parse_file_entries([path_spec], self.postgresql)
else:
@ -572,96 +578,3 @@ class ImageProcessorOptions():
self.gunzip = gunzip
self.unzip = unzip
self.reindex = reindex
class UnattendedVolumeScannerMediator(volume_scanner.VolumeScannerMediator):
"""Unattended volume scanner mediator."""
def GetAPFSVolumeIdentifiers(self, volume_system, volume_identifiers):
"""Retrieves APFS volume identifiers.
In an unattended execution, this method returns all volume identifiers.
Args:
volume_system (APFSVolumeSystem): volume system.
volume_identifiers (list[str]): volume identifiers including prefix.
Returns:
list[str]: all volume identifiers including prefix.
"""
prefix = 'apfs'
return [
'{0:s}{1:d}'.format(prefix, volume_index)
for volume_index in range(1, volume_system.number_of_volumes + 1)
]
def GetLVMVolumeIdentifiers(self, volume_system, volume_identifiers):
"""Retrieves LVM volume identifiers.
This method can be used to prompt the user to provide LVM volume
identifiers.
Args:
volume_system (LVMVolumeSystem): volume system.
volume_identifiers (list[str]): volume identifiers including prefix.
Returns:
list[str]: selected volume identifiers including prefix or None.
"""
prefix = 'lvm'
return [
'{0:s}{1:d}'.format(prefix, volume_index)
for volume_index in range(1, volume_system.number_of_volumes + 1)
]
def GetPartitionIdentifiers(self, volume_system, volume_identifiers):
"""Retrieves partition identifiers.
In an unattended execution, this method returns all partition identifiers.
Args:
volume_system (TSKVolumeSystem): volume system.
volume_identifiers (list[str]): volume identifiers including prefix.
Returns:
list[str]: all volume identifiers including prefix.
"""
prefix = 'p'
return [
'{0:s}{1:d}'.format(prefix, volume_index)
for volume_index in range(1, volume_system.number_of_volumes + 1)
]
def GetVSSStoreIdentifiers(self, volume_system, volume_identifiers):
"""Retrieves VSS store identifiers.
Placeholder method for VSS support.
Args:
volume_system (VShadowVolumeSystem): volume system.
volume_identifiers (list[str]): volume identifiers including prefix.
Returns:
list[str]: None.
"""
return []
def UnlockEncryptedVolume(
self, source_scanner_object, scan_context, locked_scan_node, credentials):
"""Unlocks an encrypted volume.
Placeholder method for encrypted volume support.
Args:
source_scanner_object (SourceScanner): source scanner.
scan_context (SourceScannerContext): source scanner context.
locked_scan_node (SourceScanNode): locked scan node.
credentials (Credentials): credentials supported by the locked scan node.
Returns:
bool: True if the volume was unlocked.
"""
log.warning(
'Encrypted volumes are currently unsupported: %s',
locked_scan_node.path_spec.CopyToDict())
return False

View file

@ -18,13 +18,13 @@ import os
from subprocess import CalledProcessError
import unittest
from dfvfs.helpers import volume_scanner
from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.path import factory as path_spec_factory
import mock
from dfdewey.utils.image_processor import (
_StringRecord, FileEntryScanner, ImageProcessor, ImageProcessorOptions,
UnattendedVolumeScannerMediator)
_StringRecord, FileEntryScanner, ImageProcessor, ImageProcessorOptions)
TEST_CASE = 'testcase'
TEST_IMAGE = 'test.dd'
@ -34,35 +34,29 @@ TEST_IMAGE_HASH = 'd41d8cd98f00b204e9800998ecf8427e'
class FileEntryScannerTest(unittest.TestCase):
"""Tests for file entry scanner."""
def _get_file_entry_scanner(self):
"""Get a test file entry scanner.
Returns:
Test file entry scanner.
"""
mediator = UnattendedVolumeScannerMediator()
scanner = FileEntryScanner(mediator=mediator)
return scanner
@mock.patch('dfdewey.datastore.postgresql.PostgresqlDataStore')
def test_parse_file_entries(self, mock_datastore):
"""Test parse file entries method."""
scanner = self._get_file_entry_scanner()
options = volume_scanner.VolumeScannerOptions()
options.partitions = ['all']
options.volumes = ['all']
options.snapshots = ['none']
scanner = FileEntryScanner()
current_path = os.path.abspath(os.path.dirname(__file__))
image_path = os.path.join(
current_path, '..', '..', 'test_data', 'test_volume.dd')
path_specs = scanner.GetBasePathSpecs(image_path)
path_specs = scanner.GetBasePathSpecs(image_path, options=options)
scanner.parse_file_entries(path_specs, mock_datastore)
self.assertEqual(mock_datastore.bulk_insert.call_count, 2)
insert_calls = mock_datastore.bulk_insert.mock_calls
self.assertEqual(len(insert_calls[0].args[1]), 1500)
self.assertEqual(len(insert_calls[1].args[1]), 3)
self.assertEqual(len(insert_calls[1].args[1]), 2)
# Test APFS
mock_datastore.reset_mock()
scanner = self._get_file_entry_scanner()
scanner = FileEntryScanner()
image_path = os.path.join(current_path, '..', '..', 'test_data', 'test.dmg')
path_specs = scanner.GetBasePathSpecs(image_path)
path_specs = scanner.GetBasePathSpecs(image_path, options=options)
self.assertEqual(getattr(path_specs[0].parent, 'location', None), '/apfs1')
scanner.parse_file_entries(path_specs, mock_datastore)
mock_datastore.bulk_insert.assert_not_called()
@ -315,7 +309,7 @@ class ImageProcessorTest(unittest.TestCase):
self.assertEqual(
ntfs_path_spec.type_indicator, dfvfs_definitions.TYPE_INDICATOR_NTFS)
self.assertEqual(
tsk_path_spec.type_indicator, dfvfs_definitions.TYPE_INDICATOR_TSK)
tsk_path_spec.type_indicator, dfvfs_definitions.TYPE_INDICATOR_EXT)
self.assertEqual(mock_bulk_insert.call_count, 48)
# Check number of blocks inserted for p1
self.assertEqual(len(mock_bulk_insert.mock_calls[0].args[1]), 639)
@ -326,7 +320,7 @@ class ImageProcessorTest(unittest.TestCase):
self.assertEqual(len(mock_call.args[1]), 1500)
self.assertEqual(len(mock_bulk_insert.mock_calls[46].args[1]), 1113)
# Check number of files inserted for p3
self.assertEqual(len(mock_bulk_insert.mock_calls[47].args[1]), 4)
self.assertEqual(len(mock_bulk_insert.mock_calls[47].args[1]), 3)
# Test missing image
image_processor.image_path = TEST_IMAGE

View file

@ -25,8 +25,7 @@ from tabulate import tabulate
from dfdewey.datastore.elastic import ElasticsearchDataStore
from dfdewey.datastore.postgresql import PostgresqlDataStore
from dfdewey.utils.image_processor import (
FileEntryScanner, UnattendedVolumeScannerMediator)
from dfdewey.utils.image_processor import FileEntryScanner
DATA_COLUMN_WIDTH = 110
TEXT_HIGHLIGHT = '\u001b[31m\u001b[1m'
@ -131,8 +130,7 @@ class IndexSearcher():
volume_extents = {}
try:
if not self.scanner:
mediator = UnattendedVolumeScannerMediator()
self.scanner = FileEntryScanner(mediator=mediator)
self.scanner = FileEntryScanner()
volume_extents = self.scanner.get_volume_extents(image_path)
except dfvfs_errors.ScannerError as e:
log.error('Error scanning for partitions: %s', e)

View file

@ -2,19 +2,20 @@ pip >= 7.0.0
PyYAML >= 3.10
cffi >= 1.9.1
cryptography >= 2.0.2
dfdatetime >= 20200809
dfdatetime >= 20210509
dtfabric >= 20170524
idna >= 2.5
libbde-python >= 20140531
libewf-python >= 20131210
libfsapfs-python >= 20201107
libfsext-python >= 20200819
libfshfs-python >= 20201103
libfsext-python >= 20210424
libfshfs-python >= 20210530
libfsntfs-python >= 20200921
libfsxfs-python >= 20201114
libfvde-python >= 20160719
libfwnt-python >= 20160418
libluksde-python >= 20200101
libmodi-python >= 20210405
libqcow-python >= 20201213
libsigscan-python >= 20191221
libsmdev-python >= 20140529
@ -24,4 +25,4 @@ libvmdk-python >= 20140421
libvsgpt-python >= 20210207
libvshadow-python >= 20160109
libvslvm-python >= 20160109
pytsk3 >= 20160721
pytsk3 >= 20210419

View file

@ -1,4 +1,4 @@
dfvfs
dfvfs>=20210606
elasticsearch
psycopg2-binary
pytsk3