Local FilesystemΒΆ

The fs support is designed mainly to support one-time read of files which might be temporary files downloaded from S3 or one-time jobs when recursively iterating a filesystem.

It does not support tail, if you need that there are better solutions*

Default codec is Codecs.TEXT_TO_JSON

Read a file once

This will create the source Observable and does not “hook” into an existing pipeline

import lumbermill.api.Codecs

import static lumbermill.Core.file

file.readFileAsLines (
        file:  '/tmp/afile',
        codec : Codecs.TEXT_TO_JSON)
.filter( keepWhen( "'{message}'.contains('ERROR')" )
.doOnNext( console.stdout('Errors: {message}') )
.subscribe()

Read each line in an existing pipeline

If you i.e have downloaded a file from S3 or iterating a number of files you can use the file.lines() to read each line and return as Observables. This also takes the codec parameter if required.

.flatMap (
    file.lines(file: '{s3_download_path}')
)