Skip to main content
Version: Next

Input: s3

Stream data from a S3 Object

Field Summary

Field NameTypeDescriptionDefault
whenmessage_filterFire this input when a specific internal message occurs-
intervaldurationHow often to run the command-
croncronHow often to run the command. Note that unlike standard Cron, Pipes use a Cron syntax that includes a column for seconds. See full discussion-
immediateboolRun as soon as invoked, instead of waiting for the specified cron intervalfalse
random-offsetdurationSets a random offset to the schedule, then sticks to it0s
windowWindowFor resources that need a time window to be specified-
blockboolBlock further input schedules from triggering if the pipe output is retryingfalse
bucket-namestringThe storage service container for created blobs-
object-namesarray of stringsThe name for the blob-
object-name-fieldfieldThe field that a blob name from an operation should be stored in-
creation-time-fieldfieldThe field that the blob creation time should be stored in-
last-modified-fieldfieldThe field that the blob last modified time should be stored in-
content-length-fieldfieldThe field that the blob content length information should be stored in-
content-type-fieldfieldThe field that the blob content type information should be stored in-
etag-fieldfieldThe field that the object ETag should be stored in-
data-fieldfieldA field that the blob data should be nested in-
endpointstringS3 Endpoint-
access-keystringAccess Key ID-
secret-keystringSecret Key ID-
security-tokenstringSecurity Token-
session-tokenstringSession Token-
timestamp-modeS3ObjectTimestampModeDerive a timestamp for this blob for filtering purposes based on the selected strategy.-
maximum-ageMaxAgeSpecifierRemove any object older than this many seconds from the candidate list-
modeS3BlockInputModeThe operating mode for this input-
fingerprintingboolEnable object fingerprinting, which will cause a object to only be downloaded oncefalse
fingerprinting-db-pathpathSpecify a custom path for the fingerprinting database-
maximum-fingerprint-ageMaxAgeSpecifierRemove any object fingerprints older than this from the tracker30 days
preprocessorsPreProcessorPreprocessors (process downloaded data before making it available to the pipeline) these processors will be run in the order they are specified-



Type: message_filter

Fire this input when a specific internal message occurs

This field overloads time-based scheduling with a scheduler that fires on matching messages.


Pipe Language Snippet:

- pipe-idle
url: "http://localhost:8888"
raw: true
ignore-line-breaks: true


Type: duration

How often to run the command

By default, interval: 0s which means: once. Note that scheduled inputs set document markers. See full discussion


Pipe Language Snippet:

command: echo 'once a day'
interval: 1d


Type: cron

How often to run the command. Note that unlike standard Cron, Pipes use a Cron syntax that includes a column for seconds. See full discussion

Example: Once a day

Pipe Language Snippet:

command: echo 'once a day'
cron: '0 0 0 * * *'

Example: Once a day, using a convenient shortcut

Pipe Language Snippet:

command: echo 'once a day'
cron: '@daily'


Type: bool

Default: false

Run as soon as invoked, instead of waiting for the specified cron interval

Example: Run immediately on invocation, and thereafter at 10h every morning

Pipe Language Snippet:

command: echo 'hello'
immediate: true
cron: '0 0 10 * * *'


Type: duration

Default: 0s

Sets a random offset to the schedule, then sticks to it

This can help avoid the thundering herd problem, where you do not, for example, want to overload some service at 00:00:00

Example: Would fire up to a minute after every hour

Pipe Language Snippet:

command: echo 'hello'
random-offset: 1m
cron: '0 0 * * * *'


Type: Window

For resources that need a time window to be specified

Field NameTypeDescriptionDefault
sizedurationWindow size-
offsetdurationWindow offset0s
start-timetimeAllows the windowing to start at a specified time-
highwatermark-filepathSpecify file where timestamp would be stored in order to resume, for when Pipe has been restarted-


Type: duration

Window size


Pipe Language Snippet:

command: echo 'one two'
size: 1m


Type: duration

Default: 0s

Window offset


Pipe Language Snippet:

command: echo 'one two'
size: 1m
offset: 10s


Type: time

Allows the windowing to start at a specified time

It should in the following format: 2019-07-10 18:45:00.000 +0200


Pipe Language Snippet:

command: echo 'one two'
size: 1m
start-time: 10s


Type: path

Specify file where timestamp would be stored in order to resume, for when Pipe has been restarted


Pipe Language Snippet:

command: echo 'one two'
size: 1m
highwatermark-file:: /tmp/mark.txt


Type: bool

Default: false

Block further input schedules from triggering if the pipe output is retrying


Type: string

The storage service container for created blobs


Type: array of strings

The name for the blob


Type: field

The field that a blob name from an operation should be stored in


Type: field

The field that the blob creation time should be stored in


Type: field

The field that the blob last modified time should be stored in


Type: field

The field that the blob content length information should be stored in


Type: field

The field that the blob content type information should be stored in


Type: field

The field that the object ETag should be stored in


Type: field

A field that the blob data should be nested in


Type: string



Type: string

S3 Endpoint


Type: string

Access Key ID


Type: string

Secret Key ID


Type: string

Security Token


Type: string

Session Token


Type: S3ObjectTimestampMode

Derive a timestamp for this blob for filtering purposes based on the selected strategy.

Field NameTypeDescriptionDefault
noneThe default mode, do not filter object based on timestamps-
last-modifiedFilter object on the last-modified timestamp reported by the service-
blob-name-patternstringFilter blobs on the timestamp derived from the object name for example: object-name-pattern: =(?P<Y>[\\d]{4,4})-(?P<m>[\\d]{2,2})-(?P<d>[\\d]{2,2})/-


The default mode, do not filter object based on timestamps


Filter object on the last-modified timestamp reported by the service


Type: string

Filter blobs on the timestamp derived from the object name for example: object-name-pattern: =(?P<Y>[\\d]{4,4})-(?P<m>[\\d]{2,2})-(?P<d>[\\d]{2,2})/


Type: MaxAgeSpecifier

Remove any object older than this many seconds from the candidate list

Field NameTypeDescriptionDefault
secondsintegerSpecify the maximum age in number of seconds-
durationstringSpecify the maximum age as a human readable duration (example: 1 hour)-


Type: integer

Specify the maximum age in number of seconds


Type: string

Specify the maximum age as a human readable duration (example: 1 hour)


Type: S3BlockInputMode

The operating mode for this input

Field NameTypeDescriptionDefault
list-objectsList Objects-
download-objectsDownload Given Objects-
list-and-download-objectsList Objects and Download-


List Objects


Download Given Objects


List Objects and Download


Type: bool

Default: false

Enable object fingerprinting, which will cause a object to only be downloaded once


Type: path

Specify a custom path for the fingerprinting database


Type: MaxAgeSpecifier

Default: 30 days

Remove any object fingerprints older than this from the tracker

Field NameTypeDescriptionDefault
secondsintegerSpecify the maximum age in number of seconds-
durationstringSpecify the maximum age as a human readable duration (example: 1 hour)-


Type: integer

Specify the maximum age in number of seconds


Type: string

Specify the maximum age as a human readable duration (example: 1 hour)


Type: PreProcessor

Preprocessors (process downloaded data before making it available to the pipeline) these processors will be run in the order they are specified

Field NameTypeDescriptionDefault
extensionPreprocess the object or blob based on the extension of the object or blob name (.gz, .parquet)-
gzipUnGzip the received data-
parquetExtract the received data as JSON rows from a parquet file-


Preprocess the object or blob based on the extension of the object or blob name (.gz, .parquet)


UnGzip the received data


Extract the received data as JSON rows from a parquet file