

【spark】sql.functions详解
source link: https://www.guofei.site/2019/01/26/sparkfunctions.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

【spark】sql.functions详解
2019年01月26日Author: Guofei
文章归类: 1-1-算法平台 ,文章编号: 155
版权声明:本文作者是郭飞。转载随意,但需要标明原文链接,并通知本人
原文链接:https://www.guofei.site/2019/01/26/sparkfunctions.html
这篇文章是我总结的spark官方文档中关于pyspark.sql.functions中的有关内容,源文档是按照函数的字母表顺序排列的,这里我按照功能排列。并且删除了其中的一些
用法
import pyspark.sql.functions as F
df.select(F.expr('*'),F.expr('length(name)').alias('len'),F.rand(seed=42))
df.withColumn('rand_num',F.sin(df.col1)*3)
df.selectExpr('*','power(cid2,5)')
# 针对 agg func:
df.groupBy('col1').agg(F.count('col2'))
df.groupBy('col1').agg(F.count(df.col2))
特殊常用
F.lit(3) # 增列都是一个值
F.col('col1') # 选取一列
F.when().else()
F.expr: 用python风格写sql
# 最简单的用法
df.withColumn('row_num',F.expr('row_number() over(partition by col1 order by col2)'))
# 分位数
from pyspark.sql import Window
df.withColumn('percentile_25',F.expr('percentile_approx(col2, 0.25)').over(Window.partitionBy('col1'))) # 每个partition的分位数,但不聚合(所以逻辑上有点怪)
# 分位数
df.groupBy('col1').agg(F.expr('''
concat(
round(percentile_approx(col2,0),3),'_',
round(percentile_approx(col2,0.25),3),'_',
round(percentile_approx(col2,0.5),3),'_',
round(percentile_approx(col2,0.75),3),'_',
round(percentile_approx(col2,1),3),'_'
)
''').alias('percentile_col2'))
数学
- 符号,取整等
abs(col) signum(col) # 符号函数,返回-1,0,1 ceil 向上取整 floor 向下取整 round(col, scale=0) # 小数形式的四舍五入,到scale位
- 角度类
sin(col),cos(col),tan(col) asin(col),acos(col),atan(col) sinh(col),cosh(col),tanh(col) radians(col) # 度数转弧度 degrees(col) # 弧度转角度
- 指数类
exp(col) pow(col1,col2) # 指数,col1和col2都可以换成数字 log(col1,col2) # 对数,col1和col2都可以换成数字 log(col1) # 自然对数 factorial(col) # 阶乘,sql也有这个命令 sqrt(col) cbrt(col) # 三次方根 # 其实还有更为python的混合用法: df1.withColumn('s1',F.sqrt(df1.col1**df1.col2+df1.col1))
- 统计类
max(col) # aggregate function min(col) # aggregate function avg(col) # aggregate function mean # alias for avg() count(col) # aggregate function countDistinct(col) # aggregate function corr(col1, col2) # 皮尔逊相关系数 kurtosis(col) # 峰度 skewness(col) # 偏度 stddev(col) # Aggregate function: returns the unbiased sample standard deviation of the expression in a group stddev_pop(col) # Aggregate function: returns population standard deviation of the expression in a group. stddev_samp(col) # Aggregate function: returns the unbiased sample standard deviation of the expression in a group. var_pop(col) # Aggregate function: returns the population variance of the values in a group. var_samp(col) # Aggregate function: returns the unbiased variance of the values in a group. variance(col) # Aggregate function: returns the population variance of the values in a group.
- 随机函数
rand(seed=None) # 随机数 df.withColumn('rand', F.rand(seed=42) * 3) # sql 中也有这个函数,但不能传入 seed 参数 randn(seed=None) # 正态分布
字符串
- 大小写
initcap(col) 每个单词的首字母大写(一个元素的字符串中可以有多个单词)(sql也有这个命令) lower(col) 转为小写 upper(col) # 转为大写
- 填充
lpad(col, len, pad) # 用pad把col填充到len长度,pad放左边 rpad(col, len, pad) # 用pad把col填充到len长度,pad放右边 rtrim(col) # 删掉右边的空格 ltrim(col) # 删掉左边的空格 trim(col) # 两边都删
- 反转、截取
repeat(col, n) # 字符串重复n次 reverse(col) # 把 string 或 array 反转 substring(str, pos, len) # 截取从pos开始,len长度
- 匹配
instr(str, substr) # str中,substr第一次出现的位置,可以用sql locate(substr, str, pos=1) # str中,pos后,substr第一次出现的位置号,位置号从1开始,如果没找到就返回0
- 正则
split(str, pattern) # 正则表达式分割 regexp_replace(str, pattern, replacement)
时间
- 时间生成
current_date() # 当前日期 current_timestamp() # 当前时间 date_format(date, format) # 把时间转化成指定格式 date_trunc(format, timestamp) # 把时间四舍五入到某一个精度 # format – ‘year’, ‘yyyy’, ‘yy’, ‘month’, ‘mon’, ‘mm’, ‘day’, ‘dd’, ‘hour’, ‘minute’, ‘second’, ‘week’, ‘quarter’
- 时间加减
add_months(start, months) # 增加一个月,mysql可以用 `select DATE_ADD('2018-02-05',INTERVAL 1 month) as dt` 但spark中不行 # start 可以是字符串格式 date_sub(start, days) datediff(end_date, start_date) months_between(end_date, start_date, roundOff=True) # 1. 返回月份差 # 2. date1大于date2,返回正值 # 3. 默认返回整数,但roundOff=False 时返回8位小数 next_day(date, dayOfWeek) # 返回下一个“周n”,dayOfWeek=“Mon”, “Tue”, “Wed”, “Thu”, “Fri”, “Sat”, “Sun”
- 取位(都返回int格式)
year(col) # 取年 quarter(col) # 取季度 month(col) # 取月 hour(col) # 取小时 minute(col) # 取分钟 second(col) # 秒
- 取dayof
dayofmonth(col) dayofweek(col) dayofyear(col) weekofyear(col) # 周一到周日算是一周() last_day(date) # 所在月的最后一天
array
- 生成
array(*cols) # 把多列粘合成一个类似 array<double> 的格式 sequence(start, stop, step=None) # 等差数列组成的array,入参可以是列名
- 集合运算
array_contains(array_col, value) # 包含 # 如果 array_col 为null,返回null # 如果 array_col 包含value,返回value # 如果 array_col 不包含value,返回false array_distinct(array_col) # 移除 array_col 中的重复 array_except(array_col1, array_col2) # 生成一个array,其元素在col1中,但不在col2中 array_intersect(col1, col2) # 差集(剔除重复) array_union(col1, col2) #交集(剔除重复) arrays_overlap(a1, a2) # 是否存在交集: # 如果有共同的元素,返回 true # 如果没有共同元素,如果任一有空元素,返回 null # 否则返回false
- 修改
array_position(col, value) # 返回 array 从的第一个value 的序号,如果没有则返回null,序号是从1开始的 array_remove(col, element) # 从array中删除所有的element array_repeat(col, count) # array中的元素重复3次,生成一个新array array_sort(col) # 返回升序array,null放到最后 sort_array(col, asc=True) # 也是排序 arrays_zip(*cols) # 效果相当于 Python 的 list(zip(a,b)) http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.arrays_zip slice(x, start, length) # 切片取数,例如 slice(df.x, 2, 2) shuffle(col) # 打乱顺序
- 统计
size(col) # array的长度 covar_pop(col1, col2) # 两个array的相关系数 covar_samp(col1, col2) #相关系数 array_max(col) # array中的最大值 array_min(col) # array中的最小值
- 拼接和展开
array_join(col, delimiter, null_replacement=None) # 将array拼接成一个字符串,其中分割符为 delimiter # 如果指定了null_replacement,用null_replacement 替换空值,否则视空值不存在 concat(*cols) #也是拼接(可用于 string,binary,array)如果array有一个为空,返回空。这个函数与sql的concat不太一样 concat_ws(sep, *cols) # 用sep链接 explode_outer(col) # 如果 col是一个 <array> 那么生成一个新列,col,把 <array>中的元素竖着变成列 # 如果 col 是一个 <map> 那么生成两个新列,key, value, 把map中的元素竖着变成列 df = spark.createDataFrame([(1, ["foo", "bar"], {"x": 1.0}), (2, [], {}), (3, None, None)],("id", "an_array", "a_map")) df.select("id", "an_array", F.explode_outer("a_map")).show() df.select("id", "a_map", F.explode_outer("an_array")).show()
其它
F.greatest('col1','col2',...) # 多列中最大的那个,sql也有这个命令
F.least('col1','col2',...) # 最小的那个
F.coalesce # 取第一个不为空的元素(sql也有这个命令)
when(condition, value)
df.select(when(df['age'] == 2, df.age + 1).otherwise(4).alias("age"))
aggfunction
collect_list
collect_set
sum
sumDistinct
窗口函数
lag(col, count=1, default=None) # 窗口函数,返回往前数第count行的内容( sql也有同样的命令)( 机器上好像报错)
lead(col, count=1, default=None) # 窗口函数,返回往后数第count行的内容( sql也有同样的命令)( 机器上好像报错)
first(col, ignorenulls=False) # aggfunction,返回第一个
last(col, ignorenulls=False) # aggfunction,返回最后一个
row_number()
cume_dist()
dense_rank()
lead(col, count=1, default=None) # returns the ntile group id (from 1 to n inclusive)
percent_rank()
rank()
其它
下面这些函数都可以方便地用其它函数代替(虽然运行效率会慢点儿),个人不建议多用下面这些函数,如果你的队友不是很熟悉spark.sql.functions,可能会对他们来说可读性太差,影响合作。
expm1:$e^{x-1}$
hypot(col1, col2) # 等价于 sqrt{(a^2 + b^2)}
atan2(col1, col2) # 输入x,y坐标,输出theta
log10(col)
log2(col)
log1p(col) # $\log(x+1)$
bround(col, scale=0) # 小数形式的四舍五入,与round(col, scale=0)重复
## 不常用
base64
basestring
bin
approx_count_distinct
参考文献
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions
https://blog.csdn.net/liam08/article/details/79663018
您的支持将鼓励我继续创作!
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK