82

GitHub - sohutv/sohu-tv-mq: RocketMQ企业级运维平台

 5 years ago
source link: https://github.com/sohutv/sohu-tv-mq
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

README.md

MQCloud - RocketMQ的企业级运维平台

它具备以下特性:

  • 跨集群:可以同时管理多个集群,对使用者透明。

  • 预警功能:针对生产或消费堆积,失败,异常等情况预警,处理。

  • 简单明了:用户视图-拓扑、流量、消费状况等指标直接展示;管理员视图-集群运维、监控、流程审批等。

  • 安全:用户隔离,操作审批,数据安全。

  • 更多特性正在开发中。

  • 下图简单描述了MQCloud大概的功能:

    mqcloud


特性概览

  • 用户topic列表-不同用户看到不同的topic,管理员可以管理所有topic

    用户topic列表

  • topic详情-分三块 基本信息,今日流程,拓扑

    topic详情

  • 生产详情

    生产详情

  • 消费详情

    消费详情

  • 消息

    消息

  • 消息消费情况

    msgconsume

  • 运维后台

    admin

  • 创建broker

    addBroker


目前运维的规模

  1. 服务器:20台+
  2. 集群:4个+
  3. topic:250个+
  4. 生产消费消息量/日:10亿条+
  5. 生产消费消息大小/日:1T+

模块介绍及编译

  1. 模块介绍

    1. mq-client-common-open与mq-client-open为客户端模块,它封装了rocketmq-client,客户端需要依赖它才能和MQCloud进行交互。
    2. mq-cloud-common与mq-cloud为MQCloud的web端,实现管理监控等一系列功能。
  2. 编译

    1. mq-client-common-open与mq-client-open最低依赖jdk1.7。
    2. mq-cloud依赖jdk1.8,其采用spring-boot实现。
    3. 编译:
      1. 在sohu-tv-mq/下,执行mvn clean install -pl "!mq-cloud"(linux系统请将"!mq-cloud"改成'!mq-cloud'执行),将编译并install mq-client-common-open,mq-client-open,mq-cloud-common模块
      2. 在sohu-tv-mq/mq-cloud/下,执行mvn clean package,将编译并打包mq-cloud.war

配置数据库及运行

  1. 数据库配置

    数据库使用mysql,不同的环境需要修改不同的配置文件,默认为local环境,默认配置参见application-local.yml

    url: jdbc:mysql://127.0.0.1:3306/mq-cloud?useUnicode=true&characterEncoding=UTF-8
    username: mqcloud
    password: mqcloud
    

    线上数据库配置在application-online.yml 中。

  2. 初始化数据

    初始化sql脚本,参见init.sql

  3. 运行

    1. 开发工具内运行:

      经过编译后,模块的依赖会解析成功,可以直接执行com.sohu.tv.mq.cloud.Application 中的main函数即可。

    2. 外部运行:

      使用下载或编译的war包,执行

      java -Dspring.profiles.active=online -DPROJECT_DIR=日志路径 -jar mq-cloud.war
      
  4. 访问

    直接访问mqcloud_ip:8080即可,默认管理员用户名:[email protected] 密码为:admin


集群创建

  1. 添加机器

    机器管理模块,使用+机器功能将要部署name server和broker的linux机器都添加进来。

  2. 修改通用配置

    1. 通用配置模块,将domain选项修改为mqcloud部署的服务器地址(或域名)+端口,一定要带端口(原因请参考admin的快速指南->集群管理->集群发现中的介绍),即使端口是80。

      如果部署在本机,请勿使用127.0.0.1或localhost,因为部署RocketMQ broker实例时,它需要访问domain来获取name server地址列表。

  3. 集群发现

    1. 集群发现模块,使用+集群记录 功能添加集群记录。
    2. 各项含义请参考提示,创建成功会在cluster表多一条记录。
    3. 使用+NameServer 功能来部署归属于该集群的name server实例(或者使用关联NameServer功能来关联已有的name server实例)。
  4. 创建broker实例

    1. 集群管理模块,使用+Master 功能创建broker实例。
  5. 已有集群如何使用MQCloud管理?

    1. 执行上面步骤中的1. 添加机器
    2. 执行上面步骤中的2. 修改通用配置
    3. 执行上面步骤中的3. 集群发现
    4. 导入topic:使用集群管理模块的topic初始化完成。
    5. 导入consumer:使用集群管理模块的consumer初始化完成。
    6. 用户使用mq-client-open客户端包(也可以使用原生RocketMQ客户端),并通过我是老用户完成绑定。
  6. 如果添加了新的集群,最好将MQCloud重启一下,这样可以将新的集群加入到监控采集任务中。


