thoughts/data/an-openioc-graph-a-different-kind-of-rule-scheme.md

273 lines
11 KiB
Markdown
Raw Normal View History

2024-08-05 18:24:56 +00:00
Despite that I think that implementing a full-fledged
XML-editor is too complex for an operational scenario, I
believe the OpenIOC-format, which has been in the works at
Mandiant for a couple of years now, is quite good. They also
have the IOC Writer which was launched at last summers Black
Hat. OpenIOC can export to other expression languages, such
as Yara [1], as well.
I have been thinking of a way to combine graph knowledge
with exactly that for a while, an expressive detection
language based on a graph. If combining two things you love,
I have learned that it simply can't end badly, it must end
with something amazing. Let's give it a try!
So I went about it, starting off by importing a sample
Maltego-graph to Titan on HBase [2]. I basically set out
with five connected nodes in Maltego Tungsten. Nothing
malicious, just a national newspaper.
Running that through my Rexster migration script results in
a equivalent graph on the Rexster server.
It's nice considering if you'd like to put it in a larger
context with millions or billions of vertices you would like
to trigger on. That is out of bounds for Maltego, or your
desktop system in general.
## The OpenIOC Part
If looking at the graphs above, you will probably agree that
it isn't especially describing of certain incidents or other
contextual data. But what if we could combine the graph with
something like OpenIOC? Turns out that it's conceptually
similar. The weakness of OpenIOC is that it doesn't scale
when firing up an OpenIOC editor - like the one Mandiant
have created. On the other hand, if you could traverse a
graph with OpenIOC designed around the OpenIOC format..
Let's create a basic writer as a demonstration, which
operates on the root level (no nesting of rules in this
example).
from ioc_writer import ioc_api
from lxml import etree as et
class IOC:
def __init__(self):
self.IOC = ioc_api.IOC(name='Test',
description='An IOC generated from a Python script',
author='Someone')
2024-08-05 18:24:56 +00:00
self.IOC.set_created_date()
self.IOC.set_published_date()
self.IOC.set_lastmodified_date()
self.IOC.update_name('test_rexster')
self.IOC.update_description('A Test')
self.id = self.IOC.iocid
def addNode(self,label,text,type,indicator,condition='is'):
IndicatorItem_node = ioc_api.make_IndicatorItem_node(condition,
label, text, type, indicator)
2024-08-05 18:24:56 +00:00
current_guid = IndicatorItem_node.attrib['id']
print current_guid
self.IOC.top_level_indicator.append(IndicatorItem_node)
def __str__(self):
self.xml = et.tostring(self.IOC.root, encoding='utf-8',
xml_declaration=True, pretty_print=True)
2024-08-05 18:24:56 +00:00
return self.xml
This enables us to do something like this:
ioc = IOC()
ioc.addNode('test','Just a test','domain','vg.no')
print ioc
Which will again return the XML of the IOC.
<?xml version='1.0' encoding='utf-8'?>
<OpenIOC xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns="http://openioc.org/schemas/OpenIOC_1.1"
id="06fd70db-992c-4678-83e6-8f1b150e8bcf"
last-modified="2014-01-28T07:15:09"
published-date="2014-01-28T07:15:09">
2024-08-05 18:24:56 +00:00
<metadata>
<short_description>test</short_description>
<description>A Test</description>
<keywords/>
<authored_by>Someone</authored_by>
<authored_date>2014-01-28T07:15:09</authored_date>
<links/>
</metadata>
<criteria>
<Indicator id="fbbb2883-473a-4a1c-92c4-692e199adb61" operator="OR">
<IndicatorItem id="14a42d26-b056-4b2e-a327-7d6edb25457e"
condition="is" preserve-case="false" negate="false">
2024-08-05 18:24:56 +00:00
<Context document="test" search="Just a test" type="mir"/>
<Content type="domain">vg.no</Content>
<IndicatorItem id="dff6e0c5-613b-4bea-8bad-bb7a36b3ccdf"
condition="is" preserve-case="false" negate="false">
2024-08-05 18:24:56 +00:00
<Context document="test" search="Just a test" type="mir"/>
<Content type="ip">195.88.55.16</Content>
</IndicatorItem>
</IndicatorItem>
</Indicator>
</criteria>
<parameters/>
</OpenIOC>
Reviewing the XML above you might notice that the scheme is
pretty transferrable to a graph, perhaps even simplifying of
the IOC XML. Be especially aware on the following tags and
attributes:
* Content
* The IndicatorItem condition
* The content type
A nested IOC might look like this (relevant excerpt):
<Indicator id="b12f8c27-d168-49b5-bc75-cec86bf21d3f" operator="OR">
<IndicatorItem id="af4323dc-a967-4fe3-b62f-b461b90a3550" condition="is"
preserve-case="false" negate="false">
2024-08-05 18:24:56 +00:00
<Context document="test" search="Just a test" type="mir"/>
<Content type="domain">vg.no</Content>
<IndicatorItem id="2ff639ca-dcec-4967-ac06-f54989bf3dc4" condition="is"
preserve-case="false" negate="false">
2024-08-05 18:24:56 +00:00
<Context document="test" search="Just a test" type="mir"/>
<Content type="ip">195.88.55.16</Content>
</IndicatorItem>
</IndicatorItem>
</Indicator>
The above implies that the domain vg.no needs to be
accompanied with the IP-address ``195.88.55.16``.
## Merging the Best of Two Worlds
So now that we have had a look at the power in the structure
of a graph and the power of expression in the OpenIOC
XML-indicators, you might see why this is the best of two
worlds.
In the challenge of combining them both I perhaps
oversimplified the nesting and used the two previously
mentioned attributes in the graph, adding the content as the
value of the node and the condition. We will also have to
add the type attribute since that tells us what type of
OpenIOC entry we have when reversing the process later
on. We will have a small collision between Maltego and
OpenIOC, since for instance an IP-address type will
differ. So for now you will need two type attributes, one
for Maltego and one for OpenIOC (if you plan to go both
ways). This is left as an exersise for the reader.
Creating an OpenIOC-compatible graph is a breeze:
from rexpro import RexProConnection
class Graph:
def __init__(self):
self.graph = RexProConnection('localhost',8184,'titan')
def addVertice(self,content,content_type,condition):
vertice_id = self.graph.execute("""
def v1 = g.addVertex([content:content,
content_type:content_type,
condition:condition])
2024-08-05 18:24:56 +00:00
return v1""",
{'content':content, 'content_type':content_type, 'condition':condition})
return vertice_id
def addEdge(self,vid1,vid2,label):
edge = self.graph.execute("""
def v1 = g.v(vid1)
def v2 = g.v(vid2)
g.addEdge(v1, v2, label)
g.commit()""",{'vid1':vid1['_id'], 'vid2':vid2['_id'], 'label':label})
graph=Graph()
v1=graph.addVertice('vg.no','domain','is')
v2=graph.addVertice('195.88.55.16','ip','is')
graph.addEdge(v1,v2,'and')
If you'd like to go the other way again in order to talk to
other organisations perhaps, you will want to run the
process in reverse:
from rexpro import RexProConnection
class RexsterIOC:
def __init__(self):
self.graph = RexProConnection('localhost',8184,'titan')
self.IOC = ioc_api.IOC(name='Test',
description='A test IOC generated from Rexster',
author='Someone')
2024-08-05 18:24:56 +00:00
self.IOC.set_created_date()
self.IOC.set_published_date()
self.IOC.set_lastmodified_date()
#IOC.add_link('help', self.baseurl + url)
self.IOC.update_name('test')
self.IOC.update_description('A Test')
self.id = self.IOC.iocid
self.lastId=None
def addNode(self,
label,
text,
type,
indicator,
condition='is',
addToLast=False):
IndicatorItem_node = ioc_api.make_IndicatorItem_node(condition,
label,
text,
type,
indicator)
2024-08-05 18:24:56 +00:00
if addToLast and self.last:
self.last.append(IndicatorItem_node)
else:
self.IOC.top_level_indicator.append(IndicatorItem_node)
current_guid = IndicatorItem_node.attrib['id']
self.last = IndicatorItem_node
def traverse(self,rootNodeId):
root=self.graph.execute("""return g.v(80284)""",
{'vid':str(rootNodeId)})
2024-08-05 18:24:56 +00:00
self.addNode('test','Just a test',
root['_properties']['content_type'],
root['_properties']['content'],
root['_properties']['condition'])
one_level_out=self.graph.execute("""return g.v(vid).out""",
{'vid':str(rootNodeId)})
2024-08-05 18:24:56 +00:00
for vertex in one_level_out:
self.addNode('test','Just a test',
vertex['_properties']['content_type'],
vertex['_properties']['content'],
vertex['_properties']['condition'],addToLast=True)
def __str__(self):
self.xml = et.tostring(self.IOC.root,
encoding='utf-8',
xml_declaration=True,
pretty_print=True)
2024-08-05 18:24:56 +00:00
return self.xml
ioc = RexsterIOC()
ioc.traverse(80284) # the root node
print ioc
One thing that you can now do is to store the indicators
with the rest of your network data. This again will imply
that the edges are created automatically without any need to
actually run jobs to combine data for detecting stuff.
That's my small concept demonstration. I think it's pretty
cool!
I've put the scripts in a Gist for you if you'd like to give
it a try [3].
[1] Yara: https://github.com/mandiant/ioc_writer/tree/master/examples/openioc_to_yara
[2] Importing a sample Maltego-graph to Titan on HBase: https://gist.github.com/tommyskg/8166472
[3] the scripts out there: https://gist.github.com/tommyskg/8671318