Migrate to OpenSearch (#27)

* Migrate to OpenSearch

* Minor fixes to support Python 3.6
This commit is contained in:
Jason 2021-12-20 11:08:29 +11:00 committed by GitHub
parent 367dc821b0
commit 561f9d840a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 135 additions and 91 deletions

View file

@ -5,8 +5,38 @@ on:
types: [opened, synchronize, reopened]
jobs:
build:
runs-on: ubuntu-latest
build-bionic:
runs-on: ubuntu-18.04
strategy:
matrix:
include:
- python-version: '3.6'
- python-version: '3.7'
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
env:
DEBIAN_FRONTEND: noninteractive
run: |
sudo apt update -q
sudo apt install -y software-properties-common
- name: Install dependencies
env:
DEBIAN_FRONTEND: noninteractive
run: |
sudo add-apt-repository -y ppa:gift/stable
sudo apt update -q
sudo apt install -y python${{ matrix.python-version }} python3-dfvfs python3-pip python3-setuptools
python3 -m pip install .[dev]
- name: Run unit tests
run: python3 run_tests.py
build-focal:
runs-on: ubuntu-20.04
strategy:
matrix:
include:

View file

@ -36,7 +36,7 @@ pip install -r dfvfs_requirements.txt
```
### Datastores
Elasticsearch and PostgreSQL are also required to store extracted data.
OpenSearch and PostgreSQL are also required to store extracted data.
These can be installed separately or started in Docker using `docker-compose`.
```shell

View file

@ -17,4 +17,4 @@
dfDewey is a digital forensics string extraction, indexing, and searching tool.
"""
__version__ = '20211201'
__version__ = '20211220'

View file

@ -19,7 +19,7 @@ import logging
import os
CONFIG_ENV = [
'PG_HOST', 'PG_PORT', 'PG_DB_NAME', 'ES_HOST', 'ES_PORT', 'ES_URL'
'PG_HOST', 'PG_PORT', 'PG_DB_NAME', 'OS_HOST', 'OS_PORT', 'OS_URL'
]
CONFIG_FILE = '.dfdeweyrc'
# Look in homedir first, then current dir
@ -51,7 +51,7 @@ def load_config(config_file=None):
for config_var in CONFIG_ENV:
config_env = os.environ.get('_'.join(('DFDEWEY', config_var)))
if not config_env:
if config_var == 'ES_URL':
if config_var == 'OS_URL':
config_str += '{0:s} = {1:s}\n'.format(config_var, 'None')
break
else:

View file

@ -19,9 +19,9 @@ PG_HOST = '127.0.0.1'
PG_PORT = 5432
PG_DB_NAME = 'dfdewey'
# Elasticsearch Config
ES_HOST = '127.0.0.1'
ES_PORT = 9200
# ES_URL can be used to specify a RFC-1738 formatted URL
# Example: ES_URL = 'https://user:secret@127.0.0.1:9200/'
ES_URL = None
# OpenSearch Config
OS_HOST = '127.0.0.1'
OS_PORT = 9200
# OS_URL can be used to specify a RFC-1738 formatted URL
# Example: OS_URL = 'https://user:secret@127.0.0.1:9200/'
OS_URL = None

View file

@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2020 Google LLC
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -12,15 +12,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Elasticsearch datastore."""
"""Opensearch datastore."""
import collections
from elasticsearch import Elasticsearch
from elasticsearch import exceptions
from opensearchpy import OpenSearch
from opensearchpy import exceptions
class ElasticsearchDataStore():
class OpenSearchDataStore():
"""Implements the datastore."""
# Number of events to queue up when bulk inserting events.
@ -28,24 +28,24 @@ class ElasticsearchDataStore():
DEFAULT_SIZE = 1000 # Max events to return
def __init__(self, host='127.0.0.1', port=9200, url=None):
"""Create an Elasticsearch client."""
"""Create an OpenSearch client."""
super().__init__()
if url:
self.client = Elasticsearch([url], timeout=30)
self.client = OpenSearch([url], timeout=30)
else:
self.client = Elasticsearch([{'host': host, 'port': port}], timeout=30)
self.client = OpenSearch([{'host': host, 'port': port}], timeout=30)
self.import_counter = collections.Counter()
self.import_events = []
@staticmethod
def build_query(query_string):
"""Build Elasticsearch DSL query.
"""Build OpenSearch DSL query.
Args:
query_string: Query string
Returns:
Elasticsearch DSL query as a dictionary
OpenSearch DSL query as a dictionary
"""
query_dsl = {
@ -80,7 +80,7 @@ class ElasticsearchDataStore():
return index_name
def delete_index(self, index_name):
"""Delete Elasticsearch index.
"""Delete OpenSearch index.
Args:
index_name: Name of the index to delete.
@ -93,10 +93,10 @@ class ElasticsearchDataStore():
def import_event(
self, index_name, event=None, flush_interval=DEFAULT_FLUSH_INTERVAL):
"""Add event to Elasticsearch.
"""Add event to OpenSearch.
Args:
index_name: Name of the index in Elasticsearch
index_name: Name of the index in OpenSearch
event: Event dictionary
flush_interval: Number of events to queue up before indexing
@ -104,7 +104,7 @@ class ElasticsearchDataStore():
The number of events processed.
"""
if event:
# Header needed by Elasticsearch when bulk inserting.
# Header needed by OpenSearch when bulk inserting.
header = {'index': {'_index': index_name}}
self.import_events.append(header)
@ -133,11 +133,11 @@ class ElasticsearchDataStore():
return self.client.indices.exists(index_name)
def search(self, index_id, query_string, size=DEFAULT_SIZE):
"""Search ElasticSearch.
"""Search OpenSearch.
This will take a query string from the UI together with a filter definition.
Based on this it will execute the search request on ElasticSearch and get
the result back.
Based on this it will execute the search request on OpenSearch and get the
result back.
Args:
index_id: Index to be searched
@ -150,7 +150,7 @@ class ElasticsearchDataStore():
query_dsl = self.build_query(query_string)
# Default search type for elasticsearch is query_then_fetch.
# Default search type for OpenSearch is query_then_fetch.
search_type = 'query_then_fetch'
# pylint: disable=unexpected-keyword-arg

View file

@ -12,28 +12,28 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for elasticsearch datastore."""
"""Tests for opensearch datastore."""
import unittest
import mock
from elasticsearch import exceptions
from opensearchpy import exceptions
from dfdewey.datastore.elastic import ElasticsearchDataStore
from dfdewey.datastore.opensearch import OpenSearchDataStore
TEST_INDEX_NAME = ''.join(('es', 'd41d8cd98f00b204e9800998ecf8427e'))
class ElasticTest(unittest.TestCase):
"""Tests for Elasticsearch datastore."""
class OpenSearchTest(unittest.TestCase):
"""Tests for OpenSearch datastore."""
def _get_datastore(self):
"""Get a mock elasticsearch datastore.
"""Get a mock opensearch datastore.
Returns:
Mock elasticsearch datastore.
Mock opensearch datastore.
"""
es = ElasticsearchDataStore()
es = OpenSearchDataStore()
return es
def test_build_query(self):
@ -56,8 +56,8 @@ class ElasticTest(unittest.TestCase):
self.assertEqual(query, query_dsl)
@mock.patch('elasticsearch.client.IndicesClient.create')
@mock.patch('elasticsearch.client.IndicesClient.exists')
@mock.patch('opensearchpy.client.IndicesClient.create')
@mock.patch('opensearchpy.client.IndicesClient.exists')
def test_create_index(self, mock_exists, mock_create):
"""Test create index method."""
es = self._get_datastore()
@ -71,8 +71,8 @@ class ElasticTest(unittest.TestCase):
with self.assertRaises(RuntimeError):
result = es.create_index(TEST_INDEX_NAME)
@mock.patch('elasticsearch.client.IndicesClient.delete')
@mock.patch('elasticsearch.client.IndicesClient.exists')
@mock.patch('opensearchpy.client.IndicesClient.delete')
@mock.patch('opensearchpy.client.IndicesClient.exists')
def test_delete_index(self, mock_exists, mock_delete):
"""Test delete index method."""
es = self._get_datastore()
@ -131,7 +131,7 @@ class ElasticTest(unittest.TestCase):
result = es.import_event(TEST_INDEX_NAME, test_event, flush_interval=1)
self.assertEqual(result, 1)
@mock.patch('elasticsearch.client.IndicesClient.exists')
@mock.patch('opensearchpy.client.IndicesClient.exists')
def test_index_exists(self, mock_exists):
"""Test index exists method."""
es = self._get_datastore()
@ -139,8 +139,8 @@ class ElasticTest(unittest.TestCase):
es.index_exists(TEST_INDEX_NAME)
mock_exists.assert_called_once_with(TEST_INDEX_NAME)
@mock.patch('elasticsearch.Elasticsearch.search')
@mock.patch('elasticsearch.client.IndicesClient.exists')
@mock.patch('opensearchpy.OpenSearch.search')
@mock.patch('opensearchpy.client.IndicesClient.exists')
def test_search(self, mock_exists, mock_search):
"""Test search method."""
es = self._get_datastore()

View file

@ -31,7 +31,7 @@ log = logging.getLogger('dfdewey')
class _StringRecord():
"""Elasticsearch string record.
"""OpenSearch string record.
Attributes:
image: Hash to identify the source image of the string
@ -68,7 +68,10 @@ def get_image_id(image_path):
with open(image_path, 'rb') as image_file:
hash = hashlib.md5()
hashed = 0
while chunk := image_file.read(8192):
while True:
chunk = image_file.read(8192)
if not chunk:
break
hash.update(chunk)
hashed += 1
if hashed == 262144:

View file

@ -30,7 +30,7 @@ from dfvfs.volume import tsk_volume_system
import pytsk3
import dfdewey.config as dfdewey_config
from dfdewey.datastore.elastic import ElasticsearchDataStore
from dfdewey.datastore.opensearch import OpenSearchDataStore
from dfdewey.datastore.postgresql import PostgresqlDataStore
BATCH_SIZE = 1500
@ -40,7 +40,7 @@ log = logging.getLogger('dfdewey.image_processor')
class _StringRecord():
"""Elasticsearch string record.
"""OpenSearch string record.
Attributes:
image: Hash to identify the source image of the string
@ -270,7 +270,7 @@ class ImageProcessor():
Attributes:
case (str): case ID.
elasticsearch (ElasticsearchDataStore): elasticsearch datastore.
opensearch (OpenSearchDataStore): opensearch datastore.
image_hash (str): MD5 hash of the image.
image_id (str): image identifier.
image_path (str): path to source image.
@ -286,7 +286,7 @@ class ImageProcessor():
super().__init__()
self.case = case
self.config = dfdewey_config.load_config(config_file=config_file)
self.elasticsearch = None
self.opensearch = None
self.image_hash = None
self.image_id = image_id
self.image_path = image_path
@ -416,7 +416,7 @@ class ImageProcessor():
"""Index a single record.
Args:
index_name: ID of the elasticsearch index.
index_name: ID of the opensearch index.
string_record: String record to be indexed.
Returns:
@ -428,27 +428,27 @@ class ImageProcessor():
'file_offset': string_record.file_offset,
'data': string_record.data
}
return self.elasticsearch.import_event(index_name, event=json_record)
return self.opensearch.import_event(index_name, event=json_record)
def _index_strings(self):
"""Index the extracted strings."""
if self.config:
self.elasticsearch = ElasticsearchDataStore(
self.OpenSearch = OpenSearchDataStore(
host=self.config.ES_HOST, port=self.config.ES_PORT,
url=self.config.ES_URL)
else:
self.elasticsearch = ElasticsearchDataStore()
self.opensearch = OpenSearchDataStore()
index_name = ''.join(('es', self.image_hash))
index_exists = self.elasticsearch.index_exists(index_name)
index_exists = self.opensearch.index_exists(index_name)
if index_exists:
log.info('Image already indexed: [%s]', self.image_path)
if self.options.reindex:
log.info('Reindexing.')
self.elasticsearch.delete_index(index_name)
self.opensearch.delete_index(index_name)
log.info('Index %s deleted.', index_name)
index_exists = False
if not index_exists:
index_name = self.elasticsearch.create_index(index_name=index_name)
index_name = self.opensearch.create_index(index_name=index_name)
log.info('Index %s created.', index_name)
string_list = os.path.join(self.output_path, 'wordlist.txt')
@ -482,7 +482,7 @@ class ImageProcessor():
if records % STRING_INDEXING_LOG_INTERVAL == 0:
log.info('Indexed %d records...', records)
# Flush the import buffer
records = self.elasticsearch.import_event(index_name)
records = self.opensearch.import_event(index_name)
log.info('Indexed %d records...', records)
def _initialise_database(self):

View file

@ -201,8 +201,8 @@ class ImageProcessorTest(unittest.TestCase):
self.assertEqual(location, '/p1')
self.assertEqual(start_offset, 1048576)
@mock.patch('dfdewey.datastore.elastic.ElasticsearchDataStore')
def test_index_record(self, mock_elasticsearch):
@mock.patch('dfdewey.datastore.opensearch.OpenSearchDataStore')
def test_index_record(self, mock_opensearch):
"""Test index record method."""
image_processor = self._get_image_processor()
@ -212,7 +212,7 @@ class ImageProcessorTest(unittest.TestCase):
string_record.offset = 1234567
string_record.data = 'test string'
image_processor.elasticsearch = mock_elasticsearch
image_processor.opensearch = mock_opensearch
image_processor._index_record(index_name, string_record)
json_record = {
@ -221,14 +221,14 @@ class ImageProcessorTest(unittest.TestCase):
'file_offset': string_record.file_offset,
'data': string_record.data
}
mock_elasticsearch.import_event.assert_called_once_with(
mock_opensearch.import_event.assert_called_once_with(
index_name, event=json_record)
@mock.patch('elasticsearch.client.IndicesClient')
@mock.patch('opensearchpy.client.IndicesClient')
@mock.patch('dfdewey.utils.image_processor.ImageProcessor._index_record')
@mock.patch('dfdewey.datastore.elastic.ElasticsearchDataStore.index_exists')
@mock.patch('dfdewey.datastore.elastic.ElasticsearchDataStore.import_event')
@mock.patch('dfdewey.datastore.elastic.ElasticsearchDataStore.create_index')
@mock.patch('dfdewey.datastore.opensearch.OpenSearchDataStore.index_exists')
@mock.patch('dfdewey.datastore.opensearch.OpenSearchDataStore.import_event')
@mock.patch('dfdewey.datastore.opensearch.OpenSearchDataStore.create_index')
def test_index_strings(
self, mock_create_index, mock_import_event, mock_index_exists,
mock_index_record, _):

View file

@ -24,7 +24,7 @@ import pytsk3
from tabulate import tabulate
import dfdewey.config as dfdewey_config
from dfdewey.datastore.elastic import ElasticsearchDataStore
from dfdewey.datastore.opensearch import OpenSearchDataStore
from dfdewey.datastore.postgresql import PostgresqlDataStore
from dfdewey.utils.image_processor import FileEntryScanner
@ -71,7 +71,7 @@ class IndexSearcher():
super().__init__()
self.case = case
self.config = dfdewey_config.load_config(config_file)
self.elasticsearch = None
self.opensearch = None
self.image = image
self.image_id = image_id
self.images = {}
@ -82,12 +82,12 @@ class IndexSearcher():
self.postgresql = PostgresqlDataStore(
host=self.config.PG_HOST, port=self.config.PG_PORT,
db_name=self.config.PG_DB_NAME)
self.elasticsearch = ElasticsearchDataStore(
self.opensearch = OpenSearchDataStore(
host=self.config.ES_HOST, port=self.config.ES_PORT,
url=self.config.ES_URL)
else:
self.postgresql = PostgresqlDataStore()
self.elasticsearch = ElasticsearchDataStore()
self.opensearch = OpenSearchDataStore()
if image != 'all':
self.image = os.path.abspath(self.image)
@ -331,7 +331,7 @@ class IndexSearcher():
table_data = []
for term in search_terms:
term = ''.join(('"', term.strip(), '"'))
results = self.elasticsearch.search(index, term)
results = self.opensearch.search(index, term)
hit_count = results['hits']['total']['value']
if hit_count > 0:
table_data.append({'Search term': term, 'Hits': hit_count})
@ -353,7 +353,7 @@ class IndexSearcher():
for image_hash, image_path in self.images.items():
log.info('Searching %s (%s) for "%s"', image_path, image_hash, query)
index = ''.join(('es', image_hash))
results = self.elasticsearch.search(index, query)
results = self.opensearch.search(index, query)
result_count = results['hits']['total']['value']
time_taken = results['took']

View file

@ -142,7 +142,7 @@ class IndexSearcherTest(unittest.TestCase):
])
@mock.patch('logging.Logger.info')
@mock.patch('dfdewey.datastore.elastic.ElasticsearchDataStore.search')
@mock.patch('dfdewey.datastore.opensearch.OpenSearchDataStore.search')
def test_list_search(self, mock_search, mock_output):
"""Test list search."""
index_searcher = self._get_index_searcher()
@ -167,7 +167,7 @@ class IndexSearcherTest(unittest.TestCase):
@mock.patch('logging.Logger.info')
@mock.patch('dfdewey.datastore.postgresql.PostgresqlDataStore')
@mock.patch('dfdewey.datastore.elastic.ElasticsearchDataStore.search')
@mock.patch('dfdewey.datastore.opensearch.OpenSearchDataStore.search')
def test_search(self, mock_search, mock_postgresql, mock_output):
"""Test search method."""
index_searcher = self._get_index_searcher()

View file

@ -1,9 +1,9 @@
dfvfs >= 20211107
dfvfs >= 20211017
pip >= 7.0.0
PyYAML >= 3.10
cffi >= 1.9.1
cryptography >= 2.0.2
dfdatetime >= 20211113
dfdatetime >= 20210509
dtfabric >= 20170524
libbde-python >= 20140531
libewf-python >= 20131210

View file

@ -11,28 +11,38 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: '2'
version: '3'
services:
elasticsearch:
opensearch:
image: opensearchproject/opensearch:latest
environment:
# Set to single node deployment
- discovery.type=single-node
# Java memory for Elasticsearch is set high for better performance when
# Disabling SSL for localhost only deployment
- plugins.security.disabled=true
# Java memory for Opensearch is set high for better performance when
# indexing large volumes of data.
# If running on a system with less available memory, consider using
# something smaller, such as:
# - ES_JAVA_OPTS=-Xms512m -Xmx512m
- ES_JAVA_OPTS=-Xms32g -Xmx32g
image: elasticsearch:7.9.3
# - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
# Recommend setting to 50% of system RAM
- "OPENSEARCH_JAVA_OPTS=-Xms32g -Xmx32g"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
ports:
- "127.0.0.1:9200:9200"
- "127.0.0.1:9300:9300"
restart: always
postgres:
image: postgres
ports:
- "127.0.0.1:5432:5432"
image: postgres:latest
environment:
- POSTGRES_USER=dfdewey
- POSTGRES_PASSWORD=password
ports:
- "127.0.0.1:5432:5432"
restart: always

View file

@ -27,7 +27,7 @@ optional arguments:
## Docker
If using Elasticsearch and PostgreSQL in Docker, they can be started using
If using OpenSearch and PostgreSQL in Docker, they can be started using
[docker-compose](https://docs.docker.com/compose/install/) from the `docker`
folder.
@ -35,7 +35,7 @@ folder.
docker-compose up -d
```
Note: Java memory for Elasticsearch is set high to improve performance when
Note: Java memory for OpenSearch is set high to improve performance when
indexing large volumes of data. If running on a system with limited resources,
you can change the setting in `docker/docker-compose.yml`.
@ -57,7 +57,7 @@ docker build -t <docker_name> -f ./docker/Dockerfile .
```
When running dfDewey within a Docker container, we need to give the container
access to the host network so it will be able to access Elasticsearch and
access to the host network so it will be able to access OpenSearch and
PostgreSQL in their respective containers. We also need to map a folder in the
container to allow access to the image we want to process. For example:

View file

@ -1,4 +1,4 @@
elasticsearch
opensearch-py
psycopg2-binary
six
tabulate

View file

@ -49,6 +49,7 @@ setup(
name='dfDewey',
version=dfdewey.__version__,
description=DFDEWEY_DESCRIPTION,
long_description=DFDEWEY_DESCRIPTION,
license='Apache License, Version 2.0',
url='https://github.com/google/dfdewey',
maintainer='dfDewey development team',