客户端的使用

  1. 【可选】客户端增加如下依赖:

    <dependency>
        <groupId>com.sohu.tv</groupId>
        <artifactId>mq-client-open</artifactId>
    </dependency>
    

    当然可以使用RocketMQ原生客户端,但是客户端统计,版本等将无法上报。

  2. 初始化后设置

    RocketMQConsumerRocketMQProducer在调用start()方法前需要进行如下配置:

    setMqCloudDomain("通用配置模块的domain");
    

    如果消息希望序列化,还需要如下配置(采用protostuff):

    setMessageSerializer(new DefaultMessageSerializer<Object>());
    
  3. 消息序列化相关

    如果发送消息时使用参数为Message对象的方法,那么消息不会经过序列化,方法类似如下:

    /**
     * 发送消息
     *
     * @param message 消息
     * @return 发送结果
     */
    public Result<SendResult> publish(Message message) {
        try {
            SendResult sendResult = producer.send(message);
            return new Result<SendResult>(true, sendResult);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return new Result<SendResult>(false, e);
        }
    }
    

    如果发送消息时使用参数为Object对象的方法,那么消息将会进行序列化,方法类似如下:

    /**
     * 发送消息
     * 
     * @param messageObject 消息数据
     * @param tags tags
     * @param keys key
     * @param delayLevel 延时级别
     * @return 发送结果
     */
    public Result<SendResult> publish(Object messageObject, String tags, String keys, MessageDelayLevel delayLevel) {
        Message message = null;
        try {
            message = buildMessage(messageObject, tags, keys, delayLevel);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return new Result<SendResult>(false, e);
        }
        return publish(message);
    }
    

    另外,如果使用序列化消息的发送方法,需要在MQCloud的common_config表中加入如下配置(用于MQCloud消息查询时反序列化使用):

    INSERT INTO `common_config` (`key`, `value`, `comment`) VALUES ( 'messageSerializerClass', 'com.sohu.tv.mq.serializable.DefaultMessageSerializer', '消息序列化工具,如果发送和消费使用了该类,MQCloud也需要配置该类');
    
  4. 其余的使用参照 用户端 -> 快速指南 -> 客户端接入模块。


其他说明

  1. 关于common_config

    配置项大多用于剥离开源和公司内部的配置、实现等。

  2. 关于rocketmq.zip

    版本为4.2.0,建议使用4.x版本。下载来源https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.2.0/rocketmq-all-4.2.0-bin-release.zip

    其中做的修改如下:

    1. rocketmq/conf下的四个日志配置文件:logback_*.xml

      将${user.home}修改为了${ROCKETMQ_HOME},原因:默认RocketMQ往用户目录下打日志,而用户目录一般在根目录,往往分区比较小,所以改成了rocketmq所在目录。
      
    2. rocketmq/bin下的runserver.sh,runbroker.sh

      gc日志目录由/dev/shm改为了${BASE_DIR}/logs
      
    3. os.sh

      echo 'deadline' > /sys/block/${DISK}/queue/scheduler
      echo "---------------------------------------------------------------"
      sysctl vm.extra_free_kbytes
      sysctl vm.min_free_kbytes
      sysctl vm.overcommit_memory
      sysctl vm.drop_caches
      sysctl vm.zone_reclaim_mode
      sysctl vm.max_map_count
      sysctl vm.dirty_background_ratio
      sysctl vm.dirty_ratio
      sysctl vm.dirty_writeback_centisecs
      sysctl vm.page-cluster
      sysctl vm.swappiness
      su - admin -c 'ulimit -n'
      cat /sys/block/$DISK/queue/scheduler
      修改为:
      if [ ! $DISK ]; then
          echo 'deadline' > /sys/block/${DISK}/queue/scheduler
      fi
      if [ ! $DISK ]; then
          cat /sys/block/$DISK/queue/scheduler
      fi
      echo "`date +%Y%m%d-%H%M%S`" >> /opt/mqcloud/.mq_cloud_inited
      
  3. 关于nmon.zip

    用于收集和监控服务器状况,来自于http://nmon.sourceforge.net

  4. 邮件预警实现

    默认的空实现类是:com.sohu.tv.mq.cloud.service.impl.DefaultAlertMessageSender。可以自己实现邮件发送方法。配置到通用配置里修改alertClass配置项。

  5. 登录实现

    默认的登录采用用户名密码:com.sohu.tv.mq.cloud.service.impl.DefaultLoginService。当然注册和登录都没有进行太多安全性考虑,主要是基于MQCloud要部署到内网使用。

    另外还支持sso登录,可以自行实现类似如下类:

    public class SSOLoginService extends AbstractLoginService {
    
        @Override
        protected void auth(HttpServletRequest request, HttpServletResponse response) {
            // 跳到sso去登录认证
        }
    
        @Override
        protected String getEmail(String ticketKey) {
            // sso回调并携带ticket,这里调用sso进行校验ticket,获取登录id(email)
            return null;
        }
    
        @Override
        public void init() {
            
        }
    }
    

    如果采用sso认证登录,auth和getEmail方法的作用参考如下:

    sso

  6. 版本管理

    因为使用者会依赖mq-client-*相关模块,所以如果涉及到mq-client-*中的代码修改,需要在父pom中修改version,然后执行mvn -N versions:update-child-modules ,这样所有的子模块的版本均会更新(com.sohu.tv.mq.util.Version中的版本号也会自动修改)。


联系方式

MQCloud QQ交流群:474960759


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK