Bulk Indexing made easy
An important part of working with Elasticsearch is adding content. While the CRUD support is useful for manipulating individual objects in an index, it is not suitable for sending large amounts of data.
For that, bulk indexing should be used. The bulk API in Elasticsearch is one of the more complex APIs in ES. The Kotlin client provides a few key abstractions to make bulk indexing easy, robust, and straightforward.
Using the Repository to bulk index
Again we use our Thing
class and thingRepository
data class Thing(val name: String, val amount: Long = 42)
To make this easy, the library comes with a BulkIndexingSession
. This takes care of all the boiler plate of constructing and sending bulk requests. Of course, our IndexRepository
provides a simple bulk
method that creates a session for you:
// creates a BulkIndexingSession<Thing> and passes it to the block
.bulk {
repo1.rangeTo(500).forEach {
("doc-$it", Thing("indexed $it", 666))
index}
}
("Lets get one of them " + repo.get("doc-100")) println
Captured Output:
Lets get one of them Thing(name=indexed 100, amount=666)
The BulkIndexingSession
aggregates our index
operations into BulkRequest
requests and sends them to Elasticsearch for us. You can control how many operations are sent with each request by setting the bulkSize
parameter. BulkIndexingSession implements AutoClosable
and will send the last request when it is closed. All this is taken care off by the bulk
method of course.
In addition to index
we have a few more operations.
.bulk(bulkSize = 50) {
repo// setting create=false overwrites and is the appropriate thing
// to do if you are replacing documents in bulk
("doc-1", Thing("upserted 1", 666), create = false)
index
// you can do a safe bulk update similar to the CRUD update.
// this has the disadvantage of doing 1 get per item and may not scale
("doc-2") { currentVersion ->
getAndUpdate// this works just like the update on the repository and it will retry a
// configurable number of times.
.copy(name = "updated 2")
currentVersion}
// if you already have the seqNo, primary term, and current version
// there you can skip the get. A good way to get these efficiently would be
// a scrolling search.
(
update= "doc-3",
id // yes, these two values are wrong; but it falls back to doing a
// getAndUpdate.
= 12,
seqNo = 34,
primaryTerms = Thing("indexed $it", 666)
original ) { currentVersion ->
.copy(name = "safely updated 3")
currentVersion}
// and of course you can delete items
("doc-4")
delete}
(repo.get("doc-1"))
println(repo.get("doc-2"))
println(repo.get("doc-3"))
println// should print null
(repo.get("doc-4")) println
Captured Output:
Thing(name=upserted 1, amount=666)
Thing(name=updated 2, amount=666)
Thing(name=indexed 3, amount=666)
null
Item Callbacks
An important aspect of bulk indexing is actually inspecting the response. The BulkIndexingSession
uses a callback mechanism that allows you to respond to do something. The default implementation for this does two things:
- it logs failures
- it retries conflicting updates
For most users this should be OK but if you want, you can do something custom:
.bulk(
repo= object : (BulkOperation<Thing>, BulkItemResponse) -> Unit {
itemCallback // Elasticsearch confirms what it did for each item in a bulk request
// and you can implement this callback to do something custom
override fun invoke(op: BulkOperation<Thing>, response: BulkItemResponse) {
if (response.isFailed) {
(
println"${op.id}: ${op.operation.opType().name} failed, " +
"code: ${response.failure.status}"
)
} else {
("${op.id}: ${op.operation.opType().name} succeeded!")
println}
}
}
) {
(
update= "doc-2",
id // these values are wrong and this will now fail instead of retry
= 12,
seqNo = 34,
primaryTerms = Thing("updated 2", 666)
original ) { currentVersion ->
.copy(name = "safely updated 3")
currentVersion}
}
("" + repo.get("doc-2"))
println
+"""
# Other parameters
There are a few more parameters that you can override.
"""
{
block .bulk(
repo// controls the number of items to send to Elasticsearch
// what is optimal depends on the size of your documents and
// your cluster setup.
= 10,
bulkSize // controls how often documents are retried by the default
// item callback
= 3,
retryConflictingUpdates // controls how Elasticsearch refreshes and whether
// the bulk request blocks until ES has refreshed or not
= WriteRequest.RefreshPolicy.IMMEDIATE
refreshPolicy ) {
("doc-1")
delete(
update= "doc-2",
id // these values are wrong so this will be retried
= 12,
seqNo = 34,
primaryTerms = Thing("updated 2", 666)
original ) { currentVersion ->
.copy(name = "safely updated 3")
currentVersion}
}
}
Other parameters
There are a few more parameters that you can override.
.bulk(
repo// controls the number of items to send to Elasticsearch
// what is optimal depends on the size of your documents and
// your cluster setup.
= 10,
bulkSize // controls how often documents are retried by the default
// item callback
= 3,
retryConflictingUpdates // controls how Elasticsearch refreshes and whether
// the bulk request blocks until ES has refreshed or not
= WriteRequest.RefreshPolicy.IMMEDIATE
refreshPolicy ) {
("doc-1")
delete(
update= "doc-2",
id // these values are wrong so this will be retried
= 12,
seqNo = 34,
primaryTerms = Thing("updated 2", 666)
original ) { currentVersion ->
.copy(name = "safely updated 3")
currentVersion}
}
Captured Output:
doc-2: UPDATE failed, code: CONFLICT
Thing(name=updated 2, amount=666)