In this episode we will create simple DataFrames in Spark, then do some basic operations on them. We will continue using the spam dataset used in the previous workshop for this first exercise.
STORY: Annie Assistant was asked by Professor Ian Investigator to
gather some statistics of the spam emails from year to year.
As you have seen in the previous workshop, there are over 8,500,000
emails to analyze, so this is a daunting task!
To get her feet wet in the water, she wants to
find the minimum, maximum, and average file sizes of the emails.
Annie used a simple shell script to get the file sizes
(see script statistics/Collect-email-sizes.sh
if you’re interested).
She saved these numbers (one number per line),
to text files named email-sizes-YYYY.txt
,
where YYYY
is a four-digit year (from 1999 through 2018).
The training subdirectory statistics
contains several of these
text files; the rest can be found in the shared DeapSECURE dataset
location on Turing:
/scratch-lustre/DeapSECURE/module01/spams/untroubled/YYYY/email-sizes-YYYY.txt
Exercise
Please use
less
orcat
to examine the content of fileemail-sizes-1999.txt
. Remember that you can call these commands directly from IPython.
At this point in time, Annie heard about many good things about Spark–especially for handling very large amounts of data. So she decided to try Spark for the rest of the analysis.
Creating a DataFrame from a Text File
From last episode we learned that PySpark version 2.0 and later already defines
a SparkSession object simply called spark
.
We will access capabilities of PySpark through this object.
First, Annie has to load the data into Spark.
Function spark.read.text
reads a text file and
return records of text lines as a DataFrame.
Here is her first Spark session (we use >>>
for the prompt,
but remember that ipython has a different prompt,
so please don’t be confused).
>>> df_sizes = spark.read.text("statistics/email-sizes-1999.txt")
# get the number of records (i.e. text lines)
>>> df_sizes.count()
1309
# obtain the first five records (and show them)
>>> df_sizes.take(5)
[Row(value='2304'),
Row(value='2175'),
Row(value='3318'),
Row(value='4083'),
Row(value='1677')]
In the examples above, after loading the data, Annie counted how many
lines are in the text file (using count
method) and
printed the first few lines of the file (using take
method).
Remember: there is one integer per text line; so there are 1309 numbers
in the whole file;
that corresponds to the number of spam emails captured in year 1999.
Both count()
and take()
methods are actions in Spark, therefore
they yielded the output immediately after the commands were executed.
Specifying path for Spark input
Spark is a little bit quirky when handling input file paths. As a rule, PySpark does not follow the “current working directory” that you can change in the Python front-end interface using Python’s
os.chdir
command.File path in standalone mode (no HDFS)
In the “standalone mode” (which is how we use Spark on Turing and many other HPC clusters), the directory where you started the PySpark process (let’s call this
D
) is the directory assumed by Spark when you do not specify an absolute path. Therefore, all input file names must be relative to thisD
directory, or you have to specify an absolute path to load the correct file.Example: Suppose you started
pyspark
from the directory called/home/user/training
, then all input files will be relative to this directory for the duration of thatpyspark
program, unless an absolute path is used:
- Command
spark.read.text("records.txt")
will read/home/user/training/records.txt
file.- Command
spark.read.text("/shared/data/bigdump.txt")
will read/shared/data/bigdump.txt
file.File path in HDFS
If Spark input files are served from HDFS, then it is recommended to use absolute path all the time to avoid confusion. (This is not applicable for Turing cluster.)
DataFrame as a table with rows and columns
Before doing further analysis, let’s take a closer look at
our data representation given by PySpark, i.e. df_sizes
.
>>> df_sizes
DataFrame[value: string]
This statement prints what df_sizes
is:
It is a DataFrame
with one column, named value
;
and value
has a string
data type.
The take(N)
method yields N
rows; and each row, logically,
has only one column named value
.
A particular column in a DataFrame can be accessed conveniently using standard Python indexing syntax, as shown in the following example:
>>> df_sizes['value']
Column<value>
This is a very frequent operation when working with DataFrames. Its utility will become apparent when we have a DataFrame with many columns.
Illustrations of DataFrame
Here is a pictorial representation of the
df_sizes
DataFrame above in a tabular fashion:+-------+ | value | +-------+ | 2304 | | 2175 | | 3318 | | 4083 | | 1677 | | ... | +-------+
A more complete data will contain additional information than just the file size; for example, it can contain file size, name, and the originating IP address. In this example, we appropriately named the columns (as we will do later).
+-------+-----------------------------+-----------------+ | size | filename | origin_ip | +-------+-----------------------------+-----------------+ | 2304 | 1999/01/915202605.14113.txt | 198.81.17.10 | | 2175 | 1999/01/915202639.14137.txt | 193.68.153.2 | | 3318 | 1999/01/915258711.14416.txt | 139.175.250.157 | | 4083 | 1999/01/915338036.14886.txt | 204.126.205.203 | | 1677 | 1999/01/915338371.14888.txt | 12.74.105.130 | | ... | ... | ... | +-------+-----------------------------+-----------------+
The importance of correct data type
Consider the output of take
method above, which is:
[Row(value='2304'), Row(value='2175'), ... ]
.
The quotation marks indicate that the value has
the datatype of string.
There is a problem here: the text file contains integers, but we read them
as strings of characters (frequently just called “strings”).
What is Unicode?
Starting with Python 3, it supports universal support for world’s many alphabetical systems by using Unicode as the underlying representation. Unicode string is a modern universal character representation on computer systems that accommodate not only Latin letters (a-z) but also other types of letters (such as Greek, Chinese, Japanese, Russian, Hindi, etc.).
With Spark DataFrame API, it is actually quite convenient to do many common operations. Let’s attempt to compute the minimum, maximum, and average sizes of the emails. These are aggregate quantities based on the data.
# Try to compute the average
>>> df_sizes.agg({'value': 'avg'})
DataFrame[avg(value): double]
Why did it not return a value?
In contrast to take
or count
operations, agg
(short name of aggregate)
is a transformation, therefore it won’t execute right away.
Instead, it returns a new DataFrame.
To compute it, we have to call the collect
method:
>>> df_sizes.agg({'value': 'avg'}).collect()
[Row(avg(value)=4550.0)]
Indeed, the average size of the emails in year 1999 is precisely 4550 bytes. Now compute the minimum and the maximum sizes:
>>> df_sizes.agg({'value': 'min'}).collect()
[Row(min(value)=u'10036')]
>>> df_sizes.agg({'value': 'max'}).collect()
[Row(max(value)=u'9965')]
Wait a minute!
The minimum size was 10036 bytes, and the maximum was less than that.
The output of take
above reveals that there was an email that was
1677 bytes long only.
Where’s the mistake?
Please think where the error lies? (Hint: Look at the result’s data type.)
Solution
The
min
andmax
operation returns strings, not numbers. Lexicographically, string10036
was the “smallest” in the order of letters, and9965
was the “largest”. So one has to correct the data type before computing the numerical minimum and maximum.
Big Data Warning
When you use
collect()
method or other actions, it is wise to know ahead of time the amount of data you are expecting. Spark will happily dump ten billion of records it can find from the requested operation, but your computer (where PySpark runs) may not be able to handle such a flood of data. If you do not know, it is recommended that either:
- you use the
count()
method to find out the amount of data, or- you use the
take()
method to fetch only a limited number of output items.
So how do we correct this mess? We have to convert the column values into the appropriate data type. Here’s how we do it in the case above:
>>> df_sizes2 = df_sizes.withColumn('value', df_sizes['value'].cast("integer"))
Calculating the correct numerical statistics
Using the new
df_sizes2
, compute the minimum, maximum, and average file sizes from the 1999 spam emails.Solution
Minimum = 709
Maximum = 61002
Average = 4550.0
Creating a DataFrame from a CSV File
A CSV (Comma Separated Value) file is actually just a plain text file with one or more data items per line, separated by a predefined delimiter. As the name suggests, the delimiter is a comma by default Other characters that are frequently used include whitespace and tab characters.
One advantage of CSV reader is that you can specify the data types right away. Here is an example to re-read the same data as before, using CSV reader:
>>> df_sizes3 = spark.read.csv("statistics/email-sizes-1999.txt", schema="size INT")
>>> df_sizes3
DataFrame[size: int]
EXERCISE: You can verify that the minimum, maximum, and average sizes are still
the same as the correct numerical answers you obtained earlier with df_sizes2
.
An important argument for the CSV reader is the schema
above:
it defines both the column name (size
) and the data type (INT
for integer).
As with the rest of Spark and Python, column names are case-sensitive;
but the data type is not case sensitive.
Here are a few frequently used data types:
INT
– Integers (FIXME: range = ?)FLOAT
– IEEE single-precision numbers (precision: about 7 decimal digits; range: approximately 10-38 to 10+38 in magnitude)DOUBLE
– IEEE double-precision numbers (precision: about 15 decimal digits; range: approximately 10-308 to 10+308 in magnitude)STRING
– Strings of characters (arbitrary length)BOOLEAN
– True/False values
There are some other possible types which we will not discuss here.
Text reader or CSV reader: When to use what?
A CSV format assumes certain structure to the data: it generally assumes the data items to be stored in rows and columns, much like how cells are arranged in a spreadsheet program. There is actually liberty to put arbitrary type of data on each cell; but for the most part, data is arranged regularly with specific data type for each column.
A general text format, on the other hand, does not assume any structure whatsoever other than that the data is organized in to text lines. Consider the following example (the first few verses from the Book of Ecclesiastes by King Solomon, King James Version Bible):
1 The words of the Preacher, the son of David, king in Jerusalem. 2 Vanity of vanities, saith the Preacher, vanity of vanities; all is vanity. 3 What profit hath a man of all his labour which he taketh under the sun? 4 One generation passeth away, and another generation cometh: but the earth abideth for ever. 5 The sun also ariseth, and the sun goeth down, and hasteth to his place where he arose. 6 The wind goeth toward the south, and turneth about unto the north; it whirleth about continually, and the wind returneth again according to his circuits.
The number of characters or words per line may differ significantly; the number of lines per verse is not fixed. You will be hard pressed to force this free-form text into a tabular format.
As a general rule of thumb, whenever the data has a table-like structure, use the CSV reader for convenience. Only use the general text reader when the text data is totally unstructured.
DataFrame from Other File Formats
There are many input formats that can be read into a Spark DataFrame. Detailed explanation and examples are outside the scope of this workshop, but here are some pointers:
-
JSON (JavaScript Object Notation): a structured data interchange format, which is still human-readable (it is based on plain text). To read data stored in this format, use
spark.read.json()
function. -
Parquet: columnar storage format developed for Hadoop ecosystem. It is suitable for tables with extremely large number of rows and columns, where generally only a few columns are involved in a single operation. This format is usually used in conjunction with HDFS (Hadoop file system), but newer development seems to support the use of this format without HDFS. To read data stored in this format, use
spark.read.parquet()
function. -
JDBC (Java Database Connectivity): a protocol to connect data from other database formats to a Java-based program (Spark, in this case). This is handy in case you already have a lot of data stored in other databases; all Spark needs to do is “pull” the data from them and use them for the analytics. To read data using JDBC protocol, use
spark.read.jdbc()
function.
Detailed information about data sources and how to read them can be found on Spark SQL programming guide .