Testing Pipes
It's useful to test Pipes, in their entirety or sections, prior to acually deploying them.
Consider this basic Pipe that has the following context
expansion:
name: echo_msg
context:
msg: hello
input:
exec:
command: 'echo {{msg}}'
output:
write: console
Run the Pipe:
$> hotrod pipes run --file test.yml
{"_raw":"hello"}
To experiment with overriding context
, create echo_msg_context.yml
, containing a context
msg
:
context:
msg: goodbye
Then, override the Pipe context
:
$> hotrod pipes run --file test.yml --context-files echo_msg_context.yml
{"_raw":"goodbye"}
Experimenting with Pipes in this way builds confidence. For the more complicated Pipes, it is always easier to troubleshoot problems on localhost.
The following Pipe consumes a file times.txt
containing times in Unix time (epoch) format. The Pipe must be used to detect any gaps in this time record:
1592928057
1592928058
1592928059
1592928060
1592928061
1592928062
1592928063
1592928064
1592928065
1592928066
The time is depicted as a string with the default field _raw
. We rename
and convert
it into a number:
name: epoch_stream
input:
exec:
command: cat times.txt
actions:
- rename:
- _raw: epoch
- convert:
- epoch: num
output:
write: console
Run the Pipe:
$> hotrod pipes run -f epoch_stream.yml
{"epoch":1592928057}
{"epoch":1592928058}
{"epoch":1592928059}
{"epoch":1592928060}
{"epoch":1592928061}
{"epoch":1592928062}
{"epoch":1592928063}
{"epoch":1592928064}
{"epoch":1592928065}
{"epoch":1592928066}
Use the stream
action
to find the difference between times (just
after convert
):
name: epoch_stream
input:
exec:
command: cat times.txt
actions:
- rename:
- _raw: epoch
- convert:
- epoch: num
- stream:
operation: delta
watch: epoch
output:
write: console
The resulting output:
{"epoch":1592928057,"delta":1592928057,"elapsed":0}
{"epoch":1592928058,"delta":1,"elapsed":0}
{"epoch":1592928059,"delta":1,"elapsed":0}
{"epoch":1592928060,"delta":1,"elapsed":0}
{"epoch":1592928061,"delta":1,"elapsed":0}
{"epoch":1592928062,"delta":1,"elapsed":0}
{"epoch":1592928063,"delta":1,"elapsed":0}
{"epoch":1592928064,"delta":1,"elapsed":0}
{"epoch":1592928065,"delta":1,"elapsed":0}
{"epoch":1592928066,"delta":1,"elapsed":0}
Since 1592928057
has no preceeding time, the delta
is 1592928057
,
thereafter we have a delta
of 1
for each subsequent line as the time
difference between each line is 1s
.
To detect gaps, only pass events through where delta is not equal to 1
and where not the first event
using the filter
action:
name: epoch_stream
input:
exec:
command: cat times.txt
actions:
- rename:
- _raw: epoch
- convert:
- epoch: num
- stream:
operation: delta
watch: epoch
- filter:
condition: delta != 1 and count() > 1
output:
write: console
The Pipe will output nothing, but we can now provoke output
by putting gaps into time.txt
(1592928061
and 1592928062
have been removed):
1592928057
1592928058
1592928059
1592928060
1592928063
1592928064
1592928065
1592928066
The resulting output
:
{"epoch":1592928063,"delta":3,"elapsed":0}
This is an example of a Pipe monitoring data and detecting anomalies. Therefore, the best way to build non-trivial Pipes is step-by-step.
It’s possible to override the input
of a Pipe. This is useful for testing Pipes, which get their input
from other sources e.g., HTTP requests, TCP etc. For instance, the version of epoch_stream.yml
that ends with the stream
action, can be fed a single timestamp:
$> hotrod pipes run -f epoch_stream.yml --input 1592928057
The resulting output
:
{"epoch":1592928057,"delta":1592928057,"elapsed":0}
Using --input @times.txt
gives us the same result as before — if the argument starts with @
, it is considered a file which contains the data (same as curl
, see man curl
).
Use --output <filename>
to save the output
to a file.
Using trace
in a Pipe will show you how each step transforms the data:
name: epoch_stream
input:
# Step 0.
exec:
command: cat times.txt
actions:
# Step 1.
- rename:
- _raw: epoch
# Step 2.
- convert:
- epoch: num
# Step 3.
- stream:
operation: delta
watch: epoch
output:
# Step 4.
write: console
Run the last example with tracing enabled:
$> hotrod pipes run -f epoch_stream.yml --input 1592928057 --trace
[TRACE] action-rename step 1
LINE: {"_raw":"1592928057"} -> [{"epoch":"1592928057"}]
[TRACE] action-convert step 2
LINE: {"epoch":"1592928057"} -> [{"epoch":1592928057}]
[TRACE] action-stream step 3
LINE: {"epoch":1592928057} -> [{"epoch":1592928057,"delta":1592928057,"elapsed":0}]
[TRACE] output-exec step 4
LINE: {"epoch":1592928057,"delta":1592928057,"elapsed":0}
{"epoch":1592928057,"delta":1592928057,"elapsed":0}