Asynchronous IO with Co-routines
The RestHighLevelClient exposes asynchronous versions of most APIs that take a call back to process the response when it comes back. Using this is kind of boiler plate heavy.
Luckily, Kotlin has co-routines for asynchronous programming and this library provides co-routine friendly versions of these functions. These suspend
functions work pretty much the same way as their synchronous version except they are marked as suspend and use a SuspendingActionListener
that uses Kotlin’s suspendCancellableCoroutine
to wrap the callback that the rest high level client expects.
As of Elasticsearch 7.5.0, all asynchronous calls return a Cancellable
object that allows you to cancel the task. Using suspendCancellableCoRoutine
uses this and this means that if you have some failure or abort a coroutine scope, all the running tasks are cancelled.
If you use an asynchronous server framework such as Ktor or Spring Boot 2.x (in reactive mode), you’ll want to use the asynchronous functions.
Generated suspend variants of async* methods
To support co-routines, this project is using a code generation plugin to generate the co-routine friendly versions of each of the Rest High Level async functions. At this point most of them are covered. There are more than a hundred of these.
As an example, here are three ways to use the reloadAnalyzers API:
// the synchronous version as provided by the RestHighLevel client
val ic = esClient.indices()
val response = ic.reloadAnalyzers(
("myindex"), RequestOptions.DEFAULT
ReloadAnalyzersRequest)
// the asynchronous version with a callback as provided by the
// RestHighLevel client
.reloadAnalyzersAsync(
ic("myindex"),
ReloadAnalyzersRequest.DEFAULT,
RequestOptionsobject : ActionListener<ReloadAnalyzersResponse> {
override fun onFailure(e: Exception) {
("it failed")
println}
override fun onResponse(response: ReloadAnalyzersResponse) {
("it worked")
println}
}
)
{
runBlocking // the coroutine friendly version using a function generated by the
// code generator plugin this is a suspend version so we put it in
// a runBlocking to get a coroutine scope use a more appropriate
// scope in your own application of course.
val response2 = ic.reloadAnalyzersAsync(
("myindex"), RequestOptions.DEFAULT
ReloadAnalyzersRequest)
}
AsyncIndexRepository
In addition to having suspend versions of most functions in the RestHighLevelClient
, the IndexRepository
also has an AsyncIndexRepository
counter part. The API of this is similar to the regular repository.
// you can create a new repository via an extension function
val asyncRepo = esClient.asyncIndexRepository<Thing>("asyncthings")
// all functions on the asyncRepo are of course suspend so we
// need to run them in a co-routine
{
runBlocking .createIndex {
asyncRepo(
source"""
{
"settings": {
"index": {
"number_of_shards": 3,
"number_of_replicas": 0,
"blocks": {
"read_only_allow_delete": "false"
}
}
},
"mappings": {
"properties": {
"title": {
"type": "text"
}
}
}
}
""",
.JSON
XContentType)
}
}
// all functions on the asyncRepo are of course suspend so we
// need to run them in a co-routine scope
{
runBlocking // all of these use asynchronous suspend functions
.index("thing1", Thing("The first thing"),
asyncRepo= WriteRequest.RefreshPolicy.WAIT_UNTIL)
refreshPolicy // this uses the `AsyncBulkIndexingSession`
.bulk(refreshPolicy = WriteRequest.RefreshPolicy.WAIT_UNTIL) {
asyncRepofor (i in 2.rangeTo(10)) {
("thing_$i", Thing("thing $i"))
index}
}
// we don't need to refresh because of the refresh policy
val count = asyncRepo.count { }
("indexed $count items")
println}
Captured Output:
indexed 10 items
Asynchronous search
The asynchronous search API is very similar; except for the returned AsyncSearchResults. The results make use of the Flow
api in the Kotlin Co-Routines library.
{
runBlocking val results = asyncRepo.search(scrolling = true) {
{
configure = matchAll()
query }
}
// hits is a Flow<Thing>
("Hits: ${results.mappedHits.count()}")
println}
Captured Output:
Hits: 10