Skip to main content
Version: Next

Input: redis

Read from Redis in-memory key-value store

Only reading key value (hash-value and hash) are scheduled

Field Summary

Field NameTypeDescriptionDefault
getRedisInOpRedis input configuration options-
batchBatchFor when a number of output events need to be marked as belonging to a distinct group-
jsonboolAssume incoming data is in JSON formatfalse
uristringRedis address to listen toredis://localhost/
whenmessage_filterFire this input when a specific internal message occurs-
intervaldurationHow often to run the command-
croncronHow often to run the command. Note that unlike standard Cron, Pipes use a Cron syntax that includes a column for seconds. See full discussion-
immediateboolRun as soon as invoked, instead of waiting for the specified cron intervalfalse
random-offsetdurationSets a random offset to the schedule, then sticks to it0s
windowWindowFor resources that need a time window to be specified-
blockboolBlock further input schedules from triggering if the pipe output is retryingfalse

Fields

get

Type: RedisInOp

Redis input configuration options

Field NameTypeDescriptionDefault
subscribearray of stringsSubscribe to Redis channels-
liststringName of a Redis list (queue)-
hash-valuearray of two stringsRead from a Redis hash-
hashstringRead a Redis hash as a JSON event-

  subscribe

Type: array of strings

Subscribe to Redis channels

Example: Can subscribe to multple channels

Pipe Language Snippet:

redis:
get:
subscribe:
- some-channel
- some-other-channel

Example: Can subscribe to all sub-channels of a particular channel

Pipe Language Snippet:

redis:
get:
subscribe:
- some-channel.*

  list

Type: string

Name of a Redis list (queue)

Example

Pipe Language Snippet:

redis:
get:
list: some-field

  hash-value

Type: array of two strings

Read from a Redis hash

This can be scheduled. Event field expansions allowed in the hash field

Example

Pipe Language Snippet:

redis:
get:
hash-value: [my-hash, the-field]
interval: 5m

  hash

Type: string

Read a Redis hash as a JSON event

Scheduled

Example

Pipe Language Snippet:

redis:
get:
hash: some-hash
interval: 2s

batch

Type: Batch

For when a number of output events need to be marked as belonging to a distinct group

Field NameTypeDescriptionDefault
uuid-fieldfieldField where generated uuid, the unique marker for the group, will be stored-
invocation-time-fieldfieldField where invocation time will be stored-
completion-time-fieldfieldField where completion (end of execution) time will be stored-
begin-marker-fieldfieldField used to mark first event in the group-
end-marker-fieldfieldField used to mark last event in the group-
line-count-fieldfieldField used to store the line count of the batch-
line-num-fieldfieldField used to store the line number of the batch-

  uuid-field

Type: field

Field where generated uuid, the unique marker for the group, will be stored

Example

Pipe Language Snippet:

exec:
command: |
for n in $(seq 3)
do
echo $n
done
no-strip-linefeeds: true
batch:
uuid-field: marker
interval: 1m

Output:

{"_raw":"foo","line-count":3,"line-num":1,"marker":"f3308aa9-6f56-4cc1-8782-c4231ff254b8"}
{"_raw":"2","line-count":3,"line-num":2,"marker":"f3308aa9-6f56-4cc1-8782-c4231ff254b8"}
{"_raw":"3","line-count":3,"line-num":3,"marker":"f3308aa9-6f56-4cc1-8782-c4231ff254b8"}

Example: For cases where event count is known, a simple counter is used, instead of uuid (useful for testing)

Pipe Language Snippet:

exec:
command: echo foo
no-strip-linefeeds: true
count: 3
batch:
uuid-field: marker
interval: 1m

Output:

{"_raw":"1","line-count":3,"line-num":1,"marker":"1"}
{"_raw":"2","line-count":3,"line-num":2,"marker":"1"}
{"_raw":"3","line-count":3,"line-num":3,"marker":"1"}
{"_raw":"1","line-count":3,"line-num":1,"marker":"2"}
{"_raw":"2","line-count":3,"line-num":2,"marker":"2"}
{"_raw":"3","line-count":3,"line-num":3,"marker":"2"}
{"_raw":"1","line-count":3,"line-num":1,"marker":"3"}
{"_raw":"2","line-count":3,"line-num":2,"marker":"3"}
{"_raw":"3","line-count":3,"line-num":3,"marker":"3"}

  invocation-time-field

Type: field

Field where invocation time will be stored

Example

Pipe Language Snippet:

exec:
command: |
for n in $(seq 3)
do
echo $n
done
no-strip-linefeeds: true
batch:
invocation-time-field: begin
interval: 1m

Output:

{"_raw":"1","line-count":3,"line-num":1,"begin":"2020-01-17T09:55:09.135Z"}
{"_raw":"2","line-count":3,"line-num":2,"begin":"2020-01-17T09:55:09.135Z"}
{"_raw":"3","line-count":3,"line-num":3,"begin":"2020-01-17T09:55:09.135Z"}

  completion-time-field

