Despite that I think that implementing a full-fledged XML-editor is too complex for an operational scenario, I believe the OpenIOC-format, which has been in the works at Mandiant for a couple of years now, is quite good. They also have the IOC Writer which was launched at last summers Black Hat. OpenIOC can export to other expression languages, such as Yara [1], as well. I have been thinking of a way to combine graph knowledge with exactly that for a while, an expressive detection language based on a graph. If combining two things you love, I have learned that it simply can't end badly, it must end with something amazing. Let's give it a try! So I went about it, starting off by importing a sample Maltego-graph to Titan on HBase [2]. I basically set out with five connected nodes in Maltego Tungsten. Nothing malicious, just a national newspaper. Running that through my Rexster migration script results in a equivalent graph on the Rexster server. It's nice considering if you'd like to put it in a larger context with millions or billions of vertices you would like to trigger on. That is out of bounds for Maltego, or your desktop system in general. ## The OpenIOC Part If looking at the graphs above, you will probably agree that it isn't especially describing of certain incidents or other contextual data. But what if we could combine the graph with something like OpenIOC? Turns out that it's conceptually similar. The weakness of OpenIOC is that it doesn't scale when firing up an OpenIOC editor - like the one Mandiant have created. On the other hand, if you could traverse a graph with OpenIOC designed around the OpenIOC format.. Let's create a basic writer as a demonstration, which operates on the root level (no nesting of rules in this example). from ioc_writer import ioc_api from lxml import etree as et class IOC: def __init__(self): self.IOC = ioc_api.IOC(name='Test', description='An IOC generated from a Python script', author='Someone') self.IOC.set_created_date() self.IOC.set_published_date() self.IOC.set_lastmodified_date() self.IOC.update_name('test_rexster') self.IOC.update_description('A Test') self.id = self.IOC.iocid def addNode(self,label,text,type,indicator,condition='is'): IndicatorItem_node = ioc_api.make_IndicatorItem_node(condition, label, text, type, indicator) current_guid = IndicatorItem_node.attrib['id'] print current_guid self.IOC.top_level_indicator.append(IndicatorItem_node) def __str__(self): self.xml = et.tostring(self.IOC.root, encoding='utf-8', xml_declaration=True, pretty_print=True) return self.xml This enables us to do something like this: ioc = IOC() ioc.addNode('test','Just a test','domain','vg.no') print ioc Which will again return the XML of the IOC. test A Test Someone 2014-01-28T07:15:09 vg.no 195.88.55.16 Reviewing the XML above you might notice that the scheme is pretty transferrable to a graph, perhaps even simplifying of the IOC XML. Be especially aware on the following tags and attributes: * Content * The IndicatorItem condition * The content type A nested IOC might look like this (relevant excerpt): vg.no 195.88.55.16 The above implies that the domain vg.no needs to be accompanied with the IP-address ``195.88.55.16``. ## Merging the Best of Two Worlds So now that we have had a look at the power in the structure of a graph and the power of expression in the OpenIOC XML-indicators, you might see why this is the best of two worlds. In the challenge of combining them both I perhaps oversimplified the nesting and used the two previously mentioned attributes in the graph, adding the content as the value of the node and the condition. We will also have to add the type attribute since that tells us what type of OpenIOC entry we have when reversing the process later on. We will have a small collision between Maltego and OpenIOC, since for instance an IP-address type will differ. So for now you will need two type attributes, one for Maltego and one for OpenIOC (if you plan to go both ways). This is left as an exersise for the reader. Creating an OpenIOC-compatible graph is a breeze: from rexpro import RexProConnection class Graph: def __init__(self): self.graph = RexProConnection('localhost',8184,'titan') def addVertice(self,content,content_type,condition): vertice_id = self.graph.execute(""" def v1 = g.addVertex([content:content,content_type:content_type,condition:condition]) return v1""", {'content':content, 'content_type':content_type, 'condition':condition}) return vertice_id def addEdge(self,vid1,vid2,label): edge = self.graph.execute(""" def v1 = g.v(vid1) def v2 = g.v(vid2) g.addEdge(v1, v2, label) g.commit()""",{'vid1':vid1['_id'], 'vid2':vid2['_id'], 'label':label}) graph=Graph() v1=graph.addVertice('vg.no','domain','is') v2=graph.addVertice('195.88.55.16','ip','is') graph.addEdge(v1,v2,'and') If you'd like to go the other way again in order to talk to other organisations perhaps, you will want to run the process in reverse: from rexpro import RexProConnection class RexsterIOC: def __init__(self): self.graph = RexProConnection('localhost',8184,'titan') self.IOC = ioc_api.IOC(name='Test', description='A test IOC generated from Rexster', author='Someone') self.IOC.set_created_date() self.IOC.set_published_date() self.IOC.set_lastmodified_date() #IOC.add_link('help', self.baseurl + url) self.IOC.update_name('test') self.IOC.update_description('A Test') self.id = self.IOC.iocid self.lastId=None def addNode(self,label,text,type,indicator,condition='is',addToLast=False): IndicatorItem_node = ioc_api.make_IndicatorItem_node(condition, label, text, type, indicator) if addToLast and self.last: self.last.append(IndicatorItem_node) else: self.IOC.top_level_indicator.append(IndicatorItem_node) current_guid = IndicatorItem_node.attrib['id'] self.last = IndicatorItem_node def traverse(self,rootNodeId): root=self.graph.execute("""return g.v(80284)""",{'vid':str(rootNodeId)}) self.addNode('test','Just a test', root['_properties']['content_type'], root['_properties']['content'], root['_properties']['condition']) one_level_out=self.graph.execute("""return g.v(vid).out""",{'vid':str(rootNodeId)}) for vertex in one_level_out: self.addNode('test','Just a test', vertex['_properties']['content_type'], vertex['_properties']['content'], vertex['_properties']['condition'],addToLast=True) def __str__(self): self.xml = et.tostring(self.IOC.root, encoding='utf-8', xml_declaration=True, pretty_print=True) return self.xml ioc = RexsterIOC() ioc.traverse(80284) # the root node print ioc One thing that you can now do is to store the indicators with the rest of your network data. This again will imply that the edges are created automatically without any need to actually run jobs to combine data for detecting stuff. That's my small concept demonstration. I think it's pretty cool! I've put the scripts in a Gist for you if you'd like to give it a try [3]. [1] Yara: https://github.com/mandiant/ioc_writer/tree/master/examples/openioc_to_yara [2] Importing a sample Maltego-graph to Titan on HBase: https://gist.github.com/tommyskg/8166472 [3] the scripts out there: https://gist.github.com/tommyskg/8671318