45

干货分享:利用Java多线程技术导入数据到Elasticsearch

 4 years ago
source link: http://news.51cto.com/art/201907/599607.htm
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

干货分享:利用Java多线程技术导入数据到Elasticsearch

作者花了3天的时间,利用java线程池框架Executors中的FixedThreadPool线程池重写了MTE导入工具,单台服务器导入效率提高十几倍(合理调整线程数据,效率更高)。

作者:Wooola来源:头条科技|2019-07-15 16:10
干货分享:利用java多线程技术导入数据到Elasticsearch

近期接到一个任务,需要改造现有从mysql往Elasticsearch导入数据MTE(mysqlToEs)小工具,由于之前采用单线程导入,千亿数据需要两周左右的时间才能导入完成,导入效率非常低。所以楼主花了3天的时间,利用java线程池框架Executors中的FixedThreadPool线程池重写了MTE导入工具,单台服务器导入效率提高十几倍(合理调整线程数据,效率更高)。

关键技术栈

  • Elasticsearch
  • ExecutorService\Thread

工具说明

maven依赖

  1. <dependency> 
  2.  <groupId>mysql</groupId> 
  3.  <artifactId>mysql-connector-java</artifactId> 
  4.  <version>${mysql.version}</version> 
  5. </dependency> 
  6. <dependency> 
  7.  <groupId>org.elasticsearch</groupId> 
  8.  <artifactId>elasticsearch</artifactId> 
  9.  <version>${elasticsearch.version}</version> 
  10. </dependency> 
  11. <dependency> 
  12.  <groupId>org.elasticsearch.client</groupId> 
  13.  <artifactId>transport</artifactId> 
  14.  <version>${elasticsearch.version}</version> 
  15. </dependency> 
  16. <dependency> 
  17.  <groupId>org.projectlombok</groupId> 
  18.  <artifactId>lombok</artifactId> 
  19.  <version>${lombok.version}</version> 
  20. </dependency> 
  21. <dependency> 
  22.  <groupId>com.alibaba</groupId> 
  23.  <artifactId>fastjson</artifactId> 
  24.  <version>${fastjson.version}</version> 
  25. </dependency> 

java线程池设置

默认线程池大小为21个,可调整。其中POR为处理流程已办数据线程池,ROR为处理流程已阅数据线程池。

  1. private static int THREADS = 21; 
  2. public static ExecutorService POR = Executors.newFixedThreadPool(THREADS); 
  3. public static ExecutorService ROR = Executors.newFixedThreadPool(THREADS); 

定义已办生产者线程/已阅生产者线程:ZlPendProducer/ZlReadProducer

  1. public class ZlPendProducer implements Runnable { 
  2.  @Override 
  3.  public void run() { 
  4.  System.out.println(threadName + "::启动..."); 
  5.  for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++) 
  6.  int size = 1000; 
  7.  for (int i = 0; i < count; i += size) { 
  8.  if (i + size > count) { 
  9.  //作用为size***没有100条数据则剩余几条newList中就装几条 
  10.  size = count - i; 
  11.  String sql = "select * from " + tableName + " limit " + i + ", " + size; 
  12.  System.out.println(tableName + "::sql::" + sql); 
  13.  rs = statement.executeQuery(sql); 
  14.  List<HistPendingEntity> lst = new ArrayList<>(); 
  15.  while (rs.next()) { 
  16.  HistPendingEntity p = PendUtils.getHistPendingEntity(rs); 
  17.  lst.add(p); 
  18.  MteExecutor.POR.submit(new ZlPendConsumer(lst)); 
  19.  Thread.sleep(2000); 
  20.  } catch (Exception e) { 
  21.  e.printStackTrace(); 
  22. public class ZlReadProducer implements Runnable { 
  23.  ...已阅生产者处理逻辑同已办生产者 

定义已办消费者线程/已阅生产者线程:ZlPendConsumer/ZlReadConsumer

  1. public class ZlPendConsumer implements Runnable { 
  2.  private String threadName; 
  3.  private List<HistPendingEntity> lst; 
  4.  public ZlPendConsumer(List<HistPendingEntity> lst) { 
  5.  this.lst = lst; 
  6.  @Override 
  7.  public void run() { 
  8.  lst.forEach(v -> { 
  9.  String json = new Gson().toJson(v); 
  10.  EsClient.addDataInJSON(json, Const.ES.HistPendDB_Index, Const.ES.HistPendDB_type, v.getPendingId(), null); 
  11.  Const.COUNTER.LD_P.incrementAndGet(); 
  12.  } catch (Exception e) { 
  13.  e.printStackTrace(); 
  14.  System.out.println("err::PendingId::" + v.getPendingId()); 
  15. public class ZlReadConsumer implements Runnable { 
  16.  //已阅消费者处理逻辑同已办消费者 

定义导入Elasticsearch数据监控线程:Monitor

监控线程-Monitor为了计算每分钟导入Elasticsearch的数据总条数,利用监控线程,可以调整线程池的线程数的大小,以便利用多线程更快速的导入数据。

  1. public void monitorToES() { 
  2.  new Thread(() -> { 
  3.  while (true) { 
  4.  StringBuilder sb = new StringBuilder(); 
  5.  sb.append("已办表数::").append(Const.TBL.TBL_PEND_COUNT) 
  6.  .append("::已办总数::").append(Const.COUNTER.LD_P_TOTAL) 
  7.  .append("::已办入库总数::").append(Const.COUNTER.LD_P); 
  8.  sb.append("~~~~已阅表数::").append(Const.TBL.TBL_READ_COUNT); 
  9.  sb.append("::已阅总数::").append(Const.COUNTER.LD_R_TOTAL) 
  10.  .append("::已阅入库总数::").append(Const.COUNTER.LD_R); 
  11.  if (ldPrevPendCount == 0 && ldPrevReadCount == 0) { 
  12.  ldPrevPendCount = Const.COUNTER.LD_P.get(); 
  13.  ldPrevReadCount = Const.COUNTER.LD_R.get(); 
  14.  start = System.currentTimeMillis(); 
  15.  } else { 
  16.  long end = System.currentTimeMillis(); 
  17.  if ((end - start) / 1000 >= 60) { 
  18.  start = end; 
  19.  sb.append("\n#########################################\n"); 
  20.  sb.append("已办每分钟TPS::" + (Const.COUNTER.LD_P.get() - ldPrevPendCount) + "条"); 
  21.  sb.append("::已阅每分钟TPS::" + (Const.COUNTER.LD_R.get() - ldPrevReadCount) + "条"); 
  22.  ldPrevPendCount = Const.COUNTER.LD_P.get(); 
  23.  ldPrevReadCount = Const.COUNTER.LD_R.get(); 
  24.  System.out.println(sb.toString()); 
  25.  Thread.sleep(3000); 
  26.  } catch (InterruptedException e) { 
  27.  e.printStackTrace(); 
  28.  }).start(); 

初始化Elasticsearch:EsClient

  1. String cName = meta.get("cName");//es集群名字 
  2. String esNodes = meta.get("esNodes");//es集群ip节点 
  3. Settings esSetting = Settings.builder() 
  4.  .put("cluster.name", cName) 
  5.  .put("client.transport.sniff", true)//增加嗅探机制,找到ES集群 
  6.  .put("thread_pool.search.size", 5)//增加线程池个数,暂时设为5 
  7.  .build(); 
  8. String[] nodes = esNodes.split(","); 
  9. client = new PreBuiltTransportClient(esSetting); 
  10. for (String node : nodes) { 
  11.  if (node.length() > 0) { 
  12.  String[] hostPort = node.split(":"); 
  13.  client.addTransportAddress(new TransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1]))); 

初始化数据库连接

  1. conn = DriverManager.getConnection(url, user, password); 

启动参数

  1. nohup java -jar mte.jar ES-Cluster2019 node1:9300,node2:9300,node3:9300 root 123456! jdbc:mysql://ip:3306/mte 130 130 >> ./mte.log 2>&1 & 

参数说明

ES-Cluster2019 为Elasticsearch集群名字

node1:9300,node2:9300,node3:9300为es的节点IP

130 130为已办已阅分表的数据

程序入口:MteMain

干货分享:利用java多线程技术导入数据到Elasticsearch
  1. // 监控线程 
  2. Monitor monitorService = new Monitor(); 
  3. monitorService.monitorToES(); 
  4. // 已办生产者线程 
  5. Thread pendProducerThread = new Thread(new ZlPendProducer(conn, "ZlPendProducer")); 
  6. pendProducerThread.start(); 
  7. // 已阅生产者线程 
  8. Thread readProducerThread = new Thread(new ZlReadProducer(conn, "ZlReadProducer")); 
  9. readProducerThread.start(); 

【编辑推荐】

【责任编辑:张燕妮 TEL:(010)68476606】
点赞 0

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK