Putting Data into HDFS

Once you have the basics of Hadoop functioning, the obvious question is how do you put the data into the system. The corrollary is that how do you distribute the data between the various data nodes. Since the system can grow to thousands of data nodes, it is pretty obvious that you should not have to tell HDFS anything more than the bare minimum.

A Simple Test

Your first test can be to take a large text file and load it into HDFS. So, start the hadoop cluster as last time with 3 nodes. On the first test, stop the datanode service on h-mstr and copy a large file on two slave nodes. (To create a text file for testing: take a hexdump of a 1gb tar and you should get around a 3 gb text file.)

$ ssh fedora@h-mstr

$ sudo systemctl stop hadoop-datanode

$ hdfs dfs -copyFromLocal large_file.txt

You can verify that the data has been copied and how much on which node as follows:

$ hadoop fs -ls -h large_file.txt

Found 1 items

-rw-r--r-- 1 fedora supergroup 3.3 G 2014-08-15 12:25 large_file.txt

$ sudo du -sh /var/cache/hadoop-hdfs/

13M /var/cache/hadoop-hdfs/

$ ssh -t fedora@h-slv1 du -sh /var/cache/hadoop-hdfs/

1.7G /var/cache/hadoop-hdfs/

Connection to h-slv1 closed.

$ ssh -t fedora@h-slv2 du -sh /var/cache/hadoop-hdfs/

[fedora@h-mstr ~]$ ssh -t fedora@h-slv2 sudo du -sh /var/cache/hadoop-hdfs/

1.7G /var/cache/hadoop-hdfs/

Connection to h-slv2 closed.

The datafile is distributed on the h-slv1 & h-slv2. Now, start the datanode on h-mstr as well. Run the above commands again as follows:

$ sudo systemctl start hadoop-datanode

$ hdfs dfs -copyFromLocal large_file.txt second_large_file.txt

$ hadoop fs -ls -h

-rw-r--r-- 1 fedora supergroup 3.3 G 2014-08-15 12:25 large_file.txt

-rw-r--r-- 1 fedora supergroup 3.3 G 2014-08-15 12:33 second_large_file.txt

$ sudo du -sh /var/cache/hadoop-hdfs/

[fedora@h-mstr ~]$ sudo du -sh /var/cache/hadoop-hdfs/

3.4G /var/cache/hadoop-hdfs/

$ ssh -t fedora@h-slv1 du -sh /var/cache/hadoop-hdfs/

1.7G /var/cache/hadoop-hdfs/

Connection to h-slv1 closed.

[fedora@h-mstr ~]$ ssh -t fedora@h-slv2 sudo du -sh /var/cache/hadoop-hdfs/

1.7G /var/cache/hadoop-hdfs/

Connection to h-slv2 closed.

The new data seems to be stored on the h-mstr only. If you run the command from h-slv2, the data is likely to be stored on h-slv2 datanode only. That is, hdfs will try to optimize and store the data close to the origin. You can know about the status of the hdfs, including location of data blocks, by browsing http://h-mstr:50070/.

The file you want to put in hdfs may be on the desktop. You can, of course, access that file using nfs. However, if you have installed the hdfs binaries on your desktop, you can copy the file by running the following command on the desktop:

$ HADOOP_USER_NAME=fedora hdfs dfs -fs hdfs://h-mstr/ -put desktop_file.txt

The commands '-put' and '-copyFromLocal' are synonyms.

Replication

You should stop the hadoop services on all the nodes. In /etc/hadoop/hdfs-site.xml on each of the nodes, change the value of dfs.replication from 1 to 2 and the data blocks would be replicated on one additional datanode.

<property>

<name>dfs.replication</name>

<value>2</value>

</property>

Restart the hadoop services. Increase the replication for existing files and explore http://h-mstr:50070/.

$ hadoop fs -setrep -R 2 /user/fedora/

These experiments illustrate the resilience and flexibility of the Hadoop distributed file system and the implicit optimisation to minimize the transfer of data across the network. You don't need to worry about distribution of the data.

During your experiments, in case you come across the error, that the namenode is in safe mode, run the command:

$ sudo runuser hdfs -s /bin/bash /bin/bash -c "hdfs dfsadmin -safemode leave"

Loading Data Programmatically

It is more likely that you will need to get the data from some existing sources like files and databases and put what you need into HDFS. As the next experiment, you can take the document files you may have collected over the years. Convert each into text and load it. Hadoop works best with large files and not a collection of small files. So, all files will go into a single HDFS file, where each file will be a single record with the name of the file as the prefix. You don't need to create a local file. The output of the program can be piped to the 'hdfs dfs -put' command.

