Skip to main content
Version: 3.5.0

Input: amqp

Read from AMQP queues

Field Summary

Field NameTypeDescriptionDefault
batchBatchFor when a number of output events need to be marked as belonging to a distinct group-
uriurlAMQP address to listen toamqp://localhost//
queueQueueAn AMQP Queue-
routing-keystringUsed in conjunction with topic exchange, to route data-
exchangeExchangeAn AMQP Exchange-
persistentboolLet messages be cached and survive server restartfalse
durableboolLet Queue or Exchange survive server restartfalse
rawboolIf true, treat incoming events as JSON, else wrap in _raw field ({"_raw":...})false
ignore-line-breaksboolDo not treat separate lines as distinct eventsfalse

Fields

batch

Type: Batch

For when a number of output events need to be marked as belonging to a distinct group

Field NameTypeDescriptionDefault
uuid-fieldfieldField where generated uuid, the unique marker for the group, will be stored-
invocation-time-fieldfieldField where invocation time will be stored-
completion-time-fieldfieldField where completion (end of execution) time will be stored-
begin-marker-fieldfieldField used to mark first event in the group-
end-marker-fieldfieldField used to mark last event in the group-
line-count-fieldfieldField used to store the line count of the batch-
line-num-fieldfieldField used to store the line number of the batch-

  uuid-field

Type: field

Field where generated uuid, the unique marker for the group, will be stored

Example

Pipe Language Snippet:

exec:
command: |
for n in $(seq 3)
do
echo $n
done
no-strip-linefeeds: true
batch:
uuid-field: marker
interval: 1m

Output:

{"_raw":"foo","line-count":3,"line-num":1,"marker":"f3308aa9-6f56-4cc1-8782-c4231ff254b8"}
{"_raw":"2","line-count":3,"line-num":2,"marker":"f3308aa9-6f56-4cc1-8782-c4231ff254b8"}
{"_raw":"3","line-count":3,"line-num":3,"marker":"f3308aa9-6f56-4cc1-8782-c4231ff254b8"}

Example: For cases where event count is known, a simple counter is used, instead of uuid (useful for testing)

Pipe Language Snippet:

exec:
command: echo foo
no-strip-linefeeds: true
count: 3
batch:
uuid-field: marker
interval: 1m

Output:

{"_raw":"1","line-count":3,"line-num":1,"marker":"1"}
{"_raw":"2","line-count":3,"line-num":2,"marker":"1"}
{"_raw":"3","line-count":3,"line-num":3,"marker":"1"}
{"_raw":"1","line-count":3,"line-num":1,"marker":"2"}
{"_raw":"2","line-count":3,"line-num":2,"marker":"2"}
{"_raw":"3","line-count":3,"line-num":3,"marker":"2"}
{"_raw":"1","line-count":3,"line-num":1,"marker":"3"}
{"_raw":"2","line-count":3,"line-num":2,"marker":"3"}
{"_raw":"3","line-count":3,"line-num":3,"marker":"3"}

  invocation-time-field

Type: field

Field where invocation time will be stored

Example

Pipe Language Snippet:

exec:
command: |
for n in $(seq 3)
do
echo $n
done
no-strip-linefeeds: true
batch:
invocation-time-field: begin
interval: 1m

Output:

{"_raw":"1","line-count":3,"line-num":1,"begin":"2020-01-17T09:55:09.135Z"}
{"_raw":"2","line-count":3,"line-num":2,"begin":"2020-01-17T09:55:09.135Z"}
{"_raw":"3","line-count":3,"line-num":3,"begin":"2020-01-17T09:55:09.135Z"}

  completion-time-field

Type: field

Field where completion (end of execution) time will be stored

Example

Pipe Language Snippet:

exec:
command: |
for n in $(seq 3)
do
echo $n
done
no-strip-linefeeds: true
batch:
invocation-time-field: begin
completion-time-field: end
interval: 1m

Output:

{"_raw":"1","begin":"2020-01-17T10:02:14.302Z","end":"2020-01-17T10:02:14.301Z","line-count":3,"line-num":1}
{"_raw":"2","begin":"2020-01-17T10:02:14.302Z","end":"2020-01-17T10:02:14.301Z","line-count":3,"line-num":2}
{"_raw":"3","begin":"2020-01-17T10:02:14.302Z","end":"2020-01-17T10:02:14.301Z","line-count":3,"line-num":3}

  begin-marker-field

Type: field

Field used to mark first event in the group

Example

Pipe Language Snippet:

exec:
command: |
for n in $(seq 3)
do
echo $n
done
no-strip-linefeeds: true
batch:
begin-marker-field: begin
interval: 1m

Output:

{"_raw":"1","begin":true,"line-count":3,"line-num":1}
{"_raw":"2","line-count":3,"line-num":2}
{"_raw":"3","line-count":3,"line-num":3}

  end-marker-field

Type: field

Field used to mark last event in the group

Example

Pipe Language Snippet:

exec:
command: |
for n in $(seq 3)
do
echo $n
done
no-strip-linefeeds: true
batch:
begin-marker-field: begin
end-marker-field: end
interval: 1m

Output:

{"_raw":"1","begin":true,"line-count":3,"line-num":1}
{"_raw":"2","line-count":3,"line-num":2}
{"_raw":"3","end":true,"line-count":3,"line-num":3}

  line-count-field

Type: field

Field used to store the line count of the batch

  line-num-field

Type: field

Field used to store the line number of the batch

uri

Type: url

Default: amqp://localhost//

AMQP address to listen to

Example

Pipe Language Snippet:

amqp:
queue:
name: queue-name
uri: amqp://localhost//

queue

Type: Queue

An AMQP Queue

Field NameTypeDescriptionDefault
namestringQueue name-
passiveboolAssume Queue has already been declared, else errorfalse

  name

Type: string

Queue name

Example

Pipe Language Snippet:

amqp:
queue:
name: queue-name

  passive

Type: bool

Default: false

Assume Queue has already been declared, else error

Example

Pipe Language Snippet:

amqp:
queue:
name: some-queue
passive: true

routing-key

Type: string

Used in conjunction with topic exchange, to route data

exchange

Type: Exchange

An AMQP Exchange

Field NameTypeDescriptionDefault
namestringExchange name-
typestringExchange type-
passiveboolAssume Exchange has already been declared, else errorfalse

  name

Type: string

Exchange name

Example

Pipe Language Snippet:

amqp:
exchange:
name: some-exchange
type: fanout

  type

Type: string

Possible Values: fanout, topic, direct

Exchange type

There are 2 supported types:

  • fanout: For when same messages are to be consumed by multiple comsumers
  • topic: Topic-based pub-sub
  • direct: For when routing keys are to be used

Example

Pipe Language Snippet:

amqp:
exchange:
name: some-exchange
type: topic
routing-key: some.key

  passive

Type: bool

Default: false

Assume Exchange has already been declared, else error

Example

Pipe Language Snippet:

amqp:
exchange:
name: some-exchange
passive: true

persistent

Type: bool

Default: false

Let messages be cached and survive server restart

Example

Pipe Language Snippet:

amqp:
queue:
name: some-queue
persistent: true

durable

Type: bool

Default: false

Let Queue or Exchange survive server restart

Example

Pipe Language Snippet:

amqp:
queue:
name: some-queue
durable: true

raw

Type: bool
Alias: json
Default: false

If true, treat incoming events as JSON, else wrap in _raw field ({"_raw":...})

Example

Pipe Language Snippet:

amqp:
queue:
name: queue-name
raw: true

ignore-line-breaks

Type: bool
Alias: ignore-linebreaks
Default: false

Do not treat separate lines as distinct events

Example

Pipe Language Snippet:

amqp:
queue:
name: some-queue
ignore-line-breaks: true