This lesson is in the early stages of development (Alpha version)

DeapSECURE module 2: Dealing with Big Data: First Encounter with Spark

In this episode we will create simple DataFrames in Spark, then do some basic operations on them. We will continue using the spam dataset used in the previous workshop for this first exercise.

STORY: Annie Assistant was asked by Professor Ian Investigator to gather some statistics of the spam emails from year to year. As you have seen in the previous workshop, there are over 8,500,000 emails to analyze, so this is a daunting task! To get her feet wet in the water, she wants to find the minimum, maximum, and average file sizes of the emails. Annie used a simple shell script to get the file sizes (see script statistics/Collect-email-sizes.sh if you’re interested). She saved these numbers (one number per line), to text files named email-sizes-YYYY.txt, where YYYY is a four-digit year (from 1999 through 2018). The training subdirectory statistics contains several of these text files; the rest can be found in the shared DeapSECURE dataset location on Turing:

/scratch-lustre/DeapSECURE/module01/spams/untroubled/YYYY/email-sizes-YYYY.txt

Exercise

Please use less or cat to examine the content of file email-sizes-1999.txt. Remember that you can call these commands directly from IPython.

At this point in time, Annie heard about many good things about Spark–especially for handling very large amounts of data. So she decided to try Spark for the rest of the analysis.

Creating a DataFrame from a Text File

From last episode we learned that PySpark version 2.0 and later already defines a SparkSession object simply called spark. We will access capabilities of PySpark through this object.

First, Annie has to load the data into Spark. Function spark.read.text reads a text file and return records of text lines as a DataFrame. Here is her first Spark session (we use >>> for the prompt, but remember that ipython has a different prompt, so please don’t be confused).

>>> df_sizes = spark.read.text("statistics/email-sizes-1999.txt")

# get the number of records (i.e. text lines)
>>> df_sizes.count()
1309

# obtain the first five records (and show them)
>>> df_sizes.take(5)
[Row(value='2304'),
 Row(value='2175'),
 Row(value='3318'),
 Row(value='4083'),
 Row(value='1677')]

In the examples above, after loading the data, Annie counted how many lines are in the text file (using count method) and printed the first few lines of the file (using take method). Remember: there is one integer per text line; so there are 1309 numbers in the whole file; that corresponds to the number of spam emails captured in year 1999. Both count() and take() methods are actions in Spark, therefore they yielded the output immediately after the commands were executed.

Specifying path for Spark input

Spark is a little bit quirky when handling input file paths. As a rule, PySpark does not follow the “current working directory” that you can change in the Python front-end interface using Python’s os.chdir command.

File path in standalone mode (no HDFS)

In the “standalone mode” (which is how we use Spark on Turing and many other HPC clusters), the directory where you started the PySpark process (let’s call this D) is the directory assumed by Spark when you do not specify an absolute path. Therefore, all input file names must be relative to this D directory, or you have to specify an absolute path to load the correct file.

Example: Suppose you started pyspark from the directory called /home/user/training, then all input files will be relative to this directory for the duration of that pyspark program, unless an absolute path is used:

  • Command spark.read.text("records.txt") will read /home/user/training/records.txt file.
  • Command spark.read.text("/shared/data/bigdump.txt") will read /shared/data/bigdump.txt file.

File path in HDFS

If Spark input files are served from HDFS, then it is recommended to use absolute path all the time to avoid confusion. (This is not applicable for Turing cluster.)

DataFrame as a table with rows and columns

Before doing further analysis, let’s take a closer look at our data representation given by PySpark, i.e. df_sizes.

>>> df_sizes
DataFrame[value: string]

This statement prints what df_sizes is: It is a DataFrame with one column, named value; and value has a string data type. The take(N) method yields N rows; and each row, logically, has only one column named value.

A particular column in a DataFrame can be accessed conveniently using standard Python indexing syntax, as shown in the following example:

>>> df_sizes['value']
Column<value>

This is a very frequent operation when working with DataFrames. Its utility will become apparent when we have a DataFrame with many columns.

Illustrations of DataFrame

Here is a pictorial representation of the df_sizes DataFrame above in a tabular fashion:

+-------+
| value |
+-------+
|  2304 |
|  2175 |
|  3318 |
|  4083 |
|  1677 |
|   ... |
+-------+

A more complete data will contain additional information than just the file size; for example, it can contain file size, name, and the originating IP address. In this example, we appropriately named the columns (as we will do later).

+-------+-----------------------------+-----------------+
|  size |                    filename |       origin_ip |
+-------+-----------------------------+-----------------+
|  2304 | 1999/01/915202605.14113.txt |    198.81.17.10 |
|  2175 | 1999/01/915202639.14137.txt |    193.68.153.2 |
|  3318 | 1999/01/915258711.14416.txt | 139.175.250.157 |
|  4083 | 1999/01/915338036.14886.txt | 204.126.205.203 |
|  1677 | 1999/01/915338371.14888.txt |   12.74.105.130 |
|   ... |                         ... |             ... |
+-------+-----------------------------+-----------------+

The importance of correct data type

