Scheduling Inputs
The majority of inputs, such as exec
and http-poll
, schedule inputs by time. Those that don’t, schedule based on external data, e.g., tcp
.
Scheduling with Intervals
interval
allows for equal intervals between command invocations with execution starting immediately. For example, if we start at 11:54:59
with interval: 1m
, the next time will be 11:55:59
and so forth.
name: interval
input:
exec:
command: date
interval: 1m
output:
write: console
# Output:
# {"_raw":"Fri 31 Mar 2023 11:54:59 SAST"}
# {"_raw":"Fri 31 Mar 2023 11:55:59 SAST"}
# {"_raw":"Fri 31 Mar 2023 11:56:59 SAST"}
cron
enables complete control for precise scheduling.
The cron
expression uses a 6-token format. The first token, allows for scheduling seconds. Using standard 5-token (see man 5 crontab
) cron expressions will result in a parse error.
name: cron
input:
exec:
command: date
cron: '13 * * * * *'
output:
write: console
# Output:
# {"_raw":"Fri 31 Mar 2023 12:09:13 SAST"}
# {"_raw":"Fri 31 Mar 2023 12:10:13 SAST"}
# {"_raw":"Fri 31 Mar 2023 12:11:13 SAST"}
The command runs at each minute. See here for a more detailed explanation.
There are pre-defined values such as @hourly
, @daily
, @weekly
... see man 5 crontab
.
As cron
commands occur at precise clock times, it is often necessary to introduce a random-offset
. This ensures that a thousand Agents do not overwhelm an endpoint by polling it at the same time:
input:
exec:
command: date
# Each minute: 16:00:00, 16:01:00, ...
interval: 1m
# Run at: 16:00:05, 16:01:05, ...
random-offset: 5s
Notice that cron
scheduling allows for immediate: true
. This adjusts it to an immediate start regardless of the schedule.
Windowing
Windowing is a useful feature for querying
a resource that needs a specified time window.
input:
exec:
command: 'echo ${start_time_secs} ${end_time_secs}'
cron: '*/5 * * * * *'
#interval: 5s
window:
size: 60s
The end_time
is "now", which is expressed as a Unix timestamp. The start_time
is "now minus 60s".
window
also has offset
that can be used in cases where you need to wait for the data to settle, such as when Elasticsearch is ingesting real-time data. This will shift the start_time
and end_time
interval back by the offset
(same seconds unit as interval
and size
). start_time
and end_time
occur before the offset
.
If setting interval
or cron
and window
(window
: with any options), then size
must be set.
If setting interval
or cron
and window
(window
: with no options), then size
is automatically set.
In addition to windowing intervals, the scheduler (either cron
or interval
), supports start-time
. This allows the windowing to start at a specified time — which must be in the following format: 2019-07-10 18:45:00.000 +02:00
. start-time
requires highwatermark-file
, which enables us to track the last successful window time and resume from the last point.
This means that if a Pipe using highwatermark-file
is stopped and restarted, the scheduler will "catch up" and run backfills for any intervals that were missing. offset
is ignored with start-time
until the backfill is completed with "now" time being the current time, then offset
is applied once again.
start-time
:
- in the future results in an error
- and
highwatermark-file
are mutually inclusive, else error - results in
offset
being ignored with backfills - is ignored if
highwatermark-file
contains a timestamp
interval
and cron
are mutually exclusive, both require size
when window
is set with any options
Scheduling Variables
Special variables are available, including those that support custom time formats, e.g., ${start_time_fmt %F %T}
will give a local date-time such as 2019-09-28 15:36:02
. The start_*
and end_*
refer to the current window.
now_time_secs
: current time in seconds since epochnow_time_msecs
: current time in milliseconds since epochnow_time_iso
: current time in "%Y-%m-%dT%H:%M:%SZ" formatstart_time_secs
: start time in seconds since epochstart_time_msecs
: start time in milliseconds since epochstart_time_iso
: start time in "%Y-%m-%dT%H:%M:%SZ" formatend_time_secs
: end time in seconds since epochend_time_msecs
: end time in seconds since epochend_time_iso
: end time in "%Y-%m-%dT%H:%M:%SZ" formatnow_time_fmt <fmt>
: e.g.,${now_time_fmt %F %T}
start_time_fmt <fmt>
: e.g.,${start_time_fmt %Y-%m-%dT%H:%M:%SZ}
end_time_fmt <fmt>
: e.g.,${end_time_fmt %s}
This example shows these special fields in use. We need start_time
and end_time
in a particular format for a complex VoIP statistics endpoint:
context:
address: http://XXXX:4444
parms:
- filter.application=rtp
- filter.custom_filter=ip=X.X.X.X
- info.aggregate_level=precise
- pager.nb_elements=10000
- sorter.order_column=capture_begin
- sorter.order_dir=sort_asc
- filter.capture_begin='${start_time_fmt %F%%20%T}'
- filter.capture_end='${end_time_fmt %F%%20%T}'
path: /nevrax/network/conv_voip_details.html/voip_table_descriptor/table.csv
password: XXX:XXX
filter: "sed -e 's/-;/0;/g'|sed -e 's/;user/ and user/g'|sed -e 's/\"//g'"
input:
exec:
command: "curl -k -u {{password}} '{{address}}{{path}}?{{parms '&'}}' | {{filter}}"
interval: 10s
window:
size: 10s
offset: 2s
...
This illustrates how Context fields can make complicated commands clearer. Also note how the query parameters are constructed by joining the parms
array with a '&'
.
It is recommended to use http-post
wherever possible, however, the principles remain the same.
Marking Batches
Scheduled commands introduce the concept of a batch
of events that occur together at intervals. A uuid
field is added to each batch. If the data was JSON (i.e., json: true
), then the field is merged into the incoming event.
For example, Netter Corp has a secure Linux host, secure.netter.lan
with the following restrictions:
- console (
tty
) login only (no remote access) auditd
log in/var/log/audit/auditd.log
- host is rebooted every 24 hours to enforce security policy
The main issue is, Linux PAM (Pluggable Authentication Modules) session IDs begin at 1 after each reboot (ses=1
in audit.log
). Between reboots we have duplicate IDs. There is a need to assign a uuid
to distinguish sessions with the same IDs. The log is rotated at each reboot too.
A uuid
field is added to each session batch. If the data was JSON (i.e., json: true
), then the field is merged into the incoming event. Assume for now we obtain the session ID via an external mechanism (see man pam_exec
) and pass it as a context
variable (e.g., session=9
) to the Pipe. pam_exec
will obtain the session ID upon user logout at session close time and run hotrod context tag ausearch session=11
. The Pipe with a ausearch
tag will be updated witht the fresh context and restarted by the Agent after polling the Server for updates. Now, strictly speaking, this is not a scheduled input like the above examples as the input interval is determined outside of the Pipe.
If the session ID is not found (possible log lag or other delays), Exited(1): <no matches>
is the ausearch
output. The Pipe will retry
for a count
of 100
with a pause
of 10s
between retries. There is no need for a command
count
or interval
as only 1 successful command run is required. The ausearch
argument (see man ausearch
) --start $(date '+%x')
, is today's date (locale date representation, see man date
).
The Pipe definition:
name: ausearch_batch
context:
session: <int>
input:
exec:
command: sudo ausearch -m USER_START,USER_END,USER_CMD,USER_LOGIN,USER_LOGOUT,LOGIN --format text --session {{session}} --start $(date '+%x')
retry:
count: 100
pause: 10s
ignore-line-breaks: true
no-strip-linefeeds: true
batch:
uuid-field: uuid
output:
file:
path: /tmp/ausearch_batch-${now_time_fmt %s}.log
Run the Pipe:
$> hotrod pipes run --file ausearch_batch.yml session=39 2 ↵
2023-05-02T11:42:08.846Z INFO pipe > shutting down aggregator
The resulting output in /tmp/ausearch_batch-1683028631.log
(by default, sudo
events are not logged):
{"_raw":"At 13:56:28 02/05/2023 system, acting as root, successfully changed-login-id-to robertg ","uuid":"cf54cb34-f216-4fc4-9333-be7b66c4e4e5"}
{"_raw":"At 13:56:28 02/05/2023 robertg, acting as root, successfully started-session /dev/tty1 using /usr/bin/login","uuid":"cf54cb34-f216-4fc4-9333-be7b66c4e4e5"}
{"_raw":"At 13:56:28 02/05/2023 robertg successfully logged-in tty1 using /usr/bin/login","uuid":"cf54cb34-f216-4fc4-9333-be7b66c4e4e5"}
{"_raw":"At 13:56:34 02/05/2023 robertg, acting as root, successfully ended-session /dev/tty1 ","uuid":"cf54cb34-f216-4fc4-9333-be7b66c4e4e5"}
The available batch
fields. Note that while all are optional, a minimum of one must be specified:
uuid-field
: field containing unique ID for each batchinvocation-time-field
: field containing the start time of the batchcompletion-time-field
: field containing the end time of the batchbegin-marker-field
: field set totrue
in first eventend-marker-field
: field set totrue
in last eventline-count-field
: each event gets the size of the batchline-num-field
: each event gets the line number within the batch
The markers are particularly useful, as evidenced in cases where you may want to only see the last line from each run (e.g., the session end time):
name: ausearch_batch
context:
session: <int>
input:
exec:
command: sudo ausearch -m USER_START,USER_END,USER_CMD,USER_LOGIN,USER_LOGOUT,LOGIN --format text --session {{session}} --start $(date '+%x')
retry:
count: 100
pause: 10s
ignore-line-breaks: true
no-strip-linefeeds: true
batch:
uuid-field: uuid
end-marker-field: last_line
actions:
- filter:
condition: last_line
output:
file:
path: /tmp/ausearch_batch-${now_time_fmt %s}.log
The resulting output:
{"_raw":"At 13:56:34 02/05/2023 robertg, acting as root, successfully ended-session /dev/tty1 ","last_line":true,"uuid":"fc074085-5975-448d-932f-7ef5d7007167"}
The filter
condition
is a Lua expression.
There are work-arounds for certain restrictions imposed by the use of Lua here. For example: We cannot say last-line
or end
because last-line
isn't a valid Lua identifier and end
is a Lua keyword.
There is always another approach — we can filter by schema, which is not restricted by Lua rules:
name: ausearch_batch
context:
session: <int>
input:
exec:
command: sudo ausearch -m USER_START,USER_END,USER_CMD,USER_LOGIN,USER_LOGOUT,LOGIN --format text --session {{session}} --start $(date '+%x')
retry:
count: 100
pause: 10s
ignore-line-breaks: true
no-strip-linefeeds: true
batch:
uuid-field: uuid
end-marker-field: last_line
actions:
- filter:
schema:
- _raw
- last_line
output:
file:
path: /tmp/ausearch_batch-${now_time_fmt %s}.log
The resulting output is the same as using condition
:
{"_raw":"At 13:56:34 02/05/2023 robertg, acting as root, successfully ended-session /dev/tty1 ","last_line":true,"uuid":"fc074085-5975-448d-932f-7ef5d7007167"}
Using both markers allows us to collect all the lines generated into an array, using transaction
:
name: ausearch_batch
context:
session: <int>
input:
exec:
command: sudo ausearch -m USER_START,USER_END,USER_CMD,USER_LOGIN,USER_LOGOUT,LOGIN --format text --session {{session}} --start $(date '+%x')
retry:
count: 100
pause: 10s
ignore-line-breaks: true
no-strip-linefeeds: true
batch:
uuid-field: uuid
begin-marker-field: first_line
end-marker-field: last_line
actions:
- transaction:
start-end:
start: [first_line]
end: [last_line]
marker: [session]
output:
file:
path: /tmp/ausearch_batch-${now_time_fmt %s}.log
The resulting output:
{"_marker":"session","complete":true,"duration":0,"recs":[{"_raw":"At 13:56:28 02/05/2023 system, acting as root, successfully changed-login-id-to robertg ","first_line":true,"uuid":"55819eec-4628-4684-abe9-0df4d97ff38e"},{"_raw":"At 13:56:28 02/05/2023 robertg, acting as root, successfully started-session /dev/tty1 using /usr/bin/login","uuid":"55819eec-4628-4684-abe9-0df4d97ff38e"},{"_raw":"At 13:56:28 02/05/2023 robertg successfully logged-in tty1 using /usr/bin/login","uuid":"55819eec-4628-4684-abe9-0df4d97ff38e"},{"_raw":"At 13:56:34 02/05/2023 robertg, acting as root, successfully ended-session /dev/tty1 ","last_line":true,"uuid":"55819eec-4628-4684-abe9-0df4d97ff38e"}]}
More readable output:
$> jq </tmp/ausearch_batch-1683031997.log
{
"_marker": "session",
"complete": true,
"duration": 0,
"recs": [
{
"_raw": "At 13:56:28 02/05/2023 system, acting as root, successfully changed-login-id-to robertg ",
"first_line": true,
"uuid": "55819eec-4628-4684-abe9-0df4d97ff38e"
},
{
"_raw": "At 13:56:28 02/05/2023 robertg, acting as root, successfully started-session /dev/tty1 using /usr/bin/login",
"uuid": "55819eec-4628-4684-abe9-0df4d97ff38e"
},
{
"_raw": "At 13:56:28 02/05/2023 robertg successfully logged-in tty1 using /usr/bin/login",
"uuid": "55819eec-4628-4684-abe9-0df4d97ff38e"
},
{
"_raw": "At 13:56:34 02/05/2023 robertg, acting as root, successfully ended-session /dev/tty1 ",
"last_line": true,
"uuid": "55819eec-4628-4684-abe9-0df4d97ff38e"
}
]
}
This returns a single generated event {"_marker":"session","recs":[...]}
with the recs
array containing the entire batch for the user login, session events, and logout.
Extending the example, Pipe and Agent tags could be used to update the context pushed from secure.netter.lan
, via the Server, and another Agent, on syslog.netter.lan
, can poll for the updated context (session=<int>
). The Pipe will run accordingly to enrich the session events, write to file and possibly be ingested for further analytics. For secure environments, off-host logging can be enabled and processed accordingly on the remote host via an Agent that receives a session ID upon logoff via PAM.