Getting Started with Spark

My interest in hadoop had been sparked a year ago by the following headline: “up to 100x faster than Hadoop MapReduce”. Finally, it is time to explore Apache Spark.

Map reduce expects you to write two programs, a mapper and a reducer. The map-reduce system will try to run the mapper programs on nodes close to the data. The output of various mapper programs will be key, value pairs. The map-reduce system will forward the output to various reducer programs based on the key.

In the Spark environment, you write only one set of code containing both the mapper and reducer code. The framework will distribute and execute the code in a manner that will try to optimise the performance and minimise the movement of data over the network. Furthermore, the program could include additional mapppers and reducers to process the intermediate results.

Installing Spark

You will need to download the code from Apache Spark site (http://spark.apache.org/) based on the version of hadoop on your system. Installation is simply to unzip the downloaded file. The documentation will guide you to configuring it for a cluster, e.g. consisting of three nodes – h-mstr, h-slv1, h-slv2.

Getting a taste of Spark with wordcount

Suppose you have already created and copied text files in hdfs in a directory, /user/fedora/docs/.

Start the python spark shell:

$ bin/pyspark

Open the text files in hdfs and run the usual wordcount example.

>>> data = sc.textFile('hdfs://h_mstr/user/fedora/docs')

>>> result = data.flatMap(lambda line:re.sub('[^a-z0-9]+',' ',line.lower()).split())

.map(lambda x:(x,1))

.reduceByKey(lambda a,b:a+b)

>>> output=result.collect()

>>> output.sort()

>>> for w,c in output:

>>>     print('%s %d'%(w,c))

The shell creates a spark context, sc. Use it to open the files contained in the hdfs docs directory for the user fedora.

You use flatMap to split each line into words. In this case, you have converted each line into lower case and replaced all non alphanumeric character by spaces before splitting it into words. Next, map each word into a pair (word, 1).

Now mapping is complete and you reduce it by keyword, accumulating the count of each word. So far the response would have been very fast. Spark is lazy and evaluates only when needed, which will be done when you run the collect function. It will return a list of word,count pairs which you may sort and print.

Which files contained the word?

You have a large number of small text files. You would like to know how frequently is a word in various files.

Spark has the option to process the whole file rather than a record at a time. Each file is treated as a pair of values – the file name and the content.

from pyspark import SparkContext

import re

# input to wholeFile is a file name, file content pair

def wholeFile(x):

name=x[0]

words = re.sub('[^a-z0-9]+',' ',x[1].lower()).split()

return [(word,name) for word in set(words)]

sc = SparkContext(appName='WordsInFiles')

data = sc.wholeTextFiles('hdfs://h_mstr/user/fedora/docs')

word_index = data.flatMap(wholeFile).reduceByKey(lambda a,b:','.join((a,b)))

word_index.persist()

output=word_index.collect()

output.sort()

# print the first 10 values

for i in range(10):

print output[i]

explore(word_index)

sc.stop()

The function wholeFile returns a list of (word, filename) pairs for unique words. As above, the text has been converted to lower case and all special characters replaced by a space.

The reduceByKey function joins all the file names with a comma as the separator. For verification, the first 10 results of the sorted output are printed.

The persist method saves the result of word_index, which is useful if you want to use the same data set again. For example, in the explore method above, you may want to analyze a list of words and see how many of these words appear in a file.

class filter_keys():

def __init__(self,keys):

self.keys = keys

def filter(self,x):

return x[0] in self.keys

def explore_keys(rdd,keys):

example = filter_keys(keys)

res = rdd.filter(example.filter) \

.flatMap(lambda x:x[1].split(',')) \

.map(lambda x:(x,1)) \

.reduceByKey(lambda a,b:a+b)

out=res.collect()

out.sort(lambda a,b:-a[1].__cmp__(b[1]))

print keys,len(out)

print out

def explore(rdd):

explore_keys(rdd,['linux','fedora','ubuntu'])

explore_keys(rdd,['linux','fedora','ubuntu','python'])

The first step is to select only those records which pertain to the keywords. Filter operation on the data set will do just that. However, we need to pass a list of words as a parameter and this can be achieved by creating the class filter_keys. The class is initialised with a list of keywords and the filter method in the class can use this list.

The next step is like the word count example. Each line is a word followed by a string of comma separated file names. So, split the file names into a list and count the number of times each file is selected.

Sort the result in descending order of the frequency and print it. You may repeat the exercise for various keywords and the intermediate data set will be reused without the need to save it in a file.

This is one of the major performance advantages of Spark over map-reduce.

From a developers perspective, the ability to code the entire analysis, which may consist of a sequence of map-reduce operations, in a single file makes it much easier to think through the problem and write better code.

Comments