28

Flink 维表关联系列之自定义异步查询

 4 years ago
source link: https://www.tuicool.com/articles/bAviyaU
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

点击上方蓝

字关注~

维表关联系列目录:

一、维表服务与Flink异步IO

二、Mysql维表关联:全量加载

三、Hbase维表关联:LRU策略

四、Redis维表关联:实时查询

五、kafka维表关联:广播方式

六、自定义异步查询

在异步IO查询外部存储时,对于提供异步查询的客户端来说可以直接使用,但是对于没有提供异步查询的客户端应该怎么做呢?我们可以将查询请求丢到一个线程池中,将这个线程池看做是一个异步的客户端来帮助我们完成查询请求。

通过线程池方式来帮助我们完成异步请求关键在于线程池的core大小如何设置,如果设置过大,会到导致创建很多个线程,势必会造成CPU的压力比较大,由于大多数情况下集群是没有做CPU隔离策略的,就会影响到其他任务;如果设置过小,在处理的速度上根不上就会导致任务阻塞。可以做一个粗略的估算:假如任务中单个Task需要做维表关联查询的数据每秒会产生1000条,也就是1000的TPS,我们希望能够在1s以内处理完这1000条数据,如果外部单次查询耗时是10ms, 那我们就需要10个并发同时执行,也就是我们需要的coreSize 是10。

以查询mysql为例:

 
class ExecSideFunction extends RichAsyncFunction[String, String] {
 
var executors: Executor = _
var sqlTemplate: String = _
 
override def open(parameters: Configuration): Unit = {
executors = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, new ArrayBlockingQueue[Runnable](1000))
sqlTemplate = "select value from tbl1 where id=?"
}
 
override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {
 
executors.execute(new Runnable {
override def run(): Unit = {
val con = ConnectionFactory.getConnection("sourceId").asInstanceOf[Connection]
val sql = sqlTemplate.replace("?", parseKey(input))
MysqlUtil.executeSelect(con, sql, rs => {
val res = new util.ArrayList[String]()
while (rs.next()) {
val v = rs.getString("value")
res.add(fillData(input, v))
}
resultFuture.complete(res)
})
con.close()
}
})
}
 
private def parseKey(input: String): String = {
""
}
 
private def fillData(input: String, v: String): String = {
""
}
}
yYBJjeV.png!web

end

关注回复 Flink 获取更多信息~

aQnAZf3.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK