Add JSON output (#32)

* Move to pytest

* Replace imp with importlib

* Add JSON output

* Update unit test actions
This commit is contained in:
Jason 2023-05-29 14:21:45 +10:00 committed by GitHub
parent 7aadd41ee2
commit 5da497d49c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 102 additions and 50 deletions

View file

@ -5,36 +5,6 @@ on:
types: [opened, synchronize, reopened]
jobs:
build-bionic:
runs-on: ubuntu-18.04
strategy:
matrix:
include:
- python-version: '3.6'
- python-version: '3.7'
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
env:
DEBIAN_FRONTEND: noninteractive
run: |
sudo apt update -q
sudo apt install -y software-properties-common
- name: Install dependencies
env:
DEBIAN_FRONTEND: noninteractive
run: |
sudo add-apt-repository -y ppa:gift/stable
sudo apt update -q
sudo apt install -y python${{ matrix.python-version }} python3-dfvfs python3-pip python3-setuptools
python3 -m pip install .[dev]
- name: Run unit tests
run: python3 run_tests.py
build-focal:
runs-on: ubuntu-20.04
strategy:
@ -64,3 +34,32 @@ jobs:
- name: Run unit tests
run: python3 run_tests.py
build-jammy:
runs-on: ubuntu-22.04
strategy:
matrix:
include:
- python-version: '3.10'
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
env:
DEBIAN_FRONTEND: noninteractive
run: |
sudo apt update -q
sudo apt install -y software-properties-common
- name: Install dependencies
env:
DEBIAN_FRONTEND: noninteractive
run: |
sudo add-apt-repository -y ppa:gift/stable
sudo apt update -q
sudo apt install -y python${{ matrix.python-version }} python3-dfvfs python3-pip python3-setuptools
python3 -m pip install .[dev]
- name: Run unit tests
run: python3 run_tests.py

View file

@ -14,7 +14,8 @@
# limitations under the License.
"""DFDewey Config."""
import imp
import importlib.machinery
import importlib.util
import logging
import os
@ -70,12 +71,16 @@ def load_config(config_file=None):
if config_file:
log.debug('Loading config from {0:s}'.format(config_file))
try:
config = imp.load_source('config', config_file)
except IOError as e:
spec = importlib.util.spec_from_loader(
'config', importlib.machinery.SourceFileLoader('config', config_file))
config = importlib.util.module_from_spec(spec)
spec.loader.exec_module(config)
except FileNotFoundError as e:
log.error(
'Could not load config file {0:s}: {1!s}'.format(config_file, e))
config = None
if not config:
log.warn('Config file not loaded. Using default datastore settings.')
log.warning('Config file not loaded. Using default datastore settings.')
return config

View file

@ -107,7 +107,8 @@ def main():
image_processor_options, args.config)
image_processor.process_image()
else:
index_searcher = IndexSearcher(args.case, image_id, args.image, args.config)
index_searcher = IndexSearcher(
args.case, image_id, args.image, args.json, args.config)
if args.search:
index_searcher.search(args.search, args.highlight)
elif args.search_list:
@ -150,6 +151,8 @@ def parse_args():
parser.add_argument(
'--highlight', help='highlight search term in results',
action='store_true')
parser.add_argument(
'--json', help='output results in JSON format', action='store_true')
parser.add_argument('-s', '--search', help='search query')
parser.add_argument('--search_list', help='file with search queries')

View file

