Simple OpenNMS/Minion Environment Using Kafka in Azure

Sharing this article by Alejandro Galue, Senior Manager, Services and Support at The OpenNMS Group.

This lab starts an OpenNMS instance and a 3 node ZK/Kafka cluster in the cloud and two Minions on your machine, using Kafka for communication through Multipass and Azure, for learning purposes.

The lab doesn’t cover security by default (user authentication and encryption), which is crucial if you ever want to expose your Kafka cluster to the Internet. A separate section covers the required changes for this.

Keep in mind that nothing prevents you from skipping using the cloud provider and do everything with Multipass (or VirtualBox , or Hyper-V , or VMWare ). The reason for using a cloud provider is to prove that OpenNMS can monitor unreachable devices via Minion. Similarly, you could use any other cloud provider instead of Azure. However I won’t explain how to port the solution here.

Time synchronization across all the instances involved in this solution is mandatory. Failing on this could lead to undesired side effects.

Requirements

Make sure to log into Azure using az login prior creating the VM.

If you have a restricted account in Azure, make sure you have the Network Contributor role and the Virtual Machine Contributor role associated with your Azure AD account for the resource group on which you would like to create the VM. Of course, Owner or Contributor at resource group level are welcome.

Tune the VMs image size accordingly.

Create common Environment Variables

export RG_NAME="OpenNMS"
export LOCATION="eastus" # Azure Region
export VNET_CIDR="13.0.0.0/16"
export VNET_SUBNET="13.0.1.0/24"
export VNET_NAME="ONMS-vnet"
export VNET_SUBNET_NAME="subnet1"
export VM_USERNAME="agalue"
export VM_PASSWORD="0p3nNM5Rules;"
export ONMS_VM_SIZE="Standard_D2s_v3" # 2 VCPU, 8 GB of RAM
export ONMS_HEAP_SIZE="4096" # Expressed in MB and must fit ONMS_VM_SIZE
export KAFKA_VM_SIZE="Standard_D2s_v3" # 2 VCPU, 8 GB of RAM
export ZK_HEAP_SIZE="1G" # Must fit KAFKA_VM_SIZE
export KAFKA_HEAP_SIZE="2G" # Must fit KAFKA_VM_SIZE
export KAFKA_PARTITIONS="9" # Must be greater than the number of Minions
export KAFKA_RF="2" # Must be less than the cluster size
export ONMS_VM_NAME="onms01"
export MINION_LOCATION="Durham"
export MINION_ID1="minion01"
export MINION_ID2="minion02"
export MINION_HEAP_SIZE="1G" # Must fit VM RAM

Feel free to change the content if needed.

Do not confuse the Azure Location or Region with the Minion Location; they are both unrelated things.

Create the Azure Resource Group

az group create -n $RG_NAME -l $LOCATION

This is a necessary step, as every resource in Azure must belong to a resource group and a location.

Create the Virtual Network

az network vnet create -g $RG_NAME \
  --name $VNET_NAME \
  --address-prefix $VNET_CIDR \
  --subnet-name $VNET_SUBNET_NAME \
  --subnet-prefix $VNET_SUBNET

Create cloud-init configuration template for Kafka

The following cloud-init template assumes a 3 node cluster, where each VM would have Zookeeper and Kafka configured and running in Ubuntu LTS.

For simplicity, Zookeeper and Kafka will be running on each machine. In production, each cluster should have its own instances, as Zookeeper should not grow the same way as Kafka would grow, for multiple reasons such as a ZK cluster should always have an odd number of members (which is not the case of Kafka); traffic across ZK members grows exponentially with the number of instances (a ZK cluster of 5 members can manage multiple dozens of Kafka members, with 7 it can manage hundreds, and with 9 it can manage thousands).

Each VM must be named like follows:

  • kafka-1
  • kafka-2
  • kafka-3

Keep in mind that each VM in Azure is reachable within the same VNet from any other VM through its hostname.

On the following template, for server.properties , a script must replace the placeholder PUBLIC_IP with the public IP of the VM when configuring the application before running it for the first time. With that in mind, there will be two listeners, one to be used within the VNet (which is what OpenNMS would use, on port 9092), and another associated with the Public IP (on port 9094), to be used by external Minions (outside Azure).

Similarly, a script must replace the placeholder ID with the numeric value extracted from the hostname for the broker.id in Kafka and the myid file in Zookeeper, which are the unique mandatory requirements to identify each instance in their respective cluster. That’s the reason for the naming convention mentioned above for the VM instances.

The script in question would be setup-kafka-client.sh , which is also part of the following template.

The number of topic partitions must be greater than the number of Minions on a given location and greater than the number of brokers in the cluster.

cat <<EOF >/tmp/kafka-template.yaml
#cloud-config
package_upgrade: true
users:
  - name: kafka
packages:
  - openjdk-11-jre-headless
  - jq
write_files:
  - owner: root:root
    path: /etc/security/limits.d/kafka.conf
    content: |
      * soft nofile 100000
      * hard nofile 100000
  - owner: root:root
    path: /etc/systemd/system/zookeeper.service
    content: |
      [Unit]
      Description=Apache Zookeeper server
      Documentation=http://zookeeper.apache.org
      Wants=network-online.target
      After=network-online.target
      [Service]
      Type=simple
      User=kafka
      Group=kafka
      Environment="KAFKA_HEAP_OPTS=-Xmx$ZK_HEAP_SIZE -Xms$ZK_HEAP_SIZE"
      ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
      ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
      [Install]
      WantedBy=multi-user.target
  - owner: root:root
    path: /etc/systemd/system/kafka.service
    content: |
      [Unit]
      Description=Apache Kafka Server
      Documentation=http://kafka.apache.org
      Wants=zookeeper.service
      After=zookeeper.service network-online.target
      [Service]
      Type=simple
      User=kafka
      Group=kafka
      Environment="KAFKA_HEAP_OPTS=-Xmx$KAFKA_HEAP_SIZE -Xms$KAFKA_HEAP_SIZE"
      Environment="KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.rmi.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=%H -Djava.net.preferIPv4Stack=true"
      Environment="JMX_PORT=9999"
      ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
      ExecStop=/opt/kafka/bin/kafka-server-stop.sh
      [Install]
      WantedBy=multi-user.target
  - owner: root:root
    path: /tmp/zookeeper.properties # Designed for a 3-node cluster
    content: |
      dataDir=/data/zookeeper
      tickTime=2000
      clientPort=2181
      initLimit=10
      syncLimit=5
      server.1=kafka-1:2888:3888;2181
      server.2=kafka-2:2888:3888;2181
      server.3=kafka-3:2888:3888;2181
  - owner: root:root
    path: /tmp/server.properties # This is a template only
    content: |
      # Make sure to adjust broker.id and replace PUBLIC_IP
      broker.id=0
      log.dirs=/data/kafka
      log.retention.hours=168
      log.segment.bytes=1073741824
      zookeeper.connect=kafka-1:2181,kafka-2:2181,kafka-3:2181
      zookeeper.connection.timeout.ms=30000
      advertised.listeners=INSIDE://:9092,OUTSIDE://PUBLIC_IP:9094
      listeners=INSIDE://:9092,OUTSIDE://:9094
      listener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      inter.broker.listener.name=INSIDE
      message.max.bytes=5000000
      replica.fetch.max.bytes=5000000
      compression.type=producer
      num.partitions=$KAFKA_PARTITIONS
      offsets.topic.replication.factor=$KAFKA_RF
      default.replication.factor=$KAFKA_RF
      min.insync.replicas=1
      auto.create.topics.enable=true
      delete.topic.enable=true
  - owner: root:root
    permissions: '0750'
    path: /tmp/setup-kafka-client.sh # Updates server.properties
    content: |
      #!/bin/bash
      INSTANCE_ID=\${HOSTNAME##*-}
      echo \$INSTANCE_ID > /data/zookeeper/myid
      sed -i -r "/broker.id/s/=.*/=\$INSTANCE_ID/" /opt/kafka/config/server.properties
      PUBLIC_IP=\$(curl -H Metadata:true --noproxy "*" "http://169.254.169.254/metadata/instance/network/interface/0/ipv4/ipAddress/0?api-version=2020-09-01" 2>/dev/null | jq -r ".publicIpAddress")
      sed -i -r "s/PUBLIC_IP/\$PUBLIC_IP/" /opt/kafka/config/server.properties
runcmd:
  - cd /opt
  - wget https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz
  - tar -xvzf kafka_2.13-2.7.0.tgz
  - ln -s kafka_2.13-2.7.0 kafka
  - mv -f /tmp/*.properties /opt/kafka/config/
  - mkdir -p /data/zookeeper /data/kafka
  - chown -R kafka:kafka /data /opt/kafka*
  - /tmp/setup-kafka-client.sh
  - systemctl daemon-reload
  - systemctl --now enable zookeeper
  - systemctl --now enable kafka
EOF

As mentioned above,

  • Line 90 shows how to extract the numeric ID from the hostname (and use it for the broker.id in Kafka and the ZK’s myid file).
  • Line 93 shows how to use the Azure Metadata API to extract the public IP from within the VM required for the advertised.listeners , so the Minions on your network can reach the Kafka cluster.

Start Broker Instances

for i in {1..3}; do
  VM_NAME="kafka-$i"
  echo "Creating VM $VM_NAME..."

  az vm create --resource-group $RG_NAME --name $VM_NAME \
    --size $KAFKA_VM_SIZE \
    --image UbuntuLTS \
    --admin-username "$VM_USERNAME" \
    --admin-password "$VM_PASSWORD" \
    --vnet-name $VNET_NAME \
    --subnet $VNET_SUBNET_NAME \
    --public-ip-address-allocation static \
    --custom-data /tmp/kafka-template.yaml

  az vm open-port -g $RG_NAME -n $VM_NAME \
    --port 9094 --priority 100 --output table
done

The above will start all the VMs using static public IP addresses, to avoid access problems with external Minions and reconfiguration issues with the Kafka advertised listeners. However, Azure will use dynamic private IPs. This is not going to be a problem as we’re going to use DNS to access Kafka.

To obtain the Public IP addresses of each of them:

for i in {1..3}; do
  VM_NAME=kafka-$i
  IP=$(az vm show -d -g $RG_NAME -n $VM_NAME --query publicIps -o tsv 2>/dev/null)
  echo $VM_NAME = $IP
done

Or,

az network public-ip list -g $RG_NAME -o table

Keep in mind that the cloud-init process starts once the VM is running, meaning you should wait about 5 minutes after the az vm create is finished.

In case there is a problem, SSH into the VM using the public IP and the provided credentials and check /var/log/cloud-init-output.log to verify the progress and the status of the cloud-init execution.

Validate Zookeeper and Kafka status

To make sure the Zookeeper cluster started, you can use the “4 letter words” commands via the embedded web server, for instance:

curl http://kafka-1:8080/commands/monitor

The above gives you general information, including the server_state , which can be leader or follower .

To get statistics:

curl http://kafka-1:8080/commands/stats

From Kafka’s perspective, you can verify how each broker has registered via Zookeeper or follow this guide to create a topic and use the console producer and consumer to validate its functionality.

List Broker IDs:

/opt/kafka/bin/zookeeper-shell.sh kafka-1 ls /brokers/ids

You should get:

[1, 2, 3]

If that’s not the case, SSH the broker that is not listed and make sure Kafka is running. It is possible that Kafka is not properly registered to Zookeeper, and it fails to start due to how the VMs are initialized. That’s because Zookeeper should start first (the whole cluster), then Kafka, but as we’re not doing that, some instances might fail to start on their own. The procedure was designed to avoid this as much as possible this situation.

Get the broker basic configuration:

/opt/kafka/bin/zookeeper-shell.sh kafka-1 get /brokers/ids/1 | egrep '^\{' | jq

You should get:

{
  "features": {},
  "listener_security_protocol_map": {
    "INSIDE": "PLAINTEXT",
    "OUTSIDE": "PLAINTEXT"
  },
  "endpoints": [
    "INSIDE://kafka-1.internal.cloudapp.net:9092",
    "OUTSIDE://52.154.73.103:9094"
  ],
  "jmx_port": 9999,
  "port": 9092,
  "host": "kafka-1.internal.cloudapp.net",
  "version": 5,
  "timestamp": "1616265688431"
}

Note the two listeners. Clients within Azure, like OpenNMS, would use the INSIDE one on port 9092, pointing to the local FQDN of the VM (and remember that hostname are resolvable via DNS within the same VNet). In contrast, clients outside Azure, like Minions, would use the OUTSIDE one on port 9094 pointing to the Public IP of each Kafka instance (accessible thanks to the NSG associated with each VM).

Kafka defaults to the hostname or FQDN of the primary interface when you don’t explicitly specify it on the listener. For this reason, if you’re using another cloud provider or using bare-metal, make sure to either have DNS working across all the VMs, or change the INSIDE listener similar to how you configured the EXTERNAL one.

As mentioned, DNS is guaranteed in Azure. Note that the FQDN contains the hostname we added to the broker, which is resolvable by all the VMs within the same VNET. If you’re using other cloud providers or working on-prem, make sure DNS works the same way or update the INSIDE listener to force the broker’s private IP address and use IP addresses to access Kafka when configuring OpenNMS.

Another way to verify the behavior is using the console producer and console consumer to verify that you can send and receive messages through a given topic.

To do that, let’s create a Test topic:

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server $(hostname):9092 \
  --create --topic Test --replication-factor 2 --partitions 3

Then, start a console producer from one of the brokers:

/opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server $(hostname):9092 --topic Test

From another broker (separate SSH session), start a console consumer:

/opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server $(hostname):9092 --topic Test

Go back to the terminal on which the console producer is running, type a message, and hit enter. Then, switch to the console consumer terminal, and you should see the message sent. Use Ctrl+C to stop the producer and consumer.

A more comprehensive test would be to download Kafka locally on your machine and run either the producer or the consumer there (use port 9094 and the public IP of one of the brokers). That serves to test connectivity from the Internet.

Create an Azure VM for OpenNMS

Create a cloud-init script to deploy OpenNMS in Ubuntu LTS:

cat <<EOF >/tmp/opennms.yaml
#cloud-config
package_upgrade: true
write_files:
  - owner: root:root
    path: /etc/opennms-overlay/featuresBoot.d/features.boot
    content: |
      opennms-kafka-producer
  - owner: root:root
    path: /etc/opennms-overlay/opennms.properties.d/kafka.properties
    content: |
      # Disable internal ActiveMQ
      org.opennms.activemq.broker.disable=true
      # Sink
      org.opennms.core.ipc.sink.strategy=kafka
      org.opennms.core.ipc.sink.kafka.bootstrap.servers=kafka-1:9092,kafka-2:9092
      # RPC
      org.opennms.core.ipc.rpc.strategy=kafka
      org.opennms.core.ipc.rpc.kafka.bootstrap.servers=kafka-1:9092,kafka-2:9092
      org.opennms.core.ipc.rpc.kafka.ttl=30000
      org.opennms.core.ipc.rpc.kafka.single-topic=true
      org.opennms.core.ipc.rpc.kafka.auto.offset.reset=latest
  - owner: root:root
    path: /etc/opennms-overlay/org.opennms.features.kafka.producer.client.cfg
    content: |
      bootstrap.servers=kafka-1:9092,kafka-2:9092
      compression.type=gzip
      timeout.ms=30000
      max.request.size=5000000
  - owner: root:root
    path: /etc/opennms-overlay/org.opennms.features.kafka.producer.cfg
    content: |
      topologyProtocols=bridge,cdp,isis,lldp,ospf
      suppressIncrementalAlarms=true
      forward.metrics=true
      nodeRefreshTimeoutMs=300000
      alarmSyncIntervalMs=300000
      kafkaSendQueueCapacity=1000
      nodeTopic=OpenNMS_nodes
      alarmTopic=OpenNMS_alarms
      eventTopic=OpenNMS_events
      metricTopic=OpenNMS_metrics
      alarmFeedbackTopic=OpenNMS_alarms_feedback
      topologyVertexTopic=OpenNMS_topology_vertices
      topologyEdgeTopic=OpenNMS_edges
apt:
  preserve_sources_list: true
  sources:
    opennms:
      source: deb https://debian.opennms.org stable main main
packages:
  - opennms
  - opennms-webapp-hawtio
bootcmd:
  - curl -s https://debian.opennms.org/OPENNMS-GPG-KEY | apt-key add -
runcmd:
  - systemctl --now enable postgresql
  - sudo -u postgres createuser opennms
  - sudo -u postgres psql -c "ALTER USER postgres WITH PASSWORD 'postgres';"
  - sudo -u postgres psql -c "ALTER USER opennms WITH PASSWORD 'opennms';"
  - sed -r -i 's/password=""/password="postgres"/' /etc/opennms/opennms-datasources.xml
  - sed -r -i '/enabled="false"/{\$!{N;s/ enabled="false"[>]\n(.*OpenNMS:Name=Syslogd.*)/>\n\1/}}' /etc/opennms/service-configuration.xml
  - echo "JAVA_HEAP_SIZE=$ONMS_HEAP_SIZE" > /etc/opennms/opennms.conf
  - rsync -avr /etc/opennms-overlay/ /etc/opennms/
  - /usr/share/opennms/bin/runjava -s
  - /usr/share/opennms/bin/install -dis
  - systemctl --now enable opennms
EOF

You don’t need to specify Kafka Brokers’ whole list as part of the bootstrap.servers entry. The whole topology will be discovered through the first one that responds, and the client will use what’s configured as the advertised listener to talk to each broker. I added two in case the first one is unavailable (as a backup).

The above installs the latest OpenJDK 11, the latest PostgreSQL, and the latest OpenNMS Horizon. I added the most basic configuration for PostgreSQL to work with authentication. Kafka will be enabled for Sink/RPC as well as the Kafka Producer. As mentioned, Azure VMs can reach each other through hostnames.

Create an Ubuntu VM for OpenNMS with a static public IP:

az vm create --resource-group $RG_NAME --name $ONMS_VM_NAME \
  --size $ONMS_VM_SIZE \
  --image UbuntuLTS \
  --admin-username "$VM_USERNAME" \
  --admin-password "$VM_PASSWORD" \
  --vnet-name $VNET_NAME \
  --subnet $VNET_SUBNET_NAME \
  --public-ip-address-allocation static \
  --custom-data /tmp/opennms.yaml \
  --output table

az vm open-port -g $RG_NAME -n $ONMS_VM_NAME \
  --port 8980 --priority 200 --output table

Once finished, the above command should show the Public IP assigned to the VM, required to configure the Minions. Here is how to obtain it:

az vm show -d -g $RG_NAME -n $VM_NAME --query publicIps -o tsv

Keep in mind that the cloud-init process starts once the VM is running, meaning you should wait about 5 minutes after the az vm create is finished to see OpenNMS up and running.

In case there is a problem, SSH into the VM using the public IP and the provided credentials and check /var/log/cloud-init-output.log to verify the progress and the status of the cloud-init execution.

Create Minion VMs using multipass

After verifying that OpenNMS is up and running, you can proceed to create the Minions.

The first step is to create the cloud-init configuration for the first Minion on your machine:

ONMS_IP=$(az vm show -d -g $RG_NAME -n $ONMS_VM_NAME --query publicIps -o tsv 2>/dev/null)
KAFKA1_IP=$(az vm show -d -g $RG_NAME -n kafka-1 --query publicIps -o tsv 2>/dev/null)
KAFKA2_IP=$(az vm show -d -g $RG_NAME -n kafka-2 --query publicIps -o tsv 2>/dev/null)

cat <<EOF > /tmp/$MINION_ID1.yaml
#cloud-config
package_upgrade: true
write_files:
  - owner: root:root
    path: /etc/minion-overlay/org.opennms.minion.controller.cfg
    content: |
      location=$MINION_LOCATION
      id=$MINION_ID1
      http-url=http://$ONMS_IP:8980/opennms
  - owner: root:root
    path: /etc/minion-overlay/featuresBoot.d/kafka.boot
    content: |
      !minion-jms
      !opennms-core-ipc-sink-camel
      !opennms-core-ipc-rpc-jms
      opennms-core-ipc-sink-kafka
      opennms-core-ipc-rpc-kafka
  - owner: root:root
    path: /etc/minion-overlay/org.opennms.core.ipc.sink.kafka.cfg
    content: |
      bootstrap.servers=$KAFKA1_IP:9094,$KAFKA2_IP:9094
  - owner: root:root
    path: /etc/minion-overlay/org.opennms.core.ipc.rpc.kafka.cfg
    content: |
      bootstrap.servers=$KAFKA1_IP:9094,$KAFKA2_IP:9094
      single-topic=true
apt:
  preserve_sources_list: true
  sources:
    opennms:
      source: deb https://debian.opennms.org stable main main
packages:
  - opennms-minion
bootcmd:
  - curl -s https://debian.opennms.org/OPENNMS-GPG-KEY | apt-key add -
runcmd:
  - rsync -avr /etc/minion-overlay/ /etc/minion/
  - sed -i -r 's/# export JAVA_MIN_MEM=.*/export JAVA_MIN_MEM="$MINION_HEAP_SIZE"/' /etc/default/minion
  - sed -i -r 's/# export JAVA_MAX_MEM=.*/export JAVA_MAX_MEM="$MINION_HEAP_SIZE"/' /etc/default/minion
  - /usr/share/minion/bin/scvcli set opennms.http admin admin
  - /usr/share/minion/bin/scvcli set opennms.broker admin admin
  - systemctl --now enable minion
EOF

Use the same content for bootstrap.servers as OpenNMS, but make sure to use the Public IPs instead of hostname here, as Minions won’t be running in Azure.

Then, start the new Minion via multipass :

multipass launch -c 1 -m 2G -n $MINION_ID1 --cloud-init /tmp/$MINION_ID1.yaml

Optionally, create a cloud-init configuration for a second Minion on your machine based on the work we did for the first one (assuming the same location):

sed "s/$MINION_ID1/$MINION_ID2/" /tmp/$MINION_ID1.yaml > /tmp/$MINION_ID2.yaml

Wait until the Minion has been successfully registered with OpenNMS. Then, start the second Minion via multipass :

multipass launch -c 1 -m 2G -n $MINION_ID2 --cloud-init /tmp/$MINION_ID2.yaml

In case there is a problem, access the VM (e.x., multipass shell $MINION_ID1 ) and check /var/log/cloud-init-output.log to verify the progress and the status of the cloud-init execution.

Feel free to change the CPU and memory settings for your Minion, but make sure it is consistent with MINION_HEAP_SIZE . Make sure to validate communication using the health-check command from the Karaf Shell.

When having multiple Minions per location, they will become part of a consumer group from Kafka’s perspective for the RPC requests topic.

Test

As you can see, the location name is Durham (a.k.a. $MINION_LOCATION ), and you should see the Minions on that location registered in OpenNMS.

SSH into the OpenNMS server and create a requisition with a node in the same network as the Minion VMs, and make sure to associate it with the appropriate location. For instance,

/usr/share/opennms/bin/provision.pl requisition add Test
/usr/share/opennms/bin/provision.pl node add Test srv01 srv01
/usr/share/opennms/bin/provision.pl node set Test srv01 location Durham
/usr/share/opennms/bin/provision.pl interface add Test srv01 192.168.0.40
/usr/share/opennms/bin/provision.pl interface set Test srv01 192.168.0.40 snmp-primary P
/usr/share/opennms/bin/provision.pl requisition import Test

Make sure to replace 192.168.0.40 with the IP of a working server in your network (reachable from the Minion VM), and do not forget to use the same location as defined in $MINION_LOCATION .

Please keep in mind that Minions are VMs on your machine. 192.168.0.40 is the IP of my machine which is why Minions can reach it (and vice versa), to access an external machine on your network, make sure to define static routes on that machine so it can reach the Minions through your machine (assuming you’re running Linux or macOS).

OpenNMS which runs in Azure, and have no access to 192.168.0.40 directly, should be able to collect data and monitor that node through any of the Minions. In fact, you can stop one of them, and OpenNMS would continue monitoring it.

To test asynchronous messages, you can send SNMP traps or Syslog messages to one of the Minions. Usually, you could put a Load Balancer in front of the Minions and use its IP when sending messages from the monitored devices. Alternatively, you could use udpgen for this purpose.

The machine that will be running udpgen must be part of the OpenNMS inventory. Then, find the IP of the Minion using multipass list , then execute the following from the machine added as a node above (the examples assumes the IP of the Minion is 192.168.75.16 ):

To send SNMP Traps:

udpgen -h 192.168.75.16 -x snmp -r 1 -p 1162

To send Syslog Messages:

udpgen -h 192.168.75.16 -x syslog -r 1 -p 1514

The C++ version of udpgen only works on Linux. If you’re on MacOS or Windows, you can use the Go version of it.

The Hawtio UI in OpenNMS can help to visualize the Camel and ActiveMQ internals, to understand what’s circulating between OpenNMS and the Minions.

For OpenNMS, Hawtio is available through http://$ONMS_IP:8980/hawtio if the package opennms-webapp-hawtio was installed (which is the case with the cloud-init template used).

For Minions, Hawtio is available through http://$MINION_IP1:8181/hawtio and http://$MINION_IP2:8181/hawtio respectively.

Troubleshooting

As mentioned, if time is not synchronized across all the instances, the Heartbeat sent by Minions via the Sink API won’t be processed properly by OpenNMS, leading to having the Minion not registered or outages in the Minion-Heartbeat service.

You can inspect the traffic on the topics to see if the Minion is sending (or receiving) traffic to Kafka. However, as the payload is encoded within a Protobuf message, using the console consumer might not be as useful as you’d expect. Still, it works for troubleshooting purposes. For instance, from one of the Kafka brokers, you can do:

/opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server $(hostname):9092 \
  --topic OpenNMS.Sink.Heartbeat

And you’ll see:

$bce7b13e-d575-40b9-989a-3b5c6e7432c2 ~<minion>
   <id>minion01</id>
   <location>Durham</location>
   <timestamp>2021-03-26T12:19:55.752-07:00</timestamp>
</minion>

As you can see, the actual payload within the Protobuf message is an indented XML.

The following application can be used to properly inspect the content without worrying about the non-readable content due to the Protobuf format:

For RPC in particular, you can access the Karaf Shell from the OpenNMS instance and use the opennms:stress-rpc command to verify communication against the Minions on a given location or against a specific Minion, and as the command name implies, to perform stress tests.

Sharing Kafka across multiple OpenNMS-Minion sets

In big environments, it is common to have multiple OpenNMS instances, each of them with its own fleet of Minions to monitor one of the multiple data centers or a section of it. In those scenarios, it is common to have a centralized Kafka cluster that can be shared across all of them (for more information, follow this link).

The above solution has to be modified to ensure each set of OpenNMS and Minions will use their own set of topics in Kafka to avoid collisions.

The topics’ prefix (which defaults to OpenNMS ) can be controlled via a system-wide property called Instance ID (a.k.a. org.opennms.instance.id ). You must configure this property in both places. For the OpenNMS, add it to a property file inside $OPENNMS_HOME/etc/opennms.properties.d ; and for a Minion, add it to a property file inside $MINION_HOME/etc/custom.system.properties .

Add a Load Balancer in front of the Minions (Optional)

In production, when having multiple Minions per location, it is a good practice to put a Load Balancer in front of them so that the devices can use a single destination for SNMP Traps, Syslog, and Flows.

The following creates a basic LB using nginx through multipass for SNMP Traps (with a listener on port 162) and Syslog Messages (with a listener on port 514):

MINION_IP1=$(multipass info $MINION_ID1 | grep IPv4 | awk '{print $2}' 2>/dev/null)
MINION_IP2=$(multipass info $MINION_ID2 | grep IPv4 | awk '{print $2}' 2>/dev/null)

cat <<EOF > nginx.yaml
#cloud-config
package_upgrade: true
packages:
  - nginx
write_files:
  - owner: root:root
    path: /etc/nginx/nginx.conf
    content: |
      user www-data;
      worker_processes auto;
      pid /run/nginx.pid;
      include /etc/nginx/modules-enabled/*.conf;
      events {
        worker_connections 768;
      }
      stream {
        upstream syslog_udp  {
          server $MINION_IP1:1514;
          server $MINION_IP2:1514;
        }
        upstream trap_udp  {
          server $MINION_IP1:1162;
          server $MINION_IP2:1162;
        }
        server {
          listen 514 udp;
          proxy_pass syslog_udp;
          proxy_responses 0;
        }
        server {
          listen 162 udp;
          proxy_pass trap_udp;
          proxy_responses 0;
        }
      }
runcmd:
  - systemctl restart nginx
EOF

multipass launch -n nginx --cloud-init nginx.yaml
echo "Load Balancer $(multipass info nginx | grep IPv4)"

Flows are outside the scope of this test as that requires more configuration on Minions and OpenNMS besides having an Elasticsearch cluster up and running with the required plugin in place.

Securing Zookeeper and Kafka

Work in progress…

The above procedure uses Kafka and Zookeeper in plain text without authentication or encryption. That works for testing purposes or private clusters, where access to the servers is restricted or audited.

This example, in particular, exposes Kafka to the Internet, which requires having at least authentication in place.

This section explains how to enable authentication using SASL with SCRAM-SHA-512 for Kafka and DIGEST for Zookeeper (as Zookeeper doesn’t support SCRAM). Because this guide’s intention is learning, I decided to add security as a separate or optional module. That’s due to the extra complexity associated with this advanced topic.

Here are the high-level changes:

  • Create the SCRAM credentials for Kafka through one of the brokers. The credentials are stored in Zookeeper.
  • Update server.properties and the systemd service definition on each Kafka broker to enable and use SASL.
  • Update zookeeper.properties and the systemd service definition on each ZK instance to enable and use SASL.
  • Stop Kafka Cluster, restart Zookeeper cluster, start Kafka Cluster.
  • Update OpenNMS to use SASL for the Sink API, the RPC API, and the Kafka Producer and restart.
  • Update Minion to use SASL for the Sink API and the RPC API and restart.

Access one of the brokers and execute the following command:

ONMS_USER="opennms"    # To be used by Kafka, OpenNMS and Minions
ONMS_PASSWD="0p3nNM5;" # To be used by Kafka, OpenNMS and Minions

/opt/kafka/bin/kafka-configs.sh --bootstrap-server $(hostname):9092 \
  --alter \
  --add-config "SCRAM-SHA-512=[password=$ONMS_PASSWD]" \
  --entity-type users \
  --entity-name $ONMS_USER

On each Zookeeper instance, update zookeeper.properties to enable SASL:

cat <<EOF | sudo tee -a /opt/kafka/config/server.properties
authProvider.sasl=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
EOF

On each Kafka broker instance, update server.properties to enable SASL/SCRAM:

sudo sed -i -r '/listener.security.protocol.map/d' /opt/kafka/config/server.properties

cat <<EOF | sudo tee -a /opt/kafka/config/server.properties
# Enable Security
zookeeper.set.acl=true
listener.security.protocol.map=INSIDE:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512
EOF

Note that listener.security.protocol.map already exists in that file, which is why I removed it prior adding the required changes.

On each Zookeeper instance, create the JAAS configuration file with the credentials:

ZK_USER="zkonms"
ZK_PASSWD="zk0p3nNM5;"

cat <<EOF | sudo tee /opt/kafka/config/zookeeper_jaas.conf
Server {
  org.apache.zookeeper.server.auth.DigestLoginModule required
  user_$ZK_USER="$ZK_PASSWD";
};
EOF
sudo chown kafka:kafka /opt/kafka/config/zookeeper_jaas.conf
sudo chmod 0600 /opt/kafka/config/zookeeper_jaas.conf

On each Kafka broker, create the JAAS configuration file with the credentials:

ZK_USER="zkonms"       # Must match zookeeper_jaas.conf
ZK_PASSWD="zk0p3nNM5;" # Must match zookeeper_jaas.conf

ONMS_USER="opennms"    # Must match scram user
ONMS_PASSWD="0p3nNM5;" # Must match scram user

cat <<EOF | sudo tee /opt/kafka/config/kafka_jaas.conf
KafkaServer {
  org.apache.kafka.common.security.scram.ScramLoginModule required
  username="$ONMS_USER"
  password="$ONMS_PASSWD";
};

Client {
  org.apache.zookeeper.server.auth.DigestLoginModule required
  username="$ZK_USER"
  password="$ZK_PASSWD";
};
EOF
sudo chown kafka:kafka /opt/kafka/config/kafka_jaas.conf
sudo chmod 0600 /opt/kafka/config/kafka_jaas.conf

On each Zookeeper instance, update the systemd service definition to load the JAAS settings via KAFKA_OPTS :

OPTS='Environment="KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/zookeeper_jaas.conf"'
sudo sed -i -r -e "/^ExecStart=.*/i $OPTS" /etc/systemd/system/zookeeper.service
sudo systemctl daemon-reload 

On each Kafka broker, update the systemd service definition to load the JAAS settings via KAFKA_OPTS :

OPTS='Environment="KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/kafka_jaas.conf"'
sudo sed -i -r -e "/^ExecStart=.*/i $OPTS" /etc/systemd/system/kafka.service
sudo systemctl daemon-reload 

Restart the cluster in the following order:

  • Stop Kafka on each server.
  • Restart Zookeeper on each server.
  • Start Kafka on each server.

On the OpenNMS instance, update /opt/opennms/etc/opennms.properties.d/kafka.properties and /opt/opennms/etc/org.opennms.features.kafka.producer.cfg to use SASL, for instance:

ONMS_USER="opennms"    # Must match scram user
ONMS_PASSWD="0p3nNM5;" # Must match scram user

cat <<EOF | sudo tee -a /etc/opennms/opennms.properties.d/kafka.properties
# Security for Sink
org.opennms.core.ipc.sink.kafka.security.protocol=SASL_PLAINTEXT
org.opennms.core.ipc.sink.kafka.sasl.mechanism=SCRAM-SHA-512
org.opennms.core.ipc.sink.kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="$ONMS_USER" password="$ONMS_PASSWD";
# Security for RPC
org.opennms.core.ipc.rpc.kafka.security.protocol=SASL_PLAINTEXT
org.opennms.core.ipc.rpc.kafka.sasl.mechanism=SCRAM-SHA-512
org.opennms.core.ipc.rpc.kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="$ONMS_USER" password="$ONMS_PASSWD";
EOF

cat <<EOF | sudo tee -a /etc/opennms/org.opennms.features.kafka.producer.client.cfg
# Security
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="$ONMS_USER" password="$ONMS_PASSWD";
EOF

Then restart OpenNMS.

On each Minion, update /etc/minion/org.opennms.core.ipc.sink.kafka.cfg and /etc/minion/org.opennms.core.rpc.sink.kafka.cfg to use SASL, for instance:

ONMS_USER="opennms"    # Must match scram user
ONMS_PASSWD="0p3nNM5;" # Must match scram user

cat <<EOF | sudo tee -a /etc/minion/org.opennms.core.ipc.sink.kafka.cfg
# Security
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="$ONMS_USER" password="$ONMS_PASSWD";
EOF

cat <<EOF | sudo tee -a /etc/minion/org.opennms.core.ipc.rpc.kafka.cfg
# Security
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="$ONMS_USER" password="$ONMS_PASSWD";
EOF

Then restart each Minion.

At this point, we have SASL authentication enabled using SCRAM-512 for Kafka and DIGEST for Zookeeper, meaning credentials might be hard to crack when intercepting traffic (but perhaps not impossible). However, to make it more secure, encryption is recommended. I’ll explain how to do this soon.

The solution works from OpenNMS and Minion perspective, despite the fact of seeing the following message repeatedly in /opt/kafka/logs/server.log on all brokers:

[2021-04-11 12:35:56,486] INFO [SocketServer brokerId=2] Failed authentication with /13.0.1.7 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

Where 13.0.1.7 is the IP of the OpenNMS server.

As a challenge to the reader, modify the cloud-init templates to start the solution with SASL from scratch.

Clean Up

When you’re done, make sure to delete the cloud resources:

az group delete -g $RG_NAME

Then clean the local resources:

multipass delete $MINION_ID1 $MINION_ID2
multipass purge

Remember to remove the nginx instance if you decided to use it.

2 Likes