5

Getting started with Kafka and Rust: Part 1

 3 years ago
source link: https://dev.to/abhirockzz/getting-started-with-kafka-and-rust-part-1-4hkb
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

This is a two-part series to help you get started with Rust and Kafka. We will be using the rust-rdkafka crate which itself is based on librdkafka (C library).

In this post we will cover the Kafka Producer API.

Initial setup

Make sure you install a Kafka broker - a local setup should suffice. Of course you will need to have Rust installed as well - you will need version 1.45 or above

Before you begin, clone the GitHub repo:

git clone https://github.com/abhirockzz/rust-kafka-101
cd part1
Enter fullscreen modeExit fullscreen mode

Check the Cargo.toml file:

...
[dependencies]
rdkafka = { version = "0.25", features = ["cmake-build","ssl"] }
...
Enter fullscreen modeExit fullscreen mode

Note on the cmake-build feature

rust-rdkafka provides a couple of ways to resolve the librdkafka dependency. I chose static linking, wherein librdkafka was compiled. You could opt for dynamic linking to refer to a locally installed version though.

For more, please refer to this link

Ok, let's start off with the basics.

Simple producer

Here is a simple producer based on BaseProducer:

    let producer: BaseProducer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092")
        .set("security.protocol", "SASL_SSL")
        .set("sasl.mechanisms", "PLAIN")
        .set("sasl.username", "<update>")
        .set("sasl.password", "<update>")
        .create()
        .expect("invalid producer config");
Enter fullscreen modeExit fullscreen mode

The send method to start producing messages - it's done in tight loop with a thread::sleep in between (not something you would do in production) to make it easier to track/follow the results. The key, value (payload) and the destination Kafka topic is represented in the form of a BaseRecord

    for i in 1..100 {
        println!("sending message");
        producer
            .send(
                BaseRecord::to("rust")
                    .key(&format!("key-{}", i))
                    .payload(&format!("value-{}", i)),
            )
            .expect("failed to send message");
        thread::sleep(Duration::from_secs(3));
    }
Enter fullscreen modeExit fullscreen mode

You can check the entire code in the file src/1_producer_simple.rs

To test if the producer is working ...

Run the program:

  • simply rename the file src/1_producer_simple.rs to main.rs
  • execute cargo run

You should see this output:

sending message
sending message
sending message
...
Enter fullscreen modeExit fullscreen mode

What's going on? To figure it out - connect to your Kafka topic (I have used rust as the name of the Kafka topic in the above example) using the Kafka CLI consumer (or any other consumer client e.g. kafkacat). You should see the messages flowing in.

For example:

&KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic rust --from-beginning
Enter fullscreen modeExit fullscreen mode

Producer callback

We are flying blind right now! Unless we explicitly create a consumer to look at our messages, we have no clue whether they are being sent to Kafka. Let's fix that by implementing a ProducerContext (trait) to hook into the produce event - it's like a callback.

Start by creating a struct and an empty implementation for the ClientContext trait (this is mandatory).

struct ProducerCallbackLogger;

impl ClientContext for ProducerCallbackLogger {}
Enter fullscreen modeExit fullscreen mode

Now comes the main part where we implement the delivery function in the ProducerContext trait.

impl ProducerContext for ProduceCallbackLogger {
    type DeliveryOpaque = ();

    fn delivery(
        &self,
        delivery_result: &rdkafka::producer::DeliveryResult<'_>,
        _delivery_opaque: Self::DeliveryOpaque,
    ) {
        let dr = delivery_result.as_ref();
        match dr {
            Ok(msg) => {
                let key: &str = msg.key_view().unwrap().unwrap();
                println!(
                    "produced message with key {} in offset {} of partition {}",
                    key,
                    msg.offset(),
                    msg.partition()
                )
            }
            Err(producer_err) => {
                let key: &str = producer_err.1.key_view().unwrap().unwrap();

                println!(
                    "failed to produce message with key {} - {}",
                    key, producer_err.0,
                )
            }
        }
    }
}
Enter fullscreen modeExit fullscreen mode

We match against the DeliveryResult (which is a Result after all) to account for success (Ok) and failure (Err) scenarios. All we do is simply log the message in both cases, since this is just an example. You could do pretty much anything you wanted to here (don't go crazy though!)

We've ignored DeliveryOpaque which is an associated type of the ProducerContext trait

We need to make sure that we plug in our ProducerContext implementation. We do this by using the create_with_context method (instead of create) and make sure by providing the correct type for BaseProducer as well.

let producer: BaseProducer<ProduceCallbackLogger> = ClientConfig::new().set(....)
...
.create_with_context(ProduceCallbackLogger {})
...
Enter fullscreen modeExit fullscreen mode

How does the "callback get called"?

Ok, we have the implementation, but we need a way to trigger it! One of the ways is to call flush on the producer. So, we could write our producer as such:

  • add producer.flush(Duration::from_secs(3));, and
  • comment the sleep (just for now)
        producer
            .send(
                BaseRecord::to("rust")
                    .key(&format!("key-{}", i))
                    .payload(&format!("value-{}", i)),
            )
            .expect("failed to send message");

        producer.flush(Duration::from_secs(3));
        println!("flushed message");
        //thread::sleep(Duration::from_secs(3));
Enter fullscreen modeExit fullscreen mode

Hold on, we can do better!

The send method is non-blocking (be default) but by calling flush after each send, we have now converted this into a synchronous invocation - not recommended from a performance perspective.

We can improve the situation by using a ThreadedProducer. It takes care of invoking the poll method in a background thread to ensure that the delivery callback notifications are delivered. Doing this is very simple - just change the type from BaseProducer to ThreadedProducer!

# before: BaseProducer<ProduceCallbackLogger>
# after: ThreadedProducer<ProduceCallbackLogger>
Enter fullscreen modeExit fullscreen mode

Also, we don't need the call to flush anymore.

...
//producer.flush(Duration::from_secs(3));
//println!("flushed message");
thread::sleep(Duration::from_secs(3));
...
Enter fullscreen modeExit fullscreen mode

The code is available in src/2_threaded_producer.rs

Run the program again

  • Rename the file src/2_threaded_producer.rs to main.rs and
  • execute cargo run

Output:

sending message
sending message
produced message with key key-1 in offset 6 of partition 2
produced message with key key-2 in offset 3 of partition 0
sending message
produced message with key key-3 in offset 7 of partition 2
Enter fullscreen modeExit fullscreen mode

As expected, you should be able to see the producer event callback, denoting that the messages were indeed sent to the Kafka topic. Of course, you can connect to the topic directly and double-check, just like before:

&KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic rust --from-beginning
Enter fullscreen modeExit fullscreen mode

To try a failure scenario, try using an incorrect topic name and notice how the Err variant of the delivery implementation gets invoked.

Sending JSON messages

So far, we were just sending Strings as key and values. JSON is a commonly used message format, let's see how to use that.

Assume we want to send User info which will be represented using this struct:

struct User {
    id: i32,
    email: String,
}
Enter fullscreen modeExit fullscreen mode

We can then use serde_json library to serialize this as JSON. All we need is to use the custom derives in serde - Deserialize and Serialize

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
struct User {
    id: i32,
    email: String,
}
Enter fullscreen modeExit fullscreen mode

Change the producer loop:

  • Create a User instance
  • Serialize it to a JSON string using to_string_pretty
  • Include that in the payload
...

let user_json = serde_json::to_string_pretty(&user).expect("json serialization failed");

        producer
            .send(
                BaseRecord::to("rust")
                    .key(&format!("user-{}", i))
                    .payload(&user_json),
            )
            .expect("failed to send message");
...
Enter fullscreen modeExit fullscreen mode

you can also use to_vec (instead of to_string()) to convert it into a Vec of bytes (Vec<u8>)

To run the program...

  • Rename the file src/3_JSON_payload.rs to main.rs, and
  • execute cargo run

Consume from the topic:

&KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic rust --from-beginning
Enter fullscreen modeExit fullscreen mode

You should see messages with a String key (e.g. user-34) and JSON value:

{
  "id": 34,
  "email": "[email protected]"
}
Enter fullscreen modeExit fullscreen mode

Is there a better way?

Yes! If you are used to the declarative serialization/de-serialization approach in the Kafka Java client (and probably others as well), you may not like this "explicit" approach. Just to put things in perspective, this is how you'd do it in Java:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer");

....

ProducerRecord<String, User> record = new ProducerRecord<String, User>(topic, key, user);
producer.send(record);
Enter fullscreen modeExit fullscreen mode

Notice that you simply configure the Producer to use KafkaJsonSchemaSerializer and the User class is serialized to JSON

rust-rdkafka provides something similar with the ToBytes trait. Here is what it looks like:

pub trait ToBytes {
    /// Converts the provided data to bytes.
    fn to_bytes(&self) -> &[u8];
}
Enter fullscreen modeExit fullscreen mode

Self-explanatory, right? There are existing implementations for String, Vec<u8> etc. So you can use these types as key or value without any additional work - this is exactly what we just did. But the problem is the way we did it was "explicit" i.e. we converted the User struct into a JSON String and passed it on.

What if we could implement ToBytes for User?

impl ToBytes for User {
    fn to_bytes(&self) -> &[u8] {
        let b = serde_json::to_vec_pretty(&self).expect("json serialization failed");
        b.as_slice()
    }
}
Enter fullscreen modeExit fullscreen mode

You will see a compiler error:

cannot return value referencing local variable `b`
returns a value referencing data owned by the current function
Enter fullscreen modeExit fullscreen mode

For additional background, please refer to this GitHub issue. I would happy to see an example other which can work with ToBytes - please drop in a note if you've inputs on this!

TL;DR is that it's best to stick to the "explicit" way of doing things unless you have a ToBytes implementation that "does not involve an allocation and cannot fail".

Wrap up

That's it for the first part. Part 2 will cover topics around Kafka consumer.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK