1

Confluent Kafka Installation and Demo

 2 years ago
source link: https://dzone.com/articles/confluent-kafka-installation-and-demo
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

About KAFKA and Confluent 

Apache Kafka is an open-source community distributed event streaming platform used by thousands of corporations for high-performance streaming, data pipelines, and critical applications. Kafka was developed by the Apache Software Foundation written in Scala and Java. 

Confluent Open Source is a developer-optimized distribution of Apache KafkaConfluent Platformis a full-scale data streaming platform that enables you to easily access, store, and manage data as continuous, real-time streams. Confluent is a more complete distribution of Apache Kafka. It streamlines the admin operations procedures with much ease.

Confluent Kafka Installation And Demo

The goal is to insert rows to SQLite-DB and show those rows on an auto-created topic on Kafka via the JDBC-source connector.

Environment: Red Hat Linux 7.x

  1. Download Tarball
    curl -O http://packages.confluent.io/archive/6.0/confluent-6.0.1.tar.gz
  2. Extract tar.gz
    tar -xvf tar/confluent-6.0.1.tar.gz
  3. Define Confluent variables
    export CONFLUENT_HOME=/mydata/myuser/confluent-6.0.1
    export PATH=$PATH:$CONFLUENT_HOME/bin
  4. Install confluent-hub and kafka connect jdbc
    $CONFLUENT_HOME/bin/confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
    confluent-hub install confluentinc/kafka-connect-jdbc:10.0.1

    You can see the confluent hub on your local web.

    http://localhost:9021/clusters
  5. Define JDBC-source file to /mydata/myuser/confluent-6.0.1/etc/kafka-connect-jdbc/:
    name=test-sqlite-jdbc-autoincrement
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    value.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable=false 
    tasks.max=1
    connection.url=jdbc:sqlite:test.db
    mode=incrementing
    incrementing.column.name=id
    topic.prefix=turkcell.sqlite-jdbc-
  6. Run confluent services up:
    confluent local services connect start

    The output like below:Kafka zookeeper output

  7. Load jdbc-source to the connector:
    confluent local services connect connector load jdbc-source -c /mydata/myuser/confluent-6.0.1/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
  8. You can create SQLite-DB; create a table and insert rows:
    cd confluent-6.0.1/
    sqlite3 test.db
    sqlite> CREATE TABLE ttech(id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, name VARCHAR(255)); 
    sqlite> INSERT INTO ttech(name) VALUES('turkcell'); 

    You can see a row like this:

    Kafka row

  9. Look at the connect log to see if Kafka-source-JDBC fails or works successfully:
    confluent local services connect log
  10. Finally, look at Kafka topic to see your newly-added record. I use Kafka Tool to check:

Kafka record

Some Useful Commands and Screenshots

  1.  See connector list: confluent local services connect connector --list
  2. See connector content:confluent local services connect connector config jdbc-source
  3. Unload connector: confluent local services connect connector unload jdbc-source
  4. See connect log: confluent local services connect log
  5. Change formatter from Avro to JSON in this file: /mydata/myuser/confluent-6.0.1/etc/kafka/connect-distributed.properties
  6. If you use schema-registry:
    •  Add key-value schema:
      • curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\":\"record\",\"name\":\"your_table\",\"fields\":[{\"name\":\"ID\",\"type\":\"long\"}]}"}' http://localhost:8091/subjects/your_table-key/versions
    • Check it is installed:
      • curl -X GET http://localhost:8091/subjects
    • Add the connector to sync to the Oracle table:
      • curl -XPOST --header "Content-Type: application/json"  localhost:8083/connectors  -d 
        '{  
            "name": "sink_my_table",  
            "config": {    
                "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",    
        		"tasks.max": 3,   
        		"connection.url": "jdbc:oracle:thin:@my.db:1961:MYSERVICE",
                "connection.user": "ora_user",    
                "connection.password": "XXXXXX",    
        		"table.name.format": "my_table",
                "topics": "my_table",
                "auto.create": "false",
        		"delete.enabled": "true",
        		"pk.mode": "record_key",
        		"pk.fields": "ID",
        		"insert.mode": "upsert",
        		"transforms": "TimestampConverter1,TimestampConverter2,TimestampConverter3",
        		"transforms.TimestampConverter1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        		"transforms.TimestampConverter1.field": "RECORDDATE",
        		"transforms.TimestampConverter1.target.type": "Timestamp",
        		"transforms.TimestampConverter2.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        		"transforms.TimestampConverter2.field": "STARTDATE",
        		"transforms.TimestampConverter2.target.type": "Timestamp",
        		"transforms.TimestampConverter3.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        		"transforms.TimestampConverter3.field": "ENDDATE",
        		"transforms.TimestampConverter3.target.type": "Timestamp"
        		
            }
        }' 
    • Look at connector config information:
      • curl -X GET http://localhost:8083/connectors/sink_my_table
    • Look connector status:
      • curl -X GET http://localhost:8083/connectors/sink_my_table/status
    • Delete connector:
      • curl -X DELETE http://localhost:8083/connectors/sink_my_table

Screenshot of Confluent control center

screenshot of confluent overview page

screenshot of topics

screenshot of connect clusters

screenshot of JDBC source

Hope it helps you!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK