26

使用Spark Streaming SQL进行PV/UV统计

 4 years ago
source link: https://www.tuicool.com/articles/VnayIny
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1.背景介绍

PV/UV统计是流式分析一个常见的场景。通过PV可以对访问的网站做流量或热点分析,例如广告主可以通过PV值预估投放广告网页所带来的流量以及广告收入。另外一些场景需要对访问的用户作分析,比如分析用户的网页点击行为,此时就需要对UV做统计。

使用Spark Streaming SQL,并结合Redis可以很方便进行PV/UV的统计。本文将介绍通过Streaming SQL消费Loghub中存储的用户访问信息,对过去1分钟内的数据进行PV/UV统计,将结果存入Redis中。

2.准备工作

  • 创建E-MapReduce 3.23.0以上版本的Hadoop集群。
  • 下载并编译E-MapReduce-SDK包
git clone [email protected]:aliyun/aliyun-emapreduce-sdk.git 
cd aliyun-emapreduce-sdk 
git checkout -b master-2.x origin/master-2.x 
mvn clean package -DskipTests 

编译完后, assembly/target目录下会生成emr-datasources_shaded_${version}.jar,其中${version}为sdk的版本。

数据源

本文采用Loghub作为数据源,有关日志采集、日志解析请参考日志服务。

3.统计PV/UV

一般场景下需要将统计出的PV/UV以及相应的统计时间存入Redis。其他一些业务场景中,也会只保存最新结果,用新的结果不断覆盖更新旧的数据。以下首先介绍第一种情况的操作流程。

3.1启动客户端

命令行启动streaming-sql客户端

streaming-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr-datasources_shaded_2.11-${version}.jar 

也可以创建SQL语句文件,通过streaming-sql -f的方式运行。

3.1定义数据表

数据源表定义如下

CREATE TABLE loghub_source(user_ip STRING, __time__ TIMESTAMP)  
USING loghub  
OPTIONS( 
sls.project=${sls.project}, 
sls.store=${sls.store}, 
access.key.id=${access.key.id}, 
access.key.secret=${access.key.secret}, 
endpoint=${endpoint}); 

其中,数据源表包含user_ip和__time__两个字段,分别代表用户的IP地址和loghub上的时间列。OPTIONS中配置项的值根据实际配置。

结果表定义如下

CREATE TABLE redis_sink  
USING redis  
OPTIONS( 
table='statistic_info', 
host=${redis_host}, 
key.column='interval'); 

其中,statistic_info为Redis存储结果的表名,interval对应统计结果中的interval字段;配置项${redis_host}的值根据实际配置。

3.2创建流作业

CREATE SCAN loghub_scan 
ON loghub_source 
USING STREAM 
OPTIONS( 
watermark.column='__time__', 
watermark.delayThreshold='10 second'); 
 
CREATE STREAM job 
OPTIONS( 
checkpointLocation=${checkpoint_location}) 
INSERT INTO redis_sink 
SELECT COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval 
FROM loghub_scan 
GROUP BY TUMBLING(__time__, interval 1 minute), window; 

4.3查看统计结果

最终的统计结果如下图所示

ZJv2AbY.png!web

可以看到,每隔一分钟都会生成一条数据,key的形式为表名:interval,value为pv和uv的值。

3.4实现覆盖更新

将结果表的配置项key.column修改为一个固定的值,例如定义如下

CREATE TABLE redis_sink 
USING redis  
OPTIONS( 
table='statistic_info', 
host=${redis_host}, 
key.column='statistic_type'); 

创建流作业的SQL改为

CREATE STREAM job 
OPTIONS( 
checkpointLocation='/tmp/spark-test/checkpoint') 
INSERT INTO redis_sink 
SELECT "PV_UV" as statistic_type,COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval 
FROM loghub_scan 
GROUP BY TUMBLING(__time__, interval 1 minute), window; 

最终的统计结果如下图所示

mQFfQn7.png!web

可以看到,Redis中值保留了一个值,这个值每分钟都被更新,value包含pv、uv和interval的值。

4.总结

本文简要介绍了使用Streaming SQL结合Redis实现流式处理中统计PV/UV的需求。后续文章,我将介绍Spark Streaming SQL的更多内容。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK