Transactions and Transitions
The pipe model involves a stream of inputs coming from a command or some other source. We sometimes need to separate it into groups.
Using exec-batch
Consider a pipe that monitors disk usage with df
:
scratch$ df -x tmpfs
Filesystem 1K-blocks Used Available Use% Mounted on
udev 8120300 0 8120300 0% /dev
/dev/sda2 122030736 102279252 13509644 89% /
/dev/sda1 523248 6228 517020 2% /boot/efi
/dev/sdb1 960380628 366946504 544579664 41% /home/steve/hd
This will run occasionally and extract the 'Mounted', 'Used' and 'Available' fields.
exec batch
here marks the first and last field specially:
name: batch
input:
exec:
command: df -x tmpfs
interval: 1m
batch:
begin-marker-field: begin
end-marker-field: end
output:
write: console
# .....
{"_raw":"Filesystem 1K-blocks Used Available Use% Mounted on","begin":true}
{"_raw":"udev 8120300 0 8120300 0% /dev"}
{"_raw":"/dev/sda2 122030736 102287196 13501700 89% /"}
{"_raw":"/dev/sda1 523248 6228 517020 2% /boot/efi"}
{"_raw":"/dev/sdb1 960380628 367768744 543757424 41% /home/steve/hd","end":true}
expand
can extract these fields with a delim: ' '
(which matches any whitespace characters) and can be told to ignore the header line with header: true
.
However, that will only work the first time - So to drop every column name row:
name: df-cols
input:
exec:
command: df -x tmpfs
batch:
begin-marker-field: begin
interval: 2s
actions:
- expand:
input-field: _raw
delim: ' '
remove: true
begin-marker-field: begin
csv:
header: true
relaxed-schema: true
output:
write: console
# .....
{"Filesystem":"udev","1K-blocks":8120300,"Used":0,"Available":8120300,"Use%":"0%","Mounted":"/dev"}
{"Filesystem":"/dev/sda2","1K-blocks":122030736,"Used":102288448,"Available":13500448,"Use%":"89%","Mounted":"/"}
{"Filesystem":"/dev/sda1","1K-blocks":523248,"Used":6228,"Available":517020,"Use%":"2%","Mounted":"/boot/efi"}
{"Filesystem":"/dev/sdb1","1K-blocks":960380628,"Used":367768868,"Available":543757300,"Use%":"41%","Mounted":"/home/steve/hd"}
expand begin-marker-field
will ensure that the column line will be passed over. (relaxed-schema
prevents the extra column "on" being a problem.)
You can now drop/rename any fields.
Detecting Stalls And Grouping by Field
Sometimes data is not scheduled as with exec
or http-poll
. With the tcp
input, data can
arrive at any time. It is useful to know when and if the data stops flowing.
name: stalled1
input:
tcp:
address: localhost:2020
plain: true
actions:
- stalled:
timeout: 2s
marker: [STALL]
output:
write: console
If there is more than 2s between events, then another event is created that would look like
{"_marker":"STALL","streaming":"no"}
. When the data starts flowing again, we get
{"_marker":"stalled","streaming":"yes"}
.
Now, imagine that various sources are connecting and writing events to port 2020. Each of the
sources identifies itself with a particular field value. The task is to detect when any particular
source stops sending data. In this example, the field is b
and there are two sources, "one" and "two",
sending data every 100ms:
{"a":1,"b":"one"}
{"a":2,"b":"one"}
{"a":3,"b":"one"}
{"a":4,"b":"one"}
{"a":5,"b":"two"}
{"a":6,"b":"two"}
{"a":7,"b":"two"}
{"a":8,"b":"one"}
{"a":9,"b":"one"}
{"a":10,"b":"one"}
{"a":11,"b":"one"}
####
- stalled:
timeout: 200ms
marker: [stalled]
group-by: b
####
{"_marker":"stalled","streaming":"yes","stalled":"one"}
{"a":1,"b":"one"}
{"a":2,"b":"one"}
{"a":3,"b":"one"}
{"a":4,"b":"one"}
{"_marker":"stalled","streaming":"yes","stalled":"two"}
{"a":5,"b":"two"}
{"a":6,"b":"two"}
{"_marker":"stalled","streaming":"no","stalled":"one"}
{"a":7,"b":"two"}
{"_marker":"stalled","streaming":"yes","stalled":"one"}
{"a":8,"b":"one"}
{"a":9,"b":"one"}
{"a":10,"b":"one"}
{"_marker":"stalled","streaming":"no","stalled":"two"}
{"a":11,"b":"one"}
The two sources are treated as separate groups - "one" is flowing away, but "two" was late.
Collecting Groups Together with Transaction
transaction
collects similar records together. Using
the output of stalled
above we collect sequences of events into arrays.
- transaction:
timeout: [stalled]
marker: [TRANS]
group-by: b
- remove: [duration,complete]
####
{"_marker":"TRANS","recs":[
{"a":1,"b":"one"},
{"a":2,"b":"one"},
{"a":3,"b":"one"},
{"a":4,"b":"one"}
]}
{"_marker":"TRANS","recs":[
{"a":5,"b":"two"},
{"a":6,"b":"two"},
{"a":7,"b":"two"}
]}
{"_marker":"TRANS","recs":[
{"a":8,"b":"one"},
{"a":9,"b":"one"},
{"a":10,"b":"one"},
{"a":11,"b":"one"}
]}
Note that transaction
consumes the stall events!
The transaction model works well with sessions. We have a log like this:
{"action":"LOGIN","user":"Bob"}
{"action":"SEND","user":"Bob"}
{"action":"LOGIN","user":"Alice"}
{"action":"RECV","user":"Bob"}
{"action":"SEND","user":"Alice"}
{"action":"SEND-AGAIN","user":"Alice"}
{"action":"SEND-MORE","user":"Alice"}
{"action":"LOGOFF","user":"Alice"}
Users log in, perform actions, and then logout - this is a session. It starts when the field
action
matches "LOGIN", and ends when it matches "LOGOFF". start-end
defines a pair
of fields with a field name and a pattern that needs to match.
A real-world complication is that users often forget to logout, so we need a timeout as well.
timeout: 200ms
marker: [stalled]
group-by: user
- transaction:
timeout: [stalled]
marker: [TRANS]
start-end:
start: [action, LOGIN]
end: [action, LOGOFF]
group-by: user
- remove: [duration]
####
{"_marker":"TRANS","complete":false,"recs":[
{"action":"LOGIN","user":"Bob"},
{"action":"SEND","user":"Bob"},
{"action":"RECV","user":"Bob"}
]}
{"_marker":"TRANS","complete":true,"recs":[
{"action":"LOGIN","user":"Alice"},
{"action":"SEND","user":"Alice"},
{"action":"SEND-AGAIN","user":"Alice"},
{"action":"SEND-MORE","user":"Alice"},
{"action":"LOGOFF","user":"Alice"}
]}
User Bob has forgotten to logout properly and timed out, so the session has "complete":false"
.