Skip to main content
Version: 3.3.0

Scheduling Inputs

Many inputs, like exec and http-poll schedule inputs by time. (The others are waiting for some external data, like tcp)

Scheduling with Intervals

With interval, the intervals between command invocations are equal, but execution starts immediately. If we start at 12:30:15 with one minute interval, then the next time will be 12:31:15 and so on.

If you need full control of poll at precise times, then use cron scheduling.

note

Hotrod's cron expressions uses a 6-token format, to allow scheduling seconds. Using standard 5-token cron expressions will result in a parse error.

name: cron
input:
exec:
command: date
cron: '0 * * * * *'
output:
write: console
# output:
{"_raw":"Thu Jun 6 12:55:00 SAST 2019"}
{"_raw":"Thu Jun 6 12:56:00 SAST 2019"}
{"_raw":"Thu Jun 6 12:57:00 SAST 2019"}

The command runs at each minute.

See this guide for more. Note the special pre-defined values like '@hourly', '@daily', etc.

Since cron commands occur at precise clock times, there's often a need to offset them by a random interval, so that for instance a thousand agents do not overwhelm an endpoint by polling it at the same time.

input:
exec:
command: date
interval: 1m # on every minute (16:00:00, 16:01:00, etc)
random-offset: 5s (so we'll actually run at 16:00:02, and so forth)

Please note that you can say immediate: true with cron scheduling. Whatever the schedule, it will be adjusted to start immediately.

Windowing

Windowing is a very useful feature if you are querying a resource that needs a time window to be specified.

input:
exec:
command: 'echo ${start_time_secs} ${end_time_secs}'
cron: '0 * * * * *'
window:
size: 1m

The end time is now, expressed as a Unix timestamp, and the start time is now minus 60s.

window also has offset - this is when you would like to wait for the data to settle (for instance, for Elasticsearch to ingest some real-time data). This will shift the (start_time,end_time) interval back by the offset (same units as interval and size).

If you provide interval, then window.size is automatically set.

In addition to windowing intervals the cron supports a start-time, which allows the windowing to start at a specified time, this time has to be in the following format: 2019-07-10 18:45:00.000 +02:00. A highwatermark-file file can be specified for cron to track the last successful window and resume from the high water mark point.

So, if a pipe using a high-water mark file is stopped and restarted, then the scheduler will "catch up" and run for any intervals that were missing.

Note: start_time will be ignored unless combined with highwatermark-file. start_time causes offset to be ignored.

Scheduling Variables

Note that special variables are available; there is support for custom time formats. For example, ${start_time_fmt %F %T} will give a local date-time like "2019-09-28 15:36:02". The start and end refer to the current window.

  • now_time_secs => current time in seconds since epoch,
  • now_time_msecs => current time in milliseconds since epoch,
  • now_time_iso => current time in "%Y-%m-%dT%H:%M:%SZ" format,
  • start_time_secs => start time in seconds since epoch,
  • start_time_msecs => start time in milliseconds since epoch,
  • start_time_iso => start time in "%Y-%m-%dT%H:%M:%SZ" format,
  • end_time_secs => end time in seconds since epoch,
  • end_time_msecs => end time in seconds since epoch,
  • end_time_iso => end time in "%Y-%m-%dT%H:%M:%SZ") format,
  • now_time_fmt <fmt> => e.g. ${now_time_fmt %F %T}
  • start_time_fmt <fmt>=> e.g ${start_time_fmt %Y-%m-%dT%H:%M:%SZ}
  • end_time_fmt <fmt>=> e.g ${end_time_fmt %s}

This example shows these special fields in use, where we need start_time and end_time in a particular format for a rather messy VOIP statistics endpoint.

context:
address: http://XXXX:4444
parms:
- filter.application=rtp
- filter.custom_filter=ip=X.X.X.X
- info.aggregate_level=precise
- pager.nb_elements=10000
- sorter.order_column=capture_begin
- sorter.order_dir=sort_asc
- filter.capture_begin='${start_time_fmt %F%%20%T}'
- filter.capture_end='${end_time_fmt %F%%20%T}'
path: /nevrax/network/conv_voip_details.html/voip_table_descriptor/table.csv
password: XXX:XXX
filter: "sed -e 's/-;/0;/g'|sed -e 's/;user/ and user/g'|sed -e 's/\"//g'"
input:
exec:
command: "curl -k -u {{password}} '{{address}}{{path}}?{{parms '&'}}' | {{filter}}"
interval: 10s
window:
size: 10s
offset: 2s
...

This also shows how context fields can make complicated command-lines clearer - note how the query parameters are constructed by joining the parms array with a "&".

Of course, you should use http-post wherever possible, but the principles remain the same.

Marking Batches

With scheduled commands, comes the idea of a 'batch' of events that occur together at intervals. It's useful to mark these events as belonging to the same batch.

For example, this will attach a different UUID field to each batch. If the data was actually JSON (i.e. raw: true) then the field is merged into the incoming event.

input:
exec:
command: blub
interval: 10s
batch:
uuid-field: uuid

Here are the available fields - all of them are optional but at least one must be specified in batch:

  • uuid-field: field containing UUID unique for each batch
  • invocation-time-field: field containing the start time of the batch
  • completion-time-field: field containing the end time of the batch
  • begin-marker-field: field set to true in first event
  • end-marker-field: field set to true in last event
  • line-count-field: each event gets the size of the batch
  • line-num-field: each event gets the line number within the batch

The markers are particularly useful. Say we only want to see the last line from each run of blub:

input:
exec:
command: blub
interval: 10s
batch:
end-marker-field: last_line
actions:
- filter:
condition: last_line
...

Note that we have work with some restrictions imposed by using Lua here - we cannot say last-line or end because the first isn't a valid Lua identifier and the second is a Lua keyword.

There is always another approach - we can filter by schema (which does not care about Lua rules):

- filter:
schema: [_raw,last_line]

Using both markers we can collect all the lines generated by a scheduled command into an array using transaction:

input:
exec:
command: blub
interval: 10s
batch:
begin-marker-field: begin
end-marker-field: end
actions:
- transaction:
start-end:
start: [begin]
end: [end]
marker: [trans]

There will be a single generated event {"_marker":"trans","recs":[....]} with the recs array containing the whole batch.