1

Congregating Spark files on S3

 3 years ago
source link: https://blog.knoldus.com/congregating-spark-files-on-s3/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Congregating Spark files on S3

Reading Time: 2 minutes

We all know that Apache Spark is a fast and general engine for large-scale data processing and it is because of its speed that Spark was able to become one of the most popular frameworks in the world of big data.

Working with Spark is a pleasant experience as it has a simple API for Scala, Java, Python and R. But, some tasks, in Spark, are still tough rows to hoe. For e.g., there was a situation where we need to upload files written by Spark cluster at one location on Amazon S3. In Local mode, this task is easy to handle as all files, or partitions as we say in Spark, are written on one node, i.e., local node.

But, when Spark is running on a cluster, then the files are written or saved on worker nodes. The master node contains only reference or empty folder. This makes uploading all files to one location on S3 a tough row to hoe.

After a lot of research and study, we were finally able to congregate all  files from Spark cluster at one location on S3. Lets see what solution looks like in terms of code.

xxxxxxxxxx
dataFrame.foreachPartition { partition =>
val file = new File(url)
val temporaryFolder = new File(url + "/_temporary")
val s3Client = new AmazonS3Client(new BasicAWSCredentials("aws_access_key", "aws_secret_key"))
temporaryFolder.listFiles().map { temporaryFile =>
temporaryFile.listFiles().map { file =>
file.listFiles().map { partitonedFile =>
s3Client.putObject(new PutObjectRequest("aws_s3_bucket", url + "/" + partitionedFile.getName, partitonedFile))
}
}
}
}

I know it is a little bit complex to understand it in one go. So, lets take it step by step.

Here dataFrame is the data that we have already saved in form of files on Spark cluster at url. Now, we need to upload its partitioned files saved on worker nodes to S3.

The partitioned files are always saved in _temporary folder at url mentioned in the code. So, we need to traverse to the _temporary folder to access those files.

Now, the _temporary folder contains partitioned folders which further contains task folders. Every task folder consist part* files referencing the actual data files. These are the actual files containing the data which needed to be uploaded to S3.

We tested this code on a 2 node cluster, created on Amazon EC2, containing one Master node & one Slave node and it works !!!

Of course there can be other solutions as well which are more robust than the one mentioned above and we would love to know them. So, if you have such a solution please leave us a comment.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK