Skip to main content
Version: 3.5.2

Transactions and Transitions

The Pipe model involves a stream of inputs that are sent either from a command or an alternative source and these inputs occasionally need to be separated into groups.

Using exec batch

Consider a Pipe that monitors disk usage with df:

$> df -x tmpfs

Filesystem 1K-blocks Used Available Use% Mounted on
udev 8120300 0 8120300 0% /dev
/dev/sda2 122030736 102279252 13509644 89% /
/dev/sda1 523248 6228 517020 2% /boot/efi
/dev/sdb1 960380628 366946504 544579664 41% /home/steve/hd

This will run intermittently and extract the Mounted, Used and Available fields. exec batch is used here to mark the first and last field:

name: batch

input:
exec:
command: df -x tmpfs
interval: 1m
batch:
begin-marker-field: begin
end-marker-field: end

output:
write: console

# Output:
# {"_raw":"Filesystem 1K-blocks Used Available Use% Mounted on","begin":true}
# {"_raw":"udev 8120300 0 8120300 0% /dev"}
# {"_raw":"/dev/sda2 122030736 102287196 13501700 89% /"}
# {"_raw":"/dev/sda1 523248 6228 517020 2% /boot/efi"}
# {"_raw":"/dev/sdb1 960380628 367768744 543757424 41% /home/steve/hd","end":true}

expand can extract these fields with delim: ' ', which matches any whitespace characters. Use header: true to ignore the header line, but this will only work on first use. Do the following to drop every row that has column names:

name: df-cols

input:
exec:
command: df -x tmpfs
batch:
begin-marker-field: begin
interval: 2s

actions:
- expand:
input-field: _raw
delim: ' '
remove: true
begin-marker-field: begin
csv:
header: true
relaxed-schema: true

output:
write: console

# Output:
# {"Filesystem":"udev","1K-blocks":8120300,"Used":0,"Available":8120300,"Use%":"0%","Mounted":"/dev"}
# {"Filesystem":"/dev/sda2","1K-blocks":122030736,"Used":102288448,"Available":13500448,"Use%":"89%","Mounted":"/"}
# {"Filesystem":"/dev/sda1","1K-blocks":523248,"Used":6228,"Available":517020,"Use%":"2%","Mounted":"/boot/efi"}
# {"Filesystem":"/dev/sdb1","1K-blocks":960380628,"Used":367768868,"Available":543757300,"Use%":"41%","Mounted":"/home/steve/hd"}

begin-marker-field ensures that the column line is passed over. relaxed-schema removes the issue related to the extra column on.

Now we are able to drop or rename fields as needed.

Detecting Stalls And Grouping by Field

There are instances where data is not scheduled as with exec or http-poll. The tcp input allows data to arrive at any time — but it's useful to know if, and when, the data stops flowing.

name: stalled1

input:
tcp:
address: localhost:2020
plain: true

actions:
- stalled:
timeout: 2s
marker: [stalled]

output:
write: console

Another event is created if there are more than 2 seconds between events. This may appear as {"_marker":"stalled","streaming":"no"}. When the data resumes its flow, we receive: {"_marker":"stalled","streaming":"yes"}.

Now, imagine that various sources are connecting and writing events to TCP port 2020. Each of the sources are identified by a particular field value, and our task is detecting when a particular source stops sending data. In this example, the field is b and there are two sources: one and two, that send data every 100ms:

# Output:
# {"a":1,"b":"one"}
# {"a":2,"b":"one"}
# {"a":3,"b":"one"}
# {"a":4,"b":"one"}
# {"a":5,"b":"two"}
# {"a":6,"b":"two"}
# {"a":7,"b":"two"}
# {"a":8,"b":"one"}
# {"a":9,"b":"one"}
# {"a":10,"b":"one"}
# {"a":11,"b":"one"}

- stalled:
timeout: 200ms
marker: [stalled]
group-by: b

# Output:
# {"_marker":"stalled","streaming":"yes","stalled":"one"}
# {"a":1,"b":"one"}
# {"a":2,"b":"one"}
# {"a":3,"b":"one"}
# {"a":4,"b":"one"}
# {"_marker":"stalled","streaming":"yes","stalled":"two"}
# {"a":5,"b":"two"}
# {"a":6,"b":"two"}
# {"_marker":"stalled","streaming":"no","stalled":"one"}
# {"a":7,"b":"two"}
# {"_marker":"stalled","streaming":"yes","stalled":"one"}
# {"a":8,"b":"one"}
# {"a":9,"b":"one"}
# {"a":10,"b":"one"}
# {"_marker":"stalled","streaming":"no","stalled":"two"}
# {"a":11,"b":"one"}

The two sources are treated as separate groups — group one is flowing away, but group two was late.

Collecting Groups Together with Transaction

transaction collects similar records. Using the above output stalled allows us to collect sequences of events into arrays.

    - transaction:
timeout: [stalled]
marker: [TRANS]
group-by: b
- remove: [duration,complete]

# Output:
# {"_marker":"TRANS","recs":[
# {"a":1,"b":"one"},
# {"a":2,"b":"one"},
# {"a":3,"b":"one"},
# {"a":4,"b":"one"}
# ]}
# {"_marker":"TRANS","recs":[
# {"a":5,"b":"two"},
# {"a":6,"b":"two"},
# {"a":7,"b":"two"}
# ]}
# {"_marker":"TRANS","recs":[
# {"a":8,"b":"one"},
# {"a":9,"b":"one"},
# {"a":10,"b":"one"},
# {"a":11,"b":"one"}
# ]}

Note that transaction consumes the stalled events.

The transaction model is well-suited to sessions, as seen in the below log:

  {"action":"LOGIN","user":"Bob"}
{"action":"SEND","user":"Bob"}
{"action":"LOGIN","user":"Alice"}
{"action":"RECV","user":"Bob"}
{"action":"SEND","user":"Alice"}
{"action":"SEND-AGAIN","user":"Alice"}
{"action":"SEND-MORE","user":"Alice"}
{"action":"LOGOFF","user":"Alice"}

A session happens as follows:

  1. Users log in.
  2. Users perform actions.
  3. Users logout.

A session starts when the action field matches LOGIN and ends when it matches LOGOFF. start-end defines a pair of fields with a field name and a matching pattern.

A real-world complication here is that users often forget to logout. A timeout is needed to prevent this.

        timeout: 200ms
marker: [stalled]
group-by: user

- transaction:
timeout: [stalled]
marker: [TRANS]
start-end:
start: [action, LOGIN]
end: [action, LOGOFF]
group-by: user

- remove: [duration]

# Output:
# {"_marker":"TRANS","complete":false,"recs":[
# {"action":"LOGIN","user":"Bob"},
# {"action":"SEND","user":"Bob"},
# {"action":"RECV","user":"Bob"}
# ]}
# {"_marker":"TRANS","complete":true,"recs":[
# {"action":"LOGIN","user":"Alice"},
# {"action":"SEND","user":"Alice"},
# {"action":"SEND-AGAIN","user":"Alice"},
# {"action":"SEND-MORE","user":"Alice"},
# {"action":"LOGOFF","user":"Alice"}
# ]}

The example above shows that user Bob has forgotten to logout properly and timed out, resulting in the session returning "complete":false".