60

Python - Hadoop interaction tutorial (PySpark, PyArrow, impyla, etc.)

 4 years ago
source link: https://www.tuicool.com/articles/hit/ZFzeQ36
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Following this guide you will learn things like how to load file from Hadoop Distributed Filesystem directly info memory. Moving files from local to HDFS or setup Spark.

In [16]:

from pathlib import Path
import pandas as pd
import numpy as np

First of all, install findspark, and also pyspark in case you are working in a local computer. If you are following this tutorial in a Hadoop cluster, can skip pyspark install. For simplicity I will use conda virtual environment manager (pro tip: create a virtual environment before starting and do not break your system Python install!).

In [ ]:

!conda install -c conda-forge findspark -y

In [ ]:

!conda install -c conda-forge pyspark -y

Spark setup with findspark

In [17]:

import findspark

# Local Spark
# findspark.init('/home/cloudera/miniconda3/envs/jupyter/lib/python3.7/site-packages/pyspark/')

# Cloudera cluster Spark
findspark.init(spark_home='/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/')

Getting PySpark shell

To get a PySpark shell:

In [18]:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('example_app').master('local[*]').getOrCreate()

Let’s get existing databases. I assume you are familiar with Spark DataFrame API and its methods:

In [19]:

spark.sql("show databases").show()
+------------+
|databaseName|
+------------+
|  __ibis_tmp|
|   analytics|
|         db1|
|     default|
|     fhadoop|
|        juan|
+------------+

First integration is about how to move data from pandas library, which is Python standard library to perform in-memory data manipulation, to Spark. First, let’s load a pandas DataFrame. This one is about Air Quality in Madrid (just to satisfy your curiosity, but not important with regards to moving data from one place to another one). You can download it here . Make sure you install pytables to read hdf5 data.

Se realiza una lectura con pandas, por ejemplo el dataset de calidad del aire en Madrid:

In [20]:

air_quality_df = pd.read_hdf('data/air_quality/air-quality-madrid/madrid.h5', key='28079008')
air_quality_df.head()

Out[20]:

BEN CH4 CO EBE NMHC NO NO_2 NOx O_3 PM10 PM25 SO_2 TCH TOL date 2001-07-01 01:00:00 30.65 NaN 6.91 42.639999 NaN NaN 381.299988 1017.000000 9.010000 158.899994 NaN 47.509998 NaN 76.050003 2001-07-01 02:00:00 29.59 NaN 2.59 50.360001 NaN NaN 209.500000 409.200012 23.820000 104.800003 NaN 20.950001 NaN 84.900002 2001-07-01 03:00:00 4.69 NaN 0.76 25.570000 NaN NaN 116.400002 143.399994 31.059999 48.470001 NaN 11.270000 NaN 20.980000 2001-07-01 04:00:00 4.46 NaN 0.74 22.629999 NaN NaN 116.199997 149.300003 23.780001 47.500000 NaN 10.100000 NaN 14.770000 2001-07-01 05:00:00 2.18 NaN 0.57 11.920000 NaN NaN 100.900002 124.800003 29.530001 49.689999 NaN 7.680000 NaN 8.970000

Let’s make some changes to this DataFrame, like resetting datetime index to not lose information when loading into Spark. Datetime will also be transformed to string as Spark has some issues working with dates (related to system locale, timezones, and so on).

In [21]:

air_quality_df.reset_index(inplace=True)
air_quality_df['date'] = air_quality_df['date'].dt.strftime('%Y-%m-%d %H:%M:%S')

We can simply load from pandas to Spark with createDataFrame :

In [ ]:

air_quality_sdf = spark.createDataFrame(air_quality_df)
air_quality_sdf.dtypes

Once DataFrame is loaded into Spark (as air_quality_sdf here), can be manipulated easily using PySpark methods:

In [23]:

air_quality_sdf.select('date', 'NOx').show(5)
+-------------------+------------------+
|               date|               NOx|
+-------------------+------------------+
|2001-07-01 01:00:00|            1017.0|
|2001-07-01 02:00:00|409.20001220703125|
|2001-07-01 03:00:00|143.39999389648438|
|2001-07-01 04:00:00| 149.3000030517578|
|2001-07-01 05:00:00|124.80000305175781|
+-------------------+------------------+
only showing top 5 rows

pandas -> spark -> hive

To persist a Spark DataFrame into HDFS , where it can be queried using default Hadoop SQL engine (Hive), one straightforward strategy (not the only one) is to create a temporal view from that DataFrame:

In [24]:

air_quality_sdf.createOrReplaceTempView("air_quality_sdf")

Once the temporal view is created, it can be used from Spark SQL engine to create a real table using create table as select . Before creating this table, I will create a new database called analytics to store it:

In [25]:

sql_drop_table = """
drop table if exists analytics.pandas_spark_hive
"""

sql_drop_database = """
drop database if exists analytics cascade
"""

sql_create_database = """
create database if not exists analytics
location '/user/cloudera/analytics/'
"""

sql_create_table = """
create table if not exists analytics.pandas_spark_hive
using parquet
as select to_timestamp(date) as date_parsed, *
from air_quality_sdf
"""

print("dropping database...")
result_drop_db = spark.sql(sql_drop_database)

print("creating database...")
result_create_db = spark.sql(sql_create_database)

print("dropping table...")
result_droptable = spark.sql(sql_drop_table)

print("creating table...")
result_create_table = spark.sql(sql_create_table)
borrando bb.dd...
creando bb.dd...
borrando tabla...
creando tabla...

Can check results using Spark SQL engine, for example to select ozone pollutant concentration over time:

In [26]:

spark.sql("select * from analytics.pandas_spark_hive").select("date_parsed", "O_3").show(5)
+-------------------+------------------+
|        date_parsed|               O_3|
+-------------------+------------------+
|2001-07-01 01:00:00| 9.010000228881836|
|2001-07-01 02:00:00| 23.81999969482422|
|2001-07-01 03:00:00|31.059999465942383|
|2001-07-01 04:00:00|23.780000686645508|
|2001-07-01 05:00:00|29.530000686645508|
+-------------------+------------------+
only showing top 5 rows

Apache Arrow, is a in-memory columnar data format created to support high performance operations in Big Data environments (it can be seen as the parquet format in-memory equivalent). It is developed in C++, but its Python API is amazing as you will be able to see now, but first of all, please install it:

In [ ]:

!conda install pyarrow -y

In order to establish a native communication with HDFS I will use the interface included in pyarrow. Only requirement is setting an environment variable pointing to the location of libhdfs . Remember we are in a Cloudera environment. In case you are using Horton will have to find proper location (believe me, it exists).

Establish connection

In [27]:

import pyarrow as pa
import os
os.environ['ARROW_LIBHDFS_DIR'] = '/opt/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/lib64/'
hdfs_interface = pa.hdfs.connect(host='localhost', port=8020, user='cloudera')

List files in HDFS

Let’s list files persisted by Spark before. Remember that those files has been previously loaded in a pandas DataFrame from a local file and then loaded into a Spark DataFrame. Spark by default works with files partitioned into a lot of snappy compressed files. In HDFS path you can identify database name ( analytics ) and table name ( pandas_spark_hive ):

In [28]:

hdfs_interface.ls('/user/cloudera/analytics/pandas_spark_hive/')

Out[28]:

['/user/cloudera/analytics/pandas_spark_hive/_SUCCESS',
 '/user/cloudera/analytics/pandas_spark_hive/part-00000-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet',
 '/user/cloudera/analytics/pandas_spark_hive/part-00001-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet',
 '/user/cloudera/analytics/pandas_spark_hive/part-00002-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet',
 '/user/cloudera/analytics/pandas_spark_hive/part-00003-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet',
 '/user/cloudera/analytics/pandas_spark_hive/part-00004-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet',
 '/user/cloudera/analytics/pandas_spark_hive/part-00005-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet',
 '/user/cloudera/analytics/pandas_spark_hive/part-00006-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet',
 '/user/cloudera/analytics/pandas_spark_hive/part-00007-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet']

Reading parquet files directly from HDFS

To read parquet files (or a folder full of files representing a table) directly from HDFS , I will use PyArrow HDFS interface created before:

In [29]:

table = hdfs_interface.read_parquet('/user/cloudera/analytics/pandas_spark_hive/')

HDFS -> pandas

Once parquet files are read by PyArrow HDFS interface, a Table object is created. We can easily go back to pandas with method to_pandas :

In [30]:

table_df = table.to_pandas()
table_df.head()
/home/cloudera/miniconda3/envs/jupyter/lib/python3.6/site-packages/pyarrow/pandas_compat.py:752: FutureWarning: .labels was deprecated in version 0.24.0. Use .codes instead.
  labels, = index.labels

Out[30]:

date_parsed date BEN CH4 CO EBE NMHC NO NO_2 NOx O_3 PM10 PM25 SO_2 TCH TOL 0 2001-06-30 23:00:00 2001-07-01 01:00:00 30.65 NaN 6.91 42.639999 NaN NaN 381.299988 1017.000000 9.010000 158.899994 NaN 47.509998 NaN 76.050003 1 2001-07-01 00:00:00 2001-07-01 02:00:00 29.59 NaN 2.59 50.360001 NaN NaN 209.500000 409.200012 23.820000 104.800003 NaN 20.950001 NaN 84.900002 2 2001-07-01 01:00:00 2001-07-01 03:00:00 4.69 NaN 0.76 25.570000 NaN NaN 116.400002 143.399994 31.059999 48.470001 NaN 11.270000 NaN 20.980000 3 2001-07-01 02:00:00 2001-07-01 04:00:00 4.46 NaN 0.74 22.629999 NaN NaN 116.199997 149.300003 23.780001 47.500000 NaN 10.100000 NaN 14.770000 4 2001-07-01 03:00:00 2001-07-01 05:00:00 2.18 NaN 0.57 11.920000 NaN NaN 100.900002 124.800003 29.530001 49.689999 NaN 7.680000 NaN 8.970000

And that is basicalle where we started, closing the cycle Python -> Hadoop -> Python.

Uploading local files to HDFS

All kind of HDFS operations are supported using PyArrow HDFS interface, for example, uploading a bunch of local files to HDFS :

In [31]:

cwd = Path('./data/')
destination_path = '/user/cloudera/analytics/data/'

for f in cwd.rglob('*.*'):
    print(f'uploading {f.name}')
    with open(str(f), 'rb') as f_upl:
        hdfs_interface.upload(destination_path + f.name, f_upl)
uploading sandp500.zip
uploading stations.csv
uploading madrid.h5
uploading diamonds_train.csv
uploading diamonds_test.csv

Let’s check if files have been uploaded properly, listing files in destination path:

In [32]:

hdfs_interface.ls(destination_path)

Out[32]:

['/user/cloudera/analytics/data/diamonds_test.csv',
 '/user/cloudera/analytics/data/diamonds_train.csv',
 '/user/cloudera/analytics/data/madrid.h5',
 '/user/cloudera/analytics/data/sandp500.zip',
 '/user/cloudera/analytics/data/stations.csv']

Reading arbitrary files (not parquet) from HDFS ( HDFS -> pandas example)

For example, a .csv file can be directly loaded from HDFS into a pandas DataFrame using open method and read_csv standard pandas function which is able to get a buffer as input:

In [33]:

diamonds_train = pd.read_csv(hdfs_interface.open('/user/cloudera/analytics/data/diamonds_train.csv'))

In [34]:

diamonds_train.head()

Out[34]:

carat cut color clarity depth table price x y z 0 1.21 Premium J VS2 62.4 58.0 4268 6.83 6.79 4.25 1 0.32 Very Good H VS2 63.0 57.0 505 4.35 4.38 2.75 2 0.71 Fair G VS1 65.5 55.0 2686 5.62 5.53 3.65 3 0.41 Good D SI1 63.8 56.0 738 4.68 4.72 3.00 4 1.02 Ideal G SI1 60.5 59.0 4882 6.55 6.51 3.95

In case you are interested in all methods and possibilities this library has, please visit: https://arrow.apache.org/docs/python/filesystems.html#hdfs-api

Sometimes it is not possible to access libhdfs native HDFS library (for example, performing analytics from a computer that is not part of the cluster). In that case, we can rely on WebHDFS ( HDFS service REST API ), it is slower and not suitable for heavy Big Data loads, but an interesting option in case of light workloads. Let’s install a WebHDFS Python API :

In [36]:

!conda install -c conda-forge python-hdfs -y
Collecting package metadata: done
Solving environment: done

## Package Plan ##

  environment location: /home/cloudera/miniconda3/envs/jupyter

  added / updated specs:
    - python-hdfs


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    certifi-2019.3.9           |           py36_0         149 KB  conda-forge
    ------------------------------------------------------------
                                           Total:         149 KB

The following packages will be UPDATED:

  ca-certificates    pkgs/main::ca-certificates-2019.1.23-0 --> conda-forge::ca-certificates-2019.3.9-hecc5488_0

The following packages will be SUPERSEDED by a higher-priority channel:

  certifi                                         pkgs/main --> conda-forge
  openssl              pkgs/main::openssl-1.1.1b-h7b6447c_1 --> conda-forge::openssl-1.1.1b-h14c3975_1



Downloading and Extracting Packages
certifi-2019.3.9     | 149 KB    | ##################################### | 100% 
Preparing transaction: done
Verifying transaction: done
Executing transaction: done

Establish WebHDFS connection

To establish connection:

In [37]:

from hdfs import InsecureClient

web_hdfs_interface = InsecureClient('http://localhost:50070', user='cloudera')

List files in HDFS

Listing files is similar to using PyArrow interface, just use list method and a HDFS path:

In [38]:

web_hdfs_interface.list('/user/cloudera/analytics/data')

Out[38]:

['diamonds_test.csv',
 'diamonds_train.csv',
 'madrid.h5',
 'sandp500.zip',
 'stations.csv']

Uploading local files to HDFS using WebHDFS

More of the same:

In [39]:

cwd = Path('./data/')
destination_path = '/user/cloudera/analytics/data_web_hdfs/'

for f in cwd.rglob('*.*'):
    print(f'uploading {f.name}')
    web_hdfs_interface.upload(destination_path + f.name, 
                              str(f),
                              overwrite=True)
uploading sandp500.zip
uploading stations.csv
uploading madrid.h5
uploading diamonds_train.csv
uploading diamonds_test.csv

Let’s check the upload is correct:

In [40]:

web_hdfs_interface.list(destination_path)

Out[40]:

['diamonds_test.csv',
 'diamonds_train.csv',
 'madrid.h5',
 'sandp500.zip',
 'stations.csv']

Bigger files can also be handled by HDFS (with some limitations). Those files are from Kaggle Microsoft Malware Competition , and weighs a couple of GB each:

In [41]:

web_hdfs_interface.upload(destination_path + 'train.parquet', '/home/cloudera/analytics/29_03_2019/notebooks/data/microsoft/train.pq', overwrite=True);
web_hdfs_interface.upload(destination_path + 'test.parquet', '/home/cloudera/analytics/29_03_2019/notebooks/data/microsoft/test.pq', overwrite=True);

Reading files from HDFS using WebHDFS ( HDFS -> pandas example)

In this case, it is useful using PyArrow parquet module and passing a buffer to create a Table object. After, a pandas DataFrame can be easily created from Table object using to_pandas method:

In [42]:

from pyarrow import parquet as pq
from io import BytesIO

In [43]:

with web_hdfs_interface.read(destination_path + 'train.parquet') as reader:
    microsoft_train = pq.read_table(BytesIO(reader.read())).to_pandas()
/home/cloudera/miniconda3/envs/jupyter/lib/python3.6/site-packages/pyarrow/pandas_compat.py:708: FutureWarning: .labels was deprecated in version 0.24.0. Use .codes instead.
  labels = getattr(columns, 'labels', None) or [
/home/cloudera/miniconda3/envs/jupyter/lib/python3.6/site-packages/pyarrow/pandas_compat.py:735: FutureWarning: the 'labels' keyword is deprecated, use 'codes' instead
  return pd.MultiIndex(levels=new_levels, labels=labels, names=columns.names)

In [44]:

microsoft_train.head()

Out[44]:

MachineIdentifier ProductName EngineVersion AppVersion AvSigVersion IsBeta RtpStateBitfield IsSxsPassiveMode DefaultBrowsersIdentifier AVProductStatesIdentifier … Census_FirmwareVersionIdentifier Census_IsSecureBootEnabled Census_IsWIMBootEnabled Census_IsVirtualDevice Census_IsTouchEnabled Census_IsPenCapable Census_IsAlwaysOnAlwaysConnectedCapable Wdft_IsGamer Wdft_RegionIdentifier HasDetections 0 0000028988387b115f69f31a3bf04f09 win8defender 1.1.15100.1 4.18.1807.18075 1.273.1735.0 0 7.0 0 NaN 53447.0 … 36144.0 0 NaN 0.0 0 0 0.0 0.0 10.0 0 1 000007535c3f730efa9ea0b7ef1bd645 win8defender 1.1.14600.4 4.13.17134.1 1.263.48.0 0 7.0 0 NaN 53447.0 … 57858.0 0 NaN 0.0 0 0 0.0 0.0 8.0 0 2 000007905a28d863f6d0d597892cd692 win8defender 1.1.15100.1 4.18.1807.18075 1.273.1341.0 0 7.0 0 NaN 53447.0 … 52682.0 0 NaN 0.0 0 0 0.0 0.0 3.0 0 3 00000b11598a75ea8ba1beea8459149f win8defender 1.1.15100.1 4.18.1807.18075 1.273.1527.0 0 7.0 0 NaN 53447.0 … 20050.0 0 NaN 0.0 0 0 0.0 0.0 3.0 1 4 000014a5f00daa18e76b81417eeb99fc win8defender 1.1.15100.1 4.18.1807.18075 1.273.1379.0 0 7.0 0 NaN 53447.0 … 19844.0 0 0.0 0.0 0 0 0.0 0.0 1.0 1

5 rows × 83 columns

Hive and Impala are two SQL engines for Hadoop. One is MapReduce based (Hive) and Impala is a more modern and faster in-memory implementation created and opensourced by Cloudera. Both engines can be fully leveraged from Python using one of its multiples APIs. In this case I am going to show you impyla , which supports both engines. Let’s install it using conda, and do not forget to install thrift_sasl 0.2.1 version (yes, must be this specific version otherwise it will not work):

In [45]:

!conda install impyla thrift_sasl=0.2.1 -y
Collecting package metadata: done
Solving environment: done

## Package Plan ##

  environment location: /home/cloudera/miniconda3/envs/jupyter

  added / updated specs:
    - impyla
    - thrift_sasl=0.2.1


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    certifi-2019.3.9           |           py36_0         155 KB
    ------------------------------------------------------------
                                           Total:         155 KB

The following packages will be SUPERSEDED by a higher-priority channel:

  ca-certificates    conda-forge::ca-certificates-2019.3.9~ --> pkgs/main::ca-certificates-2019.1.23-0
  certifi                                       conda-forge --> pkgs/main
  openssl            conda-forge::openssl-1.1.1b-h14c3975_1 --> pkgs/main::openssl-1.1.1b-h7b6447c_1



Downloading and Extracting Packages
certifi-2019.3.9     | 155 KB    | ##################################### | 100% 
Preparing transaction: done
Verifying transaction: done
Executing transaction: done

Establishing connection

In [46]:

from impala.dbapi import connect
from impala.util import as_pandas

Hive (Hive -> pandas example)

API follow classic ODBC stantard which will probably be familiar to you. impyla includes an utility function called as_pandas that easily parse results (list of tuples) into a pandas DataFrame. Use it with caution, it has issues with certain types of data and is not very efficient with Big Data workloads. Fetching results both ways:

In [47]:

hive_conn = connect(host='localhost', port=10000, database='analytics', auth_mechanism='PLAIN')

with hive_conn.cursor() as c:
    c.execute('SELECT * FROM analytics.pandas_spark_hive LIMIT 100')
    results = c.fetchall()
    
with hive_conn.cursor() as c:
    c.execute('SELECT * FROM analytics.pandas_spark_hive LIMIT 100')
    results_df = as_pandas(c)

Raw results are pretty similar to those you can expect using, for example, Python standard sqlite3 library:

In [48]:

results[:2]

Out[48]:

[(datetime.datetime(2001, 7, 1, 1, 0),
  '2001-07-01 01:00:00',
  30.649999618530273,
  nan,
  6.909999847412109,
  42.63999938964844,
  nan,
  nan,
  381.29998779296875,
  1017.0,
  9.010000228881836,
  158.89999389648438,
  nan,
  47.5099983215332,
  nan,
  76.05000305175781),
 (datetime.datetime(2001, 7, 1, 2, 0),
  '2001-07-01 02:00:00',
  29.59000015258789,
  nan,
  2.5899999141693115,
  50.36000061035156,
  nan,
  nan,
  209.5,
  409.20001220703125,
  23.81999969482422,
  104.80000305175781,
  nan,
  20.950000762939453,
  nan,
  84.9000015258789)]

And its pandas DataFrame version:

In [49]:

results_df.head()

Out[49]:

pandas_spark_hive.date_parsed pandas_spark_hive.date pandas_spark_hive.ben pandas_spark_hive.ch4 pandas_spark_hive.co pandas_spark_hive.ebe pandas_spark_hive.nmhc pandas_spark_hive.no pandas_spark_hive.no_2 pandas_spark_hive.nox pandas_spark_hive.o_3 pandas_spark_hive.pm10 pandas_spark_hive.pm25 pandas_spark_hive.so_2 pandas_spark_hive.tch pandas_spark_hive.tol 0 2001-07-01 01:00:00 2001-07-01 01:00:00 30.65 NaN 6.91 42.639999 NaN NaN 381.299988 1017.000000 9.010000 158.899994 NaN 47.509998 NaN 76.050003 1 2001-07-01 02:00:00 2001-07-01 02:00:00 29.59 NaN 2.59 50.360001 NaN NaN 209.500000 409.200012 23.820000 104.800003 NaN 20.950001 NaN 84.900002 2 2001-07-01 03:00:00 2001-07-01 03:00:00 4.69 NaN 0.76 25.570000 NaN NaN 116.400002 143.399994 31.059999 48.470001 NaN 11.270000 NaN 20.980000 3 2001-07-01 04:00:00 2001-07-01 04:00:00 4.46 NaN 0.74 22.629999 NaN NaN 116.199997 149.300003 23.780001 47.500000 NaN 10.100000 NaN 14.770000 4 2001-07-01 05:00:00 2001-07-01 05:00:00 2.18 NaN 0.57 11.920000 NaN NaN 100.900002 124.800003 29.530001 49.689999 NaN 7.680000 NaN 8.970000

Impala (Impala -> pandas example)

Working with Impala follows the same pattern as Hive, just make sure you connect to correct port, default is 21050 in this case:

In [50]:

impala_conn = connect(host='localhost', port=21050)

with impala_conn.cursor() as c:
    c.execute('show databases')
    result_df = as_pandas(c)

In [51]:

result_df

Out[51]:

name comment 0 __ibis_tmp 1 _impala_builtins System database for Impala builtin functions 2 analytics 3 db1 4 default Default Hive database 5 fhadoop 6 juan

Another alternative is Ibis Framework, a high level API to a relatively vast collection of datasources, including HDFS and Impala. It is build around the idea of using Python objects and methods to perform actions over those sources. Let’s install it the same way as the rest of libraries:

In [52]:

!conda install ibis-framework -y
Collecting package metadata: done
Solving environment: done

# All requested packages already installed.

Let’s create both a HDFS and Impala interfaces (impala needs an hdfs interface object in Ibis):

In [53]:

import ibis

hdfs_ibis = ibis.hdfs_connect(host='localhost', port=50070)
impala_ibis = ibis.impala.connect(host='localhost', port=21050, hdfs_client=hdfs_ibis, user='cloudera')

Once interfaces are created, actions can be performed calling methods, no need to write more SQL . If you are familiar to ORMs (Object Relational Mappers), this is not exactly the same, but the underlying idea is pretty similar.

In [55]:

impala_ibis.invalidate_metadata()
impala_ibis.list_databases()

Out[55]:

['__ibis_tmp',
 '_impala_builtins',
 'analytics',
 'db1',
 'default',
 'fhadoop',
 'juan']

Ibis natively works over pandas, so there is no need to perform a conversion. Reading a table returns a pandas DataFrame object:

In [56]:

table = impala_ibis.table('pandas_spark_hive', database='analytics')
table_df = table.execute()

In [ ]:

table_df.head()

Going from pandas to Impala can be made using Ibis selecting the database using Impala interface, setting up permissions (depending on your cluster setup) and using the method create , passing a pandas DataFrame object as an argument:

In [60]:

analytics_db = impala_ibis.database('analytics')
hdfs_ibis.chmod('/user/cloudera/analytics', '777')
analytics_db.create_table(table_name='diamonds', 
                          obj=pd.read_csv('data/diamonds/diamonds_train.csv'),
                          force=True)
/home/cloudera/miniconda3/envs/jupyter/lib/python3.6/site-packages/multipledispatch/dispatcher.py:278: FutureWarning: A future version of pandas will default to `skipna=True`. To silence this warning, pass `skipna=True|False` explicitly.
  return func(*args, **kwargs)

Reading the newly created back:

In [61]:

analytics_db.table('diamonds').execute().head(5)

Out[61]:

carat cut color clarity depth table price x y z 0 1.21 Premium J VS2 62.4 58.0 4268 6.83 6.79 4.25 1 0.32 Very Good H VS2 63.0 57.0 505 4.35 4.38 2.75 2 0.71 Fair G VS1 65.5 55.0 2686 5.62 5.53 3.65 3 0.41 Good D SI1 63.8 56.0 738 4.68 4.72 3.00 4 1.02 Ideal G SI1 60.5 59.0 4882 6.55 6.51 3.95

Hope you liked this tutorial. Using those methods you can vanish the wall between local computing using Python and Hadoop distributed computing framework. In case you have any questions about the concepts explained here, please write a comment below.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK