previous | index | next


Working with objects

To do anything with Elasticsearch we have to store documents in some index. The Java client provides everything you need to do this but using it the right way requires quite a bit of boiler plate as well as a deep understanding of what needs doing.

An important part of this library is providing a user friendly abstraction for this that should be familiar if you’ve ever written a database application using modern frameworks such as Spring, Ruby on Rails, etc. In such frameworks a Repository provides primitives for interacting with objects in a particular database table.

We provide a similar abstraction the IndexRepository. You create one for each index that you have and it allows you to do Create, Read, Update, and Delete (CRUD) operations as well as a few other things.

Since Elasticsearch stores Json documents, we’ll want to use a data class to represent on the Kotlin side and let the IndexRepository take care of serializing/deserializing.

Creating an IndexRepository

Lets use a simple data class with a few fields.

data class Thing(val name: String, val amount: Long = 42)

Now we can create an IndexRepository for our Thing using the indexRepository extension function:

// we pass in the index name
val repo = esClient.indexRepository<Thing>("things")

Creating the index

Before we store any objects, we should create the index. Note this is optional but using Elasticsearch in schema-less mode is probably not what you want. We use a simple mapping here.

repo.createIndex {
  // use our friendly DSL to configure the index
  configure {
    settings {
      replicas = 0
      shards = 1
    }
    mappings {
      // in the block you receive FieldMappings as this
      // a simple text field "title": {"type":"text"}
      text("name")
      // a numeric field with sub fields, use generics
      // to indicate what kind of number
      number<Long>("amount") {
        // we can customize the FieldMapping object
        // that we receive in the block
        fields {
          // we get another FieldMappings
          // lets add a keyword field
          keyword("somesubfield")
          // if you want, you can manipulate the
          // FieldMapping as a map
          // this is great for accessing features
          // not covered by our Kotlin DSL
          this["imadouble"] = mapOf("type" to "double")
          number<Double>("abetterway")
        }
      }
    }
  }
}

Note. The mapping DSL is a work in progress. The goal is not to support everything as you can simply fall back to Kotlin’s Map DSL with mapOf("myfield" to someValue). Both FieldMapping and FieldMappings extend a MapBackedProperties class that delegates to a MutableMap. This allows us to have type safe properties and helper methods and mix that with raw map access where our DSL misses features.

// stringify is a useful extension function we added to the response
println(repo.getSettings().stringify(true))

repo.getMappings().mappings()
  .forEach { (name, meta) ->
    print("$name -> ${meta.source().string()}")
  }

Captured Output:

{
  "test-ff4ade41-fa0e-4401-b5e4-67e183f06a9a" : {
  "settings" : {
    "index" : {
    "creation_date" : "1603517865461",
    "number_of_shards" : "1",
    "number_of_replicas" : "0",
    "uuid" : "TUODtxxhSXuEOYbvJaFqtA",
    "version" : {
      "created" : "7090099"
    },
    "provided_name" : "test-ff4ade41-fa0e-4401-b5e4-67e183f06a9a"
    }
  }
  }
}
test-ff4ade41-fa0e-4401-b5e4-67e183f06a9a -> {"_meta":{"content_hash":"ZLExK0PCG
9+CpgXySXotIQ==","timestamp":"2020-10-24T05:37:45.455478Z"},"properties":{"amoun
t":{"type":"long","fields":{"abetterway":{"type":"double"},"imadouble":{"type":"
double"},"somesubfield":{"type":"keyword"}}},"name":{"type":"text"}}}

Of course you can also simply set the settings json using source. This is useful if you maintain your mappings as separate json files.

// delete the previous version of our index
repo.deleteIndex()
// create a new one using json source
repo.createIndex {
  source(
    """
      {
        "settings": {
        "index": {
          "number_of_shards": 3,
          "number_of_replicas": 0,
          "blocks": {
          "read_only_allow_delete": "false"
          }
        }
        },
        "mappings": {
        "properties": {
          "name": {
          "type": "text"
          },
          "amount": {
          "type": "long"
          }
        }
        }
      }
    """
  )
}

CRUD operations

Now that we have an index, we can use the CRUD operations.

val id = "first"
println("Object does not exist: ${repo.get(id)}")
// so lets store something
repo.index(id, Thing("A thing", 42))

println("Now we get back our object: ${repo.get(id)}")

Captured Output:

Object does not exist: null
Now we get back our object: Thing(name=A thing, amount=42)

You can’t index an object twice unless you opt in to it being overwritten.

val id = "first"
try {
  repo.index(id, Thing("A thing", 42))
} catch (e: ElasticsearchStatusException) {
  println("we already had one of those and es returned ${e.status().status}")
}
// this how you do upserts
repo.index(id, Thing("Another thing", 666), create = false)
println("It was changed: ${repo.get(id)}")

Captured Output:

we already had one of those and es returned 409
It was changed: Thing(name=Another thing, amount=666)

Of course deleting an object is also possible.

repo.delete("1")
println(repo.get("1"))

Captured Output:

null

Updates with optimistic locking

One useful feature in Elasticsearch is that it can do optimistic locking. The way this works is that it keeps track of a sequenceNumber and primaryTerm for each document. If you provide both in index, it will check that what you provide matches what it has and only overwrite the document if that lines up.

This works as follows.

repo.index("2", Thing("Another thing"))

val (obj, rawGetResponse) = repo.getWithGetResponse("2")
  ?: throw IllegalStateException("We just created this?!")

println(
  "obj with name '${obj.name}' has id: ${rawGetResponse.id}, " +
    "primaryTerm: ${rawGetResponse.primaryTerm}, and " +
    "seqNo: ${rawGetResponse.seqNo}"
)
// This works
repo.index(
  "2",
  Thing("Another Thing"),
  seqNo = rawGetResponse.seqNo,
  primaryTerm = rawGetResponse.primaryTerm,
  create = false
)
try {
  // ... but if we use these values again it fails
  repo.index(
    "2",
    Thing("Another Thing"),
    seqNo = rawGetResponse.seqNo,
    primaryTerm = rawGetResponse.primaryTerm,
    create = false
  )
} catch (e: ElasticsearchStatusException) {
  println("Version conflict! Es returned ${e.status().status}")
}

Captured Output:

obj with name 'Another thing' has id: 2, primaryTerm: 1, and seqNo: 0
Version conflict! Es returned 409

While you can do this manually, the Kotlin client makes optimistic locking a bit easier by providing a robust update method instead.

repo.index("3", Thing("Yet another thing"))

repo.update("3") { currentThing ->
  currentThing.copy(name = "an updated thing", amount = 666)
}

println("It was updated: ${repo.get("3")}")

repo.update("3") { currentThing ->
  currentThing.copy(name = "we can do this again and again", amount = 666)
}

println("It was updated again ${repo.get("3")}")

Captured Output:

It was updated: Thing(name=an updated thing, amount=666)
It was updated again Thing(name=we can do this again and again, amount=666)

Update simply does the same as we did manually earlier: it gets the current version of the object along with the metadata. It then passes the current version to the update lambda function where you can do with it what you want. In this case we simply use Kotlin’s copy to create a copy and modify one of the fields and then we return it as the new value.

The update method traps the version conflict and retries a configurable number of times. Conflicts can happen if you have concurrent writes to the same object. The retry gets the latest version and applies the update lambda again and then attempts to store that.

To simulate what happens without retries, we can throw some threads at this and configure 0 retries:

repo.index("4", Thing("First version of the thing", amount = 0))

try {
  1.rangeTo(30).toList().parallelStream().forEach { n ->
    // the maxUpdateTries parameter is optional and has a default value of 2
    // so setting this to 0 and doing concurrent updates is going to fail
    repo.update("4", 0) { Thing("nr_$n") }
  }
} catch (e: Exception) {
  println("It failed because we disabled retries and we got a conflict")
}

Captured Output:

It failed because we disabled retries and we got a conflict

Doing the same with 10 retries, fixes the problem.

repo.index("5", Thing("First version of the thing", amount = 0))

1.rangeTo(30).toList().parallelStream().forEach { n ->
  // but if we let it retry a few times, it will be eventually consistent
  repo.update("5", 10) { Thing("nr_$n", amount = it.amount + 1) }
}
println("All updates succeeded! amount = ${repo.get("5")?.amount}.")

Captured Output:

All updates succeeded! amount = 30.

Custom serialization

If you want to customize how serialization and deserialization works, you can pass in a ModelReaderAndWriter implementation.

The default value of this is an instance of the included JacksonModelReaderAndWriter, which uses Jackson to serialize and deserialize our Thing objects.

If you don’t want the default Jackson based serialization, or if you want to customize the jackson object mapper, you simply create your own instance and pass it to the IndexRepository.

// this is what is used by default but you can use your own implementation
val modelReaderAndWriter = JacksonModelReaderAndWriter(
  Thing::class,
  ObjectMapper().findAndRegisterModules()
)

val thingRepository = esClient.indexRepository<Thing>(
  index = "things", modelReaderAndWriter = modelReaderAndWriter
)

Co-routine support

As with most of this library, the same functionality is also available in a co-routine friendly variant AsyncIndexRepository. To use that, you need to use esClient.asyncIndexRepository.

This works almost the same as the synchronous version except all of the functions are marked as suspend on the AsyncIndexRepository class. Additionally, the return type of the search method is different and makes use of teh Flow API.

For more details on how to use co-routines, see Co-routines


previous | index | next