@ -61,9 +61,8 @@ class FileEntryScanner(volume_scanner.VolumeScanner):
"""File entry scanner."""
_NON_PRINTABLE_CHARACTERS = list(range(0, 0x20)) + list(range(0x7f, 0xa0))
_ESCAPE_CHARACTERS = str.maketrans({
value: '\\x{0:02x}'.format(value) for value in _NON_PRINTABLE_CHARACTERS
})
_ESCAPE_CHARACTERS = str.maketrans(
{value: '\\x{0:02x}'.format(value) for value in _NON_PRINTABLE_CHARACTERS})
def __init__(self, mediator=None):
"""Initializes a file entry scanner.

View file

@ -14,6 +14,7 @@
# limitations under the License.
"""Index searcher."""
import json
import logging
import os
import re
@ -66,7 +67,7 @@ class _SearchHit():
class IndexSearcher():
"""Index Searcher class."""
def __init__(self, case, image_id, image, config_file=None):
def __init__(self, case, image_id, image, json=False, config_file=None):
"""Create an index searcher."""
super().__init__()
self.case = case
@ -75,6 +76,7 @@ class IndexSearcher():
self.image = image
self.image_id = image_id
self.images = {}
self.json = json
self.postgresql = None
self.scanner = None
@ -270,7 +272,11 @@ class IndexSearcher():
Args:
query_list (str): path to a text file containing multiple search terms.
"""
search_results = {}
for image_hash, image_path in self.images.items():
search_results[image_hash] = {}
search_results[image_hash]['image'] = image_path
search_results[image_hash]['results'] = {}
index = ''.join(('es', image_hash))
with open(query_list, 'r') as search_terms:
table_data = []
@ -279,14 +285,18 @@ class IndexSearcher():
results = self.opensearch.search(index, term)
hit_count = results['hits']['total']['value']
if hit_count > 0:
search_results[image_hash]['results'][term] = hit_count
table_data.append({'Search term': term, 'Hits': hit_count})
if table_data:
output = tabulate(table_data, headers='keys', tablefmt='simple')
else:
output = 'No results.'
log.info(
'Searched %s (%s) for terms in %s\n\n%s\n', image_path, image_hash,
query_list, output)
if not self.json:
log.info(
'Searched %s (%s) for terms in %s\n\n%s\n', image_path, image_hash,
query_list, output)
if self.json:
log.info('%s', json.JSONEncoder().encode(search_results))
def search(self, query, highlight=False):
"""Run a single query.
@ -295,7 +305,10 @@ class IndexSearcher():
query (str): query to run.
highlight (bool): flag to highlight search term in results.
"""
search_results = {}
for image_hash, image_path in self.images.items():
search_results[image_hash] = {}
search_results[image_hash]['image'] = image_path
log.info('Searching %s (%s) for "%s"', image_path, image_hash, query)
index = ''.join(('es', image_hash))
results = self.opensearch.search(index, query)
@ -329,7 +342,11 @@ class IndexSearcher():
hit.data = self._highlight_hit(hit.data, hit_positions)
hit.data = '\n'.join(hit.data)
hits.append(hit.copy_to_dict())
output = tabulate(hits, headers='keys', tablefmt='simple')
log.info(
'Returned %d results in %dms.\n\n%s\n', result_count, time_taken,
output)
search_results[image_hash][query] = hits
if not self.json:
output = tabulate(hits, headers='keys', tablefmt='simple')
log.info(
'Returned %d results in %dms.\n\n%s\n', result_count, time_taken,
output)
if self.json:
log.info('%s', json.JSONEncoder().encode(search_results))

View file

@ -140,17 +140,27 @@ class IndexSearcherTest(unittest.TestCase):
index_searcher.images = {TEST_IMAGE_HASH: TEST_IMAGE}
current_path = os.path.abspath(os.path.dirname(__file__))
query_list = os.path.join(
current_path, '..', '..', 'test_data', 'wordlist.txt')
current_path, '..', '..', 'test_data', 'searchlist.txt')
mock_search.return_value = {'hits': {'total': {'value': 1}}}
index_searcher.list_search(query_list)
self.assertEqual(mock_search.call_count, 8)
self.assertEqual(mock_search.call_count, 5)
mock_output.assert_called_once()
self.assertEqual(mock_output.call_args.args[1], TEST_IMAGE)
self.assertEqual(mock_output.call_args.args[2], TEST_IMAGE_HASH)
self.assertEqual(mock_output.call_args.args[3], query_list)
# Test JSON output
expected_output = '{"%s": {"image": "%s", "results": {"\\"list\\"": 1, "\\"of\\"": 1, "\\"test\\"": 1, "\\"search\\"": 1, "\\"terms\\"": 1}}}' % (
TEST_IMAGE_HASH, TEST_IMAGE)
mock_output.reset_mock()
index_searcher.json = True
index_searcher.list_search(query_list)
mock_output.assert_called_once()
self.assertEqual(mock_output.call_args.args[1], expected_output)
# Test no results
mock_output.reset_mock()
index_searcher.json = False
mock_search.return_value = {'hits': {'total': {'value': 0}}}
index_searcher.list_search(query_list)
mock_output.assert_called_once()
@ -211,6 +221,17 @@ class IndexSearcherTest(unittest.TestCase):
self.assertEqual(table_output[106:110], 'test')
self.assertEqual(table_output[111:117], 'GZIP-0')
# Test JSON output
expected_output = '{"%s": {"image": "%s", "test": [{"Offset": "12889600\\nGZIP-0", "Filename (inode)": "", "String": "test"}]}}' % (
TEST_IMAGE_HASH, image_path)
mock_search.reset_mock()
mock_output.reset_mock()
index_searcher.json = True
index_searcher.search('test')
mock_search.assert_called_once()
output_calls = mock_output.mock_calls
self.assertEqual(output_calls[1].args[1], expected_output)
def test_wrap_filenames(self):
"""Test wrap filenames method."""
index_searcher = self._get_index_searcher()

View file

@ -19,5 +19,8 @@ import subprocess
if __name__ == '__main__':
subprocess.check_call([
'nosetests', '-vv', '--with-coverage', '--cover-package=dfdewey', '--exe'
'coverage', 'run', '--source=dfdewey', '-m', 'pytest'
])
subprocess.check_call([
'coverage', 'report'
])

View file

@ -67,7 +67,7 @@ setup(
],
install_requires=requirements,
extras_require={
'dev': ['mock', 'nose', 'yapf', 'coverage']
'dev': ['mock', 'pytest', 'yapf', 'coverage']
},
entry_points={'console_scripts': ['dfdewey=dfdewey.dfdcli:main']},
python_requires='>=3.6',

5
test_data/searchlist.txt Normal file
View file

@ -0,0 +1,5 @@
list
of
test
search
terms