elasticsearch datastore tests

This commit is contained in:
Jason Solomon 2020-11-17 16:18:19 +11:00
parent c73d40b74c
commit 14241b04f3
2 changed files with 188 additions and 19 deletions

View file

@ -74,7 +74,6 @@ class ElasticsearchDataStore():
Returns:
Index name in string format.
Document type in string format.
"""
if not self.client.indices.exists(index_name):
try:
@ -100,41 +99,30 @@ class ElasticsearchDataStore():
raise RuntimeError('Unable to connect to backend datastore.') from e
def import_event(
self, index_name, event=None, event_id=None,
flush_interval=DEFAULT_FLUSH_INTERVAL):
self, index_name, event=None, flush_interval=DEFAULT_FLUSH_INTERVAL):
"""Add event to Elasticsearch.
Args:
index_name: Name of the index in Elasticsearch
event: Event dictionary
event_id: Event Elasticsearch ID
flush_interval: Number of events to queue up before indexing
Returns:
The number of events processed.
"""
if event:
for k, v in event.items():
if not isinstance(k, six.text_type):
k = codecs.decode(k, 'utf8')
for key, value in event.items():
if not isinstance(key, six.text_type):
key = codecs.decode(key, 'utf8')
# Make sure we have decoded strings in the event dict.
if isinstance(v, six.binary_type):
v = codecs.decode(v, 'utf8')
if isinstance(value, six.binary_type):
value = codecs.decode(value, 'utf8')
event[k] = v
event[key] = value
# Header needed by Elasticsearch when bulk inserting.
header = {'index': {'_index': index_name}}
update_header = {'update': {'_index': index_name, '_id': event_id}}
if event_id:
# Event has "lang" defined if there is a script used for import.
if event.get('lang'):
event = {'script': event}
else:
event = {'doc': event}
header = update_header
self.import_events.append(header)
self.import_events.append(event)
@ -171,5 +159,6 @@ class ElasticsearchDataStore():
# Default search type for elasticsearch is query_then_fetch.
search_type = 'query_then_fetch'
# pylint: disable=unexpected-keyword-arg
return self.client.search(
body=query_dsl, index=index_id, size=size, search_type=search_type)

View file

@ -0,0 +1,180 @@
# -*- coding: utf-8 -*-
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for elasticsearch datastore."""
import unittest
import mock
from dfdewey.datastore.elastic import ElasticsearchDataStore
TEST_INDEX_NAME = ''.join(('es', 'd41d8cd98f00b204e9800998ecf8427e'))
class ElasticTest(unittest.TestCase):
"""Tests for Elasticsearch datastore."""
def _get_datastore(self):
"""Get a mock elasticsearch datastore.
Returns:
Mock elasticsearch datastore.
"""
# with mock.patch('psycopg2.connect') as _:
es = ElasticsearchDataStore()
return es
def test_build_query(self):
"""Test build query method."""
es = self._get_datastore()
query_string = 'test'
query = es.build_query(query_string)
query_dsl = {
'query': {
'bool': {
'must': [{
'query_string': {
'query': 'test'
}
}]
}
}
}
self.assertEqual(query, query_dsl)
@mock.patch('elasticsearch.client.IndicesClient.create')
@mock.patch('elasticsearch.client.IndicesClient.exists')
def test_create_index(self, mock_exists, _):
"""Test create index method."""
es = self._get_datastore()
mock_exists.return_value = False
result = es.create_index(TEST_INDEX_NAME)
self.assertEqual(result, TEST_INDEX_NAME)
@mock.patch('elasticsearch.client.IndicesClient.delete')
@mock.patch('elasticsearch.client.IndicesClient.exists')
def test_delete_index(self, mock_exists, mock_delete):
"""Test delete index method."""
es = self._get_datastore()
mock_exists.return_value = True
es.delete_index(TEST_INDEX_NAME)
mock_delete.assert_called_once_with(index=TEST_INDEX_NAME)
def test_import_event(self):
"""Test import event method."""
es = self._get_datastore()
with mock.patch.object(es.client, 'bulk') as mock_bulk:
result = es.import_event(TEST_INDEX_NAME)
self.assertEqual(result, 0)
mock_bulk.assert_not_called()
es.import_events = [{
'index': {
'_index': 'esd41d8cd98f00b204e9800998ecf8427e'
}
}, {
'image': 'd41d8cd98f00b204e9800998ecf8427e',
'offset': 1048579,
'file_offset': None,
'data': 'NTFS \n'
}, {
'index': {
'_index': 'esd41d8cd98f00b204e9800998ecf8427e'
}
}, {
'index': {
'_index': 'esd41d8cd98f00b204e9800998ecf8427e'
}
}, {
'image': 'd41d8cd98f00b204e9800998ecf8427e',
'offset': 1048755,
'file_offset': None,
'data': 'press any key to try again ... \n'
}]
result = es.import_event(TEST_INDEX_NAME)
self.assertEqual(result, 0)
mock_bulk.assert_called_once()
test_event = {
'image': 'd41d8cd98f00b204e9800998ecf8427e',
'offset': 1048579,
'file_offset': None,
'data': 'NTFS \n'
}
result = es.import_event(TEST_INDEX_NAME, test_event, flush_interval=1)
self.assertEqual(result, 1)
@mock.patch('elasticsearch.Elasticsearch.search')
@mock.patch('elasticsearch.client.IndicesClient.exists')
def test_search(self, mock_exists, mock_search):
"""Test search method."""
es = self._get_datastore()
mock_exists.return_value = True
search_results = {
'took': 24,
'timed_out': False,
'_shards': {
'total': 1,
'successful': 1,
'skipped': 0,
'failed': 0
},
'hits': {
'total': {
'value': 2,
'relation': 'eq'
},
'max_score':
13.436,
'hits': [{
'_index': 'esd41d8cd98f00b204e9800998ecf8427e',
'_type': '_doc',
'_id': '7gST1HUBuaTSqxk-XzDA',
'_score': 13.436,
'_source': {
'image': 'd41d8cd98f00b204e9800998ecf8427e',
'offset': 1048755,
'file_offset': None,
'data': 'press any key to try again ... \n'
}
}, {
'_index': 'esd41d8cd98f00b204e9800998ecf8427e',
'_type': '_doc',
'_id': 'oAST1HUBuaTSqxk-XzLD',
'_score': 13.436,
'_source': {
'image': 'd41d8cd98f00b204e9800998ecf8427e',
'offset': 10485427,
'file_offset': None,
'data': 'press any key to try again ... \n'
}
}]
}
}
mock_search.return_value = search_results
results = es.search(TEST_INDEX_NAME, '"any key"')
self.assertEqual(results, search_results)
if __name__ == '__main__':
unittest.main()