Spring Boot集群管理工具KafkaAdminClient
source link: http://www.cnblogs.com/dgwblog/p/12350164.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
能与原理介绍
在Kafka官网中这么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具体的KafkaAdminClient包含了一下几种功能(以Kafka1.0.0版本为准):
- 创建Topic:createTopics(Collection<NewTopic> newTopics)
- 删除Topic:deleteTopics(Collection<String> topics)
- 罗列所有Topic:listTopics()
- 查询Topic:describeTopics(Collection<String> topicNames)
- 查询集群信息:describeCluster()
- 查询ACL信息:describeAcls(AclBindingFilter filter)
- 创建ACL信息:createAcls(Collection<AclBinding> acls)
- 删除ACL信息:deleteAcls(Collection<AclBindingFilter> filters)
- 查询配置信息:describeConfigs(Collection<ConfigResource> resources)
- 修改配置信息:alterConfigs(Map<ConfigResource, Config> configs)
- 修改副本的日志目录:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)
- 查询节点的日志目录信息:describeLogDirs(Collection<Integer> brokers)
- 查询副本的日志目录信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas)
- 增加分区:createPartitions(Map<String, NewPartitions> newPartitions)
- 其内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议。主要实现步骤:
客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送CreateTopicRequest请求。
客户端发送请求至Kafka Broker。
Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。
客户端接收相应的回执并进行解析处理。
和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和回执类的两个基本父类。
@Component public class KafkaConfig{ // 配置Kafka public Properties getProps(){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); /* props.put("retries", 2); // 重试次数 props.put("batch.size", 16384); // 批量发送大小 props.put("buffer.memory", 33554432); // 缓存大小,根据本机内存大小配置 props.put("linger.ms", 1000); // 发送频率,满足任务一个条件发送*/ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } }
@RestController public class KafkaTopicManager { @Autowired private KafkaConfig kafkaConfig; @GetMapping("createTopic") public void createTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); NewTopic newTopic = new NewTopic("test1",4, (short) 1); Collection<NewTopic> newTopicList = new ArrayList<>(); newTopicList.add(newTopic); adminClient.createTopics(newTopicList); adminClient.close(); } @GetMapping("deleteTopic") public void deleteTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); adminClient.deleteTopics(Arrays.asList("test1")); adminClient.close(); } @GetMapping("listAllTopic") public void listAllTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); ListTopicsResult result = adminClient.listTopics(); KafkaFuture<Set<String>> names = result.names(); try { names.get().forEach((k)->{ System.out.println(k); }); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } adminClient.close(); } @GetMapping("getTopic") public void getTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); DescribeTopicsResult describeTopics = adminClient.describeTopics(Arrays.asList("syn-test")); Collection<KafkaFuture<TopicDescription>> values = describeTopics.values().values(); if(values.isEmpty()){ System.out.println("找不到描述信息"); }else{ for (KafkaFuture<TopicDescription> value : values) { System.out.println(value); } } adminClient.close(); } }
Recommend
-
217
Spring Boot
-
138
In a previous article we covered the basics of Kotlin. This lesson will introduce you to the world of using
-
162
Spring boot 快速入门
-
118
Phillip Webb and Madhura Bhave discussSprin Boot 2, improvements, how to migrate to it, tweaks and utilities, and internals.
-
104
<p>On behalf of the team, it is my great pleasure to announce that Spring Boot 2.0.0.RC1 has been released and is now available from <a href="http://repo.spring.io/milestone/">our milestone repository</a>.</p><p>Thi...
-
94
之前介绍了Docker集群管理工具-Kubernetes部署记录,下面介绍另一个管理工具Swarm的用法,Swarm是Docker原生的集群管理软件,与Kubernetes比起来比较简单。Swarm介绍Swarm是Docker公司在2014年12月初发布的一套较为简单的工具,用来管理Docker集群,它将一群Docker...
-
90
The release of Spring Boot version 2.0 general availability came a step closer with release candidate 1 (RC1) being announced on January 31st. Even at this late stage some noteworthy additions are still being released, alongside a huge number of...
-
81
-
48
前言 因为公司技术方面的需求,前一段时间折腾了一下elasticsearch,总体技术水平能够达到简历上的 “熟悉” 的程度。 有管理ES集群经验的朋友肯定都知道elasticsearch的head插件能够实现对es集群的...
-
4
一文详解云上自动化部署集群管理工具 Nebula Operator 用代码保护地球, AI 和 IoT 世界的工程师们准备好了吗...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK