7

Using AWK and R to parse 25tb

 6 months ago
source link: https://livefreeordichotomize.com/posts/2019-06-04-using-awk-and-r-to-parse-25tb/index.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

How not to do this

To appropriate a cliched quote:

I didn’t fail a thousand times, I just discovered a thousand ways not to parse lots of data into an easily query-able format.

The first attempt

Lesson Learned: There’s no cheap way to parse 25tb of data at once.

Having taken a class at Vanderbilt titled ‘Advanced methods in Big Data’ I was sure I had this in the bag. ⊕Capital B capital D Big Data, so you know it’s serious.It would be maybe an hour or two of me setting up a Hive server to run over all our data and then calling it good. Since our data is stored on AWS S3 I used a service called Athena which allows you to run Hive SQL queries on your S3 data. Not only do you get to avoid setting/ spinning up a Hive cluster, you only pay for the data searched.

After pointing Athena to my data and its format I ran a few tests with queries like

select * from intensityData limit 10;

and got back results fast and well formed. I was set.

Until we tried to use the data in real life….

I was asked to grab all the data for a SNP so we could test a model on it. I ran the query:

select * from intensityData 
where snp = 'rs123456';

… and I waited. Eight minutes and 4+ terabytes of data queried later I had my results. Athena charges you by data searched at the reasonable rate of $5 per TB. So this single query cost $20 and eight minutes. ⊕If we ever wanted to run a model over all the data we better be ready to wait roughly 38 years and pay $50 million. Clearly this wasn’t going to work.

This should be a walk in the Parquet…

Lesson Learned: Be careful with your Parquet file sizes and organization.

My first attempt to remedy the situation was to convert all of the TSV’s to Parquet files. Parquet files are good for working with larger datasets because they store data in a ‘columnar’ fashion. Meaning each column is stored in its own section of memory/disk, unlike a text file with lines containing every column. This means to look for something you only have to read the necessary column. Also, they keep a record of the range of values by column for each file so if the value you’re looking for isn’t in the column range Spark doesn’t waste it’s time scanning through the file.

I ran a simple AWS Glue job to convert our TSVs to Parquet and hooked up the new Parquet files to Athena. This took only around five hours. However, when I ran a query it took just about the same amount of time and a tiny bit less money. This is because Spark in its attempt to optimize the job just unzipped a single TSV chunk and placed it in its own Parquet chunk. Because each chunk was big enough to contain multiple people’s full records, this meant that every file had every SNP in them and thus Spark had to open all of them to extract what we wanted.

⊕Interestingly the default (and recomended) Parquet compression type: ‘snappy’ is not splitable. So each executor was still stuck with the task of uncompressing and loading an entire 3.5gig dataset.

large_parquet_chunks.png

Sorting out the issue

Lesson Learned: Sorting is hard, especially when data is distributed.

I thought that I had the problem figured out now. All I needed to do was to sort the data on the SNP column instead of the individual. This would allow a given chunk of data to only have a few SNPs in it and Parquet’s smart only-open-if-values-in-range feature could shine. Unfortunately, sorting billions of rows of data distributed across a cluster is not a trivial task.

Me taking algorithms class in college: "Ugh, no one cares about computational complexity of all these sorting algorithms"

Me trying to sort on a column in a 20TB #Spark table: "Why is this taking so long?" #DataScience struggles.

— Nick Strayer (@NicholasStrayer) March 11, 2019

⊕AWS doesn’t exactly want to give refunds for the cause ‘I am an absent minded graduate student.’

After attempting to run this on Amazon’s glue it ran for 2 days and then crashed.

What about partitioning?

Lesson Learned: Partitions in Spark need to be balanced.

Another idea I had was to partition the data into chromosomes. There are 23 of these (plus a few extra to account for mitochondrial DNA or unmapped regions). This would provide a way of cutting down the data into much more manageable chunks. By adding just a single line to the Spark export function in the glue script: partition_by = "chr", the data should be put into those buckets.

Chromosome graphic

DNA is made up of multiple chunks called Chromosomes. Img via kintalk.org.

Unfortunately things didn’t work out well. This is because the chromosomes are different sizes and thus have different amounts of data within them. This meant that the tasks Spark sent out to its workers were unbalanced and ran slowly due to some of the nodes finishing early and sitting idle. The jobs did finish, however. But when querying for a single SNP the unbalance caused problems again. With SNPS in larger chromosomes (aka where we will actually want to get data) the cost was only improved ~10x. A lot but not enough.

What about even finer partitioning?

Lesson Learned: Never, ever, try and make 2.5 million partitions.

I decided to get crazy with my partitioning and partitioned on each SNP. This guaranteed that each partition would be equal in size. THIS WAS A BAD IDEA. I used Glue and added the innocent line of partition_by = 'snp'. The job started and ran. A day later I checked and noticed nothing had been written to S3 yet so I killed the job. Turns out Glue was writing intermediate files to hidden S3 locations, and a lot of them, like 2 billion. This mistake ended up costing more than a thousand dollars and didn’t make my advisor happy.

Partitioning + Sorting

Lesson Learned: Sorting is still hard and so is tuning Spark.

The last attempt in the partitioning era was to partition on chromosome and then sort each partition. In theory this would have made each query quicker because the desired SNP data would only reside in the ranges of a few of the Parquet chunks within a given region. Alas, it turns out sorting even the partitioned data was a lot of work. I ended up switching to EMR for a custom cluster, using 8 powerful instances (C5.4xl) and using Sparklyr to build a more flexible workflow…

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

…but no mater what the job never finished. I tried all the tuning tricks: bumped up the memory allocated to each executor of the queries, used high ram node types, broadcasting variables, but it would always get around half way done then executors would slowly start failing till everything eventually ground to a halt.

Update: so it begins. pic.twitter.com/agY4GU2ru5

— Nick Strayer (@NicholasStrayer) May 15, 2019

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK