Skip to main content
Version: 3.5.3

Testing Pipes

It's useful to test Pipes, in their entirety or sections, prior to acually deploying them.

Consider this basic Pipe that has the following context expansion:

name: echo_msg

context:
msg: hello

input:
exec:
command: 'echo {{msg}}'

output:
write: console

Run the Pipe:

$> hotrod pipes run --file test.yml

{"_raw":"hello"}

To experiment with overriding context, create echo_msg_context.yml, containing a context msg:

context:
msg: goodbye

Then, override the Pipe context:

$> hotrod pipes run --file test.yml --context-files echo_msg_context.yml

{"_raw":"goodbye"}

Experimenting with Pipes in this way builds confidence. For the more complicated Pipes, it is always easier to troubleshoot problems on localhost.

The following Pipe consumes a file times.txt containing times in Unix time (epoch) format. The Pipe must be used to detect any gaps in this time record:

1592928057
1592928058
1592928059
1592928060
1592928061
1592928062
1592928063
1592928064
1592928065
1592928066

The time is depicted as a string with the default field _raw. We rename and convert it into a number:

name: epoch_stream

input:
exec:
command: cat times.txt

actions:
- rename:
- _raw: epoch

- convert:
- epoch: num

output:
write: console

Run the Pipe:

$> hotrod pipes run -f epoch_stream.yml

{"epoch":1592928057}
{"epoch":1592928058}
{"epoch":1592928059}
{"epoch":1592928060}
{"epoch":1592928061}
{"epoch":1592928062}
{"epoch":1592928063}
{"epoch":1592928064}
{"epoch":1592928065}
{"epoch":1592928066}

Use the stream action to find the difference between times (just after convert):

name: epoch_stream

input:
exec:
command: cat times.txt

actions:
- rename:
- _raw: epoch

- convert:
- epoch: num

- stream:
operation: delta
watch: epoch

output:
write: console

The resulting output:

{"epoch":1592928057,"delta":1592928057,"elapsed":0}
{"epoch":1592928058,"delta":1,"elapsed":0}
{"epoch":1592928059,"delta":1,"elapsed":0}
{"epoch":1592928060,"delta":1,"elapsed":0}
{"epoch":1592928061,"delta":1,"elapsed":0}
{"epoch":1592928062,"delta":1,"elapsed":0}
{"epoch":1592928063,"delta":1,"elapsed":0}
{"epoch":1592928064,"delta":1,"elapsed":0}
{"epoch":1592928065,"delta":1,"elapsed":0}
{"epoch":1592928066,"delta":1,"elapsed":0}

Since 1592928057 has no preceeding time, the delta is 1592928057, thereafter we have a delta of 1 for each subsequent line as the time difference between each line is 1s.

To detect gaps, only pass events through where delta is not equal to 1 and where not the first event using the filter action:

name: epoch_stream

input:
exec:
command: cat times.txt

actions:
- rename:
- _raw: epoch

- convert:
- epoch: num

- stream:
operation: delta
watch: epoch

- filter:
condition: delta != 1 and count() > 1

output:
write: console

The Pipe will output nothing, but we can now provoke output by putting gaps into time.txt (1592928061 and 1592928062 have been removed):

1592928057
1592928058
1592928059
1592928060
1592928063
1592928064
1592928065
1592928066

The resulting output:

{"epoch":1592928063,"delta":3,"elapsed":0}

This is an example of a Pipe monitoring data and detecting anomalies. Therefore, the best way to build non-trivial Pipes is step-by-step.

It’s possible to override the input of a Pipe. This is useful for testing Pipes, which get their input from other sources e.g., HTTP requests, TCP etc. For instance, the version of epoch_stream.yml that ends with the stream action, can be fed a single timestamp:

$> hotrod pipes run -f epoch_stream.yml --input 1592928057

The resulting output:

{"epoch":1592928057,"delta":1592928057,"elapsed":0}

Using --input @times.txt gives us the same result as before — if the argument starts with @, it is considered a file which contains the data (same as curl, see man curl).

Use --output <filename> to save the output to a file.

Using trace in a Pipe will show you how each step transforms the data:

name: epoch_stream

input:
# Step 0.
exec:
command: cat times.txt

actions:
# Step 1.
- rename:
- _raw: epoch

# Step 2.
- convert:
- epoch: num

# Step 3.
- stream:
operation: delta
watch: epoch

output:
# Step 4.
write: console

Run the last example with tracing enabled:

$> hotrod pipes run -f epoch_stream.yml --input 1592928057 --trace

[TRACE] action-rename step 1
LINE: {"_raw":"1592928057"} -> [{"epoch":"1592928057"}]
[TRACE] action-convert step 2
LINE: {"epoch":"1592928057"} -> [{"epoch":1592928057}]
[TRACE] action-stream step 3
LINE: {"epoch":1592928057} -> [{"epoch":1592928057,"delta":1592928057,"elapsed":0}]
[TRACE] output-exec step 4
LINE: {"epoch":1592928057,"delta":1592928057,"elapsed":0}
{"epoch":1592928057,"delta":1592928057,"elapsed":0}