Type: field

Field where completion (end of execution) time will be stored

Example

Pipe Language Snippet:

exec:
command: |
for n in $(seq 3)
do
echo $n
done
no-strip-linefeeds: true
batch:
invocation-time-field: begin
completion-time-field: end
interval: 1m

Output:

{"_raw":"1","begin":"2020-01-17T10:02:14.302Z","end":"2020-01-17T10:02:14.301Z","line-count":3,"line-num":1}
{"_raw":"2","begin":"2020-01-17T10:02:14.302Z","end":"2020-01-17T10:02:14.301Z","line-count":3,"line-num":2}
{"_raw":"3","begin":"2020-01-17T10:02:14.302Z","end":"2020-01-17T10:02:14.301Z","line-count":3,"line-num":3}

  begin-marker-field

Type: field

Field used to mark first event in the group

Example

Pipe Language Snippet:

exec:
command: |
for n in $(seq 3)
do
echo $n
done
no-strip-linefeeds: true
batch:
begin-marker-field: begin
interval: 1m

Output:

{"_raw":"1","begin":true,"line-count":3,"line-num":1}
{"_raw":"2","line-count":3,"line-num":2}
{"_raw":"3","line-count":3,"line-num":3}

  end-marker-field

Type: field

Field used to mark last event in the group

Example

Pipe Language Snippet:

exec:
command: |
for n in $(seq 3)
do
echo $n
done
no-strip-linefeeds: true
batch:
begin-marker-field: begin
end-marker-field: end
interval: 1m

Output:

{"_raw":"1","begin":true,"line-count":3,"line-num":1}
{"_raw":"2","line-count":3,"line-num":2}
{"_raw":"3","end":true,"line-count":3,"line-num":3}

  line-count-field

Type: field

Field used to store the line count of the batch

  line-num-field

Type: field

Field used to store the line number of the batch

json

Type: bool
Alias: json
Default: false

Assume incoming data is in JSON format

Example

Pipe Language Snippet:

redis:
get:
list: queue
json: true

uri

Type: string

Default: redis://localhost/

Redis address to listen to

Example

Pipe Language Snippet:

redis:
get:
subscribe:
- topic
uri: redis://localhost/

when

Type: message_filter

Fire this input when a specific internal message occurs

This field overloads time-based scheduling with a scheduler that fires on matching messages.

Example

Pipe Language Snippet:

input:
http-poll:
when:
message-received:
filter-type:
- pipe-idle
url: "http://localhost:8888"
raw: true
ignore-line-breaks: true

interval

Type: duration

How often to run the command

By default, interval: 0s which means: once. Note that scheduled inputs set document markers. See full discussion

Example

Pipe Language Snippet:

exec:
command: echo 'once a day'
interval: 1d

cron

Type: cron

How often to run the command. Note that unlike standard Cron, Pipes use a Cron syntax that includes a column for seconds. See full discussion

Example: Once a day

Pipe Language Snippet:

exec:
command: echo 'once a day'
cron: '0 0 0 * * *'

Example: Once a day, using a convenient shortcut

Pipe Language Snippet:

exec:
command: echo 'once a day'
cron: '@daily'

immediate

Type: bool

Default: false

Run as soon as invoked, instead of waiting for the specified cron interval

Example: Run immediately on invocation, and thereafter at 10h every morning

Pipe Language Snippet:

exec:
command: echo 'hello'
immediate: true
cron: '0 0 10 * * *'

random-offset

Type: duration

Default: 0s

Sets a random offset to the schedule, then sticks to it

This can help avoid the thundering herd problem, where you do not, for example, want to overload some service at 00:00:00

Example: Would fire up to a minute after every hour

Pipe Language Snippet:

exec:
command: echo 'hello'
random-offset: 1m
cron: '0 0 * * * *'

window

Type: Window

For resources that need a time window to be specified

Field NameTypeDescriptionDefault
sizedurationWindow size-
offsetdurationWindow offset0s
start-timetimeAllows the windowing to start at a specified time-
highwatermark-filepathSpecify file where timestamp would be stored in order to resume, for when Pipe has been restarted-

  size

Type: duration

Window size

Example

Pipe Language Snippet:

exec:
command: echo 'one two'
window:
size: 1m

  offset

Type: duration

Default: 0s

Window offset

Example

Pipe Language Snippet:

exec:
command: echo 'one two'
window:
size: 1m
offset: 10s

  start-time

Type: time

Allows the windowing to start at a specified time

It should in the following format: 2019-07-10 18:45:00.000 +0200

Example

Pipe Language Snippet:

exec:
command: echo 'one two'
window:
size: 1m
start-time: 10s

  highwatermark-file

Type: path

Specify file where timestamp would be stored in order to resume, for when Pipe has been restarted

Example

Pipe Language Snippet:

exec:
command: echo 'one two'
window:
size: 1m
highwatermark-file:: /tmp/mark.txt

block

Type: bool

Default: false

Block further input schedules from triggering if the pipe output is retrying