Py – Spark

Py Pandas & Apache Dataframes

Here in this tutorial, we shall do a quick & easy lookup of what kind of data operations we can do. If you are familiar with Python Pandas, then these this might be useful for you.

Note:
* this is not introductory session for spark or pandas
* prior understanding of what spark/pandas would be great

We hope you understand/know how to do preprocessing and why it required and how to do ml/why to do and other basic details required for the understanding of this.

For Pyspark, basic requirements like spark content will be loaded by default when you create a notebook. For beginners, you may use following config for starting Pyspark with IPython.

PYSPARK_DRIVER_PYTHON=jupyter
PYSPARK_DRIVER_PYTHON_OPTS=notebook

SCALA_HOME=/usr/local/bin/scala
SPARK_HOME=/Users/sampathm/spark2

# PySpark Ipython Notebooks
# https://medium.com/@GalarnykMichael/install-spark-on-mac-pyspark-453f395f240b#.ofy81uj89
clear;
export SPARK_PATH=$SPARK_HOME
export PYSPARK_DRIVER_PYTHON="jupyter"
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
alias snotebook='$SPARK_PATH/bin/pyspark --master local[2]'

sipy()
{
clear
echo 'Starting Ipython Notebook(PySpark)'
snotebook
}

When you want to start PySpark, just type sipy in the prompt for “Spark IPython”

Loading pandas lib

import pandas as pd
import numpy as np

Checking Spark

# spark context - sc(by default) loaded when we start Ipython Context.
sc

Check Envir & spark versions & files

Inputs:

%%sh

# python version
python -V

# pyspark version
pyspark --version

Output:

Python 3.5.2 :: Continuum Analytics, Inc.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/

Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_111
Branch
Compiled by user jenkins on 2016-12-16T02:04:48Z
Revision
Url
Type --help for more information.

Load Data

# spark
spark_dataframe = spark.read.csv("data.csv", header=True)

# pandas
pd_dataframe = pd.read_csv("data.csv")

Read a sample

# spark
print(spark_dataframe.first())

# pandas
print(pd_dataframe.head())

Check columns & data types

# spark
spark_dataframe.printSchema()

# pandas
pd_dataframe.dtypes

selection particular columns

# spark
spark_dataframe = spark_dataframe.select(['adm', 'ip'])

# pandas
pd_dataframe = pd_dataframe[['ip', 'adm']]

describe a column

# spark
spark_dataframe.describe().show()

# pandas
pd_dataframe.describe()

taking a portion of data

# spark
spark_dataframe = spark_dataframe.sample(withReplacement=False, fraction=0.10)

# pandas
pd_dataframe = pd_dataframe[:15]

check just columns

# pandas
pd_dataframe.columns

# spark
spark_dataframe.columns

column operation

# pandas
pd_dataframe.adm = pd_dataframe.adm.apply(lambda x: len(str(x)))

# spark
from pyspark.sql.functions import udf # user definted function
adm_function = udf(lambda x: len(str(x)))
spark_dataframe.withColumn('adm', adm_function(spark_dataframe.adm))

Converting pandas data into spark

# spark
pd_spark_dataframe = spark_dataframe.toPandas()

# pandas
spark_pd_dataframe = spark.createDataFrame(dataframe)
Advertisements