Mozdef_util Library

We provide a library used to interact with MozDef components.

Connecting to Elasticsearch

1
2
from mozdef_util.elasticsearch_client import ElasticsearchClient
es_client = ElasticsearchClient("http://127.0.0.1:9200")

Creating/Updating Documents

Create a new Event

1
2
3
4
event_dict = {
    "example_key": "example value"
}
es_client.save_event(body=event_dict)

Update an existing event

1
2
3
4
5
event_dict = {
    "example_key": "example new value"
}
# Assuming 12345 is the id of the existing entry
es_client.save_event(body=event_dict, doc_id="12345")

Create a new alert

1
2
3
4
alert_dict = {
    "example_key": "example value"
}
es_client.save_alert(body=alert_dict)

Update an existing alert

1
2
3
4
5
alert_dict = {
    "example_key": "example new value"
}
# Assuming 12345 is the id of the existing entry
es_client.save_alert(body=alert_dict, doc_id="12345")

Create a new generic document

1
2
3
4
document_dict = {
    "example_key": "example value"
}
es_client.save_object(index='randomindex', doc_type='randomtype', body=document_dict)

Update an existing document

1
2
3
4
5
document_dict = {
    "example_key": "example new value"
}
# Assuming 12345 is the id of the existing entry
es_client.save_object(index='randomindex', doc_type='randomtype', body=document_dict, doc_id="12345")

Bulk Importing

1
2
3
from mozdef_util.elasticsearch_client import ElasticsearchClient
es_client = ElasticsearchClient("http://127.0.0.1:9200", bulk_amount=30, bulk_refresh_time=5)
es_client.save_event(body={'key': 'value'}, bulk=True)
  • Line 2: bulk_amount (defaults to 100), specifies how many messages should sit in the bulk queue before they get written to elasticsearch
  • Line 2: bulk_refresh_time (defaults to 30), is the amount of time that a bulk flush is forced
  • Line 3: bulk (defaults to False) determines if an event should get added to a bulk queue

Searching for documents

Match/Query Classes

ExistsMatch

Checks to see if a specific field exists in a document

1
2
3
from mozdef_util.query_models import ExistsMatch

ExistsMatch("randomfield")

TermMatch

Checks if a specific field matches the key

1
2
3
from mozdef_util.query_models import TermMatch

TermMatch("details.ip", "127.0.0.1")

TermsMatch

Checks if a specific field matches any of the keys

1
2
3
from mozdef_util.query_models import TermsMatch

TermsMatch("details.ip", ["127.0.0.1", "1.2.3.4"])

WildcardMatch

Allows regex to be used in looking for documents that a field contains all or part of a key

1
2
3
from mozdef_util.query_models import WildcardMatch

WildcardMatch('summary', 'test*')

PhraseMatch

Checks if a field contains a specific phrase (includes spaces)

1
2
3
from mozdef_util.query_models import PhraseMatch

PhraseMatch('summary', 'test run')

BooleanMatch

Used to apply specific “matchers” to a query. This will unlikely be used outside of SearchQuery.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from mozdef_util.query_models import ExistsMatch, TermMatch, BooleanMatch

must = [
    ExistsMatch('details.ip')
]
must_not = [
    TermMatch('_type', 'alert')
]

BooleanMatch(must=must, should=[], must_not=must_not)

MissingMatch

Checks if a field does not exist in a document

1
2
3
from mozdef_util.query_models import MissingMatch

MissingMatch('summary')

RangeMatch

Checks if a field value is within a specific range (mostly used to look for documents in a time frame)

1
2
3
from mozdef_util.query_models import RangeMatch

RangeMatch('utctimestamp', "2016-08-12T21:07:12.316450+00:00", "2016-08-13T21:07:12.316450+00:00")

QueryStringMatch

Uses a custom query string to generate the “match” based on (Similar to what you would see in kibana)

1
2
3
from mozdef_util.query_models import QueryStringMatch

QueryStringMatch('summary: test')

Aggregation

Used to aggregate results based on a specific field

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from mozdef_util.query_models import Aggregation, SearchQuery, ExistsMatch

search_query = SearchQuery(hours=24)
must = [
    ExistsMatch('seenindicator')
]
search_query.add_must(must)
aggr = Aggregation('details.ip')
search_query.add_aggregation(aggr)
results = search_query.execute(es_client, indices=['events','events-previous'])