2

Loading Thousands of Tables in Parallel With Ray Into CockroachDB Because Why No...

 2 years ago
source link: https://dzone.com/articles/loading-thousands-of-tables-in-parallel-with-ray-i
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Loading Thousands of Tables in Parallel With Ray Into CockroachDB Because Why Not?

This is brief tutorial on parallelizing imports into CockroachDB using a Python framework called Ray.

I came across an interesting scenario working with one of our customers. They are using a common data integration tool to load hundreds of tables into CockroachDB simultaneously. They reported an issue that their loads fail intermittently due to an unrecognized error. As a debug exercise I set out to write a script to import data from an http endpoint into CRDB in parallel. 

Disclosure: I do not claim to be an expert in CRDB, Python, or anything else for that matter. This is an exercise in answering a why not? question more so than anything educational. I wrote a Python script to execute an import job and need to make sure it executes in parallel to achieve the concurrency scenario I've originally set out to do. 

I'm new to Python multiprocessing and a short Google search returned a couple of options. Using built-in multiprocess, asyncio module, and using Ray. The advantage to using multiprocess and asyncio is that they're built-in Python modules. Since I was rushing through my task, I could not get multiprocess to work on my tight schedule and checked out Ray. Following a quick start guide, I was able to make it work with little to no fuss.

Start CockroachDB

Start CRDB with --max-sql-memory appropriate for your machine's RAM. Default 128mb is not enough for this workload.

cockroach start \
--insecure \
--store=node1 \
--listen-addr=localhost:26257 \
--http-addr=localhost:8080 \
--join=localhost:26257,localhost:26258,localhost:26259 \
--background \
--max-sql-memory=.25

Install Ray

pip3 install ray

I wrote the following code to execute an IMPORT job into CRDB, implementing Ray to add multiprocessing was a matter of following their README.

#! /usr/bin/python3

import subprocess as sb
import sys, ray, os, uuid

init = """
cockroach sql --insecure --host=localhost:26257 \
  --execute=\"CREATE DATABASE IF NOT EXISTS parallelload;\"
"""

def normalize_statement():
  id = str(uuid.uuid1()).replace("-", "", 4)
  return """cockroach sql --insecure --host=localhost:26257 \
   --database=\"parallelload\" \
   --execute=\"IMPORT TABLE orders{} (
   o_orderkey           INTEGER NOT NULL PRIMARY KEY,
   o_custkey            INTEGER NOT NULL,
   o_orderstatus        CHAR(1) NOT NULL,
   o_totalprice         FLOAT NOT NULL,
   o_orderdate          DATE NOT NULL,
   o_orderpriority      CHAR(15) NOT NULL,
   o_clerk              CHAR(15) NOT NULL,
   o_shippriority       INTEGER NOT NULL,
   o_comment            VARCHAR(79) NOT NULL,
   INDEX o_ck           (o_custkey ASC),
   INDEX o_od           (o_orderdate ASC)
 ) CSV DATA ('https://storage.googleapis.com/cockroach-fixtures/tpch-csv/sf-1/orders.tbl.1')
 WITH delimiter = '|';\"""".format(id)

def initialization():
   print("Initial Step: Creating Database")
   sb.run(init, shell=True, check=True)

@ray.remote
def execute_import():
   statement = normalize_statement()
   print("Running process: {}".format(os.getpid()))   
   try:
      sb.run(statement, shell=True, check=True)
   except sb.CalledProcessError as e:
        sys.stderr.write(
            "common::run_command() : [ERROR]: output = %s, error code = %s\n" 
            % (e.output, e.returncode))

if __name__ == "__main__":
   initialization()
   # run ray uncapped without arguments [ray.init()]
   ray.init(memory=52428800, object_store_memory=78643200)
   futures = [execute_import.remote() for i in range(1000)]
   print(ray.get(futures))

Ray-specific parts:

import ray
ray.init()

@ray.remote
def f(x):
    return x * x

futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))

Cockroach-specific parts:

IMPORT TABLE orders{} (
   o_orderkey           INTEGER NOT NULL PRIMARY KEY,
   o_custkey            INTEGER NOT NULL,
   o_orderstatus        CHAR(1) NOT NULL,
   o_totalprice         FLOAT NOT NULL,
   o_orderdate          DATE NOT NULL,
   o_orderpriority      CHAR(15) NOT NULL,
   o_clerk              CHAR(15) NOT NULL,
   o_shippriority       INTEGER NOT NULL,
   o_comment            VARCHAR(79) NOT NULL,
   INDEX o_ck           (o_custkey ASC),
   INDEX o_od           (o_orderdate ASC)
 ) CSV DATA ('https://storage.googleapis.com/cockroach-fixtures/tpch-csv/sf-1/orders.tbl.1')
 WITH delimiter = '|';

Then execute the code and watch it go:

python3 import_with_ray.py
(pid=85485) job_id    status    fraction_completed    rows    index_entries    system_records    bytes
(pid=85485) 509795360310591489    succeeded    1    187500    375000    0    27768880
(pid=85485) Running process: 85485
(pid=85495) job_id    status    fraction_completed    rows    index_entries    system_records    bytes
(pid=85495) 509795420722757633    succeeded    1    187500    375000    0    27768880
(pid=85495) Running process: 85495

Finally, go to cockroach sql shell and verify your tables are loaded. Note, the output is reduced for brevity:

root@localhost:26257/parallelload> show tables;
...
  ordersfef573fc183c11eaa7b2acde48001122
  ordersff03fb32183e11ea9700acde48001122
  ordersff479678183c11ea9700acde48001122
  ordersff58aa96183f11eaa1e9acde48001122
  ordersff8e417c183c11ea99bbacde48001122
  ordersff931a5a183f11ea8265acde48001122
(1004 rows)

Time: 15.318ms

Lessons Learned

1. loading 1000 tables is not a big deal, even on a local 3 node cluster. It does require starting CRDB with --max-sql-memory=.25 per node but otherwise, it can chug along.
2. Ray is way cool, albeit requires a pip install and I will be looking at it further.
3. Cockroach is awesome as it's able to keep up with the sheer volume and velocity of the data even
on a single machine. Again, this is not a big data problem and just an exercise in whether it can work.

Additionally, CRDB DB Console comes in handy when bulk loading as you can monitor the status of your imports through the JOBS page.

Main Page

Job Details

Failed Jobs

Failed jobs filter


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK