1

keycloak~扩展事件机制,集成kafka中间件

 2 years ago
source link: https://www.cnblogs.com/lori/p/15235724.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

对于KC的后台或者接口的操作,当用户,组,角色这些实体状态发生改变时,KC会对外发布事件,而这些事件处理程序我们是可以在后台配置的,默认继承了jboss-logging日志事件,而我们可以在事件管理中去配置自己的事件处理程序。

事件处理程序SPI

实现EventListenerProviderFactoryEventListenerProvider这两个SPI即可,我们在这里可以订阅由KC发出现的事件,针对我们感兴趣的事件,去添加处理代码。

/**
     * 后端管理平台事件
     *
     * @param adminEvent
     * @param b
     */
    @Override
    public void onEvent(AdminEvent adminEvent, boolean b) {
      logger.info(adminEvent.getResourceTypeAsString()))
    }

总结常用的消息类型

  • 操作类型:operationType
    • 后端类型
      • CREATE 新建
      • UPDATE 编辑
      • DELETE 删除
    • 后台资源类型:resourceType
      • REALM_ROLE 域的角色
      • REALM_ROLE_MAPPING 域的角色绑定
      • USER 用户
      • GROUP 组
      • GROUP_MEMBERSHIP 组绑定
      • CLIENT_ROLE 客户端角色
      • CLIENT_ROLE_MAPPING 客户端角色绑定
    • 前端类型
      • LOGIN 登录
      • LOGIN_ERROR 登录失败
      • REGISTER 注册
      • REGISTER_ERROR 注册失败
      • LOGOUT 登出
      • LOGOUT_ERROR 登出失败

扩展一个KAFKA

  • 需要扩展相关组件
<dependencies>
     <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>
  • 需要配置你的kafka的生产者
    kc这边是一个kafka的生产者,当它收到由KC发出来的事件之后,把消息发到kafka,其它服务方可以消费这个kafka消息
/**
 * kafka生产者.
 */
public class Producer {

    private final static String BOOTSTRAP_SERVER = ConfigFactory.getInstance().getStrPropertyValue("kafka.host");
    private final static Logger logger = Logger.getLogger(Producer.class);
    private static KafkaProducer<String, String> producer;

    private static KafkaProducer<String, String> getProducer() {
        if (producer == null) {
            //reset thread context
            resetThreadContext();
            // create the producer
            producer = new KafkaProducer<String, String>(getProperties());
        }
        return producer;
    }

    public static void publishEvent(String topic, String value) {

        // create a producer record
        ProducerRecord<String, String> eventRecord =
                new ProducerRecord<String, String>(topic, value);

        // send data - asynchronous
        getProducer().send(eventRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e != null) {
                    e.printStackTrace();
                } else {
                    logger.info(String.format("The offset of the record we just sent is:%s", recordMetadata.offset()));
                }
            }
        });

    }

    private static void resetThreadContext() {
        Thread.currentThread().setContextClassLoader(null);
    }

    public static Properties getProperties() {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        return properties;
    }


}

最后,在你的kc事件处理SPI里,调用咱们的KAFKA生产者去生产消息就行了


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK