Introduction
In this episode we will perform analytics on the result of the IP address analysis done in the first workshop. In the first workshop, Annie Assistant (the graduate student of Professor Ian Investigator) launched computations that mapped the originating IP addresses and corresponding countries of the SPAM emails. The result is a dataset that has four columns, which we shall give names:
filename
– email’s filenameorigin_ip
– originating IP addressCC2
– two-letter country codecountry
– full country name
These results are not insightful yet, of course. Annie wants to further analyze these to obtain some insight. She came up with the following questions:
-
For a given year
Y
(say, Y=1999), how many spam emails come from countryX
? Sort these by the number of emails per country. -
With respect to the major contributing countries, is there a trend observed across the years? Say, if U.S. turned out to be the top spam contributor in 1999, is it still number one in year 2009?
We have to take a deep drill into the data.
The datasets for some years are available in the subdirectory spam-ip
in your hands-on directory.
Other years are available on the shared location on Turing.
For year YYYY
, the file name will be:
/scratch-lustre/DeapSECURE/module01/spams/untroubled/YYYY/YYYY.ip_alg1
In this episode, we will cover three important operations that are frequently encountered in analytics:
- Filtering
- Selecting & modifying
- Sorting
Afterwards, we will cover grouping and aggregation, which will be used to obtain the results that Annie wants for her research.
Loading Table-Like Data
Annie uses the CSV reader to load the Spam-IP-country dataset.
>>> df_spam1 = spark.read.csv("spam-ip/1999.ip_alg1",
schema="filename STRING, origin_ip STRING, CC2 STRING, country STRING",
sep="|")
The sep
argument is used to indicate that the field separator is a
vertical bar (|
), not a comma.
Explore the data
When faced with a new data, always performa an exploration first. For example:
- What is the output of
df_spam1.count()
?- What is the output of
df_spam1.take(5)
?- What is the output of
df_spam1.take(50)
?- Did you see anything that’s out-of-place; that is, did you see any bad data?
Discuss your findings with your neighbors, as well as the following.
- How does “bad data” look like in this dataset, if any?
- Do you see the pro and con of doing the
count()
operation above?Solution
Only some solutions are provided here:
df_spam1.count() = 1309
Output of
df_spam1.take(5)
:[Row(filename=u'1999/01/915202605.14113.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None), Row(filename=u'1999/01/915202639.14137.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None), Row(filename=u'1999/01/915258711.14416.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None), Row(filename=u'1999/01/915338036.14886.txt', origin_ip=u'204.126.205.203', CC2=u'US', country=u'United States'), Row(filename=u'1999/01/915338371.14888.txt', origin_ip=u'12.74.105.130', CC2=u'US', country=u'United States')]
We will discuss bad data later on.
Know your enemy: The
count()
operation can take a very long time if the data size is enormous. An a priori knowledge on the magnitude of the data is going to be helpful.
Filtering
Filtering is an essential operation in analytics. For example, if you only want to see the spam records from the US, then you will do
>>> df_spam1_US = df_spam1.filter(df_spam1['CC2'] == 'US')
>>> df_spam1_US.take(5)
# find out what this gives you
>>> df_spam1_US.count()
256
There are 256 emails that were claimed to have originated from the US.
Using SQL-like expression
The DataFrame API has a rich support for SQL (Structured Query Language). There is an alternative syntax for filtering using SQL expression:
>>> df_spam1_US2 = df_spam1.filter("CC2 == 'US'") >>> df_spam1_US2.take(5) # find out what this gives you >>> df_spam1_US2.count() 256
Are the output the same as before? This syntax allows us to save a lot of typing!
Selecting and Modifying Data
Data modification is another important operation in analytics.
This includes selecting which piece of information to process.
This is accomplished by the select()
method of DataFrame.
Consider the snippet of df_spam1
DataFrame contents below:
>>> df_spam1.take(5)
[Row(filename=u'1999/01/915202605.14113.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None),
Row(filename=u'1999/01/915202639.14137.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None),
Row(filename=u'1999/01/915258711.14416.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None),
Row(filename=u'1999/01/915338036.14886.txt', origin_ip=u'204.126.205.203', CC2=u'US', country=u'United States'),
Row(filename=u'1999/01/915338371.14888.txt', origin_ip=u'12.74.105.130', CC2=u'US', country=u'United States')]
We did not have year
and month
fields in this dataset,
but the filename actually has the data.
These pieces of information can be invaluable when we are evaluating
a spam dataset that spans multiple years.
To create additional columns (fields),
we need to create a new DataFrame that has the year
column:
>>> df_spam1y = df_spam1.select("*", df_spam1['filename'].substr(1,4).alias("year"))
The first column name above, an asterisk (“*”) is special.
It means “all the columns existing in the said DataFrame” (i.e. in df_spam1
).
A new column is added by taking a substring from the filename
field
(i.e.,the first four letters in each filename); and this column is explicitly
named “year”).
The operation above is equivalent to
>>> df_spam1y = df_spam1.select("filename", "origin_ip", "CC2", "country",
df_spam1['filename'].substr(1,4).alias("year"))
select
can also be used to reduce the number of columns to only
what’s necessary.
For example, we may want to drop the filename
and CC2
fields:
>>> df_spam1z = df_spam1.select("origin_ip","country",
df_spam1['filename'].substr(1,4).alias("year"))
DataFrame exercises
Create a new DataFrame named
df_spam1ym
that has bothyear
andmonth
additional fields.Assuming you still have the
df_sizes
DataFrame from the last episode, create a new DataFrame withsize_KB
column, where the file sizes are divided by 1024. Hint: Divide the relevant quantity by 1024.0 (a float) instead of by 1024 (an integer).
DataFrame is a read-only entity
A keen reader may have this question: Why didn’t we modify the original
df_spam1
DataFrame instead of creating a new one? The reason is that a Spark DataFrame is a read-only entity. We perform transformation by creating a new DataFrame with the modified (transformed) quantities. This operation is not computationally expensive in Spark. Furthermore, creating a new DataFrame does not double the memory usage of Spark because it is not computed and stored right away.
Sorting
The next important operation is sorting the data
according to a given criteria.
The DataFrame sort()
method does this task.
Building upon the previous example where you have defined the df_spam1y
DataFrame–now you can sort the data based on the country name first,
then year:
df_spam1_sorted = df_spam1y.sort("country", "year")
We will use this later.
Chaining Operations
The operations mentioned above–filtering, selecting, and sorting–can be chained together to compose a complex analytics pipeline. The order of the operations are important: They are performed according to the order they are specified, in the left-to-right manner.
The df_spam1_sorted
is an example of chained operation:
First, it appended an extra year
field;
then, the data is sorted by country and year.
This is an equivalent expression that yields an identical DataFrame
for df_spam1_sorted
:
>>> df_spam1_sorted = df_spam1.select("*", df_spam1['filename'].substr(1,4).alias("year")) \
.sort("country", "year")
Data Manipulation and Comparison
The three key operations above all take some kind of expression (or expressions) as their argument. In this section we describe the common expressions that can be used in filtering, selecting/modifying, and sorting. These are by no means exhaustive–please refer to Spark documentation as the authoratitative source of information. Spark syntax is designed to be intuitive (both the readers and writers of the program).
Let us introduce some notations to help the discussion below.
-
COLUMN
is a Column expression, for example,df_spam['CC2']
. -
A Python string will be delimited by a single quote in the example below, like this:
'SOME_STRING'
. -
General Python expression can also be involved; they are denoted by
EXPR
below. This include numbers, but some can take string expressions. Examples:27.5 1000 'United States'
-
COLNAME
refers to a DataFrame column name, such asfilename
orcountry
. Column names will have to be quoted as strings when used in Python program– unless they reside inside an SQL string. -
FUNC
refers to a function that can be applied to a Column. There are several functions available inpyspark.sql.functions
module such asabs
(computing absolute value),concat
(string concatenation),exp
(exponentiation),log
(logarithmic),isnull
(determining if a column value is undefined),sqrt
(square root). Refer to the PySpark SQL documentation for a complete list of functions.
Manipulation: Expressions
Both select()
and sort()
can take one or more COLUMN
or COLNAME
,
or a combination thereof.
Or, as we already saw in earlier examples, they can also take
some expressions (formulas)—one or more of the following:
Description | Commonly Used Syntax | Example |
---|---|---|
Arithmetic operations | COLUMN + EXPR |
df_person['age'] + 1 |
COLUMN - EXPR |
df_person['age'] - 1 |
|
COLUMN * EXPR |
df_spam['size_KB'] * 1024 |
|
COLUMN / EXPR |
df_spam['size'] / 1024.0 |
|
Extract a substring | COLUMN.substr(BEGINPOS, LENGTH) |
df_spam['filename'].substr(1, 4) |
NOTE:
The operations are applied elementwise (element-by-element),
to every value in COLUMN
.
The result of each operation above is another Column, the same length
as COLUMN
, containing the result of the elementwise operations
above.
Comparison and Condition: Logical Expressions
The following table lists common logical expression patterns that are
frequently used inside the filter()
argument.
Each operation is evaluated elementwise against the data in COLUMN
;
the result is stored in a new Column that has logical values
(only True
or False
).
Description | Commonly Used Syntax | Example |
---|---|---|
String match (exact comparison) | COLUMN == 'SOME_STRING' |
df_spam['CC2'] == 'US' |
String mismatch (exact comparison) | COLUMN != 'SOME_STRING' |
df_spam['CC2'] != 'US' |
Exact match at beginning of the string value | COLUMN.startswith('SOME_STRING') |
df_spam['country'].startswith('United') |
Exact match at end of the string value | COLUMN.endswith('SOME_STRING') |
df_spam['country'].endswith('land') |
Exact match located anywhere in the string value | COLUMN.contains('SOME_STRING') |
df_spam['country'].contains('Republic') |
SQL “LIKE” match of the string value | COLUMN.like('LIKE_EXPRESSION') |
df_spam['country'].like('A%a') |
Value is defined | COLUMN.isNotNull() |
df_spam['origin_ip'].isNotNull() |
Value is undefined | COLUMN.isNull() |
df_spam['origin_ip'].isNull() |
Value is within a specified range (inclusive endpoints) | COLUMN.between(LOWERBOUND, UPPERBOUND) |
df_sizes['value'].between(1024, 2047) |
Value comparison (numerical or lexical) | COLUMN == EXPR |
df_sizes['value'] == 1024 |
COLUMN != EXPR |
df_sizes['value'] != 1024 |
|
COLUMN > EXPR |
df_sizes['value'] > 1024 |
|
COLUMN >= EXPR |
df_sizes['value'] >= 1024 |
|
COLUMN < EXPR |
df_sizes['value'] < 1024 |
|
COLUMN <= EXPR |
df_sizes['value'] <= 1024 |
These expression can be combined using the bitwise AND (&
),
OR (|
) or XOR (^
) operator(s).
For example:
(df_spam['year'] == '1999') & (df_spam['size'] < 1024)
will return a DataFrame that has True
values on the rows
that has the year
value of 1999 and whose size
is less than 1024.
Note that the parentheses are important because of the way
Python prioritize operators
(see this reference).
Practice using filter!
Changing Column Attributes
There are a few operations which can change Column attributes, such as name and sort order. Applying these methods do not alter the values stored in the Column.
Description | Commonly Used Syntax | Example |
---|---|---|
Rename a column | COLUMN.alias(NEW_COLNAME) |
df_spam['CC2'].alias('country code') |
Set an ascending sort order | COLUMN.asc() |
df_spam['CC2'].asc() |
Set an descending sort order | COLUMN.desc() |
df_spam['CC2'].desc() |
Determining sort order are useful for sort()
method.
Cleaning the Data
Data cleaning is an integral part of working with data analytics. There is a famous expression: “Garbage in, garbage out”. This notion is very applicable in big data analytics. Our goal is to obtain insight from very large amounts of data, but if our data is bad, we cannot expect to obtain valid insight.
Data cleaning is done by filtering and carefully thought data manipulation. In practice, this is often an iterative process that may take several iterations before we obtain an acceptable quality of data.
PROBLEM: Unknown IP address. In the spam analysis work of Annie Assistant, she found bad data (rows) that look like:
Row(filename=u'01/915202605.14113.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None),
Row(filename=u'01/915202639.14137.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None),
Row(filename=u'01/915258711.14416.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None),
...
Row(filename=u'01/915338373.14888.txt', origin_ip=None, CC2=u'Fail to get source IP', country=None),
...
The origin_ip
column value on these rows is None
—this means that
the origin IP is not defined.
(In SQL, undefined data is called NULL
.)
Removing unknown IP
Which operation is needed to remove rows with unknown IP:
select()
orfilter()
? Use the operation to create a new data framedf_spam1_known
that has only known IP addresses (i.e. no rows withorigin_ip=None
). How many records remained after this cleaning process?Solution
This is a
filter
operation. The following filtering should remove unknown IP address:df_spam1_known = df_spam1.filter(df_spam1['origin_ip'].isNotNull())
Do verify by peeking into some values from
df_spam1_known
.There are 422 records in the
df_spam1_known
DataFrame.
PROBLEM: Invalid IP address.
There is another form of bad data that you may or may not have caught
in your exploration:
some IP addresses did not map to any country because they are reserved IP range
that are not meant for public IP addresses.
For example, IP address 127.0.0.1
is reserved for “local host”, i.e. “this machine”.
Such IP addresses would map to -
(a dash) in both CC2
and country
fields.
Removing both unknown and invalid IP
Since we do not have a way to fix these bad data, we should remove them from our analytics. Based on the two kinds of bad data above, please create a cleaned DataFrame called
df_spam1_clean
.HINT: Use the
&
(AND) operator.Solution
The following filtering should remove both types of bad data:
df_spam1_clean = df_spam1.filter((df_spam1['origin_ip'].isNotNull()) & (df_spam1['CC2'] != '-'))
Alternatively, one can chain two filters instead of using
&
operator and achieve the same result:df_spam1_clean = df_spam1.filter(df_spam1['origin_ip'].isNotNull()) \ .filter(df_spam1['CC2'] != '-'))
There are 400 records in the
df_spam1_clean
DataFrame.
Aggregating
Now we are in a position to tackle this question:
QUESTION 1: For a given year
Y
(say, Y=1999), how many spam emails come from countryX
? Sort these by the number of emails per country.
To answer this question, we have to aggregate the records.
If we are doing this manually, we first group the emails by the country,
then count the emails belonging to the same country and report the sums
along with the corresponding countries.
In Spark, the groupBy()
combined with count()
will accomplish the same.
Let’s start with the cleaned data:
# Group and aggregate (count) by the country
>>> df_email_count1 = df_spam1_clean.groupBy('country').count()
>>> df_email_count1
DataFrame[country: string, count: bigint]
>>> df_email_count1.take(5)
[Row(country=u'Sweden', count=3),
Row(country=u'Germany', count=17),
Row(country=u'France', count=7),
Row(country=u'Argentina', count=3),
Row(country=u'United States', count=256)]
# Sort the result in descending order (by the number of emails)
>>> df_email_count = df_email_count1.sort('count', ascending=False)
>>> countries_top10 = df_email_count.take(10)
>>> countries_top10
[Row(country=u'United States', count=256),
Row(country=u'China', count=37),
Row(country=u'Germany', count=17),
Row(country=u'Canada', count=13),
Row(country=u'Korea, Republic of', count=10),
Row(country=u'Japan', count=9),
Row(country=u'France', count=7),
Row(country=u'Colombia', count=7),
Row(country=u'Australia', count=6),
Row(country=u'United Kingdom', count=5)]
IMPORTANT NOTES:
-
The result of aggregation using
count()
method above is aDataFrame
. This is in contrast to the DataFrame’scount()
method. Why? BecausegroupBy()
returns aGroupedData
instead of aDataFrame
object. -
The results shown by
df_email_count1.take(5)
may differ on your system because of the somewhat undeterministic way Spark may process the data. This is true if you process the data in parallel with different configurations.
More statistics
How many records are found in the aggregated dataset, i.e. in
df_email_count
?There is a
show()
method that prints the table nicely on the terminal. Try:df_email_count.show(30)
.Solutions
There are 29 countries noted in the aggregated dataset.
Tabular output:
+--------------------+-----+ | country|count| +--------------------+-----+ | United States| 256| | China| 37| | Germany| 17| | Canada| 13| | Korea, Republic of| 10| | Japan| 9| | Colombia| 7| | France| 7| | Australia| 6| | United Kingdom| 5| | Netherlands| 4| | Sweden| 3| | Argentina| 3| | Italy| 2| | Dominican Republic| 2| | Spain| 2| | Hong Kong| 2| | Switzerland| 2| | Cuba| 2| | South Africa| 2| | Puerto Rico| 1| | Chile| 1| | Russian Federation| 1| |Taiwan, Province ...| 1| | Poland| 1| | Costa Rica| 1| | Israel| 1| | Brazil| 1| | Mexico| 1| +--------------------+-----+
Making sense of the result
(Comment: Strictly speaking, this section is not part of the CI technique introduced in this workshop, but it is nevertheless an important matter to discuss.)
Once we obtain some numbers from our computation, we have to make sense of the results. While the use of computers allows us to perform computations that are otherwise not possible to do, we have to perform our due diligence to validate the results. Earlier we mentioned that data quality is crucial. We also have to check the correctness of the computational algorithms we use. We should not blindly trusting any computational result. Otherwise, we can arrive at a totally wrong conclusion. In this section, we are encouraging participants to critically think about the results we obtained so far.
In the previous workshop, Dr. Graham mentioned that Nigeria and some Eastern European countries are blamed for many of the phishing emails circulating today. Emails in our dataset here are not strictly phishing emails; most of them are just the plain old “spam”. It is interesting that the largest contributor of spam emails recorded in this dataset is actually the United States. The next five countries are also on the “developed country” category. Colombia was the first from the “developing country” category.
Open-ended discussion
Can you think why US could be the largest contributor to the spam, instead of the countries that are supposed to be much poorer, where people are more inclined to steal wealth from rich countries?
What might affect the reliability of this result?
Extracting Data
We have said previously that Spark stores the (input) data in an RDD
or a DataFrame
and performs computation only when triggered by one or more actions.
Here are some actions for a DataFrame (where DF
refers to a DataFrame
object):
-
DF.first()
: Returns the first row ofDF
. -
DF.collect()
: Dumps the entire contents ofDF
into a Python list of the same length as the number of elements inDF
. Each row in theDF
DataFrame turns into aRow
object in the list. -
DF.take(n)
: Returns the firstn
rows ofDF
. -
DF.sample(withReplacement=None, fraction=None, seed=None)
: performs random sampling with or without replacement. Thefraction
argument is mandatory and refers to the fraction of rows to be picked randomly fromDF
. -
DF.toPandas()
: Dumps the entire contents ofDF
into a new Pandas dataframe. Useful for further processing with Pandas.
All actions must be used with care: the size of the output data can be too
big for the computer where PySpark is running!
This is particularly true for the collect
and toPandas
actions above.
Under the hood, a DataFrame is still an RDD.
The DF.rdd
attribute gives us access to this underlying RDD.
Sometimes we will need to work with the RDD because some tools in Spark
only work with RDD.
More Datasets
In the spam-ip/
subdirectory there are two more datasets that you
can use for exploration:
-
spam-ip/1999.ip_alg2
: IP addresses and countries extracted from 1999 spam dataset, using an improved algorithm. -
spam-ip/2003.ip_alg2
: IP addresses and countries extracted from 2003 spam dataset, using an improved algorithm. The 2003 dataset is significantly larger than the 1999 dataset (more than 57000 emails analyzed).
You are encouraged to experiment with these datasets. For the 1999 dataset, see how the improved algorithm changed the conclusion regarding the top 10 countries that produce spam emails in 1999.
Big Data Challenge
(This is an advanced challenge.)
If you are ready to tackle a big dataset, you are invited to analyze the complete analysis result in the following text file:
/scratch-lustre/DeapSECURE/module01/spams/Results_seq_alg2
Warning: This is a very large file. It contains originating IP addresses from over 8.5 million emails; the total file size is over 500 MB.
You will repeat the same steps of analysis as before. The format of the dataset is identical to the smaller file. However, this IP dataset is of much better quality than the first dataset we have been using so far. I improved the algorithm to extract the IP address and was able to extract many more emails.
Timing the PySpark operation
IPython has a time
magic function that can keep track of the time
spent in a given Python statement.
Suppose you have loaded the dataset above to df_bigspam
DataFrame.
To compute the number of records in this dataset and measure the time
taken to count all the records in this dataset:
In [125]: time df_bigspam.count()
CPU times: user 2 ms, sys: 2 ms, total: 4 ms
Wall time: 16.2 s
Out[125]: 8514860
This timing tells us a lot of thing!
Of all the time spent for count()
operation, only 4 milliseconds are
used for computation.
Where did the rest go?
This operation requires going through the entire file and counting
the number of lines.
So, much time is actually spent in the disk I/O process.
But this does not sound 100% sensible either.
A most likely reason is that the CPU times did not count the
amount of time used by the child process (which in this case is the
Java Spark worker process).
In any case, the wall time is the most useful information of all.
Big Data exploratory questions
Here are a few questions that one can interrogate from the
df_bigspam
set:
How many data points have missing (unreadable) IP address?
How many data points have invalid (reserved) IP address?
Add a
year
field, deduced from the first four characters in the email’s filename.For all the years, list the top 20 countries that produced the most spam.
Repeat the analysis above, but also group the results by the year.
Adding workers (i.e. running parallel) would help reduce the runtime. Try running one or more of these configurations:
2x1
,1x2
,4x1
,1x4
; report the timing of the aggregating analysis that produce the top 20 countries..Solutions (partial)
- 39425
- 889483