Tag Archives: Python

Python and ElasticSearch

I’ve recently been working with a project that involved getting data out of an AWS ElasticSearch instance. I hearken back to my early days with Lucene, before Big Data was a buzzword.

I chose to use Python to accomplish this task, because it was to be a ‘quick’ item, so I didn’t want to use something heavy, like Java. And I wanted to become more familiar with Python.

Googling it, and checking StackOverflow, Python ElasticSearch and ElasticSearch-DSL seemed the modules best suited for accomplishing my goal. Problem was, although the documentation seemed extensive, the examples were limited, making it often difficult to accomplish what I needed to. Now that I’ve created several successful scripts using this library, I wanted to share them with everyone.

First Attempt

My first task included a number of aggregations that would be determined at runtime. I couldn’t find a way of making this work with the standard syntax provided. But there is a way to create a JSON object, then convert it for use in Python ElasticSearch. I found this code, which allowed me to create dictionaries for trees. It can then be converted to JSON using json.dumps. And the JSON object can then be fed into ElasticSearch.

My loop. Using this structure, I could modify the values at runtime:

 for i in range (1,len(mn_timestamps)):
 aggs['aggregations']['by_account']['aggs']['by_month']['filters']['filters'][mn_timestamps[i -1].label]['range']['execution'] = 'index'
 aggs['aggregations']['by_account']['aggs']['by_month']['filters']['filters'][mn_timestamps[i -1].label]['range']['TIMESTAMP']['gte'] = mn_timestamps[i - 1].timestamp
 aggs['aggregations']['by_account']['aggs']['by_month']['filters']['filters'][mn_timestamps[i -1].label]['range']['TIMESTAMP']['lt'] = mn_timestamps[i].timestamp

As stated in the documentation, terms that don’t exist are created when listed:

 search['query']['filtered']['query']['match_all']

Here’s the other instance I found this handy at this point. I couldn’t figure out how to get multiple filters added to the query. The dictionary to json method allowed me to do this easily. Notice that there’s another runtime variable. But since it’s only used once, I didn’t need the loop strucutre:

 timestamp = "{\'range\': { \'TIMESTAMP\': { \'gte\':" + str(mn_timestamps[0].timestamp) + ", \'lt\':" + str(mn_timestamps[ len(mn_timestamps) - 1].timestamp) + "}, \'execution\': \'index\' } }"
 hierarchy = "{ \'terms\': { \'hierarchy\': [\'" + account + "\'] } }"
 terms = "{ \'terms\': { \'VIEW_TYPE\': [ \'t1\', \'t2\',\'t3\' ] } }"
 search['query']['filtered']['filter']['bool']['must'] = [timestamp, hierarchy, terms]
 aggs['aggregations']['by_account']['terms']['field'] = 'ACCT_ID'
 aggs['aggregations']['by_account']['terms']['size'] = 0

Now convert the query from a dictionary/tree object into JSON:

 query = json.dumps(search) + json.dumps(aggs)

Here’s the original output:

{"query": {"filtered": {"filter": {"bool": {"must": ["{'range': { 'TIMESTAMP': { 'gte':1420070400000,
 'lt':1435708800000}, 'execution': 'index' } }", "{ 'terms': { 'hierarchy': ['account'] } }", "
{ 'terms': { 'VIEW_TYPE': [ 't1', 't2','t3' ] } }"]}}, "query": {"match_all": {}}}}}{"aggregations": 
{"by_account": {"terms": {"field": "ACCT_ID", "size": 0}, "aggs": {"by_month": {"filters": {"filters": {"201502": 
{"range": {"TIMESTAMP": {"lt": 1425168000000, "gte": 1422748800000}, "execution": "index"}}, "201503": {"range": 
{"TIMESTAMP": {"lt": 1427846400000, "gte": 1425168000000}, "execution": "index"}}, "201501": {"range": {"TIMESTAMP": 
{"lt": 1422748800000, "gte": 1420070400000}, "execution": "index"}}, "201504": {"range": {"TIMESTAMP": {"lt": 
1430438400000, "gte": 1427846400000}, "execution": "index"}}, "201505": {"range": {"TIMESTAMP": {"lt": 1435708800000, 
"gte": 1430438400000}, "execution": "index"}}}}}}}}}

Now the odd thing. I found that the object produced couldn’t be used directly. A few substitutions had to be done:

 query = query.replace("\"{","{")
 query = query.replace("}\"","}")
 query = query.replace("\'","\"")
 query = query.replace("}{",",")


Here’s the changed output. Note the bolded differences from above:

{"query": {"filtered": {"filter": {"bool": {"must": [{"range": { "TIMESTAMP": { "gte":1420070400000, 
"lt":1435708800000}, "execution": "index" } }, { "terms": { "hierarchy": ["account"] } }, { "terms": { "VIEW_TYPE": [ 
"t1", "t2","t3" ] } }]}}, "query": {"match_all": {}}}},"aggregations": {"by_account": {"terms": {"field": "ACCT_ID", 
"size": 0}, "aggs": {"by_month": {"filters": {"filters": {"201502": {"range": {"TIMESTAMP": {"lt": 1425168000000, 
"gte": 1422748800000}, "execution": "index"}}, "201503": {"range": {"TIMESTAMP": {"lt": 1427846400000, "gte": 
1425168000000}, "execution": "index"}}, "201501": {"range": {"TIMESTAMP": {"lt": 1422748800000, "gte": 1420070400000}, 
"execution": "index"}}, "201504": {"range": {"TIMESTAMP": {"lt": 1430438400000, "gte": 1427846400000}, "execution": 
"index"}}, "201505": {"range": {"TIMESTAMP": {"lt": 1435708800000, "gte": 1430438400000}, "execution": "index"}}}}}}}}}

Get the dictionary into our elasticsearch ‘terminology’ and run:

 qdict = ast.literal_eval(query)
 
 s = elasticsearch_dsl.Search(using=eclient, index='my_index').from_dict(qdict)
 response = s.execute()
Easy Queries

As I mentioned before, I had experience with Lucene queries from the past. So the next item to tackle was pretty easy for me. I used a simple Lucene query to accomplish my task:

 

query = "hierarchy: \"{0}\" AND TIMESTAMP: [{1} TO {2}]  AND (VIEW_TYPE: t1 OR VIEW_TYPE: t2 OR VIEW_TYPE:t3)".format(account, t_start, t_end)
    res = eclient.search(index=indx, q=query)

This query was the same as one would use in a Kibana interface to ElasticSearch.

Getting Around Pre-Determined Limits

The next hurdle I had to overcome was the limit of how many records could be returned: 10. Here, I had to go to the ElasticSearch documentation for an answer.  Again, I had to create a custom JSON object to handle the issue:

    query = '{"size":0,"query": {"filtered": {"filter": {"range": {"TIMESTAMP":'\
        + ' {"lt":' + str(end_date) +', "gte":' + str(start_date) + '}}}, "query": '\
        + '{"bool": {"must_not": [{"terms": {"VIEW_TYPE": ["t1", ' \
        + '"t2", "t3", "t4", "t5", "t5", "t6", "t6"]}}], "must": '\
        + '[{"match": {"hierarchy": "' + account + '"}}]}}}}, "aggs": {"by_account":'\
        + ' {"terms": {"field": "hierarchy", "size":300}, "aggs": {"by_product": {"terms": '\
        + '{"field": "PRODUCT", "size":30}}}}}}'
        
    print(query)
        
    response = eclient.search(body=query, index=indx)

Notice the size parameter added to the query.

Did I ever use the built-in ElasticSearch-DSL query builder? Well, I played with it. But I couldn’t necessarily find the specific quirks I needed handled. Here’s an example:

search = elasticsearch_dsl.Search().using(eclient).index(indx
    ).query("match", hierarchy=account
    ).query("bool", must_not=Q('terms', VIEW_TYPE=['t1','t2','t3','t4','t5','t6','t7','t8'])
    ).filter("range", TIMESTAMP={'gte': start_date,'lt': end_date})
          
search.aggs.bucket('by_account', 'terms',field='hierarchy')\
    .bucket('by_product','terms', field='PRODUCT')

This was the query I used, before finding that I had a 10 record limit on returns.

Python SUDS Library Client

I was trying to create a ‘quick and dirty’ python SOAP client and was finding it difficult. The WSDL I was dealing with wasn’t a simple one, which, of course, made creating a client a bit more difficult. Add to the fact that I’m far from a python expert (have been tired of learning the ‘latest/greatest’ new toy), so finally started learning python reluctantly.

I found this post in StackOverflow that listed a number of python SOAP libraries and I started working through the examples. I finally settled on using SUDS. It’s not the most up-to-date library – the last development done on in was in 2010. But frankly, it’s the only one from which I could obtain results.

That being said, it wasn’t as straightforward as I would have liked. My first language is PERL; my second is Java. So maybe ‘thinking in python’ was not as easy for me. But still, I have found it a good language to work with, which is why I have been forcing myself to use it instead of PERL lately.

So, my code…..

There are groups of parameters in the WSDL I’m using, which required me to create dictionaries of dictionaries:

sort = dict(sortPolicyId='RELEVANCE',order='DESCENDING')
criteria = dict(searchPhrase='test', sortPolicy=sort)
params = dict(maximumResults=1000, searchTimeOut=24000)
contextParams = dict(ContextualCriteria=criteria,SearchCriteriaParameters=params)

The next part, passing them, does come directly from the documentation:

results = client.service.contextualSearch(**contextParams)

Then the next hard part: I couldn’t figure out how to get the values out. What I finally found was an example for pysimplesoap that I could use: Pull the results from the results. That is:

resultList = results.results

At this point, I had something from which I could pull values:

for item in resultList:
    print item.catalogEntry.id

… which gave me the values I needed.