Pyspark – Import any data
source link: https://www.tuicool.com/articles/reiAbu7
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
A brief guide to import data with Spark
Oct 10 ·3min read
With this article, I will start a series of short tutorials on Pyspark, from data pre-processing to modeling. The first will deal with the import and export of any type of data, CSV , text file, Avro, Json …etc. I work on a virtual machine on google cloud platform data comes from a bucket on cloud storage. Let’s import them.
Import a CSV
Spark has an integrated function to read csv it is very simple as:
csv_2_df = spark.read.csv("gs://my_buckets/poland_ks")#print it csv_2_df.show()
The data is loaded with the right number of columns and there does not seem to be any problem in the data, however the header is not fixed. We need to set header = True parameters.
csv_2_df = spark.read.csv("gs://my_buckets/poland_ks", header = "true")
There’s others syntax possible like.
csv_2_df= spark.read.load("gs://my_buckets/poland_ks", format="csv", header="true")
and parameters like sep to specify a separator or inferSchema to infer the type of data, let’s look at the schema by the way.
Our dataframe has all types of data set in string, let’s try to infer the schema.
csv_2_df = spark.read.csv("gs://my_buckets/poland_ks", header =True, inferSchema=True)csv_2_df.printSchema()
We can specify our Schema manualy
from pyspark.sql.types import *schema = StructType([ StructField("ID_DAY", DateType()), StructField("SID_STORE", IntegerType()), StructField("NB_TICKET", IntegerType()), StructField("F_TOTAL_QTY", IntegerType()), StructField("F_VAL_SALES_AMT_TCK", DoubleType()), StructField("SITE_FORMAT", StringType())])csv_2_df = spark.read.csv("gs://alex_precopro/poland_ks", header = 'true', schema=schema)
Import a JSON
json_to_df = spark.read.json("gs://my_bucket/poland_ks_json")
Import a PARQUET
parquet_to_df = spark.read.parquet("gs://my_bucket/poland_ks_parquet")
Import an AVRO
In the case of an Avro we need to call an external databricks package to read them.
df = spark.read.format("com.databricks.spark.avro").load("gs://alex_precopro/poland_ks_avro", header = 'true')
Import a Text File
In the same way spark has a built-in function
textFile = spark.read.text('path/file.txt')
you can also read textfile as rdd
# read input text file to RDD lines = sc.textFile('path/file.txt') # collect the RDD to a list list = lines.collect()
To export data you have to adapt to what you want to output if you write in parquet, avro or any partition files there is no problem. If we want to write in CSV we must group the partitions scattered on the different workers to write our CSV file
#partitioned file output.write.parquet(“gs://my_bucket/my_output")#csv partitioned_output.coalesce(1).write.mode("overwrite")\ .format("com.databricks.spark.csv")\ .option("header", "true")\ .option("sep", "|")\ .save('gs://my_bucket/my_output_csv')
By using coalesce(1) or repartition(1) all the partitions of the dataframe are combined in a single block.
We saw how to import our file and write it now. Let’s go to my next article to learn how to filter our dataframe. Thanks . Y ou can find the code here : https://github.com/AlexWarembourg/Medium
Aggregate valuable and interesting links.
Joyk means Joy of geeK