0

Pyspark & Pandas

 2 years ago
source link: https://xfliu1998.github.io/2022/03/18/8-pyspark&pandas/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

一直进步 做喜欢的

Pyspark & Pandas

Created2022-03-18|Updated2022-03-22|Data Analysis and Processing
Word count:1.1k|Reading time:5min|Post View:20|Comments:

pyspark常用操作

spark连接

from pyspark.sql import SparkSession
from pyspark.shell import sc
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import ArrayType

class SparkUtils:
def __init__(self):
self.spark = None

def get_spark(self):
if self.spark is None:
self.spark = SparkSession.builder.appName("username") \
.enableHiveSupport().config("spark.sql.shuffle.partitions", "500") \
.config("spark.sql.broadcastTimeout", "3600") \
.config("spark.driver.memory", "200g") \
.config("spark.executor.memory", "40g") \
.config("spark.yarn.appMasterEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \
.config("spark.executorEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \
.config("spark.yarn.appMasterEnv.yarn.nodemanager.docker-container-executor.image-name",
"bdp-docker.jd.com:5000/wise_mart_bag:latest") \
.config("spark.executorEnv.yarn.nodemanager.docker-container-executor.image-name",
"bdp-docker.jd.com:5000/wise_mart_bag:latest") \
.getOrCreate()
return self.spark

spark = SparkUtils()

# 生成dataframe
spark_data = spark.sql("""
select
id,
username,
num
from
table1
where
status in (1, 2, 3)
and dt = '{}'
""".format(date))

# 创建sql数据表
sp_test.createOrReplaceTempView('data')

常用命令
参考:

# 创建第一个dataframe
rdd = sc.parallelize([(1, 'Alice', 18), (2, 'Andy', 19), (3, 'Bob', 17)])
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
sp_test = spark.createDataFrame(rdd, schema)
sp_test.show()

# 数据缓存
sp_test.cache()
sp_test.persist()

# 新增一列
def func(a, b):
return a + b

sp_test.withColumn("price_detail", F.udf(func, IntegerType())(sp_test.a, sp_test.b))
sp_test.withColumn("price_detail", F.udf(func, IntegerType())(sp_test['a'], sp_test['b']))
sp_test.withColumn("price_detail", F.udf(func, IntegerType())(F.col("a"), F.col("b")))

# 修改名字
sp_test.withColumnRenamed("old_name", "new_name")

# 保持关联
sp_data_join = sp_data_new.join(sp_data_old,
sp_data_new_filter.begin_org_name_new == sp_data_old_filter.begin_org_name_old) &
(sp_data_new_filter.real_vehicle_type_new == sp_data_old_filter.vehicle_type_old),
how="left") # 默认为inner
# 通过一个字段关联
sp_data_join = sp_data_new.join(sp_data_old, ['id'], 'left')

# 利用udf函数过滤数据
def filter_milage(milage_old, milage_new):
# print(type(milage_old), type(milage_new))
return abs(milage_old - milage_new) <= 5
sp_data_join = sp_data_new.filter(
F.udf(filter_milage, BooleanType())(sp_data_new["milage_old"], sp_data_new["milage_new"]))

# 选择两列
sp_test_filter = sp_test.select('code', 'name')

# 删除列
sp_test = sp_test.drop('name', "code")

# 设置列值
sp_test = sp_test.withColumn('name',F.lit(''))

# 过滤非空符号
sp_test = sp_test.filter(~(F.isnull(sp_test.d)))

# 判断不为空(字符串)
sp_test = sp_test.filter(~((sp_test.code.isNull()) | (sp_test.code == "")))

# 去重
sp_test.select('code').distinct()
sp_test_filter = sp_test.drop_duplicates(["code"])

# 赋值为0或者""
sp_test = sp_test.withColumn('code', F.when(F.isnull(sp_test.code), 0).otherwise(sp_test.code))

# 聚合
sp_test_collect = sp_test.groupBy('number').agg(
F.collect_set('province').alias('set_province'),
F.first('city').alias('set_city'),
F.collect_list('district').alias('set_district'),
F.max('report_user').alias('set_report_user'),
F.min('first_type').alias('set_first_type'))
sp_test_collect = sp_test.groupby().agg({'code': 'sum'}).collect()

# 字段占比数量
sp_test.groupBy(sp_test.code).count().show()

pandas常用操作

import pandas as pd
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', None)
pd.set_option('display.width',1000)
pd.set_option('display.max_colwidth',1000)

# 导入数据
df = pd.read_excel("file_name")
df = pd.read_csv("file_name", sep="\t")
df.to_csv("data.csv", index=False)

# 创建空表
df_empty = pd.DataFrame()
df_empty.append([3, 4, 5, "哈哈"])

# 过滤数据
df_new = df[df["code"].apply(lambda x: len(x) == 11)]
df_new2 = df[df["code"] == 1]

# 删除列,更改列名
df.drop(['column1', "column2"], axis=1, inplace=True)
df.rename(columns={"column1": "new_name1", "column2": "new_name2"}, inplace=True)

# 修改表
df["电话"] = df["电话"].apply(lambda x: x == "")

# 修改表2
def func(x):
if x["所属市"] == "赣州市":
return "宁都县"
return x
df["所属县"] = df.apply(func, axis=1)

# 上下合并表
df_rule = pd.concat([df_rule_1, df_rule_2], axis=0)

# 左右表合并
pd.merge(df_left, df_right, left_on="a", right_on="b", how="left|right|inner|outer")


# 分组聚合
def func1(gg):
return pd.DataFrame({
"上游客户电话号码": gg["上游电话号码修改"].tolist()[0],
"上游商家单量": sum(gg["上游商家单量"])}, index=[0])

df_result = df_new.groupby(["上游电话号码修改"]).apply(func1)
df_result2 = df.groupby("tag").apply(func1)

# 索引变列
df_result.reset_index()

# 遍历iterrows
for i, row in df.iterrows():
print(row["c1"], row["c2"])

# 补零操作
num = 233
str_num = str(num).zfill(4)
print(str_num)

# 去重
df.drop_duplicates(["列名"], keep='first', inplace=True)

# 排序
df.sort_values(by=["A", "D"], axis=0, ascending=[True, False], inplace=True)

# 删除空值
df.dropna(subset=['trader_province_name', "trader_county_name"], how="any", inplace=True)

# 转换数据类型
df["Customer"] = df['Customer'].astype("int")

# 创建sql数据表
df_data = spark.createDataFrame(df[["Customer"]])
df_data.registerTempTable('df_data')

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK