Input: amqp
Read from AMQP queues
Field Summary
Field Name | Type | Description | Default |
---|---|---|---|
batch | Batch | For when a number of output events need to be marked as belonging to a distinct group | - |
uri | url | AMQP address to listen to | amqp://localhost// |
queue | Queue | An AMQP Queue | - |
routing-key | string | Used in conjunction with topic exchange, to route data | - |
exchange | Exchange | An AMQP Exchange | - |
persistent | bool | Let messages be cached and survive server restart | false |
durable | bool | Let Queue or Exchange survive server restart | false |
raw | bool | If true, treat incoming events as JSON, else wrap in _raw field ({"_raw":...} ) | false |
ignore-line-breaks | bool | Do not treat separate lines as distinct events | false |
Fields
batch
Type: Batch
For when a number of output events need to be marked as belonging to a distinct group
Field Name | Type | Description | Default |
---|---|---|---|
uuid-field | field | Field where generated uuid, the unique marker for the group, will be stored | - |
invocation-time-field | field | Field where invocation time will be stored | - |
completion-time-field | field | Field where completion (end of execution) time will be stored | - |
begin-marker-field | field | Field used to mark first event in the group | - |
end-marker-field | field | Field used to mark last event in the group | - |
line-count-field | field | Field used to store the line count of the batch | - |
line-num-field | field | Field used to store the line number of the batch | - |
uuid-field
Type: field
Field where generated uuid, the unique marker for the group, will be stored
Example
Pipe Language Snippet:
exec:
command: |
for n in $(seq 3)
do
echo $n
done
no-strip-linefeeds: true
batch:
uuid-field: marker
interval: 1m
Output:
{"_raw":"foo","line-count":3,"line-num":1,"marker":"f3308aa9-6f56-4cc1-8782-c4231ff254b8"}
{"_raw":"2","line-count":3,"line-num":2,"marker":"f3308aa9-6f56-4cc1-8782-c4231ff254b8"}
{"_raw":"3","line-count":3,"line-num":3,"marker":"f3308aa9-6f56-4cc1-8782-c4231ff254b8"}
Example: For cases where event count is known, a simple counter is used, instead of uuid (useful for testing)
Pipe Language Snippet:
exec:
command: echo foo
no-strip-linefeeds: true
count: 3
batch:
uuid-field: marker
interval: 1m
Output:
{"_raw":"1","line-count":3,"line-num":1,"marker":"1"}
{"_raw":"2","line-count":3,"line-num":2,"marker":"1"}
{"_raw":"3","line-count":3,"line-num":3,"marker":"1"}
{"_raw":"1","line-count":3,"line-num":1,"marker":"2"}
{"_raw":"2","line-count":3,"line-num":2,"marker":"2"}
{"_raw":"3","line-count":3,"line-num":3,"marker":"2"}
{"_raw":"1","line-count":3,"line-num":1,"marker":"3"}
{"_raw":"2","line-count":3,"line-num":2,"marker":"3"}
{"_raw":"3","line-count":3,"line-num":3,"marker":"3"}
invocation-time-field
Type: field
Field where invocation time will be stored
Example
Pipe Language Snippet:
exec:
command: |
for n in $(seq 3)
do
echo $n
done
no-strip-linefeeds: true
batch:
invocation-time-field: begin
interval: 1m
Output:
{"_raw":"1","line-count":3,"line-num":1,"begin":"2020-01-17T09:55:09.135Z"}
{"_raw":"2","line-count":3,"line-num":2,"begin":"2020-01-17T09:55:09.135Z"}
{"_raw":"3","line-count":3,"line-num":3,"begin":"2020-01-17T09:55:09.135Z"}
completion-time-field
Type: field
Field where completion (end of execution) time will be stored
Example
Pipe Language Snippet:
exec:
command: |
for n in $(seq 3)
do
echo $n
done
no-strip-linefeeds: true
batch:
invocation-time-field: begin
completion-time-field: end
interval: 1m
Output:
{"_raw":"1","begin":"2020-01-17T10:02:14.302Z","end":"2020-01-17T10:02:14.301Z","line-count":3,"line-num":1}
{"_raw":"2","begin":"2020-01-17T10:02:14.302Z","end":"2020-01-17T10:02:14.301Z","line-count":3,"line-num":2}
{"_raw":"3","begin":"2020-01-17T10:02:14.302Z","end":"2020-01-17T10:02:14.301Z","line-count":3,"line-num":3}
begin-marker-field
Type: field
Field used to mark first event in the group
Example
Pipe Language Snippet:
exec:
command: |
for n in $(seq 3)
do
echo $n
done
no-strip-linefeeds: true
batch:
begin-marker-field: begin
interval: 1m
Output:
{"_raw":"1","begin":true,"line-count":3,"line-num":1}
{"_raw":"2","line-count":3,"line-num":2}
{"_raw":"3","line-count":3,"line-num":3}
end-marker-field
Type: field
Field used to mark last event in the group
Example
Pipe Language Snippet:
exec:
command: |
for n in $(seq 3)
do
echo $n
done
no-strip-linefeeds: true
batch:
begin-marker-field: begin
end-marker-field: end
interval: 1m
Output:
{"_raw":"1","begin":true,"line-count":3,"line-num":1}
{"_raw":"2","line-count":3,"line-num":2}
{"_raw":"3","end":true,"line-count":3,"line-num":3}
line-count-field
Type: field
Field used to store the line count of the batch
line-num-field
Type: field
Field used to store the line number of the batch
uri
Type: url
Default: amqp://localhost//
AMQP address to listen to
Example
Pipe Language Snippet:
amqp:
queue:
name: queue-name
uri: amqp://localhost//
queue
Type: Queue
An AMQP Queue
Field Name | Type | Description | Default |
---|---|---|---|
name | string | Queue name | - |
passive | bool | Assume Queue has already been declared, else error | false |
name
Type: string
Queue name
Example
Pipe Language Snippet:
amqp:
queue:
name: queue-name
passive
Type: bool
Default: false
Assume Queue has already been declared, else error
Example
Pipe Language Snippet:
amqp:
queue:
name: some-queue
passive: true
routing-key
Type: string
Used in conjunction with topic exchange, to route data
exchange
Type: Exchange
An AMQP Exchange
Field Name | Type | Description | Default |
---|---|---|---|
name | string | Exchange name | - |
type | string | Exchange type | - |
passive | bool | Assume Exchange has already been declared, else error | false |
name
Type: string
Exchange name
Example
Pipe Language Snippet:
amqp:
exchange:
name: some-exchange
type: fanout
type
Type: string
Possible Values: fanout, topic, direct
Exchange type
There are 2 supported types:
- fanout: For when same messages are to be consumed by multiple comsumers
- topic: Topic-based pub-sub
- direct: For when routing keys are to be used
Example
Pipe Language Snippet:
amqp:
exchange:
name: some-exchange
type: topic
routing-key: some.key
passive
Type: bool
Default: false
Assume Exchange has already been declared, else error
Example
Pipe Language Snippet:
amqp:
exchange:
name: some-exchange
passive: true
persistent
Type: bool
Default: false
Let messages be cached and survive server restart
Example
Pipe Language Snippet:
amqp:
queue:
name: some-queue
persistent: true
durable
Type: bool
Default: false
Let Queue or Exchange survive server restart
Example
Pipe Language Snippet:
amqp:
queue:
name: some-queue
durable: true
raw
Type: bool
Alias: json
Default: false
If true, treat incoming events as JSON, else wrap in _raw
field ({"_raw":...}
)
Example
Pipe Language Snippet:
amqp:
queue:
name: queue-name
raw: true
ignore-line-breaks
Type: bool
Alias: ignore-linebreaks
Default: false
Do not treat separate lines as distinct events
Example
Pipe Language Snippet:
amqp:
queue:
name: some-queue
ignore-line-breaks: true