June 15, 2014

Introduction to Hadoop

My effort to learn Hadoop and Map Reduce resulted in the presentation. If you like it and find it useful do leave a comment.

June 11, 2014

Forecasting Retail Sales - Linear Regression with R and Hadoop

image from Responsemagazine.com
A retail store tracks the volume of sale for each stock-keeping-unit (SKU) that the store deals with. Given the sales for days 1 through 5, is it possible to predict the sales on days 6 through 10 ? Common sense dictates that sales will remain constant and the average sales per day for the first 5 days will be the same as the average sales per day for the next 5 days. However this may not always be the case, if there is a rising or falling trend. If there is a strongly rising trend, caused by a some strong promotional activity, then the assumption of constant sale will lead to a stock-out and loss of potential business. Similarly, if there is a strongly falling trend, then a similar assumption will lead to accumulation of dead stock and hence a loss related to excessive inventory. Instead of days, the same analysis can be done on the basis of weeks, fortnights or even month. Net-net given the sales over 5 periods of time, it is useful to be able to predict the same for the next 5 periods. How can we do this ? Without resorting to the simplistic "average daily sale" strategy ?

A simple solution is to use linear regression, a well known and widely used statistical tool. If you have the sales data for a particular SKU for the past 5 days, you can "fit" a regression line, determine the slope and the intercept of this line and use the resulting linear regression "model" to predict the expected sales for the next 5 days. Based on these predictions,  you can place orders for these SKUs so that the gap between expected and the actual is minimum. A software tool like R can be used to solve this problem very easily.

All this is well known. But when the number of SKUs is of the order of 50,000 - 70,000 then the time required  -- to build so many regression models, even with R, and then using each to estimate the sales quantity for different SKUs -- becomes enormous ! In fact, if one has to do this on a rolling-basis every day to predict the sales over the next 5 days, then it becomes impossible. Even before we have a solution to today's prediction, the next set of data is waiting and getting stale !

This is where Hadoop steps in. By splitting the regression problem for 50,000 - 70,000 SKUs across multiple computers, it is quite possible to solve the entire problem in a reasonable amount of time. This means that the person responsible for placing orders for the replenishment of inventory would know which of the SKUs would need to be ordered in a higher quantity and which to be ordered in a lower quantity. This is the Linear Regression problem that we will solve with R and Hadoop.

R is not necessary for regression. Any programming language like Java can be used but using R ( or for that matter, a similar tool like Python ) allows the ready-made function -- lm()  for linear regression -- to be used without re-inventing the wheel. In fact, R is a free and open-source statistical tool that is very widely used across the data analytics community. There are two ways to use R with Hadoop. First, we can use the streaming feature of Hadoop with R scripts or we can use the RHadoop set of packages from Revolution Analytics ( which include rhdfs, rmr2 and rhbase). The RHadoop path initially looks easier because it allows one to operate from within the familiar  R environment, but configuring RHadoop is difficult ( or at least, the author was unsuccessful despite a lot of effort). Moreover RHadoop is in reality using the same streaming feature of Hadoop to get the job done. So there is no loss if one ignores RHadoop and uses the native streaming feature of Hadoop directly.

So now we will see how to solve the Sales Forecasting Problem.

We have miniaturized the problem by assuming the Retail Store stocks and sells only 3 products, namely salt, soap and soda. On a particular day, arbitrarily designated as Day 08, the sales of these three SKUs was as follows and this was stored in a file called DailySales08.txt
8 soap 90
8 salt 90
8 soda 120
where the first column represents the day, the second the SKU name (or code) and the third column is the sales on Day 08. There are 4 other files, namely DailySales09.txt, DailySales10.txt, DailySales11.txt, DailySales12.txt.  In reality, each of these files will have very large number of records,  with one record for each SKU

Based on the data for 5 days, from day 08 to day 12, we need to estimate the data for 6th to 10th day, or for day 13 to day 17. Once we run the R program in Hadoop, the following output is generated
salt dates [ 8 - 12 ] : 400  next [ 6 - 10 ] 175 : 17  -- 575  
soap dates [ 8 - 12 ] : 600  next [ 6 - 10 ] 1025 : 239  -- 1625  
soda dates [ 8 - 12 ] : 620  next [ 6 - 10 ] 845 : 187  -- 1465 

where each row represents the picture for each SKU, where we can see in row 1
  • SKU is "salt"
  • cumulative actual sales on days 8 - 12 ( the first 5 days of the analysis ) is 400
  • cumulative expected sales from 6th to 10th day is 175
  • the estimated sale on the last, 10th day, that is day 17 is 17 ( just a coincidence !)
  • the total estimated sales over the 10 day period is 575 ( 400 actual, 175 estimated)
Why is the actual sales in the first 5 days 400 but the predicted sales in the next 5 days only 175 ? See what the regression data reveals :

The black dots represent the actual sales on the first 5 days [ day 8 - day 12 ] Based on this the model has created the regression line : sales = 170 -9*days and with this the estimated values for all the days can be calculated and shown as red dots on the graph. Because of the falling trend, the expected sale in the next 5 days is significantly lower than in the first 5 days. Or so says the regression data !

To run this program, a development environment was created an Ubuntu 14.04 laptop running R 3.0.2 and Hadoop 2.2.0 installed in a single cluster mode as described in my earlier post Demystifying Hadoop and MR with this DIY tutorial.

Section 4 of that tutorial showed how the Hadoop streaming utility was used to run a WordCount program in Python. The same strategy is used in this case, where we have replaced the python programs with two R scripts, LinReg-map.R and LinReg-red.R and a shell script runRetail.sh was used to execute the map-reduce job. The source code of all three scripts along with the 5 datafiles are available at the Git Repository prithwis/Retail.

Once the Mapper (LinReg-map.R) runs, the output looks like this, though in reality, this output will not be stored but instead "streamed" to the Reducer

salt 10$120 
salt 11$50 
salt 12$60 
salt 8$90 
salt 9$80 
soap 10$100 
soap 11$140 
soap 12$160 
soap 8$90 
soap 9$110 
soda 10$150 
soda 11$130 
soda 12$140 
soda 8$120 
soda 9$80 

here the Key is the SKU name, and the Value is a string formed by the concatenation of the date and the quantity sold, separated by the $ char.

In this case there were only 15 records ( 3 SKUs x 5 days ) but even if the number of SKUs is very high, the task of creating this sorted list of <key, value> can be distributed across multiple servers in the Hadoop cluster. This sorted list of records can now be distributed again to multiple servers for the second, reducer, program LinReg-red.R to execute. Hadoop ensures that all records pertaining to any one key ( or SKU) is sent to only one machine where the Linear Regression function is executed.

The reducer program reads through all the <Key, Value> pairs for each Key ( or SKU), splits the Val at the $ char isolate the date and the sale value for that date and create two lists one of dates and the other of the corresponding sale values. These two lists are passed, along with the key (SKU) to the user defined EstValue() function. The fourth parameter N, in our case 9, represents the number of days between the last day of the period and the first day for which data is available. In this case, first day was 8, N is 9, so the last day is the 10th day or day 17.

The EstValue() function is where the Linear Regression module lm() is finally called with the two lists for days, sales as input. For a quick recap of how Linear Regression is done in R, read this tutorial. A little bit of data manipulation is done in which, the days (8,9,10,11,12) are replaced by the more generic (1,2,3,4,5) and so the estimates are done for days (6,7,8,9,10) instead of (13,14,15,16,17). This transformation does not have any implication on the result.

There are 3 ways of testing / running this set of programs of which the first two can be done on a laptop

  • To test the R scripts without calling Hadoop, one can simply pipe the commands as follows : cat DailySales*.txt | ./LinReg-map.R | sort | ./LinReg-red.R > output.txt . This simulates the entire streaming process by sending the data from the 5 data files into the "stdin" of the mapper script that in turn streams the data to the Unix sort utility which in turn streams the sorted key-value pairs to the reducer script which in turn sends the "stdout" output into a file called output.txt This the output that you can see in the post above
  • To run the same scripts on the Hadoop Single Machine Cluster installed on a laptop, we use the following shell script runRetail.sh

#hdfs dfs -ls 
#hdfs dfs -mkdir /user/hduser/Retail-in
#hdfs dfs -copyFromLocal /home/hduser/RetailSales/DailySales*.txt /user/hduser/Retail-in
hdfs dfs -ls /user/hduser/Retail-in
hdfs dfs -rm -r /user/hduser/Retail-out
hadoop jar /usr/local/hadoop220/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -D mapred.job.name='RetailR' -mapper /home/hduser/RetailSales/LinReg-map.R -reducer /home/hduser/RetailSales/LinReg-red.R -input /user/hduser/Retail-in/* -output /user/hduser/Retail-out 
hdfs dfs -ls /user/hduser/Retail-out

this scripts creates the Retail-in directory in HDFS and loads the DailySales files from local directory to the HDFS filesystem. It deletes the output directory, if it exists, and then calls the Hadoop streaming program with the 4 mandatory parameters : mapper script, reducer script, input directory and output directory ( all with fully qualified names, to avoid any ambiguity ). The only additional parameter is the job name (RetailR) that helps track the job on http://localhost:8088 and http://localhost:50070

In both these cases, the output is the same.

Now that we know that the program works fine, how do we scale up ? When we have thousands of SKUs and we want to use data from, say 15 or 20 days to build the regression model, the number of records will go up dramatically. One can of course procure multiple servers and configure all of them with Ubuntu, R and Hadoop but this is a very big, complicated and error-prone task. The simple solution is to use the
  • Amazon Web Services Elastic Map Reduce ( AWS/EMR) services, where the Mapper and Reducer programs can be run without any change on the same ( or if necessary on much, much larger) data to get identical results obtained in the first two methods.
To try out AWS/EMR, you need to visit the AWS website with a credit card and sign up for a loginid. Then follow the steps given in this tutorial by Raffael Vogler to run the LinReg map and reduce scripts. Follow the steps but instead of Vogler's programs, use the ones described post. You should also ignore the Bootstrapping step as well two lines of -jobconf stream.num.map.output.key.fields=2 and
-jobconf map.output.key.field.separator=\t that were meant to be placed in the Arguments box since these are not required for the Linear Regression programs. Running the programs with the test data given in this post will take around 10 mins, 7 to provision and configure the machine and 3 mins to run the job. This will result in a charge of around US$ 2 or US$ 3 that will be billed to the credit card used to create the loginid. Should you use AWS/EMR do remember to terminate the cluster at the end of the exercise as otherwise the billing will continue.

AWS/EMR really removes the hassles of configuring Hadoop and makes running Map Reduce jobs as easy as, well almost, send a Gmail message ! Everything is GUI oriented. You choose the number of type of machines and input the location of the data files and the map and reduce scripts. So after building and testing your R scripts on a laptop, you can scale up to hundreds of servers in  a few minutes  and that too for only a few minutes ! Who could ask for anything more ?

In this post, we have defined a simple sales prediction problem that could be faced in any retail store and we have shown how it can be solved with Hadoop and R. The approach taken has been adopted from a YouTube video created and uploaded by Fady El-Rukby and even though he solves a completely different problem and uses native Java, not streaming R, we have used the same data and compared results to make sure that the Linear Regression function of R is working correctly. To learn more about R and Data Science in general, please read this post on Data Science - A DIY approach and to get business perspective join the Business Analytics Program at Praxis Business School, Calcutta

June 03, 2014

HIVE and PIG to simplify Hadoop

When I was doing engineering at IIT, Kharagpur, the computers that we had were not even as powerful as a low-cost non-smart phone today and other than the basic concept of programming, nothing that we learnt is of any relevance today. So when we start a teaching a course on Business Analytics, that lies at the bleeding edge of  current technology and business practices, there is simply no option but to take the Do-It-Yourself approach of first learning a subject and then teaching it to students. Fortunately, there are many kind and knowledgeable souls on this planet who have taken the pains to explain new and difficult concepts to ancients like us and thanks to Google, it is not too difficult to locate them.

Using this route, I first learnt what is Data Science and then created this compilation of tutorials and training materials that anyone can use to learn about this new subject in greater depth. The next big challenge was to Demystify Hadoop and Map Reduce as these two key concepts play a very significant role in this area of interest. Writing Map Reduce programs in java, as is the standard practice, is a non-trivial task and many people have sought to simplify matters by adopting other approaches. One is to use the Hadoop streaming API and use a program written in any executable language like Python or R. HIVE and PIG are two other products that have evolved to ease and facilitate the use of MR techniques with Hadoop systems.

HIVE simulates an SQL based query engine sitting on top of the data stored in HDFS file system on Hadoop. Anyone familiar with SQL will immediately feel at home with the DDL, DML (load, insert) and Select commands.

PIG (and its humourously named command prompt, GRUNT > ) is a scripting language that allows one to run queries on data stored on HDFS without writing complex MR programs in Java.

In this post we will

  1. Install HIVE and use SQL commands to load and retrieve data from an HDFS file system.
  2. Install PIG and use it to retrieve the same data 
  3. Do the same task with the usual Java program ( already shown in an earlier blog post.)
We assume that you have followed instructions in the earlier blog post and you single machine cluster of Hadoop installed on a Ubuntu ( preferably 14.04) machine.

Varad Meru of Orzota has created a set of four excellent tutorials that we will use to get a grip on PIG and HIVE.

The first one talks about installing Hadoop 1.0.3, but we will ignore that because we have already learnt to install Hadoop 2.2.0.

The data that is used in the three other tutorials is called the Book Crossing Dataset that you can download as a zip file and then extract ONLY the file called BX-Books.csv for the purpose of the next three tutorials.

From this file we will answer the question of how many books are published in each calendar year. Not really rocket-science but enough to meet the requirements of requirements of how HIVE and PIG work.

The second tutorial Hive for Beginners gives clear, step by step instructions to carry out the task. Almost every instruction works perfectly. The following listing show the shell script used for all three tutorials (HIVE, PIG, Java).


# --- hive and common data cleaning and loading

#hdfs dfs -mkdir /user/hive
#hdfs dfs -mkdir /user/hive/warehouse
#hdfs dfs -chmod g+w /tmp
#hdfs dfs -chmod g+w /user/hive/warehouse
#hdfs dfs -mkdir /user/hduser/BXData-in
#sed 's/&amp;/&/g' BX-Books.csv | sed -e '1d' |sed 's/;/$$$/g' | sed 's/"$$$"/";"/g' > BX-BooksCorrected.txt
#hdfs dfs -copyFromLocal /home/hduser/BXData/BX-BooksCorrected.txt /user/hduser/BXData-in

#hive -f goBX2.sql > goBX2.output

# ---- pig
#pig goBX3.pig

# --- java

#rm -rf LocalClasses
#mkdir LocalClasses
# ....
#javac -classpath $HADOOP_INSTALL/share/hadoop/common/hadoop-common-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/common/lib/commons-cli-1.2.jar -d LocalClasses BookXReducer.java
# .....
#javac -classpath $HADOOP_INSTALL/share/hadoop/common/hadoop-common-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/common/lib/commons-cli-1.2.jar -d LocalClasses BookXMapper.java
# .....
#javac -classpath $HADOOP_INSTALL/share/hadoop/common/hadoop-common-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/common/lib/commons-cli-1.2.jar:LocalClasses -d LocalClasses BookXDriver.java && jar -cvf BookXDriver.jar -C LocalClasses/ .

#hadoop jar BookXDriver.jar BookXDriver /user/hduser/BXData-in /user/hduser/BXData-MR-out


instead of typing long HIVE commands by hand, we have created a file call goBX2.sql to store the various HIVE commands and by selectively un-commenting lines, we execute the different commands.


--use default;
--show databases;
--show tables;
--LOAD DATA INPATH '/user/hduser/BXData-in/BX-BooksCorrected.txt' OVERWRITE INTO TABLE BXDataSet;
select yearofpublication, count(booktitle) from bxdataset group by yearofpublication;


The only deviation from the instructions is
  1. One error in the CREATE TABLE command. Since ";" is the EOL for HIVE files, the first CREATE TABLE statement failed because it contained a ";" symbol. This problem was solved by changing it to "\;" before the execution could proceed.
Also note that output is stored in file goBX2.output.

After using HIVE, the same task is performed using PIG by following instructions given in the tutorial PIG for Beginners.

There were two deviations from the instructions
  1. PIG was throwing a fearful error ERROR org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl - Error whiletrying to run jobs.java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected. that was causing a major abort. This was tracked down to this StackOverflow thread and the following command, issued from $PIG_HOME directory solved the problem : ant clean jar-all -Dhadoopversion=23 .. However please note that the command takes nearly 25 minutes to execute as it virtually rebuilds many Hadoop, PIG and related jars
  2. the PIG_CLASSPATH is set to the conf directory which in the case of Hadoop 2.2.0 is set to $HADOOP_INSTALL/etc/hadoop
  3. Also do note that after HIVE has loaded data into a table, it removes the data from the HDFS filesystem. So before PIG can start, the data has to be reloaded from the local file system to HDFS once again ! Simply uncomment the line in the shell script and run it once again
the PIG commands were stored in a file goBX3.pig and executed from the shell script goBX1.sh 

BookXRecords = LOAD '/user/hduser/BXData-in/BX-BooksCorrected.txt' USING PigStorage(';') AS (ISBN:chararray, BookTitle:chararray, BookAuthor:chararray, YearOfPublication:chararray, Publisher:chararray, ImageURLS:chararray, ImageURLM:chararray, ImageURLL:chararray);
GroupByYear = GROUP BookXRecords BY YearOfPublication;
CountByYear = FOREACH GroupByYear GENERATE CONCAT((chararray)$0,CONCAT(':',(chararray)COUNT($1)));
STORE CountByYear INTO '/user/hduser/BXData-out-pig/BXDataQueryResult' USING PigStorage('t');


In this case, the output is stored in the HDFS file system that can be accessed thorough the browser at localhost:50075 and downloaded.

Finally, after using HIVE and then PIG to generate the data, one can use the standard Java route as explained in this fourth and final tutorial. There is really no need to configure Eclipse with Hadoop Plug-in ( the version for Hadoop 2.2.0 is not yet ready or stable, as of now ). You can simply download the three java files : BookXDriver, BookXMapper, and BookXReducer and then use the javac command from the ubuntu prompt as given in the shell script above. Once again the output will be stored in the HDFS directory /user/hudser/BXData-MR-out ( as show in the diagram above ) and can be downloaded for comparison with the two other results.

Ok, here is the final screenshot of the applications console available in the browser at localhost:8088 that shows all the three jobs to have executed successfully.

If you find any errors in this post, please leave a comment. If you find it useful, do share it with your friends ... and also check out the Business Analytics program at Praxis Business School.

June 01, 2014

Consultant for Criminals or Criminal Consultants

In an earlier post on Extreme Konsulting, I had explored the characteristics and challenges of delivering management consulting services to reluctant and criminally corrupt government offices, but Sameer Kamat in his book Business Doctors, goes far, far deeper. In this new novel set in US West Coast, Sameer explores what it would, or could, mean to offer professional management consulting services to actual, law-breaking, criminals !

Best MBA Books | Business Doctors Management Consulting Gone WildAs one who has been in the consulting profession for a long time, Sameer knows too well that the business is not about rocket-science, earth-shattering ideas, but the ability to locate an action-template that has worked in the past and apply it, with some creative tweaks, to address the situation at hand.

This simple point-of-view has been expanded on and converted into a novel that explores the mutually awkward relationship that develops between an Ivy League educated, laptop-carrying management consultant and a mafia mob leader and his henchmen who have never seen anything like a Powerpoint Presentation !

But behind the initial sequence of hilarious situations, there is a deeper and then darker truth that Sameer's novel has pointed to. First, a criminal organisation is really no different from any run-of-the-mill corporate in terms of underlying business processes, like recruitment, money management, public relations and so on. It is only that the rules they break are from a different section of the statutes and some of them have more undesirable outcomes !

Which brings us to the second and deeper issue. Where is the dividing line between the consultant and the criminal ? Where does consulting to criminals end and the consultant becomes a criminal himself ? There are many instances, primarily in the US, where famous management consultants are now sitting in jail. Is it that the trajectory described in Business Doctors the one that has taken them there ? Read the novel and you may find an answer.

Those who are in the management consulting profession will love the way Sameer has mapped the well known tools of their trade to the world criminals. There are a few small plotholes but given the challenge of trying to fit a square peg into the round hole, we should be more than happy to give the author the licence to stretch facts to make ends meet. Net-net, quite a page turner that will keep you amused and intrigued till the very end.

May 31, 2014

Demystify Map Reduce and Hadoop with this DIY tutorial

In an earlier post on Data Science - A DIY approach, I had explained how one can initiate a career in data science, or data analytics, by using free resources available on the web. Since Hadoop and Map Reduce is a tool and a technique that is very popular in data science, this post will get you started and help you
  1. Install Hadoop 2.2, in a single machine cluster mode on a machine running Ubuntu
  2. Compile and run the standard WordCount example in Java
  3. Compile and run another, non WordCount, program in Java
  4. Use the Hadoop streaming utility to run a WordCount program written in Python, as an example of a non-Java application
  5. Compile and run a java program that actually solves a small but representative Predictive Analytics problem
All the information presented here has been gathered from various sites on the web and has been tested on dual-boot laptop running Ubuntu 14.04. Believe me, it works. Should you still get stuck because of variations in the operating environment, you would need to Google with the appropriate error messages and locate your own solutions.

Hadoop is a piece of software, a Java framework, and MapReduce is a technique, an algorithm, that was developed to support "internet-scale" data processing requirements at Yahoo, Google and other internet giants. The primary requirement was to sort through and count vast amounts of data. A quick search in Google will reveal a vast number of tutorials on both Hadoop and MapReduce like this one on Introduction to the Hadoop Ecosystem by Uwe Seiler, or  Programming Hadoop MapReduce by Arun C Murthy of Yahoo. You can also download Tom White's Hadoop - The Definitive Guide, 3rd Edition or read Hadoop Illuminated online

Slide 5 of Murthy's deck goes to the heart of the MapReduce technique and explains it with a very simple Unix shell script analogy. A Map process takes data and generates a long list of [key, value] pairs, where a key is an alphanumeric string, e.g. a word, a URL, a country and the value is a usually a numeric. Once the long list of  [keyvalue] pairs have been generated, this list is sorted ( or shuffled) so that all the pairs with the same key are located one after the other in the list. Finally, in the Reduce phase, the multiple values associated with each key, are processed ( for example, added ) to generate a list where each key appears once, along with it's single, reduced, value.

What Hadoop does is distribute the Map and Reduce process among multiple computers in way that is essentially invisible ( or as they say, transparent) to the person executing the MapReduce program. Hence the same MapReduce program can be executed on either a standalone "cluster" of a single machine ( as will be the case in our exercise) or in genuine cluster of appropriately configured machines. However if the size of the data is large, or "internet-scale", a single machine cluster will either take a very long time or may simply crash.

My good friend Rajiv Pratap, who runs an excellent data analytics company, Abzooba, has a brilliant analogy for Hadoop. Let us assume that field is covered with thousands of red and green apples and I am asked to determine the number of each red and green apples. I might slowly and painstakingly go through the whole field myself but better still I can hire an army of street-urchins who can barely count upto 20 ("low cost commodity machines"). I ask each urchin to pick up an armload of apples, count the number of red and green ones and report back to me with two numbers, say, ([red, 3], [green, 8]). These are my [key, value] pairs, two pairs reported by each urchin. Then I can simply add the values corresponding to the red key and I get the total number of red apples in the field. Same for the green apples and I have my answer. In the process, if one of the urchins throws down his apples and runs away ["a commodity machine has a failure"] the process is not impacted because some other urchin picks up the apples and reports the data. Hadoop is like a manager who hires urchins, tells them what to do, shows them where the data is located, sorts, shuffles, collates their result and replaces them if one or two run away. The urchins simply have to know how to count [ Map ] and add [ Reduce ].

Anyway, enough of theory ... let us

1. Install Hadoop 2.2, in a single machine cluster mode on a machine running Ubuntu

I have a Dell laptop that has a dual-boot feature that allows me to use either Windows 7 or Ubuntu 14.04. Running Hadoop on Windows7 is possible but then you will be seen to be an amateur. As a budding Hadoop professional, it is imperative that you get access to a Linux box. If you are using Ubuntu, then you can use the directions given in this blog post to setup newest Hadoop 2.x (2.2.0) on Ubuntu

I followed these instructions except for the following deviations
  • the HADOOP_INSTALL directory was /usr/local/hadoop220 and not /usr/local/hadoop to distinguish it from an earlier Hadoop 1.8 install that I had abandoned
  • the HDFS file system was located at /user/hduser/HDFS and not at /mydata/hdfs. Hence the directories for the namenode and datanode were located at 
    • /user/hduser/HDFS/namenode310514  [ not /mydata/hdfs/namenode ]
    • /user/hduser/HDFS/datanode310514 [ not /mydata/hdfs/datanode ]
  • for xml files, located at $HADOOP_INSTALL/etc/hadoop have to be updated. The exact syntax is not very clearly given in the instructions but you can see them here

<!-- These files located in $HADOOP_INSTALL/etc/hadoop -->

<!-- Contents of file : core-site.xml -->


<!-- Contents of file : yarn-site.xml -->


<!-- Contents of file : mapred-site.xml -->


<!-- Contents of file : hdfs-site.xml -->




  • After the namenode is formatted with the command : hdfs namenode -format, the two commands to start the hadoop, namely start-dfs.sh and start-yarn.sh throw a lot of fearful errors or warnings, none of which cause any harm to the overall cause. To avoid this ugliness on the console screen, I have created two simple shell scripts to start and stop hadoop.

# -- written prithwis mukerjee
# -- file : $HADOOP_INSTALL/sbin/pm-start-hadoop-sh
# --

echo 'Using Java at        ' $JAVA_HOME
echo 'Starting Hadoop from ' $HADOOP_INSTALL
# --
# formatting HDFS if necessary, uncomment following
#hdfs namenode -format
# --
hadoop-daemon.sh start namenode
hadoop-daemon.sh start datanode
yarn-daemon.sh start resourcemanager
yarn-daemon.sh start nodemanager
mr-jobhistory-daemon.sh start historyserver
cd ~

# -- file : $HADOOP_INSTALL/sbin/pm-stop-hadoop-sh
# --

hadoop-daemon.sh stop namenode
hadoop-daemon.sh stop datanode
yarn-daemon.sh stop resourcemanager
yarn-daemon.sh stop nodemanager
mr-jobhistory-daemon.sh stop historyserver


  • create these two shell scripts, and place them in the same bin where the usual Hadoop shell scripts are stored and you can simply execute pm-start-hadoop.sh or pm-stop-hadoop.sh  from any directory to start and stop Hadoop services. After starting hadoop, make sure that all 6 processes reported by the jps command are operational.
  • If there is a problem with the datanode not starting, delete the HDFS directories, recreate them, redefine them in hdfs-site.xml, reformat the namenode and restart Hadoop.
The installation exercise ends with an execution of trivial Map Reduce job that calculates the value of Pi. If this executes without errors ( though the value of Pi it calculates is very approximate ) then Hadoop has been installed correctly. 

Next we 

2. Compile and run the standard WordCount example in Java

In this exercise we will scan through a number of text files and create a list of words along with the number of times it is found across all three files. For our exercise we will download text files of these copyright free books (a) Outline of Science, Arthur Thomson, (b) Notebooks of Leonardo Da Vinci and (c) Ulysses by James Joyce

For each book, the txt version is downloaded and kept in a directory /home/hduser/BookText as three *.txt files

Many sample java programs for WordCount with Hadoop are available but you need to find one that works with the Hadoop 2.2.0 APIs. One such program is available in the CodeFusion blog. Copy, paste and save these three java programs as WordMapper.java, SumReducer.java and WordCount.java in your machine. These three files must be compiled, linked and made into a jar file for Hadoop to execute. The commands to do so are given in this shell script but they can also be executed from the command line from the directory where the java programs are stored

rm -rf WC-classes
mkdir WC-classes
# ....
javac -classpath $HADOOP_INSTALL/share/hadoop/common/hadoop-common-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/common/lib/commons-cli-1.2.jar -d WC-classes WordMapper.java
# .....
javac -classpath $HADOOP_INSTALL/share/hadoop/common/hadoop-common-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/common/lib/commons-cli-1.2.jar -d WC-classes SumReducer.java
# .....
javac -classpath $HADOOP_INSTALL/share/hadoop/common/hadoop-common-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/common/lib/commons-cli-1.2.jar:WC-classes -d WC-classes WordCount.java && jar -cvf WordCount.jar -C WC-classes/ .

note that a directory called WC-classes has been created, and the last command is quite long and ends with '.'

Once this has executed, there will be jar file called WordCount.jar that is created and this is used when we invoke Hadoop through this set of commands stored in a shell script

hdfs dfs -rm -r /user/hduser/WC-input
hdfs dfs -rm -r /user/hduser/WC-output
hdfs dfs -ls /user/hduser
hdfs dfs -mkdir /user/hduser/WC-input
hdfs dfs -copyFromLocal /home/hduser/BookText/* /user/hduser/WC-input
hdfs dfs -ls /user/hduser
hdfs dfs -ls /user/hduser/WC-input 
hadoop jar WordCount.jar WordCount /user/hduser/WC-input /user/hduser/WC-output

the first two commands delete two directories (if they exist ) from the HDFS file system
the fourth command, creates a directory called WC-input in /user/hduser. Note that in the HDFS filesystem, that is different from the normal Ubuntu file system, user hduser has his files stored in directory /user/hduser and NOT in the usual /home/hduser
the fifth command copies the three *.txt files from the Ubuntu filesystem (/home/huser/BookText/ ) to the HDFS filesystem ( /user/hduser/WC-input )
the last command executes Hadoop, uses the WordCount.jar created in the previous step, reads the data from HDFS file system directory WC-input and send the output to HDFS directory WC-output that MUST NOT EXIST before the job is started

This job will take quite a few minutes to run and machine will slow down and may even freeze for a while. Lots of messages will be dumped on the console. Dont panic unless you see a lot of error messages. However if you have done everything correctly, this should not happen. You can follow the progress of the job by sending your browser to http://localhost:8088 and you will see an image like this

that shows a history of current and past jobs that have run since the last time Hadoop was started.

To see the files in the HDFS file system, point your browser to http://localhost:50075 and you will see a screen like this

Clicking on the link  "Browse the filesystem" will lead you to

If you go inside the WC-output directory, you will see the results generated by the WordCount program and you can "download" the same into your normal Ubuntu file system.

WordCount is to Hadoop what HelloWorld is to C and now that we have copied, compiled, linked, jarred and executed it let us move on and try to

3. Compile and run another, non WordCount, program in Java

According to Jesper Anderson, an Hadoop expert and also a noted wit, "45% of all Hadoop tutorials count words, 25% count sentences, 20% are about paragraphs. 10% are log parsers. The remainder are helpful."

One such helpful tutorial is a YouTube video by Helen Zeng where she explains the problem and then demonstrates the solution. The video not only explains the problem and the solution but also gives explicit instructions on how to execute the program. The actual code for the demo MarketRatings.java is available at Github and the data is also available in farm-market-data.csv that you can download in text format.

Once the code and the data is available in your machine, it can be compiled and executed using the following commands

# -- compile and create Jar files

javac -classpath $HADOOP_INSTALL/share/hadoop/common/hadoop-common-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/common/lib/commons-cli-1.2.jar -d classes MarketRatings.java && jar -cvf MarketRatings.jar -C classes/ .


hdfs dfs -mkdir /user/hduser/MR-input
hdfs dfs -copyFromLocal FarmersMarket.txt /user/hduser/MR-input
hdfs dfs -ls /user/hduser/MR-input 
hadoop jar MarketRatings.jar MarketRatings /user/hduser/MR-input /user/hduser/MR-output

do note that the csv file, saved in txt format as "FarmersMarket.txt" was moved the local directory to the HDFS directory and then used by the MarketRatings program in the MarketRatings.jar called by Hadoop.

Once again, the progress of the job can be viewed in a browser at http://localhost:8088 and the contents of the HDFS filesystem can be examined at http://localhost:50075

Since Hadoop is a java framework, java is the most natural language to write MapReduce programs for Hadoop. Unfortunately, java is not neither the easiest language to master ( especially if you are from the pre-java age or you are perplexed by the complexity of things like Eclipse and Maven that all java programmers seem to be at ease with ) nor the simplest language to articulate your requirements in. Fortunately, the Gods of Hadoop have realized the predicament of the java-challenged community and have a provided a way, the "streaming api", that allows programs written in any language -- python, shell, R, C, C++ or whatever -- to use the MapReduce technique and use Hadoop.

So now we shall see how to

4. Use Hadoop streaming and run a Python application

The process is explained Michael Noll's "How to run a Hadoop MapReduce program in Python" but once again the example is that of a WordCount application.

Assuming that you have Python installed in your machine, you can copy and download the two programs mapper.py and reducer.py given in the instruction.

We will use the book text data that we have already loaded into HDFS, in the java WordCount, exercise and it is already available at /user/hduser/WC-input. So we do not need to load it again.

We simply make sure that the output directory does not exist and then call the streaming API as follows

hdfs dfs -ls
hdfs dfs -rm -r /user/hduser/WCpy-output
hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -mapper /home/hduser/python/wc-python/mapper.py -reducer /home/hduser/python/wc-python/reducer.py -input /user/hduser/WC-input/* -output /user/hduser/WCpy-output
hdfs dfs -ls


As in the case with native java programs, the progress of the job can be viewed in a browser at http://localhost:8088 and the contents of the HDFS filesystem can be examined at http://localhost:50075

The jury is still out on whether there can be a trade-off between the alleged, or perceived, high performance of a native java MapReduce program and simplicity and ease of use a MapReduce program written in a scripting language. This is similar to the debate on whether programs written in Assembler are "better" than similar programs written in convenient languages like C or VB. Rapid progress in hardware technology has made such debates irrelevant and developers have preferred convenience over performance because performance is made up by better hardware.

The same debate between native java and streaming api in Hadoop / MR is yet to conclude. However Amazon Elastic Map Reduce (EMR) service has opened a new vista in using Hadoop. Using the Amazon EMR console ( a GUI front end !) one can configure the number and type of machines in the Hadoop cluster then upload the map and reduce program in any language (generally supported on the EMR machines )  specify the input and output directories and then invoke the Hadoop streaming program and wait for the results.

This eliminates the need for the entire complex process of installing and configuring Hadoop on multiple machines and reduces the MapReduce exercise (almost) to the status of an idiot-proof, online banking operation !

But the real challenge that still remains is how to convert a standard predictive statistics task, like regression, classification and clustering, into the simplistic format of map-reduce "counter" and then execute the same on Hadoop. This is demonstrated in this exercise that

5. Solves an actual Predictive Analytics problem with Map Reduce and Hadoop.

Regression is the first step in predictive analytics and this video MapReduce and R : A short example on Regression and Forecasting, is an excellent introduction to both regression and how it can be done, first in Excel, then with R and finally with a java program that uses Map Reduce and Hadoop.

The concept is simple. There is a set of 5 y values ( dependent variables ) for 5 days ( each day being an x variable ) We need to create a regression equation that shows how y is related to x and then predict the value of y on day 10. From the perspective of regression, this is trivial problem that can even be solved by hand, let alone Excel or R. The challenge is when this has to be done a million times, once for say, each SKU in a retail store or a telecom customers. The challenge becomes even bigger when these million regressions need to be done everyday to predict the value of y 5 days hence on the basis of the data of the trailing 5 days ! That is when you need to call in Map Reduce and Hadoop.

The exercise consists of these four java programs and the sample data that you can download. Then you can follow the same set of commands given in section 2 above to compile and run the programs. The same application ported to R and tailored to a retail scenario is available in another blog post, Forecasting Retail Sales - Linear Regression with R and Hadoop.

rm -rf REG-classes
mkdir REG-classes
# ....
javac -classpath $HADOOP_INSTALL/share/hadoop/common/hadoop-common-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/common/lib/commons-cli-1.2.jar -d REG-classes Participant.java
# .....
javac -classpath $HADOOP_INSTALL/share/hadoop/common/hadoop-common-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/common/lib/commons-cli-1.2.jar:REG-classes -d REG-classes ProjectionMapper.java
# .....
javac -classpath $HADOOP_INSTALL/share/hadoop/common/hadoop-common-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/common/lib/commons-cli-1.2.jar:REG-classes -d REG-classes ProjectionReducer.java
# .....
javac -classpath $HADOOP_INSTALL/share/hadoop/common/hadoop-common-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/common/lib/commons-cli-1.2.jar:REG-classes -d REG-classes Projection.java && jar -cvf Projection.jar -C REG-classes/ .
# ............
hdfs dfs -rm -r /user/hduser/REG-input
hdfs dfs -rm -r /user/hduser/REG-output
hdfs dfs -ls /user/hduser
hdfs dfs -mkdir /user/hduser/REG-input
hdfs dfs -copyFromLocal /home/hduser/JavaHadoop/RegScore.txt /user/hduser/REG-input
hdfs dfs -ls /user/hduser
hdfs dfs -ls /user/hduser/REG-input 
hadoop jar Projection.jar Projection /user/hduser/REG-input /user/hduser/REG-output

These five exercises are not meant to be a replacement for the full time course on Map Reduce, Hadoop that is taught at the Praxis Business School, Calcutta,  but will serve as a simple introduction to this very important technology.

If you find any errors, please leave a comment. Otherwise if you like this post, please share with your friends.

May 21, 2014

Tawang @ Arunachal Pradesh

Arunchal Pradesh is said to be one of the most beautiful parts of India but because it is so very remote I have never had the opportunity to visit it in the past. This summer, when Calcutta was sweating at 43 Celsius, we decided to pay a visit. Arunachal is a vast state with an extreme diversity of culture. The western end, around Tawang, shares the Buddhist culture of Tibet / Bhutan while the eastern end is primarily tribal and animistic. Unless you have a lot of time and energy it is better to stick to one end. We chose the Tawang end -- the scene of the great 1962 debacle, where an under prepared and ill equipped Indian army was badly bruised by China despite many instances of personal bravery by our Jawans.

We started off from Guwahati / Tezpur and this was the route that we took

We followed the Kameng River

and spent the night at Prashanti Cottage, Bhalukpong, a beautiful resort on the river bank

where from the watch tower attached to the restaurant, you just might get a glimpse of elephants and other wild animals in the reserve forest located just across the river

The next day we entered the land of the clouds

and every tree was dripping wet

but we saw a very beautiful waterfall on the roadside.

and then we entered Bomdila district

but we did not stay in the town of Bomdila but pressed on to the lovely little town of Dirang

where we stayed in this lovely hotel

and explored the river that flowed nearby

It was a bright and sunny day and the views were breathtaking

even though dusk was fast approaching

next to the hotel was a local Buddhist shrine

with an excellent mural on the wall

and Buddhist religious texts on stone tablets

After spending a pleasant night in Dirang, we finally entered the high country and passed through some of the most rugged roads on our journey

that went on ...

and on ... till we spotted Sela Pass

and finally arrived at Sela Pass -- the Gateway to the fabled district of Tawang

the Altimeter app on my Android phone showed the height to be a little lower, but not much

Sela Pass and Sela Lake is a spectacular place. Till March it is totally covered by snow, though the engineers of the Border Roads Organisation do keep it open throughout the year but even in May the place is very, very cloudy. However we were lucky that for us, the day was bright and sunny and we could enjoy the spectacular scenery


some of the peaks around were still covered in snow

but this was the peak that caught our attention

because it is associated with the legend of Sela, a local tribal girl, who with her sister Nura (more about her later ) loved the heroic and valiant Jaswant Singh Rawat, of the Garhwal Regiment, who fought the Chinese with amazing bravery and was martyred here. On coming to know of Jaswant's death, Sela killed herself by jumping off this very cliff.

Just after Sela Pass, is the place where Jaswant made his last heroic stand, where the battle of Nuranang was fought to the last man, last bullet. The Indian Army has renamed this non-descript bunker as Jaswant Garh or Fort Jaswant

and this how the place looks now

but you can still see the lonely bunker where Jaswant and his comrades fought so bravely

here is a statue of Jaswant Singh Rawat

(btw, the message beneath says NO DONATION PLEASE )

Jaswant's memory is lovingly preserved by the Indian Army along with his personal effects, his kit and his shoes that are still polished every day

we read about the exploits of Jaswant and his comrades

and paid our respects to the heroes of the war by signing our name in the visitor's book kept at the memorial

and then we moved on through the Yaks grazing ..

on wide meadows covered in yellow flowers

and reached a gigantic waterfall named after Nura (the second of Jaswant's lovers and Sela's sister)

this is a HUGE waterfall whose size can only be appreciated if you walk closer

and the mist that is thrown up is stupendous

and really drenches you if you venture too close

the rains had not yet started but even then the volume of water was very high

as you can see from these photographs

here is another one.

The song Tanhai Tanhai in the movie Koyla, featuring Madhuri Dixit and Shahrukh Khan was shot at the Nuranang Falls and this video clip will give you a better view of this thundering mass of water.

Leaving Sela (Pass), Jaswant (Garh) and Nura (Falls) with their memories we pressed on to our destination Tawang that is the home of the second largest Buddhist Monastery in the world ( after the Potala Palace at Lhasa) and this our first view of the same

Inside the monastery

there is a huge statue of the Buddha, among other historical and religious artifacts

and a giant prayer wheel

Here is a general view of Tawang, with our hotel visible as the big building at the extreme left

But the really exciting part of the trip was the journey to the PTSoy lake ! that took us through some of the roughest roads which were totally obscured by clouds. It was so dark that let alone take pictures, the driver could barely see 10 feet in front of the car and we repeatedly ran into army vehicles and earth moving equipment but just as we reached the lake ... the clouds lifted, the fog cleared and a weak sun showed its face

and we could take a few pictures

in a great  hurry

before the clouds rolled in again

and blanked out the lake.

On the way back to Tawang town we passed some 1962 era bunkers now abandoned by the Indian Army

and I stepped into one of them to get a feel of what it would have like to see the Chinese Army coming down this road

we ended our stay in Tawang with a visit to the Martyrs Memorial

dedicated to the

and to the most heroic of them all, Joginder Singh, who was posthumously awarded India's highest battle honour, the Param Vir Chakra

for fighting till his last bullet and then charging at the enemy with only a bayonet fixed to his rifle and Sikh war cry of "Jo Boley So Nihal, Sat Sri Akal" on his lips.

Here is a map of the places that we visited on this trip

About This Blog

  © Blogger template 'External' by Ourblogtemplates.com 2008

Back to TOP