54

京东金融数据分析案例(一)

 5 years ago
source link: https://blog.csdn.net/mingyunxiaohai/article/details/82428664?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/mingyunxiaohai/article/details/82428664

数据说明:

给定的数据为业务情景数据,所有数据均已进⾏了采样和脱敏处理,字段取值与分布均与真实业务数据不同。提供了时间为 2016-08-03 到 2016-11-30 期间,用户在移动端的行为数据、购物记录和历史借贷信息,及 11 月的总借款金额。 数据集下载地址为:链接: https://pan.baidu.com/s/1hk8hARHxkQcMS8SgABmcHQ 密码: fc7z

文件包括user.csv,order.cav,click.csv,loan.csv,loan_sum.csv。

前言:一般的大数据项目一般都分为两种,一种是批处理一种是流式处理,此次练习批处理使用hive和presto处理,流式处理使用SparkStreaming+kafka来处理

任务 1

一般情况下我们的user的数据都是存在自己的关系型数据库中,所以这里将 t_user 用户信息到 MySQL 中,我们在从MySQL中将其导入到hdfs上,其他三个文件及,t_click,t_loan 和 t_loan_sum 直接导入到 HDFS 中。

mysql自带csv导入功能所以

先创建数据库和user表

create database jd
use jd
create table t_user (uid INT NOT NULL,
                  age INT,
                  sex INT,
                  active_date varchar(40),
                  initial varchar(40));

导入数据

LOAD DATA LOCAL INFILE '/home/chs/Documents/t_user.csv' 
 INTO TABLE t_user 
  CHARACTER SET  utf8 
 FIELDS TERMINATED BY ',' 
 ENCLOSED BY '"'
 LINES TERMINATED BY '\n'
 IGNORE 1 ROWS;

任务 2

利用 Sqoop 将 MySQL 中的 t_user 表导入到 HDFS 中

显示有哪些数据库

sqoop list-databases --connect jdbc:mysql://master:3306 --username root --password ''
//显示下面的几个数据库
information_schema
jd
mysql
performance_schema

显示有哪些表

sqoop list-tables --connect jdbc:mysql://master:3306/jd --username root --password ''
//这里只有一张表
t_user

使用sqoop把MySQL中表t_user数据导入到hdfs的/data/sq目录下

sqoop import --connect jdbc:mysql://master:3306/jd --username root --password '' --table t_user --target-dir /data/sq

出错了

18/08/21 13:44:26 ERROR tool.ImportTool: Import failed: No primary key could be found for table t_user. Please specify one with --split-by or perform a sequential import with '-m 1'.

说是这个表中没有主键。我们可以建表的时候给它设置上主键,也可以使用下面–split-by来指定使用哪个字段分割

sqoop import --connect jdbc:mysql://master:3306/jd --username root --password '' --table t_user --target-dir /data/sq --split-by 'uid'

又出错了

Host 'slave' is not allowed to connect to this MySQL server
 Host 'slave2' is not allowed to connect to this MySQL server

错误原因 ,因为我这里的hadoop集群使用了3台虚拟机,slave和slave2没有使用root用户访问MySQL的权限

进入mysql的控制台

use mysql

select host,user,password from user;

+-----------+------+----------+
| host      | user | password |
+-----------+------+----------+
| localhost | root |          |
| master    | root |          |
| 127.0.0.1 | root |          |
| ::1       | root |          |
| localhost |      |          |
| master    |      |          |
+-----------+------+----------+

可以看到现在只有master有权限,给slave和slave2也设置权限

grant all PRIVILEGES on jd.* to root@'slave'  identified by '';
 grant all PRIVILEGES on jd.* to root@'slave2'  identified by '';

这才执行OK

查看导入后的hdfs上的目录

hdfs dfs -ls /data/sq

-rw-r--r--   1 chs supergroup          0 2018-08-21 14:06 /data/sq/_SUCCESS
-rw-r--r--   1 chs supergroup     807822 2018-08-21 14:06 /data/sq/part-m-00000
-rw-r--r--   1 chs supergroup     818928 2018-08-21 14:06 /data/sq/part-m-00001
-rw-r--r--   1 chs supergroup     818928 2018-08-21 14:06 /data/sq/part-m-00002
-rw-r--r--   1 chs supergroup     818964 2018-08-21 14:06 /data/sq/part-m-00003

查看每一部分的数据

hdfs dfs -cat /data/sq

17107,30,1,2016-02-13,5.9746772897
11272,25,1,2016-02-17,5.9746772897
14712,25,1,2016-01-10,6.1534138563
16152,30,1,2016-02-10,5.9746772897
10005,30,1,2015-12-17,5.7227683627
......

OK导入完成 剩下的几个CSV文件直接功过hadoop的put命令上传到hdfs上对应的目录即可。

任务 3

利用 Presto 分析产生以下结果,并通过 web 方式可视化:

• 各年龄段消费者每日购买商品总价值

• 男女消费者每日借贷金额

我们在使用presto做数据分析的时候,一般是跟hive联合起来使用,先从hive中创建相应的数据表,然后使用presto分析hive中的表。

启动hive

//启动hive的metastore
nohup hive --service metastore >> /home/chs/apache-hive-2.1.1-bin/metastore.log 2>&1  &
//启动hive server
nohup hive --service hiveserver2 >> /home/chs/apache-hive-2.1.1-bin/hiveserver.log 2>&1 &
//启动客户端 beeline 并连接
beeline
beeline> !connect jdbc:hive2://master:10000/default  hadoop hadoop

创建用户表

create table if not exists t_user (
uid STRING,
age INT,
sex INT,
active_date STRING,
limit STRING
)ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

导入hdfs上的数据

load data inpath '/data/sq' overwrite into table t_user;

创建用户订单表

create table if not exists t_order (
uid STRING,
buy_time STRING,
price DOUBLE,
qty INT,
cate_id INT,
discount DOUBLE
)ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

导入hdfs上的数据

load data inpath '/data/t_order.csv' overwrite into table t_order;

创建用户点击表

create table if not exists t_click (
uid STRING,
click_time STRING,
pid INT,
param INT
)ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

导入hdfs上的数据

load data inpath '/data/t_click.csv' overwrite into table t_click;

创建借款信息表t_loan

create table if not exists t_loan (
uid STRING,
loan_time STRING,
loan_amount STRING,
plannum INT
)ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

导入hdfs上的数据

load data inpath '/data/t_loan.csv' overwrite into table t_loan;

创建月借款总额表t_loan_sum

create table if not exists t_loan_sum (
uid STRING,
month STRING,
loan_sum STRING
)ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

导入hdfs上的数据

load data inpath '/data/t_loan_sum.csv' overwrite into table t_loan_sum;

启动Presto

在安装目录下运行 bin/launcher start

运行客户端 bin/presto –server master:8080 –catalog hive –schema default

连接hive !connect jdbc:hive2://master:10000/default hadoop hadoop

开始查询分析

第一题

select t_user.age,t_order.buy_time,sum(t_order.price*t_order.qty-t_order.discount) as sum  from t_user join t_order on t_user.uid=t_order.uid group  by t_user.age,t_order.buy_time;

部分结果

+-------------+-------------------+----------------------+--+
| t_user.age  | j_order.buy_time  |         sum          |
+-------------+-------------------+----------------------+--+
| 20          | 2016-11-17        | 1.7227062320000002   |
| 25          | 2016-10-15        | 5.386111459          |
| 25          | 2016-10-19        | 0.45088435299999996  |
| 25          | 2016-10-20        | 2.8137519620000004   |
| 25          | 2016-10-21        | 3.548087797          |
| 25          | 2016-10-22        | 2.788946585          |
| 25          | 2016-10-26        | 2.469814958          |
| 25          | 2016-10-27        | 0.4795708140000001   |
| 25          | 2016-10-30        | 2.8022007390000003   |
| 25          | 2016-10-31        | 6.995954644          |
......

第二题

select t_user.sex,SUBSTRING(t_loan.loan_time,0,10) as time,sum(t_loan.loan_amount) as sum from t_user join t_loan on t_user.uid=t_loan.uid group by t_user.sex ,SUBSTRING(t_loan.loan_time,0,10);

部分结果

+-------------+-------------+---------------------+--+
| t_user.sex  |    time     |         sum         |
+-------------+-------------+---------------------+--+
| 1           | 2016-08-03  | 7919.6380018219     |
| 1           | 2016-08-04  | 6786.673292777713   |
| 1           | 2016-08-05  | 7238.370847563002   |
| 1           | 2016-08-06  | 7074.863470141198   |
| 1           | 2016-08-07  | 6235.208871191806   |
| 1           | 2016-08-08  | 5866.736957390908   |
| 1           | 2016-08-09  | 7683.339201321814   |
| 1           | 2016-08-10  | 7154.676993165003   |
| 1           | 2016-08-11  | 7836.102713179016   |
| 1           | 2016-08-12  | 8380.352202798527   |
| 1           | 2016-08-13  | 7324.325793652918   |
| 1           | 2016-08-14  | 5402.735435714206   |
| 1           | 2016-08-15  | 5354.373083991806   |
| 1           | 2016-08-16  | 6928.694087775619   |
| 1           | 2016-08-17  | 6639.536366437292   |
| 2           | 2016-08-04  | 991.838448799499    |
| 2           | 2016-08-05  | 1038.7726705849989  |
| 2           | 2016-08-06  | 1331.7359480245996  |
......

任务 4

利用 Spark RDD 或 Spark DataFrame 分析产生以下结果:

• 借款金额超过 2000(因为数据做过处理,这里就分析大于5的) 且购买商品总价值超过借款总金额的用户 ID

• 从不买打折产品且不借款的用户 ID

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;

/**
 * Created by chs on 8/23/18.
 */
public class JDAnalysis {

    public static void main(String[] args){
      String path = "hdfs://master:9000/warehouse";

        SparkConf conf = new SparkConf().setAppName("JDAnalysis");

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
        //读取hdfs上的数据
        Dataset<Row> orderDs = spark.read().csv(path+"/t_order");
        Dataset<Row> loanDs = spark.read().csv(path+"/t_loan");
        //创建order的Schema
        StructType orderSchema = new StructType()
                .add("uid", "string", false)
                .add("buy_time", "string", false)
                .add("price", "string", true)
                .add("qty", "string", false)
                .add("cate_id", "string", false)
                .add("discount", "string", false);
        //创建loan的Schema
        StructType loanSchema = new StructType()
                .add("uid", "string", false)
                .add("loan_time", "string", false)
                .add("loan_amount", "string", false)
                .add("plannum", "string", true);
        //创建有Schema的dataframe
        Dataset<Row> orderDf = spark.createDataFrame(orderDs.toJavaRDD(), orderSchema);
        Dataset<Row> loanDf = spark.createDataFrame(loanDs.toJavaRDD(), loanSchema);
        //第一题
        loanDf.filter("loan_amount>5")
                .join(orderDf,"uid")
                .select("uid","loan_amount","price","qty","discount")
                .where("(price*qty-discount)>loan_amount")
                .show();

          //也可以创建一个临时表然后通过sql
//        dataFrame.createOrReplaceTempView("loan");
//        spark.sql("select uid,loan_amount from loan where loan_amount>5").show();
        //第二题
        orderDf.filter("discount=0")
                .select("uid")
                .join(loanDf,orderDf.col("uid").equalTo(loanDf.col("uid")),"left")
                .where(loanDf.col("uid").isNull())
                .show();

    }


}

第五题制造实时数据然后通过SparkStreaming实时分析下一篇在写。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK