4

Spring Boot与运行在Kubernetes上的ksqlDB集成教程 - Piotr

 1 year ago
source link: https://www.jdon.com/61205
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Spring Boot与运行在Kubernetes上的ksqlDB集成教程
在本文中,您将学习如何在 Kubernetes 上运行ksqlDB并将其与 Spring Boot 一起使用。您还将了解如何基于Strimzi运算符在 Kubernetes 上运行 Kafka。
为了将 Spring Boot 与 ksqlDB 服务器集成,我们将利用 ksqlDB 提供的轻量级 Java 客户端。此客户端支持拉取和推送查询。它还提供了用于插入行和创建表或流的 API。您可以在此处的 ksqlDB 文档中阅读有关它的更多信息。

Screenshot-2022-06-22-at-15.48.09.png?resize=696%2C343&ssl=1

我们的示例 Spring Boot 应用程序非常简单。我们将使用 Spring Cloud Stream Supplierbean 生成事件并将其发送到 Kafka 主题。有关使用 Spring Cloud Stream 的 Kafka 的更多信息,请参阅以下文章
另一方面,我们的应用程序使用 kSQL 查询从 Kafka 主题获取数据。它还KTable在启动时创建。

源代码
如果您想自己尝试一下,可以随时查看我的源代码。为此,您需要克隆我的 GitHub 存储库。然后进入transactions-service目录。之后,您应该按照我的指示进行操作。让我们开始。

先决条件
我们将使用几种工具。你需要有:

  • Kubernetes 集群——它可能是一个单节点的本地集群,例如 Minikube 或 Kind。就个人而言,我在 Docker 桌面上使用 Kubernetes
  • kubectlCLI – 与集群交互
  • Helm——我们将使用它在 Kubernetes 上安装 ksqlDB 服务器。如果您没有 Helm,则必须安装它

使用 Strimzi 在 Kubernetes 上运行 Kafka
当然,我们需要一个 Kafka 实例来执行我​​们的练习。有几种方法可以在 Kubernetes 上运行 Kafka。我将向您展示如何使用基于运算符的方法来实现。第一步,您需要在集群上安装 OLM(Operator Lifecycle Manager)。为此,您只需在 Kubernetes 上下文中执行以下命令:

$ curl -L https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.21.2/install.sh -o install.sh
$ chmod +x install.sh
$ ./install.sh v0.21.2

然后,您可以继续安装 Strimzi 操作员。这只是一个命令。

$ kubectl create -f https://operatorhub.io/install/stable/strimzi-kafka-operator.yaml

现在,我们可以在 Kubernetes 上创建一个 Kafka 集群。让我们从练习的专用命名空间开始:

$ kubectl create ns kafka

我假设你有一个单节点 Kubernetes 集群,所以我们还创建了一个单节点 Kafka。Kafka这是带有CRD的 YAML 清单。您可以在路径下的存储库中找到它k8s/cluster.yaml。

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  entityOperator:
    topicOperator: {}
    userOperator: {}
  kafka:
    config:
      default.replication.factor: 1
      inter.broker.protocol.version: "3.2"
      min.insync.replicas: 1
      offsets.topic.replication.factor: 1
      transaction.state.log.min.isr: 1
      transaction.state.log.replication.factor: 1
    listeners:
      - name: plain
        port: 9092
        tls: false
        type: internal
      - name: tls
        port: 9093
        tls: true
        type: internal
    replicas: 1
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 30Gi
          deleteClaim: true
    version: 3.2.0
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: true

让我们将它应用到命名空间中的 Kubernetes kafka:

$ kubectl apply -f k8s/cluster.yaml -n kafka

您应该会看到一个 Kafka 实例和一个 Zookeeper 实例。如果 pod 正在运行,则意味着您在 Kubernetes 上安装了 Kafka。

$ kubectl get pod -n kafka
NAME                                          READY   STATUS    RESTARTS  AGE
my-cluster-entity-operator-68cc6bc4d9-qs88p   3/3     Running   0         46m
my-cluster-kafka-0                            1/1     Running   0         48m
my-cluster-zookeeper-0                        1/1     Running   0         48m

Kafka 在集群内以 namemy-cluster-kafka-bootstrap和 port可用9092。

kubectl get svc -n kafka
NAME                          TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                               AGE
my-cluster-kafka-bootstrap    ClusterIP   10.108.109.255   <none>        9091/TCP,9092/TCP,9093/TCP            47m
my-cluster-kafka-brokers      ClusterIP   None             <none>        9090/TCP,9091/TCP,9092/TCP,9093/TCP   47m
my-cluster-zookeeper-client   ClusterIP   10.102.10.251    <none>        2181/TCP                              47m
my-cluster-zookeeper-nodes    ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP            47m

在 Kubernetes 上运行 KsqlDB 服务器
KsqlDB 服务器是 Confluent 平台的一部分。由于我们不是在 Kubernetes 上安装整个 Confluent Platform,而只是一个开源的 Kafka 集群,我们需要单独安装 KsqlDB Server。让我们用 Helm 来做。KSQL 服务器没有“官方”Helm 图表。因此,我们应该直接去 GitHub 上的 Confluent Helm 仓库:

$ git clone https://github.com/confluentinc/cp-helm-charts.git
$ cd cp-helm-charts

在这个存储库中,您可以为每个单独的 Confluent 组件找到单独的 Helm 图表,包括控制中心或 KSQL Server。我们的图表在存储库中的位置是charts/cp-ksql-server. 我们需要在安装过程中覆盖一些默认设置。首先,我们必须禁用无头模式。在无头模式下,KSQL Server 不公开 HTTP 端点并从输入脚本加载查询。我们的 Spring Boot 应用程序将通过 HTTP 连接到服务器。在下一步中,我们应该覆盖 Kafka 集群的默认地址和仍然6.1.0存在的 KSQL Server 的默认版本。我们将使用最新版本7.1.1。这是helm您应该在 Kubernetes 集群上运行的命令:

$ helm install cp-ksql-server \
    --set ksql.headless=false \
    --set kafka.bootstrapServers=my-cluster-kafka-bootstrap:9092 \
    --set imageTag=7.1.1 \
  charts/cp-ksql-server -n kafka

让我们验证 KSQL 是否在集群上运行:

$ kubectl get pod -n kafka | grep ksql
cp-ksql-server-679fc98889-hldfv               2/2     Running   0               2m11s

HTTP 端点可用于 namecp-ksql-server和 port下的其他应用程序8088:

$ kubectl get svc -n kafka | grep ksql
cp-ksql-server                ClusterIP   10.109.189.36    <none>        8088/TCP,5556/TCP                     3m25s

现在,我们的 Kubernetes 集群上运行着所需的全部人员。因此,我们可以继续进行 Spring Boot 应用程序的实现。

将 Spring Boot 与 ksqlDB 集成
我没有发现 Spring Boot 和 ksqlDB 之间有任何开箱即用的集成。因此,我们将ksqldb-api-client直接使用。首先,我们需要包含 ksqlDB Maven 存储库和一些依赖项:

<dependencies>
        ...

  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-api-client</artifactId>
    <version>0.26.0</version>
  </dependency>
  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-udf</artifactId>
    <version>0.26.0</version>
  </dependency>
  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-common</artifactId>
    <version>0.26.0</version>
  </dependency>
</dependencies>

<repositories>
  <repository>
    <id>ksqlDB</id>
    <name>ksqlDB</name>
    <url>https://ksqldb-maven.s3.amazonaws.com/maven/</url>
  </repository>
</repositories>

之后,我们可以定义一个@Bean返回 ksqlDBClient实现的 Spring。由于我们将在与 KSQL Server 相同的命名空间中运行我们的应用程序,因此我们需要提供 Kubernetes 服务名称作为主机名。

@Configuration
public class KSQLClientProducer {

    @Bean
    Client ksqlClient() {
        ClientOptions options = ClientOptions.create()
                .setHost("cp-ksql-server")
                .setPort(8088);
        return Client.create(options);
    }
}

我们的应用程序通过 HTTP 端点与 KSQL Server 交互。KTable它在启动时创建一个单曲。为此,我们需要调用executeStatementKSQL Clientbean 实例上的方法。我们正在创建 SOURCE 表以启用对其运行 拉取查询 。该表从transactions主题中获取数据。它期望传入事件中的 JSON 格式。

public class KTableCreateListener implements ApplicationListener<ContextRefreshedEvent> {

   private static final Logger LOG = LoggerFactory.getLogger(KTableCreateListener.class);
   private Client ksqlClient;

   public KTableCreateListener(Client ksqlClient) {
      this.ksqlClient = ksqlClient;
   }

   @Override
   public void onApplicationEvent(ContextRefreshedEvent event) {
      try {
         String sql = """
                 CREATE SOURCE TABLE IF NOT EXISTS transactions_view (
                   id BIGINT PRIMARY KEY,
                   sourceAccountId BIGINT,
                   targetAccountId BIGINT,
                   amount INT
                 ) WITH (
                   kafka_topic='transactions',
                   value_format='JSON'
                 );
                 """;
         ExecuteStatementResult result = ksqlClient.executeStatement(sql).get();
         LOG.info("Result: {}", result.queryId().orElse(null));
      } catch (ExecutionException | InterruptedException e) {
         LOG.error("Error: ", e);
      }
   }
}

