diff --git a/dfdewey/datastore/elastic.py b/dfdewey/datastore/elastic.py index 7872d22..e038494 100644 --- a/dfdewey/datastore/elastic.py +++ b/dfdewey/datastore/elastic.py @@ -74,7 +74,6 @@ class ElasticsearchDataStore(): Returns: Index name in string format. - Document type in string format. """ if not self.client.indices.exists(index_name): try: @@ -100,41 +99,30 @@ class ElasticsearchDataStore(): raise RuntimeError('Unable to connect to backend datastore.') from e def import_event( - self, index_name, event=None, event_id=None, - flush_interval=DEFAULT_FLUSH_INTERVAL): + self, index_name, event=None, flush_interval=DEFAULT_FLUSH_INTERVAL): """Add event to Elasticsearch. Args: index_name: Name of the index in Elasticsearch event: Event dictionary - event_id: Event Elasticsearch ID flush_interval: Number of events to queue up before indexing Returns: The number of events processed. """ if event: - for k, v in event.items(): - if not isinstance(k, six.text_type): - k = codecs.decode(k, 'utf8') + for key, value in event.items(): + if not isinstance(key, six.text_type): + key = codecs.decode(key, 'utf8') # Make sure we have decoded strings in the event dict. - if isinstance(v, six.binary_type): - v = codecs.decode(v, 'utf8') + if isinstance(value, six.binary_type): + value = codecs.decode(value, 'utf8') - event[k] = v + event[key] = value # Header needed by Elasticsearch when bulk inserting. header = {'index': {'_index': index_name}} - update_header = {'update': {'_index': index_name, '_id': event_id}} - - if event_id: - # Event has "lang" defined if there is a script used for import. - if event.get('lang'): - event = {'script': event} - else: - event = {'doc': event} - header = update_header self.import_events.append(header) self.import_events.append(event) @@ -171,5 +159,6 @@ class ElasticsearchDataStore(): # Default search type for elasticsearch is query_then_fetch. search_type = 'query_then_fetch' + # pylint: disable=unexpected-keyword-arg return self.client.search( body=query_dsl, index=index_id, size=size, search_type=search_type) diff --git a/dfdewey/datastore/elastic_test.py b/dfdewey/datastore/elastic_test.py new file mode 100644 index 0000000..9bf42ee --- /dev/null +++ b/dfdewey/datastore/elastic_test.py @@ -0,0 +1,180 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for elasticsearch datastore.""" + +import unittest +import mock + +from dfdewey.datastore.elastic import ElasticsearchDataStore + +TEST_INDEX_NAME = ''.join(('es', 'd41d8cd98f00b204e9800998ecf8427e')) + + +class ElasticTest(unittest.TestCase): + """Tests for Elasticsearch datastore.""" + + def _get_datastore(self): + """Get a mock elasticsearch datastore. + + Returns: + Mock elasticsearch datastore. + """ + # with mock.patch('psycopg2.connect') as _: + es = ElasticsearchDataStore() + return es + + def test_build_query(self): + """Test build query method.""" + es = self._get_datastore() + query_string = 'test' + query = es.build_query(query_string) + + query_dsl = { + 'query': { + 'bool': { + 'must': [{ + 'query_string': { + 'query': 'test' + } + }] + } + } + } + + self.assertEqual(query, query_dsl) + + @mock.patch('elasticsearch.client.IndicesClient.create') + @mock.patch('elasticsearch.client.IndicesClient.exists') + def test_create_index(self, mock_exists, _): + """Test create index method.""" + es = self._get_datastore() + + mock_exists.return_value = False + + result = es.create_index(TEST_INDEX_NAME) + self.assertEqual(result, TEST_INDEX_NAME) + + @mock.patch('elasticsearch.client.IndicesClient.delete') + @mock.patch('elasticsearch.client.IndicesClient.exists') + def test_delete_index(self, mock_exists, mock_delete): + """Test delete index method.""" + es = self._get_datastore() + + mock_exists.return_value = True + + es.delete_index(TEST_INDEX_NAME) + mock_delete.assert_called_once_with(index=TEST_INDEX_NAME) + + def test_import_event(self): + """Test import event method.""" + es = self._get_datastore() + + with mock.patch.object(es.client, 'bulk') as mock_bulk: + result = es.import_event(TEST_INDEX_NAME) + self.assertEqual(result, 0) + mock_bulk.assert_not_called() + + es.import_events = [{ + 'index': { + '_index': 'esd41d8cd98f00b204e9800998ecf8427e' + } + }, { + 'image': 'd41d8cd98f00b204e9800998ecf8427e', + 'offset': 1048579, + 'file_offset': None, + 'data': 'NTFS \n' + }, { + 'index': { + '_index': 'esd41d8cd98f00b204e9800998ecf8427e' + } + }, { + 'index': { + '_index': 'esd41d8cd98f00b204e9800998ecf8427e' + } + }, { + 'image': 'd41d8cd98f00b204e9800998ecf8427e', + 'offset': 1048755, + 'file_offset': None, + 'data': 'press any key to try again ... \n' + }] + result = es.import_event(TEST_INDEX_NAME) + self.assertEqual(result, 0) + mock_bulk.assert_called_once() + + test_event = { + 'image': 'd41d8cd98f00b204e9800998ecf8427e', + 'offset': 1048579, + 'file_offset': None, + 'data': 'NTFS \n' + } + result = es.import_event(TEST_INDEX_NAME, test_event, flush_interval=1) + self.assertEqual(result, 1) + + @mock.patch('elasticsearch.Elasticsearch.search') + @mock.patch('elasticsearch.client.IndicesClient.exists') + def test_search(self, mock_exists, mock_search): + """Test search method.""" + es = self._get_datastore() + + mock_exists.return_value = True + search_results = { + 'took': 24, + 'timed_out': False, + '_shards': { + 'total': 1, + 'successful': 1, + 'skipped': 0, + 'failed': 0 + }, + 'hits': { + 'total': { + 'value': 2, + 'relation': 'eq' + }, + 'max_score': + 13.436, + 'hits': [{ + '_index': 'esd41d8cd98f00b204e9800998ecf8427e', + '_type': '_doc', + '_id': '7gST1HUBuaTSqxk-XzDA', + '_score': 13.436, + '_source': { + 'image': 'd41d8cd98f00b204e9800998ecf8427e', + 'offset': 1048755, + 'file_offset': None, + 'data': 'press any key to try again ... \n' + } + }, { + '_index': 'esd41d8cd98f00b204e9800998ecf8427e', + '_type': '_doc', + '_id': 'oAST1HUBuaTSqxk-XzLD', + '_score': 13.436, + '_source': { + 'image': 'd41d8cd98f00b204e9800998ecf8427e', + 'offset': 10485427, + 'file_offset': None, + 'data': 'press any key to try again ... \n' + } + }] + } + } + mock_search.return_value = search_results + + results = es.search(TEST_INDEX_NAME, '"any key"') + self.assertEqual(results, search_results) + + +if __name__ == '__main__': + unittest.main()