This lesson is in the early stages of development (Alpha version)

DeapSECURE module 2: Dealing with Big Data: Analytics of Spam Emails with Spark

Introduction

In this episode we will perform analytics on the result of the IP address analysis done in the first workshop. In the first workshop, Annie Assistant (the graduate student of Professor Ian Investigator) launched computations that mapped the originating IP addresses and corresponding countries of the SPAM emails. The result is a dataset that has four columns, which we shall give names:

These results are not insightful yet, of course. Annie wants to further analyze these to obtain some insight. She came up with the following questions:

  1. For a given year Y (say, Y=1999), how many spam emails come from country X? Sort these by the number of emails per country.

  2. With respect to the major contributing countries, is there a trend observed across the years? Say, if U.S. turned out to be the top spam contributor in 1999, is it still number one in year 2009?

We have to take a deep drill into the data.

The datasets for some years are available in the subdirectory spam-ip in your hands-on directory. Other years are available on the shared location on Turing. For year YYYY, the file name will be:

/scratch-lustre/DeapSECURE/module01/spams/untroubled/YYYY/YYYY.ip_alg1

In this episode, we will cover three important operations that are frequently encountered in analytics:

Afterwards, we will cover grouping and aggregation, which will be used to obtain the results that Annie wants for her research.

Loading Table-Like Data

Annie uses the CSV reader to load the Spam-IP-country dataset.

>>> df_spam1 = spark.read.csv("spam-ip/1999.ip_alg1",
        schema="filename STRING, origin_ip STRING, CC2 STRING, country STRING",
        sep="|")

The sep argument is used to indicate that the field separator is a vertical bar (|), not a comma.

Explore the data

When faced with a new data, always performa an exploration first. For example:

  • What is the output of df_spam1.count()?
  • What is the output of df_spam1.take(5)?
  • What is the output of df_spam1.take(50)?
  • Did you see anything that’s out-of-place; that is, did you see any bad data?

Discuss your findings with your neighbors, as well as the following.

  • How does “bad data” look like in this dataset, if any?
  • Do you see the pro and con of doing the count() operation above?

Solution

Only some solutions are provided here:

  • df_spam1.count() = 1309

  • Output of df_spam1.take(5):

    [Row(filename=u'1999/01/915202605.14113.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None),
     Row(filename=u'1999/01/915202639.14137.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None),
     Row(filename=u'1999/01/915258711.14416.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None),
     Row(filename=u'1999/01/915338036.14886.txt', origin_ip=u'204.126.205.203', CC2=u'US', country=u'United States'),
     Row(filename=u'1999/01/915338371.14888.txt', origin_ip=u'12.74.105.130', CC2=u'US', country=u'United States')]
    
  • We will discuss bad data later on.

  • Know your enemy: The count() operation can take a very long time if the data size is enormous. An a priori knowledge on the magnitude of the data is going to be helpful.

Filtering

Filtering is an essential operation in analytics. For example, if you only want to see the spam records from the US, then you will do

>>> df_spam1_US = df_spam1.filter(df_spam1['CC2'] == 'US')
>>> df_spam1_US.take(5)
# find out what this gives you

>>> df_spam1_US.count()
256

There are 256 emails that were claimed to have originated from the US.

Using SQL-like expression

The DataFrame API has a rich support for SQL (Structured Query Language). There is an alternative syntax for filtering using SQL expression:

>>> df_spam1_US2 = df_spam1.filter("CC2 == 'US'")
>>> df_spam1_US2.take(5)
# find out what this gives you

>>> df_spam1_US2.count()
256

Are the output the same as before? This syntax allows us to save a lot of typing!

Selecting and Modifying Data

Data modification is another important operation in analytics. This includes selecting which piece of information to process. This is accomplished by the select() method of DataFrame. Consider the snippet of df_spam1 DataFrame contents below:

>>> df_spam1.take(5)
[Row(filename=u'1999/01/915202605.14113.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None),
 Row(filename=u'1999/01/915202639.14137.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None),
 Row(filename=u'1999/01/915258711.14416.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None),
 Row(filename=u'1999/01/915338036.14886.txt', origin_ip=u'204.126.205.203', CC2=u'US', country=u'United States'),
 Row(filename=u'1999/01/915338371.14888.txt', origin_ip=u'12.74.105.130', CC2=u'US', country=u'United States')]

We did not have year and month fields in this dataset, but the filename actually has the data. These pieces of information can be invaluable when we are evaluating a spam dataset that spans multiple years.

To create additional columns (fields), we need to create a new DataFrame that has the year column:

>>> df_spam1y = df_spam1.select("*", df_spam1['filename'].substr(1,4).alias("year"))

The first column name above, an asterisk (“*”) is special. It means “all the columns existing in the said DataFrame” (i.e. in df_spam1). A new column is added by taking a substring from the filename field (i.e.,the first four letters in each filename); and this column is explicitly named “year”). The operation above is equivalent to

>>> df_spam1y = df_spam1.select("filename", "origin_ip", "CC2", "country",
                                df_spam1['filename'].substr(1,4).alias("year"))

select can also be used to reduce the number of columns to only what’s necessary. For example, we may want to drop the filename and CC2 fields:

>>> df_spam1z = df_spam1.select("origin_ip","country",
                                df_spam1['filename'].substr(1,4).alias("year"))

DataFrame exercises

  1. Create a new DataFrame named df_spam1ym that has both year and month additional fields.

  2. Assuming you still have the df_sizes DataFrame from the last episode, create a new DataFrame with size_KB column, where the file sizes are divided by 1024. Hint: Divide the relevant quantity by 1024.0 (a float) instead of by 1024 (an integer).

DataFrame is a read-only entity

A keen reader may have this question: Why didn’t we modify the original df_spam1 DataFrame instead of creating a new one? The reason is that a Spark DataFrame is a read-only entity. We perform transformation by creating a new DataFrame with the modified (transformed) quantities. This operation is not computationally expensive in Spark. Furthermore, creating a new DataFrame does not double the memory usage of Spark because it is not computed and stored right away.

Sorting

The next important operation is sorting the data according to a given criteria. The DataFrame sort() method does this task. Building upon the previous example where you have defined the df_spam1y DataFrame–now you can sort the data based on the country name first, then year:

df_spam1_sorted = df_spam1y.sort("country", "year")

We will use this later.

Chaining Operations

The operations mentioned above–filtering, selecting, and sorting–can be chained together to compose a complex analytics pipeline. The order of the operations are important: They are performed according to the order they are specified, in the left-to-right manner.

The df_spam1_sorted is an example of chained operation: First, it appended an extra year field; then, the data is sorted by country and year. This is an equivalent expression that yields an identical DataFrame for df_spam1_sorted:

>>> df_spam1_sorted = df_spam1.select("*", df_spam1['filename'].substr(1,4).alias("year")) \
                              .sort("country", "year")

Data Manipulation and Comparison

The three key operations above all take some kind of expression (or expressions) as their argument. In this section we describe the common expressions that can be used in filtering, selecting/modifying, and sorting. These are by no means exhaustive–please refer to Spark documentation as the authoratitative source of information. Spark syntax is designed to be intuitive (both the readers and writers of the program).

Let us introduce some notations to help the discussion below.

Manipulation: Expressions

Both select() and sort() can take one or more COLUMN or COLNAME, or a combination thereof. Or, as we already saw in earlier examples, they can also take some expressions (formulas)—one or more of the following:

Description Commonly Used Syntax Example
Arithmetic operations COLUMN + EXPR df_person['age'] + 1
  COLUMN - EXPR df_person['age'] - 1
  COLUMN * EXPR df_spam['size_KB'] * 1024
  COLUMN / EXPR df_spam['size'] / 1024.0
Extract a substring COLUMN.substr(BEGINPOS, LENGTH) df_spam['filename'].substr(1, 4)

NOTE: The operations are applied elementwise (element-by-element), to every value in COLUMN. The result of each operation above is another Column, the same length as COLUMN, containing the result of the elementwise operations above.

Comparison and Condition: Logical Expressions

The following table lists common logical expression patterns that are frequently used inside the filter() argument. Each operation is evaluated elementwise against the data in COLUMN; the result is stored in a new Column that has logical values (only True or False).

Description Commonly Used Syntax Example
String match (exact comparison) COLUMN == 'SOME_STRING' df_spam['CC2'] == 'US'
String mismatch (exact comparison) COLUMN != 'SOME_STRING' df_spam['CC2'] != 'US'
Exact match at beginning of the string value COLUMN.startswith('SOME_STRING') df_spam['country'].startswith('United')
Exact match at end of the string value COLUMN.endswith('SOME_STRING') df_spam['country'].endswith('land')
Exact match located anywhere in the string value COLUMN.contains('SOME_STRING') df_spam['country'].contains('Republic')
SQL “LIKE” match of the string value COLUMN.like('LIKE_EXPRESSION') df_spam['country'].like('A%a')
Value is defined COLUMN.isNotNull() df_spam['origin_ip'].isNotNull()
Value is undefined COLUMN.isNull() df_spam['origin_ip'].isNull()
Value is within a specified range (inclusive endpoints) COLUMN.between(LOWERBOUND, UPPERBOUND) df_sizes['value'].between(1024, 2047)
Value comparison (numerical or lexical) COLUMN == EXPR df_sizes['value'] == 1024
  COLUMN != EXPR df_sizes['value'] != 1024
  COLUMN > EXPR df_sizes['value'] > 1024
  COLUMN >= EXPR df_sizes['value'] >= 1024
  COLUMN < EXPR df_sizes['value'] < 1024
  COLUMN <= EXPR df_sizes['value'] <= 1024

These expression can be combined using the bitwise AND (&), OR (|) or XOR (^) operator(s). For example:

(df_spam['year'] == '1999') & (df_spam['size'] < 1024)

will return a DataFrame that has True values on the rows that has the year value of 1999 and whose size is less than 1024. Note that the parentheses are important because of the way Python prioritize operators (see this reference).

Practice using filter!

Changing Column Attributes

There are a few operations which can change Column attributes, such as name and sort order. Applying these methods do not alter the values stored in the Column.

Description Commonly Used Syntax Example
Rename a column COLUMN.alias(NEW_COLNAME) df_spam['CC2'].alias('country code')
Set an ascending sort order COLUMN.asc() df_spam['CC2'].asc()
Set an descending sort order COLUMN.desc() df_spam['CC2'].desc()

Determining sort order are useful for sort() method.

Cleaning the Data

Data cleaning is an integral part of working with data analytics. There is a famous expression: “Garbage in, garbage out”. This notion is very applicable in big data analytics. Our goal is to obtain insight from very large amounts of data, but if our data is bad, we cannot expect to obtain valid insight.

Data cleaning is done by filtering and carefully thought data manipulation. In practice, this is often an iterative process that may take several iterations before we obtain an acceptable quality of data.

PROBLEM: Unknown IP address. In the spam analysis work of Annie Assistant, she found bad data (rows) that look like:

Row(filename=u'01/915202605.14113.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None),
Row(filename=u'01/915202639.14137.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None),
Row(filename=u'01/915258711.14416.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None),
...
Row(filename=u'01/915338373.14888.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None),
...

The origin_ip column value on these rows is None—this means that the origin IP is not defined. (In SQL, undefined data is called NULL.)

Removing unknown IP

Which operation is needed to remove rows with unknown IP: select() or filter()? Use the operation to create a new data frame df_spam1_known that has only known IP addresses (i.e. no rows with origin_ip=None). How many records remained after this cleaning process?

Solution

This is a filter operation. The following filtering should remove unknown IP address:

df_spam1_known = df_spam1.filter(df_spam1['origin_ip'].isNotNull())

Do verify by peeking into some values from df_spam1_known.

There are 422 records in the df_spam1_known DataFrame.

PROBLEM: Invalid IP address. There is another form of bad data that you may or may not have caught in your exploration: some IP addresses did not map to any country because they are reserved IP range that are not meant for public IP addresses. For example, IP address 127.0.0.1 is reserved for “local host”, i.e. “this machine”. Such IP addresses would map to - (a dash) in both CC2 and country fields.

Removing both unknown and invalid IP

Since we do not have a way to fix these bad data, we should remove them from our analytics. Based on the two kinds of bad data above, please create a cleaned DataFrame called df_spam1_clean.

HINT: Use the & (AND) operator.

Solution

The following filtering should remove both types of bad data:

df_spam1_clean = df_spam1.filter((df_spam1['origin_ip'].isNotNull())
                                 & (df_spam1['CC2'] != '-'))

Alternatively, one can chain two filters instead of using & operator and achieve the same result:

df_spam1_clean = df_spam1.filter(df_spam1['origin_ip'].isNotNull()) \
                         .filter(df_spam1['CC2'] != '-'))

There are 400 records in the df_spam1_clean DataFrame.

Aggregating

Now we are in a position to tackle this question:

QUESTION 1: For a given year Y (say, Y=1999), how many spam emails come from country X? Sort these by the number of emails per country.

To answer this question, we have to aggregate the records. If we are doing this manually, we first group the emails by the country, then count the emails belonging to the same country and report the sums along with the corresponding countries. In Spark, the groupBy() combined with count() will accomplish the same. Let’s start with the cleaned data:

# Group and aggregate (count) by the country
>>> df_email_count1 = df_spam1_clean.groupBy('country').count()

>>> df_email_count1
DataFrame[country: string, count: bigint]

>>> df_email_count1.take(5)
[Row(country=u'Sweden', count=3),
 Row(country=u'Germany', count=17),
 Row(country=u'France', count=7),
 Row(country=u'Argentina', count=3),
 Row(country=u'United States', count=256)]

# Sort the result in descending order (by the number of emails)
>>> df_email_count = df_email_count1.sort('count', ascending=False)

>>> countries_top10 = df_email_count.take(10)

>>> countries_top10
[Row(country=u'United States', count=256),
 Row(country=u'China', count=37),
 Row(country=u'Germany', count=17),
 Row(country=u'Canada', count=13),
 Row(country=u'Korea, Republic of', count=10),
 Row(country=u'Japan', count=9),
 Row(country=u'France', count=7),
 Row(country=u'Colombia', count=7),
 Row(country=u'Australia', count=6),
 Row(country=u'United Kingdom', count=5)]

IMPORTANT NOTES:

  1. The result of aggregation using count() method above is a DataFrame. This is in contrast to the DataFrame’s count() method. Why? Because groupBy() returns a GroupedData instead of a DataFrame object.

  2. The results shown by df_email_count1.take(5) may differ on your system because of the somewhat undeterministic way Spark may process the data. This is true if you process the data in parallel with different configurations.

More statistics

  1. How many records are found in the aggregated dataset, i.e. in df_email_count?

  2. There is a show() method that prints the table nicely on the terminal. Try: df_email_count.show(30).

Solutions

  1. There are 29 countries noted in the aggregated dataset.

  2. Tabular output:

    +--------------------+-----+
    |             country|count|
    +--------------------+-----+
    |       United States|  256|
    |               China|   37|
    |             Germany|   17|
    |              Canada|   13|
    |  Korea, Republic of|   10|
    |               Japan|    9|
    |            Colombia|    7|
    |              France|    7|
    |           Australia|    6|
    |      United Kingdom|    5|
    |         Netherlands|    4|
    |              Sweden|    3|
    |           Argentina|    3|
    |               Italy|    2|
    |  Dominican Republic|    2|
    |               Spain|    2|
    |           Hong Kong|    2|
    |         Switzerland|    2|
    |                Cuba|    2|
    |        South Africa|    2|
    |         Puerto Rico|    1|
    |               Chile|    1|
    |  Russian Federation|    1|
    |Taiwan, Province ...|    1|
    |              Poland|    1|
    |          Costa Rica|    1|
    |              Israel|    1|
    |              Brazil|    1|
    |              Mexico|    1|
    +--------------------+-----+
    

Making sense of the result

(Comment: Strictly speaking, this section is not part of the CI technique introduced in this workshop, but it is nevertheless an important matter to discuss.)

Once we obtain some numbers from our computation, we have to make sense of the results. While the use of computers allows us to perform computations that are otherwise not possible to do, we have to perform our due diligence to validate the results. Earlier we mentioned that data quality is crucial. We also have to check the correctness of the computational algorithms we use. We should not blindly trusting any computational result. Otherwise, we can arrive at a totally wrong conclusion. In this section, we are encouraging participants to critically think about the results we obtained so far.

In the previous workshop, Dr. Graham mentioned that Nigeria and some Eastern European countries are blamed for many of the phishing emails circulating today. Emails in our dataset here are not strictly phishing emails; most of them are just the plain old “spam”. It is interesting that the largest contributor of spam emails recorded in this dataset is actually the United States. The next five countries are also on the “developed country” category. Colombia was the first from the “developing country” category.

Open-ended discussion

  1. Can you think why US could be the largest contributor to the spam, instead of the countries that are supposed to be much poorer, where people are more inclined to steal wealth from rich countries?

  2. What might affect the reliability of this result?

Extracting Data

We have said previously that Spark stores the (input) data in an RDD or a DataFrame and performs computation only when triggered by one or more actions. Here are some actions for a DataFrame (where DF refers to a DataFrame object):

All actions must be used with care: the size of the output data can be too big for the computer where PySpark is running! This is particularly true for the collect and toPandas actions above.

Under the hood, a DataFrame is still an RDD. The DF.rdd attribute gives us access to this underlying RDD. Sometimes we will need to work with the RDD because some tools in Spark only work with RDD.

More Datasets

In the spam-ip/ subdirectory there are two more datasets that you can use for exploration:

You are encouraged to experiment with these datasets. For the 1999 dataset, see how the improved algorithm changed the conclusion regarding the top 10 countries that produce spam emails in 1999.

Big Data Challenge

(This is an advanced challenge.)

If you are ready to tackle a big dataset, you are invited to analyze the complete analysis result in the following text file:

/scratch-lustre/DeapSECURE/module01/spams/Results_seq_alg2

Warning: This is a very large file. It contains originating IP addresses from over 8.5 million emails; the total file size is over 500 MB.

You will repeat the same steps of analysis as before. The format of the dataset is identical to the smaller file. However, this IP dataset is of much better quality than the first dataset we have been using so far. I improved the algorithm to extract the IP address and was able to extract many more emails.

Timing the PySpark operation

IPython has a time magic function that can keep track of the time spent in a given Python statement. Suppose you have loaded the dataset above to df_bigspam DataFrame. To compute the number of records in this dataset and measure the time taken to count all the records in this dataset:

In [125]: time df_bigspam.count()
CPU times: user 2 ms, sys: 2 ms, total: 4 ms
Wall time: 16.2 s
Out[125]: 8514860

This timing tells us a lot of thing! Of all the time spent for count() operation, only 4 milliseconds are used for computation. Where did the rest go? This operation requires going through the entire file and counting the number of lines. So, much time is actually spent in the disk I/O process. But this does not sound 100% sensible either. A most likely reason is that the CPU times did not count the amount of time used by the child process (which in this case is the Java Spark worker process). In any case, the wall time is the most useful information of all.

Big Data exploratory questions

Here are a few questions that one can interrogate from the df_bigspam set:

  1. How many data points have missing (unreadable) IP address?

  2. How many data points have invalid (reserved) IP address?

  3. Add a year field, deduced from the first four characters in the email’s filename.

  4. For all the years, list the top 20 countries that produced the most spam.

  5. Repeat the analysis above, but also group the results by the year.

  6. Adding workers (i.e. running parallel) would help reduce the runtime. Try running one or more of these configurations: 2x1, 1x2, 4x1, 1x4; report the timing of the aggregating analysis that produce the top 20 countries..

Solutions (partial)

  1. 39425
  2. 889483