24

Spring Boot集群管理工具KafkaAdminClient

 4 years ago
source link: http://www.cnblogs.com/dgwblog/p/12350164.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

能与原理介绍

在Kafka官网中这么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具体的KafkaAdminClient包含了一下几种功能(以Kafka1.0.0版本为准):

  1. 创建Topic:createTopics(Collection<NewTopic> newTopics)
  2. 删除Topic:deleteTopics(Collection<String> topics)
  3. 罗列所有Topic:listTopics()
  4. 查询Topic:describeTopics(Collection<String> topicNames)
  5. 查询集群信息:describeCluster()
  6. 查询ACL信息:describeAcls(AclBindingFilter filter)
  7. 创建ACL信息:createAcls(Collection<AclBinding> acls)
  8. 删除ACL信息:deleteAcls(Collection<AclBindingFilter> filters)
  9. 查询配置信息:describeConfigs(Collection<ConfigResource> resources)
  10. 修改配置信息:alterConfigs(Map<ConfigResource, Config> configs)
  11. 修改副本的日志目录:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)
  12. 查询节点的日志目录信息:describeLogDirs(Collection<Integer> brokers)
  13. 查询副本的日志目录信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas)
  14. 增加分区:createPartitions(Map<String, NewPartitions> newPartitions)
  15. 其内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议。主要实现步骤:

客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送CreateTopicRequest请求。

客户端发送请求至Kafka Broker。

Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。

客户端接收相应的回执并进行解析处理。

和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和回执类的两个基本父类。

@Component
public class KafkaConfig{

     // 配置Kafka
    public Properties getProps(){
        Properties props =  new Properties();
        props.put("bootstrap.servers", "localhost:9092");
/*        props.put("retries", 2); // 重试次数
        props.put("batch.size", 16384); // 批量发送大小
        props.put("buffer.memory", 33554432); // 缓存大小,根据本机内存大小配置
        props.put("linger.ms", 1000); // 发送频率,满足任务一个条件发送*/
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }

}
@RestController
public class KafkaTopicManager {

    @Autowired
    private KafkaConfig kafkaConfig;

    @GetMapping("createTopic")
    public void createTopic(){
        AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());

        NewTopic newTopic = new NewTopic("test1",4, (short) 1);
        Collection<NewTopic> newTopicList = new ArrayList<>();
        newTopicList.add(newTopic);
        adminClient.createTopics(newTopicList);

        adminClient.close();
    }
    @GetMapping("deleteTopic")
    public void deleteTopic(){
        AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
        adminClient.deleteTopics(Arrays.asList("test1"));
        adminClient.close();
    }
    @GetMapping("listAllTopic")
    public void listAllTopic(){
        AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
        ListTopicsResult result = adminClient.listTopics();
        KafkaFuture<Set<String>> names = result.names();
        try {
            names.get().forEach((k)->{
                System.out.println(k);
            });
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        adminClient.close();
    }
    @GetMapping("getTopic")
    public void getTopic(){
        AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());

        DescribeTopicsResult describeTopics = adminClient.describeTopics(Arrays.asList("syn-test"));

        Collection<KafkaFuture<TopicDescription>> values = describeTopics.values().values();

        if(values.isEmpty()){
            System.out.println("找不到描述信息");
        }else{
            for (KafkaFuture<TopicDescription> value : values) {
                System.out.println(value);
            }
        }
        adminClient.close();
    }
}
IBZfymF.png!web

Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK