44

Pyspark – Import any data

 4 years ago
source link: https://www.tuicool.com/articles/reiAbu7
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

A brief guide to import data with Spark

yaIzqiI.png!web

With this article, I will start a series of short tutorials on Pyspark, from data pre-processing to modeling. The first will deal with the import and export of any type of data, CSV , text file, Avro, Json …etc. I work on a virtual machine on google cloud platform data comes from a bucket on cloud storage. Let’s import them.

Import a CSV

Spark has an integrated function to read csv it is very simple as:

csv_2_df = spark.read.csv("gs://my_buckets/poland_ks")#print it
csv_2_df.show()

nqqE3aY.png!web

The data is loaded with the right number of columns and there does not seem to be any problem in the data, however the header is not fixed. We need to set header = True parameters.

csv_2_df = spark.read.csv("gs://my_buckets/poland_ks", header = "true")

jMz6Vna.png!web

There’s others syntax possible like.

csv_2_df= spark.read.load("gs://my_buckets/poland_ks", format="csv", header="true")

and parameters like sep to specify a separator or inferSchema to infer the type of data, let’s look at the schema by the way.

csv_2_df.printSchema()
QJf6fyF.png!web

Our dataframe has all types of data set in string, let’s try to infer the schema.

csv_2_df = spark.read.csv("gs://my_buckets/poland_ks", header =True, inferSchema=True)csv_2_df.printSchema()
riMZVvJ.png!web

We can specify our Schema manualy

from pyspark.sql.types import *schema = StructType([
    StructField("ID_DAY", DateType()),
    StructField("SID_STORE", IntegerType()),
    StructField("NB_TICKET", IntegerType()),
    StructField("F_TOTAL_QTY", IntegerType()),
    StructField("F_VAL_SALES_AMT_TCK", DoubleType()),
    StructField("SITE_FORMAT", StringType())])csv_2_df = spark.read.csv("gs://alex_precopro/poland_ks", header = 'true', schema=schema)

Import a JSON

json_to_df = spark.read.json("gs://my_bucket/poland_ks_json")

Import a PARQUET

parquet_to_df = spark.read.parquet("gs://my_bucket/poland_ks_parquet")

Import an AVRO

In the case of an Avro we need to call an external databricks package to read them.

df = spark.read.format("com.databricks.spark.avro").load("gs://alex_precopro/poland_ks_avro", header = 'true')

Import a Text File

In the same way spark has a built-in function

textFile = spark.read.text('path/file.txt')

you can also read textfile as rdd

# read input text file to RDD
lines = sc.textFile('path/file.txt')
# collect the RDD to a list
list = lines.collect()

Export anything

To export data you have to adapt to what you want to output if you write in parquet, avro or any partition files there is no problem. If we want to write in CSV we must group the partitions scattered on the different workers to write our CSV file

#partitioned file
output.write.parquet(“gs://my_bucket/my_output")#csv partitioned_output.coalesce(1).write.mode("overwrite")\
.format("com.databricks.spark.csv")\
.option("header", "true")\
.option("sep", "|")\
.save('gs://my_bucket/my_output_csv')

By using coalesce(1) or repartition(1) all the partitions of the dataframe are combined in a single block.

To conclude

We saw how to import our file and write it now. Let’s go to my next article to learn how to filter our dataframe. Thanks . Y ou can find the code here : https://github.com/AlexWarembourg/Medium


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK