Kinesis Consumer LibraryΒΆ

Lumber-Mill can use KCL to process data. Each ‘batch’ is received as a stream and checkpointed after it successfully returns. Currently there is no support for delay checkpointing.

Build

compile 'com.sonymobile:lumbermill-aws-kcl:$version'

This sample subscribes to a kinesis stream and simply prints the contents of each record and the total count.

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
import lumbermill.api.BytesEvent
import lumbermill.aws.kcl.KCL
import static lumbermill.api.Sys.env
import static lumbermill.aws.kcl.KCL.workerId


// Uses minimal KCL configuration
KCL.create (
    new KinesisClientLibConfiguration (
        env ('appName', 'testApp').string(),
        env ('streamName', 'testStream').string(),
        new DefaultAWSCredentialsProviderChain(),
        workerId())
    .withRegionName(env ("region", "eu-west-1").string()))

    .dry(env ("dry", "false").bool()) // Dry will not checkpoint

    // Each record as an observable
    .handleRecordBatch { record ->
        record
            .doOnNext{BytesEvent event -> println event.raw().utf8()}
            .count()
            .doOnNext{count -> println count} // Prints the total number of records that was received.
    }