Apache Kafka - Kafka Connect - Debezium Connector MySQL

Apache Kafka - Kafka Connect - Debezium Connector MySQL

·

3 min read

Refs :

  1. https://kifarunix.com/install-apache-kafka-on-debian/

  2. https://www.digitalocean.com/community/tutorials/introduction-to-kafka

  3. https://www.digitalocean.com/community/tutorials/how-to-integrate-existing-systems-with-kafka-connect

  4. https://huzzefakhan.medium.com/install-and-setup-apache-kafka-on-linux-b430d8796dae

Apache Kafka is a powerful platform for real-time data streaming, integration, and processing. In this guide, we’ll walk through installing and configuring Kafka, Kafka Connect, and the Debezium Connector for MySQL, and we’ll even integrate it with Elasticsearch.

Prerequisites

  • Linux Server: A Debian-based OS is recommended.

  • Java: Kafka requires Java to run.

  • Basic Knowledge: Familiarity with Linux commands and configuration files.

Step 1: Install Java

First, check if Java is installed:

java -version

If not, install it with:

sudo apt install openjdk-17-jdk

Step 2: Download Apache Kafka

Download Kafka's latest version:

wget https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz

Step 3: Download Debezium Connector for MySQL

Fetch the Debezium MySQL Connector:

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.6.2.Final/debezium-connector-mysql-2.6.2.Final-plugin.tar.gz

Step 4: Prepare Kafka Directory

Create a directory for Kafka:

sudo mkdir -p /opt/kafka

Extract Kafka:

sudo tar -xzf kafka_2.13-3.7.1.tgz -C /opt/kafka --strip-components=1

Step 5: Configure Kafka as a Service

Create a systemd service file for Kafka:

sudo nano /etc/systemd/system/kafka.service

Add the following configuration:

[Unit]
Description=Apache Kafka
Requires=network.target
After=network.target

[Service]
Type=simple
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=default.target

Step 6: Configure Zookeeper as a Service

Create a systemd service file for Zookeeper:

sudo nano /etc/systemd/system/zookeeper.service

Add the following:

[Unit]
Description=Apache Zookeeper server
Requires=network.target
After=network.target

[Service]
Type=simple
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

Step 7: Extract Debezium Connector

Extract the Debezium MySQL connector to Kafka’s library:

bashCopy codesudo tar -zxf debezium-connector-mysql-2.6.2.Final-plugin.tar.gz -C /opt/kafka/libs/

Step 8: Install Elasticsearch Sink Connector

Use Confluent Hub to install the Elasticsearch connector:

sudo confluent-hub install confluentinc/kafka-connect-elasticsearch:latest \
  --component-dir /usr/local/share/java \
  --worker-configs /opt/kafka/config/connect-distributed.properties

Step 9: Start Kafka Connect

Edit the Kafka Connect configuration:

bashCopy codesudo nano /opt/kafka/config/connect-distributed.properties

Start Kafka Connect:

bashCopy code/opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties

Step 10: Create and Register Connectors

  1. Debezium MySQL Connector
    Create a file named debezium-mysql-connector.json:
{
  "name": "debezium-mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "your-mysql-host",
    "database.port": "3306",
    "database.user": "your-user",
    "database.password": "your-password",
    "database.server.name": "mysql-server",
    "database.include.list": "your-database",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "schema-changes.mysql"
  }
}

Register it:

curl -X POST -H "Content-Type: application/json" \
  --data @debezium-mysql-connector.json http://localhost:8083/connectors
  1. Elasticsearch Sink Connector
    Create a file named elasticsearch-sink-connector.json:
{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "your_topic",
    "connection.url": "http://elasticsearch-host:9200",
    "key.ignore": "true",
    "schema.ignore": "true"
  }
}

Register it:

curl -X POST -H "Content-Type: application/json" \
  --data @elasticsearch-sink-connector.json http://localhost:8083/connectors

Step 11: Kafka Management Commands

  • List Topics:

      /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    
  • Delete Topic:

      /opt/kafka/bin/kafka-topics.sh --delete --topic your_topic --bootstrap-server localhost:9092
    
  • List Connectors:

      curl -X GET http://localhost:8083/connectors
    
  • Delete Connector:

      curl -X DELETE http://localhost:8083/connectors/your-connector
    

Did you find this article valuable?

Support FX by becoming a sponsor. Any amount is appreciated!