Near Real-Time Indexing With ElasticSearch

Choosing your indexing strategy is hard. The Elasticsearch documentation does have some general recommendations, and there are some tips from other companies, but it also depends on the particular usecase. In the typical scenario you have a database as the source of truth, and you have an index that makes things searchable. And you can have the following strategies:

  • Index as data comes – you insert in the database and index at the same time. It makes sense if there isn’t too much data; otherwise indexing becomes very inefficient.
  • Store in database, index with scheduled job – this is probably the most common approach and is also easy to implement. However, it can have issues if there’s a lot of data to index, as it has to be precisely fetched with (from, to) criteria from the database, and your index lags behind the actual data with the number of seconds (or minutes) between scheduled job runs
  • Push to a message queue and write an indexing consumer – you can run something like RabbitMQ and have multiple consumers that poll data and index it. This is not straightforward to implement because you have to poll multiple items in order to leverage batch indexing, and then only mark them as consumed upon successful batch execution – somewhat transactional behaviour.
  • Queue items in memory and flush them regularly – this may be good and efficient, but you may lose data if a node dies, so you have to have some sort of healthcheck based on the data in the database
  • Hybrid – do a combination of the above; for example if you need to enrich the raw data and update the index at a later stage, you can queue items in memory and then use “store in database, index with scheduled job” to update the index and fill in any missing item. Or you can index as some parts of the data come, and use another strategy for the more active types of data

We have recently decided to implement the “queue in memory” approach (in combination with another one, as we have to do some scheduled post-processing anyway). And the first attempt was to use a class provided by the Elasticsearch client – the BulkProcessor. The logic is clear – accumulate index requests in memory and flush them to Elasticsearch in batches either if a certain limit is reached, or at a fixed time interval. So at most every X seconds and at most at every Y records there will be a batch index request. That achieves near real-time indexing without putting too much stress on Elasticsearch. It also allows multiple bulk indexing requests at the same time, as per Elasticsearch recommendations.

However, we are using the REST API (via Jest) which is not supported by the BulkProcessor. We tried to plug a REST indexing logic instead of the current native one, and although it almost worked, in the process we noticed something worrying – the internalAdd method, which gets invoked every time an index request is added to the bulk, is synchronized. Which means threads will block, waiting for each other to add stuff to the bulk. This sounded suboptimal and risky for production environments, so we went for a separate implementation. It can be seen here – ESBulkProcessor.

It allows for multiple threads to flush to Elasticsearch simultaneously, but only one thread (using a lock) to consume from the queue in order to form the batches. Since this is a fast operation, it’s fine to have it serialized. And not because the concurrent queue can’t handle multiple threads reading from it – it can; but reaching the condition for forming the bulk by multiple threads at the same time will result in several small batches rather than one big one, hence the need for only one consumer at a time. This is not a huge problem so the lock can be removed. But it’s important to note it’s not blocking.

This has been in production for a while now and doesn’t seem to have any issues. I will report any changes if there are such due to increased load or edge cases.

It’s important to reiterate the issue if this is the only indexing logic – your application node may fail and you may end up with missing data in Elasticsearch. We are not in that scenario, and I’m not sure which is the best approach to remedy it – be it to do a partial reindex of recent data in case of a failed server, or a batch process the checks if there aren’t mismatches between the database and the index. Of course, we should also say that you may not always have a database – sometimes Elasticsearch is all you have for data storage, and in that case some sort of queue persistence is needed.

The ultimate goal is to have a near real-time indexing as users will expect to see their data as soon as possible, while at the same time not overwhelming the Elasticsearch cluster.

The topic of “what’s the best way to index data” is huge and I hope I’ve clarified it at least a little bit and that our contribution makes sense for other scenarios as well.

The post Near Real-Time Indexing With ElasticSearch appeared first on Bozho's tech blog.