Lovely Systems Lovely Systems Blog entries IID Generator Using Elasticsearch

IID Generator Using Elasticsearch


An IID generator using elasticsearch to create bulks of iids.

At Lovely Systems we are using Crate as our database. Crate is an SQL database on top of elasticsearch and has no built in support for auto incrementing integer ids.

So we came up to build our own generator:

  • needs to be fast (so not really superfast)
  • needs to work on a distributed system
  • must be able to backup

First I found this blog post from Clinton Gormley explaining how to use the version numbering built into elasticsearch to create ids. The use is really simple but it has a big disadvantage, it is not possible to create a backup using simple bulk files because it is not possible to write the _version back into elasticsearch.

The iid value needs to be stored in the _source of the document to be able to dump the data.

Generator Explained (curl)

First we show how the elasticsearch requests will look when creating iids using curl.

Here is a simple document containing an integer value ("iid"):

>>> curl -XPUT http://localhost:9200/sequence/sequence/1?pretty= -d '{"iid": 0}'
{
    "_index" : "sequence",
    "_type" : "sequence",
    "_id" : "1",
    "_version" : 1,
    "created" : true
}

To increment the iid we use the _update API because this doesn't need a read/modify/write cycle of the document:

>>> curl -XPOST http://localhost:9200/sequence/sequence/1/_update -d '
... {
...     "script": "ctx._source.iid += 1",
...     "lang": "groovy"
... }
... '
{
    "_index" : "sequence",
    "_type" : "sequence",
    "_id" : "1",
    "_version" : 2
}

After incrementing the iid we would like to know the new value. Adding the the parameter fields and specifying that we want to the iid field back:

>>> curl -XPOST http://localhost:9200/sequence/sequence/1/_update?fields=iid -d '
... {
...     "script": "ctx._source.iid += 1",
...     "lang": "groovy"
... }
... '
{
    "_index" : "sequence",
    "_type" : "sequence",
    "_id" : "1",
    "_version" : 3,
    "get" : {
        "found" : true,
        "fields" : {
            "iid" : [ 2 ]
        }
    }
}

All this works but it is not performing very good because a request needs to be done every single id. What if we could request more than on iid per request. This is easy if we increment the iid by the bulk size we want to request. Instead of adding 1 to the existing iid we add our bulk size:

>>> curl -XPOST http://localhost:9200/sequence/sequence/1/_update?fields=iid -d '
... {
...     "script": "ctx._source.iid += bulk_size",
...     "params": {"bulk_size": 10},
...     "lang": "groovy"
... }
... '
{
    "_index" : "sequence",
    "_type" : "sequence",
    "_id" : "1",
    "_version" : 4,
    "get" : {
        "found" : true,
        "fields" : {
            "iid" : [ 12 ]
        }
    }
}

Now the returned iid marks the end of our bulk which will be in the range (iid-10+1 .. iid).

Also note how we used the script this time. I use the params to define the bulk size. This has the advantage that the script code is always the same. elasticsearch can now use the cached version of the script and work faster because it needs to compile the code only once.

The disadvantage in using bulks is that it is possible to create holes if an app is restarted which has not fully consumed the requested bulk. We decided that this is not a problem for our use case because we have long running applications.

Optimizations for real world use

First we configure the index with a mapping. This needs to be done before the first write to the index.

Optimizations:

  • only one shard
  • no _all indexing
  • do not index the type
  • no dynamic mapping
  • do not index the iid property

auto-expand-replicas is set to "0-all" here. To minimize the number of replicas this should be reduced to have a maximum of n/2+1 replicas where n is the number of elasticsearch nodes. n/2+1 is normally set in the minimum-master-nodes setting.

Here's the mapping request:

>>> curl -XPOST http://localhost:9200/sequence/_mapping -d '
... {
...     "settings": {
...         "number_of_shards": 1,
...         "auto_expand_replicas": "0-all"
...     },
...     "mappings": {
...         "sequence": {
...             "_all": {"enabled": 0},
...             "_type": {"index": "no"},
...             "dynamic": "strict",
...             "properties": {
...                 "iid": {
...                     "type": "string",
...                     "index": "no",
...                 },
...             },
...         }
...     }
... }'

Then an optimized update request.

  • use retry-on-conflict to handle conflicts on the server
  • add upsert to initially create the document if it doesn't exist

Here's the update request:

>>> curl -XPOST "http://localhost:9200/sequence/sequence/1/_update?fields=iid&retry_on_conflict=5" -d '
... {
...     "script": "ctx._source.iid += bulk_size",
...     "params": {"bulk_size": 10},
...     "lang": "groovy",
...     "upsert": {
...         'iid': 10
...     },
... }'

With Python

This is how it can be used with the elasticsearch python client.

Get a client instance:

>>> from elasticsearch import Elasticsearch
>>> es = Elasticsearch()

Create the mapping:

>>> es.indices.create(
...     'sequence',
...     {
...         "settings": {
...             "number_of_shards": 1,
...             "auto_expand_replicas": "0-all"
...         },
...         "mappings": {
...             "sequence": {
...                 "_all": {"enabled": 0},
...                 "_type": {"index": "no"},
...                 "dynamic": "strict",
...                 "properties": {
...                     "iid": {
...                         "type": "string",
...                         "index": "no",
...                     },
...                 },
...             }
...         }
...     },
...     ignore=400  # ignore index already exists
... )

Request a bulk:

>>> bulk_size = 10
>>> result = self._es_client().update(
...     index='lc_iidsequences',
...     doc_type='iid',
...     id=self.name,
...     body={
...         "script": "ctx._source['iid'] += bulk_size",
...         "lang": "groovy",
...         "params": {
...             "bulk_size": bulk_size
...         },
...         "upsert": {
...             'iid': bulk_size
...         },
...     },
...     retry_on_conflict=10,
...     fields='iid',
... )
>>> iid = result['get']['fields']['iid'][0]
>>> bulk = range(iid, iid - self.bulk_size, -1)

Now the ids can be retrieved from bulk until it is exhausted:

>>> id = bulk.pop()

References


Written by Jürgen Kartnaller

Posted on May 9, 2015

Lovely Systems