Skip to main content
Version: 3.3.0

Pipes

A Pipe - or a data processing pipeline - represents a specific data processing workload, and is described with the Hotrod Pipe Language.

To make data processing more repeatable and accessible, the pipe language provides a standardized approach to defining what a particular data processing pipeline must do.

PipesAnimation

Pipes are defined in YAML format. Here's a simple pipe that makes an HTTP request, parses a JSON response, renames a field, and writes some output to a file:

name: Simple HTTP Example

input:
http-poll:
address: "http://httpbin.org/get"
raw: true

actions:
- rename:
fields:
- origin: source_ip

output:
file:
path: "/tmp/output.log"
input-field: out

All pipe definitions follow the same pattern. An input specifies where data comes from. Any number of optional actions defines the sequential transformations and manipulations on the input data. An output tells the pipe where the processed data must be delivered to.

A pipe definition is declarative: you specify what needs to happen, and leave the how it must happen part to the Hotrod runtime. In addition to data manipulation, transformation, parsing and extraction capabilities, advanced features for scheduling, batching and enrichment is also available to make dealing with streaming data simpler.

Pipe definitions are attached to one or more Hotrod Agent, which is the runtime that executes one or more pipes. Deploying pipes to agents are performed through Hotrod Server's web-based UI, it's command line interface, or it's HTTP API.

Here's another example of a pipe that executes a command on a Linux server:

name: uptime
input:
exec:
command: uptime
interval: 10s
actions:
- extract:
input-field: _raw
remove: true
pattern: 'load average: (\S+), (\S+), (\S+)'
output-fields: [m1, m5, m15]
output:
write: console

In this example, input data comes from executing the Unix uptime command every 10 seconds; the load averages (1min, 5min, 15min) are extracted using regular expressions from the input line, using the extract action. We then write the results out to the console.

So from a typical input '09:07:36 up 16 min, 1 user, load average: 0.08, 0.23, 0.31' we get a JSON record that looks like {"m1":"0.08","m5":"0.23","m15":"0.31"}.

Time series events normally have an associated time, and must be in an appropriate form to be processed and queried by data analytics systems. If our actions steps were:

- extract:
input-field: _raw
remove: true
pattern: 'load average: (\S+), (\S+), (\S+)'
output-fields: [m1, m5, m15]
- convert:
- m1: num
- m5: num
- m15: num
- time:
output-field: '@timestamp'

Then the result looks like this:

{"m1":0.08,"m5":0.23,"m15":0.31,"@timestamp":"2018-08-21T09:30:36.000Z"}

These records can then be indexed by Elasticsearch and queried with tools such as Kibana or Grafana.

So a pipe is "input -> action1 -> action2 -> ... -> output". Note that actions are optional, with the only required top-level fields being nameinput, and output.

Most action steps work with JSON data. Although real-world data usually arrives as raw text, the exec input will by default 'quote' it as JSON, like {"_raw":"output line"}. The various extraction steps work on this raw data and generate sensible event fields.

Here are the available inputs and outputs, with the possible processing steps documented here.

Adding Pipes to the Hotrod System

Below, we follow the Hotrod Command Line process, to do this using the Hotrod User Interface instead, refer to our quickstart guide.

Assuming you are logged in, then:

$ hotrod agents add Joburg
agent Joburg has id e94ccdca-f379-447b-8c90-6976e77652ec
$ hotrod agents add Durban
agent Durban has id 8a1a0a29-d8f8-4098-a016-9d08f841f9a4
$ hotrod agents list
name | id | tags | pipes | last seen
-------+-------------------------------------- +------+-------+-----------
Joburg | e94ccdca-f379-447b-8c90-6976e77652ec | | |
Durban | 8a1a0a29-d8f8-4098-a016-9d08f841f9a4 | | |

Unless you explicitly specify --id with agents add, new agents will be assigned a uuid unique identifier. Agent names and ids must be unique.

A pipe definition is a file NAME.yml, where NAME is the provided name of the pipe. Pipe names must be unique, and the hotrod tool will also enforce that the name of the file matches. To enable versioning or annotation of a pipe definition, additional full stops with some text are allowed in front of the extension, for example: NAME.testing.yml or NAME.0.1.yml.

A pipe is loaded with the pipes add subcommand:

$ hotrod pipes add --file uptime.yml
$ hotrod pipes list
name
------
uptime
$ hotrod pipes show uptime
name: uptime
input:
exec:
command: uptime
interval: 2s
actions:
- extract:
input-field: _raw
remove: true
pattern: 'load average: (\S+), (\S+), (\S+)'
output-fields: [m1, m5, m15]
- convert:
- m1: num
- m5: num
- m15: num
- time:
output-field: '@timestamp'
output:
write: console

This pipe is not associated with any agent yet, so we update a particular agent:

$ hotrod agents update Joburg --add-pipe uptime
$ hotrod agents list
name | id | tags | pipes | last seen
-------+-------------------------------------- +------+--------+-----------
Joburg | e94ccdca-f379-447b-8c90-6976e77652ec | | uptime |
Durban | 8a1a0a29-d8f8-4098-a016-9d08f841f9a4 | | |

The pipe is now staged for the "Joburg" agent, and will be deployed in a short while using the configured hotrod agent.

If you do update Joburg --remove-pipe uptime then it will be removed from the staging area, and stopped by the hotrod agent.