Scheduling Inputs
Many inputs, like exec
and http-poll
schedule inputs by time. (The others are waiting for some external data, like tcp
)
Scheduling with Intervals
With interval
, the intervals between command invocations are equal, but execution starts immediately.
If we start at 12:30:15 with one minute interval, then the next time will be 12:31:15 and so on.
If you need full control of poll at precise times, then use cron
scheduling.
Hotrod's cron
expressions uses a 6-token format, to allow scheduling seconds. Using standard 5-token cron
expressions will result in a parse error.
name: cron
input:
exec:
command: date
cron: '0 * * * * *'
output:
write: console
# output:
{"_raw":"Thu Jun 6 12:55:00 SAST 2019"}
{"_raw":"Thu Jun 6 12:56:00 SAST 2019"}
{"_raw":"Thu Jun 6 12:57:00 SAST 2019"}
The command runs at each minute.
See this guide for more. Note the special pre-defined values like '@hourly', '@daily', etc.
Since cron
commands occur at precise clock times, there's often a need to offset them by a random interval,
so that for instance a thousand agents do not overwhelm an endpoint by polling it at the same time.
input:
exec:
command: date
interval: 1m # on every minute (16:00:00, 16:01:00, etc)
random-offset: 5s (so we'll actually run at 16:00:02, and so forth)
Please note that you can say immediate: true
with cron
scheduling. Whatever the schedule, it will be adjusted
to start immediately.
Windowing
Windowing is a very useful feature if you are querying a resource that needs a time window to be specified.
input:
exec:
command: 'echo ${start_time_secs} ${end_time_secs}'
cron: '0 * * * * *'
window:
size: 1m
The end time is now, expressed as a Unix timestamp, and the start time is now minus 60s.
window
also has offset
- this is when you would like to wait for the data to settle
(for instance, for Elasticsearch to ingest some real-time data). This will shift the (start_time,end_time)
interval back by the offset (same units as interval and size).
If you provide interval
, then window.size
is automatically set.
In addition to windowing intervals the cron
supports a start-time
, which allows the windowing to
start at a specified time, this time has to be in the following format: 2019-07-10 18:45:00.000 +02:00
. A
highwatermark-file
file can be specified for cron
to track the last successful window and
resume from the high water mark point.
So, if a pipe using a high-water mark file is stopped and restarted, then the scheduler will "catch up" and run for any intervals that were missing.
Note: start_time
will be ignored unless combined with highwatermark-file
.
start_time
causes offset
to be ignored.
Scheduling Variables
Note that special variables are available; there is support for custom time formats.
For example, ${start_time_fmt %F %T}
will give a local date-time like "2019-09-28 15:36:02".
The start and end refer to the current window.
now_time_secs
=> current time in seconds since epoch,now_time_msecs
=> current time in milliseconds since epoch,now_time_iso
=> current time in "%Y-%m-%dT%H:%M:%SZ" format,start_time_secs
=> start time in seconds since epoch,start_time_msecs
=> start time in milliseconds since epoch,start_time_iso
=> start time in "%Y-%m-%dT%H:%M:%SZ" format,end_time_secs
=> end time in seconds since epoch,end_time_msecs
=> end time in seconds since epoch,end_time_iso
=> end time in "%Y-%m-%dT%H:%M:%SZ") format,now_time_fmt <fmt>
=> e.g.${now_time_fmt %F %T}
start_time_fmt <fmt>
=> e.g${start_time_fmt %Y-%m-%dT%H:%M:%SZ}
end_time_fmt <fmt>
=> e.g${end_time_fmt %s}
This example shows these special fields in use, where we need start_time
and end_time
in a particular
format for a rather messy VOIP statistics endpoint.
context:
address: http://XXXX:4444
parms:
- filter.application=rtp
- filter.custom_filter=ip=X.X.X.X
- info.aggregate_level=precise
- pager.nb_elements=10000
- sorter.order_column=capture_begin
- sorter.order_dir=sort_asc
- filter.capture_begin='${start_time_fmt %F%%20%T}'
- filter.capture_end='${end_time_fmt %F%%20%T}'
path: /nevrax/network/conv_voip_details.html/voip_table_descriptor/table.csv
password: XXX:XXX
filter: "sed -e 's/-;/0;/g'|sed -e 's/;user/ and user/g'|sed -e 's/\"//g'"
input:
exec:
command: "curl -k -u {{password}} '{{address}}{{path}}?{{parms '&'}}' | {{filter}}"
interval: 10s
window:
size: 10s
offset: 2s
...
This also shows how context fields can make complicated command-lines clearer - note how the query parameters
are constructed by joining the parms
array with a "&".
Of course, you should use http-post wherever possible, but the principles remain the same.
Marking Batches
With scheduled commands, comes the idea of a 'batch' of events that occur together at intervals. It's useful to mark these events as belonging to the same batch.
For example, this will attach a different UUID field to each batch. If the data was actually JSON (i.e. raw: true
)
then the field is merged into the incoming event.
input:
exec:
command: blub
interval: 10s
batch:
uuid-field: uuid
Here are the available fields - all of them are optional but at least one must be specified in batch
:
uuid-field
: field containing UUID unique for each batchinvocation-time-field
: field containing the start time of the batchcompletion-time-field
: field containing the end time of the batchbegin-marker-field
: field set totrue
in first eventend-marker-field
: field set totrue
in last eventline-count-field
: each event gets the size of the batchline-num-field
: each event gets the line number within the batch
The markers are particularly useful. Say we only want to see the last line from each run of blub
:
input:
exec:
command: blub
interval: 10s
batch:
end-marker-field: last_line
actions:
- filter:
condition: last_line
...
Note that we have work with some restrictions imposed by using Lua here - we cannot say last-line
or end
because the
first isn't a valid Lua identifier and the second is a Lua keyword.
There is always another approach - we can filter by schema (which does not care about Lua rules):
- filter:
schema: [_raw,last_line]
Using both markers we can collect all the lines generated by a scheduled command into an array using transaction
:
input:
exec:
command: blub
interval: 10s
batch:
begin-marker-field: begin
end-marker-field: end
actions:
- transaction:
start-end:
start: [begin]
end: [end]
marker: [trans]
There will be a single generated event {"_marker":"trans","recs":[....]}
with the recs
array containing
the whole batch.