Consider the output of take method above, which is: [Row(value='2304'), Row(value='2175'), ... ]. The quotation marks indicate that the value has the datatype of string. There is a problem here: the text file contains integers, but we read them as strings of characters (frequently just called “strings”).

What is Unicode?

Starting with Python 3, it supports universal support for world’s many alphabetical systems by using Unicode as the underlying representation. Unicode string is a modern universal character representation on computer systems that accommodate not only Latin letters (a-z) but also other types of letters (such as Greek, Chinese, Japanese, Russian, Hindi, etc.).

With Spark DataFrame API, it is actually quite convenient to do many common operations. Let’s attempt to compute the minimum, maximum, and average sizes of the emails. These are aggregate quantities based on the data.

# Try to compute the average
>>> df_sizes.agg({'value': 'avg'})
DataFrame[avg(value): double]

Why did it not return a value? In contrast to take or count operations, agg (short name of aggregate) is a transformation, therefore it won’t execute right away. Instead, it returns a new DataFrame. To compute it, we have to call the collect method:

>>> df_sizes.agg({'value': 'avg'}).collect()
[Row(avg(value)=4550.0)]

Indeed, the average size of the emails in year 1999 is precisely 4550 bytes. Now compute the minimum and the maximum sizes:

>>> df_sizes.agg({'value': 'min'}).collect()
[Row(min(value)=u'10036')]

>>> df_sizes.agg({'value': 'max'}).collect()
[Row(max(value)=u'9965')]

Wait a minute! The minimum size was 10036 bytes, and the maximum was less than that. The output of take above reveals that there was an email that was 1677 bytes long only.

Where’s the mistake?

Please think where the error lies? (Hint: Look at the result’s data type.)

Solution

The min and max operation returns strings, not numbers. Lexicographically, string 10036 was the “smallest” in the order of letters, and 9965 was the “largest”. So one has to correct the data type before computing the numerical minimum and maximum.

Big Data Warning

When you use collect() method or other actions, it is wise to know ahead of time the amount of data you are expecting. Spark will happily dump ten billion of records it can find from the requested operation, but your computer (where PySpark runs) may not be able to handle such a flood of data. If you do not know, it is recommended that either:

  1. you use the count() method to find out the amount of data, or
  2. you use the take() method to fetch only a limited number of output items.

So how do we correct this mess? We have to convert the column values into the appropriate data type. Here’s how we do it in the case above:

>>> df_sizes2 = df_sizes.withColumn('value', df_sizes['value'].cast("integer"))

Calculating the correct numerical statistics

Using the new df_sizes2, compute the minimum, maximum, and average file sizes from the 1999 spam emails.

Solution

Minimum = 709

Maximum = 61002

Average = 4550.0

Creating a DataFrame from a CSV File

A CSV (Comma Separated Value) file is actually just a plain text file with one or more data items per line, separated by a predefined delimiter. As the name suggests, the delimiter is a comma by default Other characters that are frequently used include whitespace and tab characters.

One advantage of CSV reader is that you can specify the data types right away. Here is an example to re-read the same data as before, using CSV reader:

>>> df_sizes3 = spark.read.csv("statistics/email-sizes-1999.txt", schema="size INT")

>>> df_sizes3
DataFrame[size: int]

EXERCISE: You can verify that the minimum, maximum, and average sizes are still the same as the correct numerical answers you obtained earlier with df_sizes2.

An important argument for the CSV reader is the schema above: it defines both the column name (size) and the data type (INT for integer). As with the rest of Spark and Python, column names are case-sensitive; but the data type is not case sensitive. Here are a few frequently used data types:

There are some other possible types which we will not discuss here.

Text reader or CSV reader: When to use what?

A CSV format assumes certain structure to the data: it generally assumes the data items to be stored in rows and columns, much like how cells are arranged in a spreadsheet program. There is actually liberty to put arbitrary type of data on each cell; but for the most part, data is arranged regularly with specific data type for each column.

A general text format, on the other hand, does not assume any structure whatsoever other than that the data is organized in to text lines. Consider the following example (the first few verses from the Book of Ecclesiastes by King Solomon, King James Version Bible):

1 The words of the Preacher, the son of David, king in Jerusalem.
2 Vanity of vanities, saith the Preacher, vanity of vanities;
    all is vanity.
3 What profit hath a man of all his labour
    which he taketh under the sun?
4 One generation passeth away, and another generation cometh:
    but the earth abideth for ever.
5 The sun also ariseth, and the sun goeth down,
    and hasteth to his place where he arose.
6 The wind goeth toward the south, and turneth about unto the north;
    it whirleth about continually,
    and the wind returneth again according to his circuits.

The number of characters or words per line may differ significantly; the number of lines per verse is not fixed. You will be hard pressed to force this free-form text into a tabular format.

As a general rule of thumb, whenever the data has a table-like structure, use the CSV reader for convenience. Only use the general text reader when the text data is totally unstructured.

DataFrame from Other File Formats

There are many input formats that can be read into a Spark DataFrame. Detailed explanation and examples are outside the scope of this workshop, but here are some pointers:

Detailed information about data sources and how to read them can be found on Spark SQL programming guide .