The following Python program load_doc_files.py illustrates the idea.

  • It selects document files.

  • Each document file is converted to a text file.

  • The name of the file is output with a delimiter(a tab in this case).

  • Each line from the file is read and output with the new line character replaced by a delimiter.

  • On reaching the end of file, a new line character is written to the standard output.

#!/usr/bin/python

from __future__ import print_function

import sys

import os

import subprocess



DELIM='\t'

FILETYPES=['odt','doc','sxw','abw']

# use soffice to convert a document to text file

def convert_to_text(inpath,infile):

subprocess.call(['soffice','--headless','--convert-to','txt:Text',

'--outdir','/tmp','/'.join([inpath,infile])])

return '/tmp/' + infile.rsplit('.',1)[0] + '.txt'

# Convert & print file as a single line replacing '\n' by '\t' adding file name at the start

def process_file(p,f):

print("%s"%f, end=DELIM)

textfile = convert_to_text(p,f)

for line in open(textfile):

print("%s"%line.strip(),end=DELIM)

print()

# Generator for files of type odt, doc, etc.

def get_documents(path):

for curr_path,dirs,files in os.walk(path):

for f in files:

try:

if f.rsplit('.',1)[1].lower() in FILETYPES:

yield curr_path,f

except:

pass

# pass the root directory as a parameter

for path,f in get_documents(sys.argv[1]):

process_file(path,f)



You may run the above program as follows from the desktop:

$ ./load_doc_files.py ~/Documents | HADOOP_USER_NAME=fedora \

hdfs dfs -fs hdfs://h-mstr/ -put – doc_files.txt

The above program can be easily extended to convert and load pdf files as well. Or another program can be used to append to the existing hdfs file.

The obvious next step is that once the data is in hadoop, how do you use it and what can you do with it?

A Modified Wordcount Example

Hadoop map-reduce offers a very convenient streaming framework. It takes input lines from a hdfs file and passes them on standard input to one or more mapper programs. The standard output of the mapper programs is routed to the standard input of the reducer programs. The mapper and reducer programs could be in any language.

Each line of a mapper output is expected to be a keyword, value pair separated by a tab. All lines with the same keyword are routed to the same reducer.

A common first example is a word count in a text file. Each line of a text file is split into words by a mapper. It writes the word as the key with value 1. The reducer counts the number of times a particular word is received. Since the word is the key, all occurences of the same word will be routed to the same reducer.

In doc_files.txt, each line is actually a file. It is very likely that words will be repeated in a line. So, it is better if the mapper counts the words in a line before writing to stdout, as shown in the following mapper.py:

#!/usr/bin/python

import sys

import re

# Generator for words in a line

def read_input(file):

for line in file:

yield re.compile('\W*').split(line)



def main(sep='\t'):

for words in read_input(sys.stdin):

wd_dict={}

# count the number of occurences of a case-insensitive word

for word in words:

if word != '':

wd = word.lower()

wd_dict[wd] = wd_dict.get(wd,0) + 1

# print each word and its count

for word in wd_dict:

print('%s%s%d'%(word,sep,wd_dict[word]))

if __name__ == "__main__" :

main()

There is no need to change reducer function as long as it had not assumed that the count of each word was 1. The corresponding reducer.py:

#!/usr/bin/python

import sys

# convert each line into a word and its count

def read_mapper_output(file, sep):

for line in file:

yield line.strip().split(sep)

# add the count for each occurence of the word

def main(sep='\t'):

wc = {}

for word, count in read_mapper_output(sys.stdin, sep):

wc[word] = wc.get(word,0) + int(count)

# print the sorted list

for word in sorted(wc):

print("%s%s%d"%(word,sep,wc[word]))

if __name__ == "__main__":

main()

So, having signed into fedora@h-mstr, you may run these command to count the words and examine the result:

$ hadoop jar /usr/share/java/hadoop/hadoop-streaming.jar \

-files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py\

-input document_files.txt -output wordcount.out

$ hadoop fs -cat wordcount.out/part-00000 | less

As you have experimented, you can put store the data in a hdfs file using any programming language. Then, you can write fairly simple map and reduce programs in any programming language, without worrying about any issues related to distributed processing. Hadoop will make efforts to optimize the distribution of the data across the nodes and feeding the data to the appropriate mapper programs.

Comments