Skip to main content
Version: 3.3.0

Transactions and Transitions

The pipe model involves a stream of inputs coming from a command or some other source. We sometimes need to separate it into groups.

Using exec-batch

Consider a pipe that monitors disk usage with df:

scratch$ df -x tmpfs
Filesystem 1K-blocks Used Available Use% Mounted on
udev 8120300 0 8120300 0% /dev
/dev/sda2 122030736 102279252 13509644 89% /
/dev/sda1 523248 6228 517020 2% /boot/efi
/dev/sdb1 960380628 366946504 544579664 41% /home/steve/hd

This will run occasionally and extract the 'Mounted', 'Used' and 'Available' fields. exec batch here marks the first and last field specially:

name: batch
input:
exec:
command: df -x tmpfs
interval: 1m
batch:
begin-marker-field: begin
end-marker-field: end
output:
write: console
# .....
{"_raw":"Filesystem 1K-blocks Used Available Use% Mounted on","begin":true}
{"_raw":"udev 8120300 0 8120300 0% /dev"}
{"_raw":"/dev/sda2 122030736 102287196 13501700 89% /"}
{"_raw":"/dev/sda1 523248 6228 517020 2% /boot/efi"}
{"_raw":"/dev/sdb1 960380628 367768744 543757424 41% /home/steve/hd","end":true}

expand can extract these fields with a delim: ' ' (which matches any whitespace characters) and can be told to ignore the header line with header: true. However, that will only work the first time - So to drop every column name row:

name: df-cols
input:
exec:
command: df -x tmpfs
batch:
begin-marker-field: begin
interval: 2s
actions:
- expand:
input-field: _raw
delim: ' '
remove: true
begin-marker-field: begin
csv:
header: true
relaxed-schema: true
output:
write: console
# .....
{"Filesystem":"udev","1K-blocks":8120300,"Used":0,"Available":8120300,"Use%":"0%","Mounted":"/dev"}
{"Filesystem":"/dev/sda2","1K-blocks":122030736,"Used":102288448,"Available":13500448,"Use%":"89%","Mounted":"/"}
{"Filesystem":"/dev/sda1","1K-blocks":523248,"Used":6228,"Available":517020,"Use%":"2%","Mounted":"/boot/efi"}
{"Filesystem":"/dev/sdb1","1K-blocks":960380628,"Used":367768868,"Available":543757300,"Use%":"41%","Mounted":"/home/steve/hd"}

expand begin-marker-field will ensure that the column line will be passed over. (relaxed-schema prevents the extra column "on" being a problem.)

You can now drop/rename any fields.

Detecting Stalls And Grouping by Field

Sometimes data is not scheduled as with exec or http-poll. With the tcp input, data can arrive at any time. It is useful to know when and if the data stops flowing.

name: stalled1
input:
tcp:
address: localhost:2020
plain: true
actions:
- stalled:
timeout: 2s
marker: [STALL]
output:
write: console

If there is more than 2s between events, then another event is created that would look like {"_marker":"STALL","streaming":"no"}. When the data starts flowing again, we get {"_marker":"stalled","streaming":"yes"}.

Now, imagine that various sources are connecting and writing events to port 2020. Each of the sources identifies itself with a particular field value. The task is to detect when any particular source stops sending data. In this example, the field is b and there are two sources, "one" and "two", sending data every 100ms:

    {"a":1,"b":"one"}
{"a":2,"b":"one"}
{"a":3,"b":"one"}
{"a":4,"b":"one"}
{"a":5,"b":"two"}
{"a":6,"b":"two"}
{"a":7,"b":"two"}
{"a":8,"b":"one"}
{"a":9,"b":"one"}
{"a":10,"b":"one"}
{"a":11,"b":"one"}
####
- stalled:
timeout: 200ms
marker: [stalled]
group-by: b
####
{"_marker":"stalled","streaming":"yes","stalled":"one"}
{"a":1,"b":"one"}
{"a":2,"b":"one"}
{"a":3,"b":"one"}
{"a":4,"b":"one"}
{"_marker":"stalled","streaming":"yes","stalled":"two"}
{"a":5,"b":"two"}
{"a":6,"b":"two"}
{"_marker":"stalled","streaming":"no","stalled":"one"}
{"a":7,"b":"two"}
{"_marker":"stalled","streaming":"yes","stalled":"one"}
{"a":8,"b":"one"}
{"a":9,"b":"one"}
{"a":10,"b":"one"}
{"_marker":"stalled","streaming":"no","stalled":"two"}
{"a":11,"b":"one"}

The two sources are treated as separate groups - "one" is flowing away, but "two" was late.

Collecting Groups Together with Transaction

transaction collects similar records together. Using the output of stalled above we collect sequences of events into arrays.

    - transaction:
timeout: [stalled]
marker: [TRANS]
group-by: b
- remove: [duration,complete]
####
{"_marker":"TRANS","recs":[
{"a":1,"b":"one"},
{"a":2,"b":"one"},
{"a":3,"b":"one"},
{"a":4,"b":"one"}
]}
{"_marker":"TRANS","recs":[
{"a":5,"b":"two"},
{"a":6,"b":"two"},
{"a":7,"b":"two"}
]}
{"_marker":"TRANS","recs":[
{"a":8,"b":"one"},
{"a":9,"b":"one"},
{"a":10,"b":"one"},
{"a":11,"b":"one"}
]}

Note that transaction consumes the stall events!

The transaction model works well with sessions. We have a log like this:

  {"action":"LOGIN","user":"Bob"}
{"action":"SEND","user":"Bob"}
{"action":"LOGIN","user":"Alice"}
{"action":"RECV","user":"Bob"}
{"action":"SEND","user":"Alice"}
{"action":"SEND-AGAIN","user":"Alice"}
{"action":"SEND-MORE","user":"Alice"}
{"action":"LOGOFF","user":"Alice"}

Users log in, perform actions, and then logout - this is a session. It starts when the field action matches "LOGIN", and ends when it matches "LOGOFF". start-end defines a pair of fields with a field name and a pattern that needs to match.

A real-world complication is that users often forget to logout, so we need a timeout as well.

        timeout: 200ms
marker: [stalled]
group-by: user
- transaction:
timeout: [stalled]
marker: [TRANS]
start-end:
start: [action, LOGIN]
end: [action, LOGOFF]
group-by: user
- remove: [duration]
####
{"_marker":"TRANS","complete":false,"recs":[
{"action":"LOGIN","user":"Bob"},
{"action":"SEND","user":"Bob"},
{"action":"RECV","user":"Bob"}
]}
{"_marker":"TRANS","complete":true,"recs":[
{"action":"LOGIN","user":"Alice"},
{"action":"SEND","user":"Alice"},
{"action":"SEND-AGAIN","user":"Alice"},
{"action":"SEND-MORE","user":"Alice"},
{"action":"LOGOFF","user":"Alice"}
]}

User Bob has forgotten to logout properly and timed out, so the session has "complete":false".