This lesson is in the early stages of development (Alpha version)

DeapSECURE module 2: Dealing with Big Data: Getting Started

Warning: Old Python Version

This sub-lesson on PySpark was written with Python 2.7 in mind. However, this version of Python is now considered obsolete, and newer codes should use Python 3.6 or later. The pyspark-interactive script has been updated to use Python 3.6, but some traces in this extra lesson may still refer to Python 2 or use Python 2 syntax. Please be aware of the key changes between these versions which can lead to incompatible codes.

Starting Interactive Spark Environment

On a typical Spark installation, one would invoke pyspark command on the terminal (shell) to use Spark via Python programming language. This will launch an interactive python session, where the terminal prompt turns to >>>. Here is an example:

$ pyspark
Python 2.7.15 (default, Jun 21 2018, 11:58:20)
[GCC 4.2.1 Compatible Clang 5.0.2 ] on linux2
Type "help", "copyright", "credits" or "license" for more information.
2018-11-22 23:01:18 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/

Using Python version 2.7.15 (default, Jun 21 2018 11:58:20)
SparkSession available as 'spark'.
>>> 

The first “$” prompt above denotes the UNIX shell prompt, where we invoke the pyspark command. It printed out some messages, including Python version and Spark version information, then waits for user command at the >>> Python prompt.

Using IPython with Spark

In this exercise we will be using IPython (Interactive Python) to use Spark. IPython includes powerful Tab command completion, input and output logging; it can also function as a UNIX shell replacement to a great extent. It makes our lives much easier on the command line.

To use IPython as the interface for PySpark, on your bash shell, do the following:

$ export PYSPARK_DRIVER_PYTHON=ipython
$ pyspark

After some messages are printed, the IPython’s prompt will appear, which looks like this:

In [1]:

The number increases with every a new command is invoked. This is a very useful feature in IPython: Every input and output of this session is recorded in the In and Out global variable (each one is a Python list). You can recall the previous input and output by providing the appropriate index number. This feature will be demonstrated below.

Interactive pyspark on Turing

On Turing cluster, we provide a command named “pyspark-interactive” to facilitate running an interactive PySpark session on a compute node. This script does the following:

Enabling pyspark-interactive command

At this moment, pyspark-interactive is available through the DeapSECURE custom module, stored outside the standard repository. Before running pyspark-interactive, please invoke the following sequence of commands in your shell on Turing:

module use /scratch-lustre/DeapSECURE/lmod
module load DeapSECURE

Tip: If you want to always make the DeapSECURE module available for you on Turing, insert the first line above (module use ...) at the end of your ~/.turing_tcshrc configuration file. It is best to load the DeapSECURE module on demand (as with other modules), because DeapSECURE provides additional Python modules specific to this training program. While generally harmless, they could potentially cause undesirable side effects in your other Python programs.

Default Spark behavior on Turing

By default, pyspark-interactive only allocates a single CPU core on a compute node, which is sufficient for most of this training. In the latter stage, you are invited to run PySpark using multiple cores and/or compute nodes to speed up the processing of large amounts of data. We will cover this later.

On Turing, Spark is configured to run in standalone mode.

Now get your PySpark session up and running!

From your UNIX shell session on Turing, please type pyspark-interactive to start your interactive PySpark session. Please wait until the IPython prompt In [1]: appears.

  1. Try several simple Python commands; for example:

    a = 37
    a
    a * 2
    print(a + 3)
    

    What are the outputs of these commands?

  2. Now try some more commands:

    In[1]
    Out[1]
    Out[2]
    Out[3]
    Out[4]
    

    Try to make sense of these outputs?

  3. Next, try these commands;

    sc
    spark
    

    What are printed? We did not specify these variables before. Where did they come from?

  4. Try several UNIX commands (cd, pwd, ls, cat or less). I thought we are using Python. Why did these command work?

  5. Try to invoke nano. Did it work?

  6. Try to invoke !nano (that is, add an exclamation point before the word nano). Did it work now?

Solution

  1. The outputs are:

    (no output)
    37
    74
    (40 is printed on terminal, but no Out[] prefix)
    
  2. The outputs are:

    u"a = 37"
    (no output)
    37
    74
    (no output)
    

    When there is no output printed on the terminal, it means that that particular output (e.g. Out[1]) is actually None. Python prints nothing on the screen for a None value.

  3. The outputs are:

    <SparkContext master=local[*] appName=PySparkShell>
    <pyspark.sql.session.SparkSession at 0x7f021dbc89d0>
    

    (The memory address printed on the second line will differ in your case.)

  4. A few commonly used UNIX commands are made available on IPython, thanks to the “magic” functions. You can list the names of magic functions using lsmagic command and learn more about them using magic command.

  5. nano is not part of the magic functions in IPython, therefore an attempt to invoke that command on IPython will result in an error (NameError exception).

  6. Yes, it works now by prefixing the command name with an exclamation point. The ! prefix is a special sign to IPython that the following command is to be executed in the regular shell instead of in IPython itself. The shell will look for the program in the usual manner (i.e. using the PATH environment variable).

Spark Context and Spark Session

There are two high-level objects that are automatically defined when we launch an interactive PySpark session:

These two variables (sc and spark) are available from the beginning of the PySpark session; we do not to define them.

Spark Driver and Workers

Spark adopts a client-server programming model, which means that there are several processes that are involved in a Spark environment:

We as the end-user interact with Spark’s driver process through the PySpark interface. Under the hood, this driver interacts with all the executors. Here is an image showing the relationship between all these components:

Overview of a Spark cluster

Noninteractive Spark Computing

PySpark can also be used for noninteractive (batch) computing. In this case, one will need to create an instance of SparkContext or SparkSession depending on the need (whether to work with RDD or with DataFrame/Dataset).

PySpark features and capabilities are accessible from Python language through a module aptly named pyspark. (Do not confuse the pyspark computer program above from the pyspark Python module that defines the API for interacting with Spark from Python.)

Parallel Interactive PySpark on Turing (Advanced)

The pyspark-interactive has an option to run PySpark in parallel mode. It must be launched with a special command-line flag that has the --NxC format, where N is the number of nodes (1, 2, 3, …), and C is the number of cores required per node (1, 2, 3, …, 32). An example of invocation:

$ pyspark-interactive --4x1

This will launch four Spark worker processes, each with one thread. The script will take a little while to start. For very large data (way beyond 1 GB), it may make sense to run Spark in parallel mode to help speed up computation.