Skip to main content
Version: 3.5.3

Redis and RabbitMQ

This section illustrates typical HOTROD usage patterns for Redis and RabbitMQ.

Lists and Queues

A Redis list is equivalent to a RabbitMQ queue — in both cases you push items on one end and pop items off the other end.

Push a value to a Redis list:

name: redis-write-queue

input:
text: '{"msg":"hello"}'

output:
redis:
set:
list: q

Pop a value from a Redis list:

name: redis-read-queue

input:
redis:
get:
list: q
raw: true

output:
write: console

Assuming you have a Redis server listening on 127.0.0.1:6379:

$> hotrod pipes run --file redis-write-queue.yml 

$> hotrod pipes run --file redis-read-queue.yml

{"msg":"hello"}

The input will now wait for anything new to be pushed to the queue. As with the other inputs, raw indicates that the data will not be quoted, e.g., {"_raw":"{\"msg\":\"hello\"}"}.

In cases where there are multiple consumers of the queue, run redis-read-queue in two separate terminals, and send new messages as seen below:

$> cat redis-input.json

{"msg":"dolly"}
{"msg":"so nice"}
{"msg":"to see you back"}
{"msg":"where you belong"}

Now write to the queue:

$> hotrod pipes run --file redis-write-queue.yml --json --input @redis-input.json

First terminal output:

{"msg":"dolly"}
{"msg":"to see you back"}

Second terminal output:

{"msg":"so nice"}
{"msg":"where you belong"}

Each consumer gets the next item available in turn as round-robin behaviour.

The RabbitMQ equivalent would be this pair of Pipes:

name: rabbit-write-queue

input:
text: '{"msg":"hello"}'

output:
amqp:
queue:
name: some-name
name: rabbit-read-queue

input:
amqp:
raw: true
queue:
name: some-name

output:
write: console

Here again, multiple consumers will get the items in round-robin order.

RabbitMQ provides greater control, which is why it may be preferable to use instead of Redis. This does, however, mean that there are more fields to consider, such as the important passive field:

queue:
name: some-name
passive: true

This result indicates that we should not attempt to redeclare this queue. You will need this after defining the queue properties in the RabbitMQ server.

The amqp model involves both queues and exchanges, which we need to change the default round-robin behaviour. If an exchange is specified, then a queue will be defined implicitly.

To receive a fan-out — where each consumer gets a copy of the messages — we must explicitly define an exchange:

name: rabbit-write-fanout

input:
text: '{"msg":"hello"}'

output:
amqp:
exchange:
name: some-name
type: fanout
name: rabbit-read-fanout

input:
amqp:
raw: true
exchange:
name: some-name
type: fanout

output:
write: console
note

passive also applies to exchanges as default behaviour attempts to declare them if they do not exist.

Publish and Subscribe

Much like the queue consumers above, subscribers listen to an exchange. However, they are only interested in certain topics. If a publisher puts out a message with a routing key that matches the topic specified by the subscriber, then the subscriber gets the message.

The subscription topic is a set of words separated by .. * means "exactly one word" and # means "zero or many words".

In the pair of Pipes found in the example below, we have used Context Expansion for variety. In real-world cases, this will allow you to customize the Pipe parameters for each Agent. Here we can experiment with different routing keys without editing the Pipes:

name: rabbit-publish

context:
TOPIC: animal.dog

input:
text: '{"msg":"hello","my_topic":"{{TOPIC}}"}'

output:
amqp:
exchange:
name: topic-exchange
type: topic
routing-key-field: my_topic
name: rabbit-subscribe

context:
TOPIC: animal.*

input:
amqp:
raw: true
exchange:
name: topic-exchange
type: topic
passive: true
routing-key: '{{TOPIC}}'

output:
write: console

The first Pipe publishes the message which contains the topic name. We can exploit the fact that the routing key can be used as the value of a field:

$> hotrod pipes run --file rabbit-publish.yml

$> hotrod pipes run --file rabbit-publish.yml TOPIC='animal.cat'

In another terminal:

$> hotrod pipes run --file rabbit-subscribe.yml

{"msg":"hello","my_topic":"animal.dog"}
{"msg":"hello","my_topic":"animal.cat"}

By default, the subscribing Pipe is interested in everything related to animals.

Redis also performs publish-subscribe. Note that the topic is matched with glob patterns (does not use #) and there can be subscriptions to multiple topics.

name: redis-publish

context:
TOPIC: animal.dog

input:
exec:
raw: true
command: |
echo '{"msg":"hello","my_topic":"{{TOPIC}}"}'

output:
redis:
set:
publish: '${my_topic}'
name: redis-subscribe

input:
redis:
get:
subscribe:
- animal.*
raw: true

output:
write: console

Redis Hash Operations

Originally, Redis was designed as a pure in-memory, key-value store where values could be set and retrieved by key. However, these values have types, such as the previously detailed list type, which operates like a queue. Let’s explore the hash type, which is a map of fields to values.

output:
redis:
set:
hash-value: [key, field]

This hash may have the key employees, and the field bob. The value is a string, which is usually interpreted as JSON.

The getting process is similar:

input:
redis:
get:
hash-value: [key, field]

Flat JSON events map well to Redis hashes, a feature that can be used to merge JSON from different sources:

# Input: {"one":1}, {"two":"hello"}

output:
redis:
set:
hash: keyname

input:
redis:
interval: 30s
get:
hash: keyname

# Output: {"one":1,"two":"hello"}

There are several small Pipes that individually report the specialized knowledge of a system and all write to the same hash. A coordinating Pipe reads this hash before sending the combined event to its desired destination. This scheme is especially powerful because the Pipes do not have to coordinate their scheduling — Redis acts as the shared cache.

We can also ask for the values inside the hash:

input:
redis:
interval: 1m
get:
hash-values: keyname
raw: true

# Output: 1, hello

This is useful for inter-Pipe communication. On a server, events from different Agents can come in and be written to a hash keyed by the Agent ID. A collector Pipe can then access all these events at chosen intervals with Redis as the event cache.