ElasticsearchΒΆ
Stores events in Elasticsearch
Usage
build.gradle
compile 'com.sonymobile:lumbermill-elasticsearch-client:$version'
Groovy script
import lumbermill.api.Codecs
import static lumbermill.Core.*
import static lumbermill.Elasticsearch.elasticsearch
Observable.just(Codecs.TEXT_TO_JSON.from("hello"), Codecs.TEXT_TO_JSON.from("World"))
.flatMap (
fingerprint.md5('{message}')
)
.buffer (100) // Buffering is currently required. Pick a suitable amount.
.flatMap (
elasticsearch.client (
basic_auth: 'user:passwd', // Optional
url: 'http(s)://host', // Required
index_prefix: 'myindex-', // Required, supports pattern '{anIndex}-'
type: 'a_type', // Required, supports pattern '{type}'
document_id: '{fingerprint}', // Optional, but recommended
timestamp_field: '@timestamp' // Optional, defaults to @timestamp
retry: [ // Optional, defaults to fixed, 2000, 20
policy: 'linear',
attempts: 20,
delayMs: 500
],
dispatcher: [ // Optional
max_concurrent_requests: 2, // Optional, defaults to 5
threadpool: <ExecutorService>, // Optional
]
)
)
.toBlocking()
.subscribe()
Arguments
Elasticsearch requires a List<JsonEvent> as input so you MUST buffer before sending. It will convert events into a single Bulk API request.
Returns
Elasticsearch returns Observable<ElasticSearchResponseEvent> which extends JsonEvent and contains the actual raw response from Elasticsearch. If you want to continue working with the original Events that where sent as arguments to Elasticsearch function you can get those with the arguments() method.
o.flatMap (
elasticsearch.client (...).flatMap(response.arguments())
)
Errors
Elasticsearch client is built to handle partial errors, meaning that some entries are not properly stored. This could be due to anything from malformed content or shard failures. The Elasticsearch client will retry any failures that are not unrecoverable (400 BAD_REQUEST), those will simply be ignored and not retried.
Retries
Elasticsearch has a default retry policy that is fixed delay of 2 seconds and 20 attempts, so it will retry failed records every 2 seconds 20 times.
Once there are no more retries, an FatalIndexException is thrown to indicate that it failed and there is no use to continue.
Limitations
- Currently it only uses index operation, does not support create, update or delete.
- Only daily indices can be created.
Performance
It is a custom implementation based on OkHttp. We started out with Jest but could not get good enough throughput, but OkHttp has proven to be amazing.