Warning: Old Python Version
This sub-lesson on PySpark was written with Python 2.7 in mind. However, this version of Python is now considered obsolete, and newer codes should use Python 3.6 or later. The
pyspark-interactive
script has been updated to use Python 3.6, but some traces in this extra lesson may still refer to Python 2 or use Python 2 syntax. Please be aware of the key changes between these versions which can lead to incompatible codes.
Starting Interactive Spark Environment
On a typical Spark installation, one would invoke pyspark
command on
the terminal (shell) to use Spark via Python programming language.
This will launch an interactive python session, where the terminal prompt
turns to >>>
.
Here is an example:
$ pyspark
Python 2.7.15 (default, Jun 21 2018, 11:58:20)
[GCC 4.2.1 Compatible Clang 5.0.2 ] on linux2
Type "help", "copyright", "credits" or "license" for more information.
2018-11-22 23:01:18 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.1
/_/
Using Python version 2.7.15 (default, Jun 21 2018 11:58:20)
SparkSession available as 'spark'.
>>>
The first “$” prompt above denotes the UNIX shell prompt, where we
invoke the pyspark
command.
It printed out some messages, including Python version and Spark version
information, then waits for user command at the >>>
Python prompt.
Using IPython with Spark
In this exercise we will be using IPython (Interactive Python) to use Spark. IPython includes powerful Tab command completion, input and output logging; it can also function as a UNIX shell replacement to a great extent. It makes our lives much easier on the command line.
To use IPython as the interface for PySpark, on your bash shell, do the following:
$ export PYSPARK_DRIVER_PYTHON=ipython
$ pyspark
After some messages are printed, the IPython’s prompt will appear, which looks like this:
In [1]:
The number increases with every a new command is invoked.
This is a very useful feature in IPython: Every input and output
of this session is recorded in the In
and Out
global variable
(each one is a Python list).
You can recall the previous input and output by providing the appropriate
index number.
This feature will be demonstrated below.
Interactive pyspark on Turing
On Turing cluster, we provide a command named “pyspark-interactive” to facilitate running an interactive PySpark session on a compute node. This script does the following:
- Loads the requisite modules such as
python
,ipython
, andspark
; - Set the environment variable
PYSPARK_DRIVER_PYTHON
toipython
so that ipython is used as the front-end interface instead of the vanilla Python; - Requests computing resources (by default, a single core on a compute node);
- Runs
pyspark
on the acquired computing resources.
Enabling
pyspark-interactive
commandAt this moment,
pyspark-interactive
is available through theDeapSECURE
custom module, stored outside the standard repository. Before runningpyspark-interactive
, please invoke the following sequence of commands in your shell on Turing:module use /scratch-lustre/DeapSECURE/lmod module load DeapSECURE
Tip: If you want to always make the
DeapSECURE
module available for you on Turing, insert the first line above (module use ...
) at the end of your~/.turing_tcshrc
configuration file. It is best to load theDeapSECURE
module on demand (as with other modules), because DeapSECURE provides additional Python modules specific to this training program. While generally harmless, they could potentially cause undesirable side effects in your other Python programs.
Default Spark behavior on Turing
By default,
pyspark-interactive
only allocates a single CPU core on a compute node, which is sufficient for most of this training. In the latter stage, you are invited to run PySpark using multiple cores and/or compute nodes to speed up the processing of large amounts of data. We will cover this later.On Turing, Spark is configured to run in standalone mode.
Now get your PySpark session up and running!
From your UNIX shell session on Turing, please type
pyspark-interactive
to start your interactive PySpark session. Please wait until the IPython promptIn [1]:
appears.
Try several simple Python commands; for example:
a = 37 a a * 2 print(a + 3)
What are the outputs of these commands?
Now try some more commands:
In[1] Out[1] Out[2] Out[3] Out[4]
Try to make sense of these outputs?
Next, try these commands;
sc spark
What are printed? We did not specify these variables before. Where did they come from?
Try several UNIX commands (
cd
,pwd
,ls
,cat
orless
). I thought we are using Python. Why did these command work?Try to invoke
nano
. Did it work?Try to invoke
!nano
(that is, add an exclamation point before the wordnano
). Did it work now?Solution
The outputs are:
(no output) 37 74 (40 is printed on terminal, but no Out[] prefix)
The outputs are:
u"a = 37" (no output) 37 74 (no output)
When there is no output printed on the terminal, it means that that particular output (e.g.
Out[1]
) is actuallyNone
. Python prints nothing on the screen for aNone
value.The outputs are:
<SparkContext master=local[*] appName=PySparkShell> <pyspark.sql.session.SparkSession at 0x7f021dbc89d0>
(The memory address printed on the second line will differ in your case.)
A few commonly used UNIX commands are made available on IPython, thanks to the “magic” functions. You can list the names of magic functions using
lsmagic
command and learn more about them usingmagic
command.
nano
is not part of the magic functions in IPython, therefore an attempt to invoke that command on IPython will result in an error (NameError
exception).Yes, it works now by prefixing the command name with an exclamation point. The
!
prefix is a special sign to IPython that the following command is to be executed in the regular shell instead of in IPython itself. The shell will look for the program in the usual manner (i.e. using the PATH environment variable).
Spark Context and Spark Session
There are two high-level objects that are automatically defined when we launch an interactive PySpark session:
-
sc
: This is aSparkContext
object which provides access to the fundamental RDD functionalities of Spark. -
spark
: Starting with Spark versions 2.0, PySpark also defines another object simply calledspark
, which is aSparkSession
object to access high-level APIs of Spark for (semi) structured data. Usespark
to work with DataFrame, Dataset, Spark SQL, etc.
These two variables (sc
and spark
) are available from the beginning of
the PySpark session; we do not to define them.
Spark Driver and Workers
Spark adopts a client-server programming model, which means that there are several processes that are involved in a Spark environment:
-
A Driver process, which directs the entire computation;
-
One or more Executors, which actually perform (part of) the computation. The executors may lie in a different compute node than the driver, or may be distributed across many compute nodes. It is these drivers that gives the parallel computing power to Spark.
-
A cluster manager, which schedules, manages, and monitors the executor processes.
We as the end-user interact with Spark’s driver process through the PySpark interface. Under the hood, this driver interacts with all the executors. Here is an image showing the relationship between all these components:
Noninteractive Spark Computing
PySpark can also be used for noninteractive (batch) computing.
In this case, one will need to create an instance of SparkContext
or
SparkSession
depending on the need (whether to work with RDD or with
DataFrame/Dataset).
PySpark features and capabilities are accessible from Python language
through a module aptly named pyspark
.
(Do not confuse the pyspark
computer program above from the pyspark
Python module that defines the API for interacting with Spark from Python.)
Parallel Interactive PySpark on Turing (Advanced)
The pyspark-interactive
has an option to run PySpark in parallel mode.
It must be launched with a special command-line flag that has the
--NxC
format, where N
is the number of nodes (1, 2, 3, …),
and C
is the number of cores required per node (1, 2, 3, …, 32).
An example of invocation:
$ pyspark-interactive --4x1
This will launch four Spark worker processes, each with one thread. The script will take a little while to start. For very large data (way beyond 1 GB), it may make sense to run Spark in parallel mode to help speed up computation.