创建表后,我们可以对其运行一些查询。有非常简单的查询。我们正在尝试查找与特定帐户相关的所有交易和所有交易。

@RestController
@RequestMapping("/transactions")
public class TransactionResource {

   private static final Logger LOG = LoggerFactory.getLogger(TransactionResource.class);
   Client ksqlClient;

   public TransactionResource(Client ksqlClient) {
      this.ksqlClient = ksqlClient;
   }

   @GetMapping
   public List<Transaction> getTransactions() throws ExecutionException, InterruptedException {
      StreamedQueryResult sqr = ksqlClient
            .streamQuery("SELECT * FROM transactions_view;")
            .get();
      Row row;
      List<Transaction> l = new ArrayList<>();
      while ((row = sqr.poll()) != null) {
         l.add(mapRowToTransaction(row));
      }
      return l;
   }

   @GetMapping("/target/{accountId}")
   public List<Transaction> getTransactionsByTargetAccountId(@PathVariable("accountId") Long accountId)
            throws ExecutionException, InterruptedException {
      StreamedQueryResult sqr = ksqlClient
            .streamQuery("SELECT * FROM transactions_view WHERE sourceAccountId=" + accountId + ";")
            .get();
      Row row;
      List<Transaction> l = new ArrayList<>();
      while ((row = sqr.poll()) != null) {
         l.add(mapRowToTransaction(row));
      }
      return l;
   }

   private Transaction mapRowToTransaction(Row row) {
      Transaction t = new Transaction();
      t.setId(row.getLong("ID"));
      t.setSourceAccountId(row.getLong("SOURCEACCOUNTID"));
      t.setTargetAccountId(row.getLong("TARGETACCOUNTID"));
      t.setAmount(row.getInteger("AMOUNT"));
      return t;
   }

}

使用 Spring Cloud Stream 向主题发送事件
最后,我们可以进行练习的最后一部分。我们需要生成测试数据并将其发送到 Kafkatransactions主题。实现它的最简单方法是使用 Spring Cloud Stream Kafka 模块。首先,让我们添加以下 Maven 依赖项:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

然后,我们可以创建一个基于 Spring Supplierbean 的生产者。Supplierbean 不断生成新事件并将其发送到目标通道。默认情况下,它每秒重复一次该操作。

@Configuration
public class KafkaEventProducer {

   private static long transactionId = 0;
   private static final Random r = new Random();

   @Bean
   public Supplier<Message<Transaction>> transactionsSupplier() {
      return () -> {
          Transaction t = new Transaction();
          t.setId(++transactionId);
          t.setSourceAccountId(r.nextLong(1, 100));
          t.setTargetAccountId(r.nextLong(1, 100));
          t.setAmount(r.nextInt(1, 10000));
          Message<Transaction> o = MessageBuilder
                .withPayload(t)
                .setHeader(KafkaHeaders.MESSAGE_KEY, new TransactionKey(t.getId()))
                .build();
          return o;
      };
   }
}

当然,我们还需要提供我们的 Kafka 集群的地址和频道的目标主题名称。Kafka的地址是在部署阶段注入的。

spring.kafka.bootstrap-servers = ${KAFKA_URL}
spring.cloud.stream.bindings.transactionsSupplier-out-0.destination = transactions

最后,让我们在 Kubernetes 上部署我们的 Spring Boot。这是包含 KubernetesDeployment和Service定义的 YAML 清单:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: transactions
spec:
  selector:
    matchLabels:
      app: transactions
  template:
    metadata:
      labels:
        app: transactions
    spec:
      containers:
      - name: transactions
        image: piomin/transactions-service
        env:
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap:9092
        ports:
        - containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
  name: transactions
spec:
  type: ClusterIP
  selector:
    app: transactions
  ports:
    - port: 8080

让我们在命名空间中部署应用程序kafka:

$ kubectl apply -f k8s/deployment.yaml -n kafka

在 Kubernetes 上测试 ksqlDB
在 Kubernetes 上部署应用程序后,让我们启用port-forward在本地端口上对其进行测试:

$ kubectl port-forward service/transactions 8080:8080
现在,我们可以测试我们的两个 HTTP 端点。让我们从搜索所有事务的端点开始:

$ curl http://localhost:8080/transactions
然后,您可以调用端点来搜索与 相关的所有事务targetAccountId,例如:

$ curl http://localhost:8080/transactions/target/10

最后的想法
在本文中,我想展示如何在 Kubernetes 上使用 ksqlDB。我们使用 Spring Boot 和 Spring Cloud Stream 等框架与 Kafka 和 ksqlDB 进行交互。您可以了解如何使用 Strimzi 运算符在 Kubernetes 上运行 Kafka 集群,或者如何直接从 Helm 存储库部署 KSQL Server。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK