Streaming Tranalyzer flows into Apache Kafka

Apache Kafka

Prerequisites

For this tutorial, it is assumed the user has a basic knowledge of Tranalyzer and that the file t2_aliases has been sourced in ~/.bashrc or ~/.bash_aliases as follows (Refer to How to install Tranalyzer for more details):

# $HOME/.bashrc

if [ -f "$T2HOME/scripts/t2_aliases" ]; then
    . "$T2HOME/scripts/t2_aliases"             # Note the leading `.'
fi

Make sure to replace $T2HOME with the actual path, e.g., $HOME/tranalyzer2-0.9.1):

Dependencies

The kafkaSink plugin uses the librdkafka library, which may be installed as follows:

Ubuntu sudo apt-get install librdkafka-dev
Arch sudo pacman -S librdkafka
Gentoo sudo emerge librdkafka
openSUSE sudo zypper install librdkafka-devel
Red Hat/Fedora sudo dnf install librdkafka-devel or sudo yum install librdkafka-devel
macOS brew install librdkafka

Required plugins

The only required plugin is the kafkaSink plugin. For this tutorial, we will also load the basicFlow, basicStats and tcpStates plugins. Although not required, those plugins provide useful information, such as source and destination addresses and ports, protocols and basic statistics about packets and bytes. They can be built by running:

t2build basicFlow basicStats kafkaSink tcpStates

Services initialization

The kafkaSink plugin requires a ZooKeeper and a Kafka broker service running on KAFKA_BROKERS, e.g., 127.0.0.1:9092:

t2conf kafkaSink -G KAFKA_BROKERS

KAFKA_BROKERS = "127.0.0.1:9092"
  • Start the ZooKeeper server and send it to the background:

    zookeeper-server-start.sh /etc/kafka/zookeeper.properties &

  • Start the Kafka server and send it to the background:

    kafka-server-start.sh /etc/kafka/server.properties &

Plugin and core configuration

Let’s first look at the default configuration of the kafkaSink plugin:

kafkaSink

vi src/kafkaSink.h

...
/* ========================================================================== */
/* ------------------------ USER CONFIGURATION FLAGS ------------------------ */
/* ========================================================================== */

#define KAFKA_DEBUG     0                  // Print debug messages
#define KAFKA_RETRIES   3                  // Max. number of retries when message production failed [0 - 255]

/* +++++++++++++++++++++ ENV / RUNTIME - conf Variables +++++++++++++++++++++ */

#define KAFKA_BROKERS   "127.0.0.1:9092"   // Broker address(es)
                                           // (comma separated list of host[:port])
#define KAFKA_TOPIC     "tranalyzer.flows" // Topic to produce to

#define KAFKA_PARTITION -1                 // Target partition:
                                           //    - >= 0: fixed partition
                                           //    -   -1: automatic partitioning (unassigned)

/* ========================================================================== */
/* ------------------------- DO NOT EDIT BELOW HERE ------------------------- */
/* ========================================================================== */
...

For this tutorial, we will use the default values.

The kafkaSink plugin only send the flows to Kafka. What if you also want to keep track of the errors, warnings and other information produced by Tranalyzer? We’ll look into that in this tutorial, but first, let us switch off the coloring of T2 report and make sure to rebuild everything:

t2conf tranalyzer2 -D T2_LOG_COLOR=0

t2build -R

Time to stream

In this tutorial, we will work with a PCAP file, but you could also process the traffic directly from an interface.

Start by downloading the PCAP file file we will be using. Make sure your ZooKeeper and Kafka servers are up and running (See the Services Initialization section)!

Now run t2:

t2 -r faf-exercise.pcap

It is as simple as that!

Let’s make sure Kafka has received our data!

# Consume messages for tranalyzer.flows topic
$ kafka-console-consumer              \
    --bootstrap-server localhost:9092 \
    --from-beginning                  \
    --topic tranalyzer.flows

Sending stdout and stderr

Sometimes it is interesting to store the logs produced by T2. The errors ([ERR]), warnings ([WRN]) and information ([INF]) are particularly useful. Errors are sent to stderr, while warnings and information are sent to stdout. We will mirror that in Kafka by sending them to two different topics, namely tranalyzer.err and tranalyzer.out. For simplicity, we will use the kcat (formerly kafkacat) tool. Let’s look at its syntax (or at least at the parts we will need):

kcat

...
General options:
  -C | -P | -L | -Q  Mode: Consume, Produce, Metadata List, Query mode
  -G <group-id>      Mode: High-level KafkaConsumer (Kafka >=0.9 balanced consumer groups)
                     Expects a list of topics to subscribe to
  -t <topic>         Topic to consume from, produce to, or list
  -p <partition>     Partition
  -b <brokers,..>    Bootstrap broker(s) (host[:port])
...

For our tutorial, we will need the -P, -b and -t options. The -P option indicates we want to produce data, the -b identifies the address of the Kafka broker and the -t is used to select a topic.

Now that this is out of the way, let us look at the full command to see how we can redirect stderr and stdout separately:

$ t2 -r faf-exercise.pcap \
    1> >(grep -F -e "[INF]" -e "[WRN]" | kcat -P -b 127.0.0.1:9092 -t tranalyzer.out) \
    2> >(kcat -P -b 127.0.0.1:9092 -t tranalyzer.err)

We can access the tranalyzer.flows topic as before:

# Consume messages for tranalyzer.flows topic
$ kafka-console-consumer              \
    --bootstrap-server localhost:9092 \
    --from-beginning                  \
    --topic tranalyzer.flows

If we want to consume messages from another topic, we just have to specify a different -t option.

  • Consume messages for tranalyzer.err topic:

    $ kafka-console-consumer              \
        --bootstrap-server localhost:9092 \
        --from-beginning                  \
        --topic tranalyzer.flows
  • Consume messages for tranalyzer.out topic:

    $ kafka-console-consumer              \
        --bootstrap-server localhost:9092 \
        --from-beginning                  \
        --topic tranalyzer.flows

Conclusion

Don’t forget to reset the plugin configuration for the next tutorial.

t2conf tranalyzer2 -D T2_LOG_COLOR=0

Have fun analyzing!