Wednesday, March 19, 2014

Step-by-step guide to set up Multi-node Hadoop Cluster on Ubuntu Virtual Machines

If you've landed on this page, I know your feelings. Wanna know how it feels when it's done? Ride on, it's like Roller Coaster...

You have successfully configured a single-node cluster in 7 easy steps. Good! But you are yet to taste the real essence of Hadoop. Recall that the primary purpose of Hadoop is to distribute a very lengthy task to more than one machines. This is exactly what we are going to do, but the only difference is that we will be doing so in Virtual machines.

Step 1: Networking

We have several things to do first with the existing VM, beginning with disabling IPv6. This is a recommendation because Hadoop currently does not support IPv6 according to their official Wiki. In order to do so, you will have to modify a fine named /etc/sysctl.conf:
- Launch your Virtual Machine from Virtualbox
- Open your terminal and run:
$ sudo nano /etc/sysctl.conf

- Add the following lines at the end of the file:
# Disable ipv6
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1



- Now you'll have to reboot the machine. Sorry :(
- Run the following command to check if IPv6 is really disabled:
$ cat /proc/sys/net/ipv6/conf/all/disable_ipv6

You should get 1 as output, meaning that IPv6 is disabled.



Next, we change the Network Settings of Virtual Machine.
- Go to Virtual Machine's Menu > Devices > Network Settings..
- Change the Network Attached to from NAT to Bridged Adapter and choose your Network Adapter from below. Mine is as shown in the image.



Now we change our Ubuntu's IP addressing from Dynamic to Static. This is because we will be using fixed IPs:
- On your Ubuntu's desktop, open Networks window from Top-Right menu by Clicking Edit Connections
- From the same menu, also open Connection Information. This is handy to enter correct settings
- Edit the only connection (if you have multiple, you may want to remove the ones not in use)
- Go to IPv4 Settings and Changed the Method to Manual
- Add a new Address by looking at the Connection Information. You should change the last number to something like 10, 50, or 100
- Gateway and Netmask should remain similar to that of Connection Information
- Save the Settings and Close; the Network should refresh itself

We are not done yet. We now need to define what our Master and Slave machines will be called (defining aliases) in the Network:
- On terminal run:
$ sudo nano /etc/hosts

- Replace the local address from 127.0.0.1 localhost to your machine's IP address and call it master
192.168.0.50 master
- Also, comment out all the lines below, which are related to IPv6

- Change localhost to master. This will make your first line look like:
192.168.0.50 master
- Add another like for slave:
192.168.0.51 slave



Recall that in our single-node set up, we configured some xml files and defined the machine's name as localhost. Well! We'll have to change the localhost to master now:

- Edit the core-site.xml
$ sudo nano /usr/local/hadoop/conf/core-sites.xml

- Change localhost to master



- Save and exit
- Now edit the mapred-site.xml in similar way
$ sudo nano /usr/local/hadoop/conf/mapred-sites.xml

- Change localhost to master
- Save and exit

Finally, shut down the machine. We have to clone it

Step 2: Cloning Virtual Machine

Thanks to VirtualBox's built-in features that we won't have to create another machine from the scratch:
- Right click on Hadoop VM > Clone
- Name the new machine Hadoop Clone
- Next, choose Full clone
- Next, choose Current machine state and Clone



You may want to check this if you encounter problems.

After completion, launch both Virtual Machines. But please watch your memory resources, if you have a total of 4GB, then reduce the size of both VMs' memories to 1GB each in their settings.

Log into both machines and following the steps of change the IP address of newly created clone from 192.168.0.50 to 192.168.0.51 (or whatever you have to set). Wait until the Network refreshes.

This clone is your original machine's Slave. Now we need to define hostname for both machines. Edit the file /etc/hostname. Run:

$ sudo nano /etc/hostname
- Change localhost to master on your master machine and to slave on your slave machine


- Save and Exit
- Restart both machines to apply changes

Step 3: Connectivity

We need to ensure that SSH access is available from master to slave. On master, we already have SSH configured, but we will need to regenerate SSH key on slave machine because it is a clone of master and we should always have a unique key for machines.

- In both master and slave machine, clear existing files from /home/hadoop/.ssh directory
$ sudo rm /home/hadoop/.ssh/*

- Now generate a new public key for SSH. For details, check step 3 here
$ ssh-keygen -t rsa -P ""


- Copy the key to authorized_keys list
$ cat /home/hadoop/.ssh/id_rsa.pub >> /home/hadoop/.ssh/authorized_keys

- Try the SSH to master from master and slave from slave itself:
On master
$ ssh master
On slave
$ ssh slave

If you experience having to write password because of "Agent admitted failure to sign using the key" error on any of the machines, then you'll have to run the following command to fix it:
$ ssh-add

We will have to copy the master's public key to slave machine as well. On master, run:
$ ssh-copy-id -i /home/hadoop/.ssh/id_rsa.pub hadoop@slave

You will be asked for slave's password to continue (this should be same as master's if you haven't changed explicitly).



We should now be able to connect to slave from master. Why not try out? Run:
$ ssh slave

You should see a successful connection if nothing went wrong.



- Exit the SSH connection to slave:
$ exit


Step 4: Configuring Hadoop Resources

We will run all services on Master node and only TaskTracker and DataNode on slave machines; on large-scale deployments, you may be running Secondary Namenodes and also, may not be running TaskTracker and DataNode on master (when master is dedicated for task assignments only, not executing). We will define which machine will be actual master for Hadoop and which ones will be Slaves (or workers).

- In your master machine, edit the /usr/local/hadoop/conf/masters file:
$ sudo nano /usr/local/hadoop/conf/masters

- Write master



- Save and exit
- In same machine, edit the /usr/local/hadoop/conf/slaves file:
$ sudo nano /usr/local/hadoop/conf/slaves

- Write master in first line and slave in second



- Save and exit

Since we have 2 machines now, we can enable replication in hdfs-site.xml
$ sudo nano /usr/local/hadoop/conf/hdfs-site.xml

- Change the value to 2



- Save and exit


Step 5: Invoking Demons

Format the HDFS file system first:
$ hadoop namenode -format



- Start Namenode, Secondary Namenode and JobTracker on master and Datanode and TaskTracker on both master and slave
$ start-all.sh



- Run Jps utility on master to check running services:
$ jps



- Similarly, run Jps utility on check to check running services:
$ jps



Important! If you see Datanode service not running on any of the machines, then retry this whole step after rebooting the machine that is missing it. If the problem still persists, you'll have to look deeper. Open the Datanode log of the machine:
$ sudo nano /usr/local/hadoop/logs/hadoop-hadoop-datanode-slave.log (or master.log)

If you find a java.io.IOException, saying Incompatible namespaceIDs. Then you'll have to manually correct the datanode namespace ID; this should be identical to the ID. Look for the namenode namespace ID and replace it in your dfs directory (this is defined in hdfs-sites.xml as a property, but in our case, this should be in /home/hadoop/tmp/dfs/data/current/VERSION




- Now stop and start all service on master
$ stop-all.sh
$ start-all.sh

Step 6: Executing MapReduce Application

We will try the same word count example. On master
- Copy some more text files in the /home/hadoop/Documents/books (assuming you have already run this example previously)
- Copy the files in HDFS. Run:
$ hadoop dfs -copyFromLocal /home/hadoop/Documents/books /HDFS/books

Important! If you see a SafeModeException that are keeping the files from copying, turn off the Hadoop safe mode by running:
$ hadoop dfsadmin -safemode leave

You may require this because initially, Hadoop runs in fail safe mode and does not allow certain services. It then transitions into normal mode; this transitioning may not happen sometimes.


- Run the wordcount example
$ hadoop jar $HADOOP_HOME/hadoop-examples-1.2.1.jar wordcount /HDFS/books /HDFS/books/output

- Collect the output:
$ hadoop dfs –getmerge /HDFS/books/output $HOME/Documents/books/

Step 7: Tracking the Progress

You can track the status of NameNode on your browser master:50070
The MapReduce tasks on different nodes can be seen on master:50030

At this point, if you're not feeling like praying, or give a hug to the person sitting next to you, or dancing on the table, then you have no emotion.. :-|

End Note:
If you have arrived to here seamlessly, or with some glitches that you were able to fix by yourself, then you can certainly deploy a multi-node cluster with more than 2 nodes in real World with little effort. I would still recommend to try to extend the existing set up to 3 nodes and maybe on multiple Virtual Machines on different Physical Machines. You may also try to run your own applications via Eclipse on larger data sets to enjoy real Hadoop experience. 
Happy Hadooping...

* All the experiments were performed on my Lenovo Ideapad U510 laptop with moderate specifications.
* If you find some images not being displayed (I have absolutely no idea why this happens), I have put them here for download.

Wednesday, March 12, 2014

Executing MapReduce Applications on Hadoop (Single-node Cluster) - Part 3

In our previous experiment, we ran source code of Word count MapReduce application eclipse. This time, we are going to write our own piece of code.

Remember Permutations and Combinations you studied in College? We will write a fresh approach to compute combinations of all strings in a file. You'll have to make a very few changes to the existing code.

First, you need to create a text file with some words separated by spaces:
- Create a new text file named words.txt in /home/hadoop/Documents/combinations/
- Enter some text like:
Astronomystar sun earth moon milkyway asteroid pulsar nebula mars venus jupiter neptune saturn blackhole galaxy cygnus cosmic comet solar eclipse globular panorama apollo discovery seti aurora dwarf halebopp plasmasphere supernova cluster europa juno keplar helios indego genamede neutrinos callisto messier nashville sagittarius corona circinus hydra whirlpool rosette tucanaeAndroidcupcake donut eclair froyo gingerbread honeycomb icecreamsandwich jellybean kitkat lemonadeUbuntuwarty warthog haory hedgehog breezy badger dapper drake edgy eft feisty fawn gutsy gibbon herdy heron intrepid ibex jaunty jackalope karmic koala lucid lynx meverick meerkat natty narwhal oneiric ocelot raring ringtail

- Save and exit

Now open your Eclipse. In your existing Combinatorics Project, add a new Class:
- Right click on src > New > Class
- Name it Combinations
- Replace the code with the following:

import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.SortedSet;
import java.util.StringTokenizer;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**

 * MapReduce Application to discover combinations of all words in a text file
 * 
 * @author hadoop
 * 
 */
public class Combinations {

// Inherit Mapper class, our input and output will be all text

public static class MyMap extends Mapper<LongWritable, Text, Text, Text> {
// Empty string to write as values against keys by default
private Text term = new Text();

// Mapping function to map all words to key-value pairs

protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// Convert all text to a string
String str = value.toString();
// Tokenizer to break text into words, separated by space
StringTokenizer tokenizer = new StringTokenizer(str, " ");
// Write a key-value pair against each term to context (job)
while (tokenizer.hasMoreTokens()) {
term.set(tokenizer.nextToken());
// If length of the term exceeds 20 characters, skip it, because
// processing strings of greater lengths may not be possible
if (term.getLength() > 20)
continue;
// Initially, pass term as both key and value
context.write(term, term);
}
};
}

// Inherit Reducer class, this will be executed on multiple nodes and write

// output to text file
public static class MyReduce extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// Sorted collection to store set of combinations
SortedSet<String> list = new TreeSet<String>();
// Iterate for each key
for (Text text : values) {
// Find out all combinations of a string
String str = text.toString();
int length = str.length();
// The number of combinations is 2^(n-1)
int total = ((Double) Math.pow(2, length)).intValue() - 1;
for (int i = 0; i < total; i++) {
String tmp = "";
char[] charArray = new StringBuilder(
Integer.toBinaryString(i)).reverse().toString()
.toCharArray();
for (int j = 0; j < charArray.length; j++) {
if (charArray[j] == '1') {
tmp += str.charAt(j);
}
}
list.add(tmp);
}
list.add(str);
}
// Write term as key and its combinations to output
context.write(key, new Text(Arrays.toString(list.toArray())));
};
}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
// Initiate MapReduce job, named combinations
Job job = new Job(conf, "combinations");
// Our keys in output will be text
job.setOutputKeyClass(Text.class);
// Our values in output will be text
job.setOutputValueClass(Text.class);
job.setMapperClass(MyMap.class);
job.setReducerClass(MyReduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// Fetch input/output paths from arguments
FileInputFormat.addInputPath(job, new Path(args[0]));
// Create a time stamped directory inside input directory for output
FileOutputFormat.setOutputPath(job,
new Path(args[0] + String.valueOf(new Date().getTime())));
job.waitForCompletion(true);
}
}

Execute the code:
- Right click > Run As > Run Configurations
- Right click on Java Application (from left pane) > New
- On top, name it Combinations
- In Arguments tab, write /home/hadoop/Documents/combinations/
- Apply and Run

The MapReduce application should run and you should be able to collect the output in a time stamped folder in /home/hadoop/Documents/combinations/ directory

This algorithm is explained in detail here. You should also read the comments, the rest of the code is self-explanatory.

With this experiment, we'll wrap up our experiments on Single-node Hadoop clusters. I will encourage you to do some more experiments and make yourself comfortable before you try out something Big.

Monday, March 10, 2014

A faster, Non-recursive Algorithm to compute all Combinations of a String

Imagine you're me, and you studied Permutations and Combinations in your high school maths and after so many years, you happen to know that to solve a certain problem, you need to apply Combinations.

You do your revision and confidently open your favourite IDE to code; after typing some usual lines, you pause and think, then you do the next best thing - search on Internet. You find out a nice recursive solution, which does the job well. Like the following:

import java.util.ArrayList;
import java.util.Date;

public class Combination {
   public ArrayList<ArrayList<String>> compute (ArrayList<String> restOfVals) {
      if (restOfVals.size () < 2) {
         ArrayList<ArrayList<String>> c = new ArrayList<ArrayList<String>> ();
         c.add (restOfVals);
         return c;
      }
      else {
         ArrayList<ArrayList<String>> newList = new ArrayList<ArrayList<String>> ();
         for (String o : restOfVals) {
            ArrayList<String> rest = new ArrayList<String> (restOfVals);
            rest.remove (o);
            newList.addAll (prependToEach (o, compute (rest)));
         }
         return newList;
      }
   }

   private ArrayList<ArrayList<String>> prependToEach (String v, ArrayList<ArrayList<String>> vals) {
      for (ArrayList<String> o : vals)
         o.add (0, v);
      return vals;
   }

   public static void main (String args[]) {
      ArrayList<String> i = new ArrayList<String> ();
      i.add ("a");
      i.add ("b");
      i.add ("c");
      long start = new Date ().getTime ();
      Combination c = new Combination ();
      c.compute (i);
      System.out.println ("Elapsed Time: " + (new Date ().getTime () - start));
   }
}

So, if the above does what we need, what's the problem we are addressing? Well! Try passing "acknowledgement" to this function and enjoy your cup of coffee, cause there is no way your program will finish execution in realistic time; in fact, it may even crash due to low memory. The reason for that is the problem of computing all combinations is NP-Hard, so as the length of the string increases, the time hikes exponentially. The graph below illustrates this very well (input is on x-axis and time on y-axis).


Image Ref: http://www.regentsprep.org

What's wrong with the current approach is recursion. As your program starts branching, the tree becomes gigantic and your memory requirement grows exponentially too. While you cannot reduce the time it takes to compute all combinations, you can certainly do some tinkering to reduce the memory consumption, thus reducing the additional overhead.

In order to mitigate this issue, we look for a non-recursive solution. Now, in my case, I couldn't really find any (you might be luckier). So here is what I did:

Recall the table you once wrote in your College that maps all Hexa-decimal digits to respective 4-digit binary values?
0 = 0000
1 = 0001
2 = 0010
. . .
. . .
F = 1111

What's so interesting about these binary numbers? You choose the length 4, turn all to 0 and keep adding 1 and you'll get ALL combinations that can come out of a 4-digit string of binary digits. Now, think of a string stored in a character-array, if you print the characters on indices represented by a binary-digit array, you can get any combination from this string. This is exactly what we do here. We iterate a binary-digit array to the maximum number of combinations and bang! You get a non-recursive method to discover all possible combinations from a string. Here is the code in Java:

import java.util.Date;
import java.util.SortedSet;
import java.util.TreeSet;

public class Combinations {
   public static void main (String[] args) {
      long start = new Date ().getTime ();
      combination ("teststring");
   }

   public static String[] combination (String str) {
      SortedSet<String> list = new TreeSet<String> ();
      int length = str.length ();
      int total = ((Double) Math.pow (2, length)).intValue () - 1;
      for (int i = 0; i < total; i++) {
         String tmp = "";
         char[] charArray = new StringBuilder (Integer.toBinaryString (i)).reverse ().toString ().toCharArray ();
         for (int j = 0; j < charArray.length; j++)
            if (charArray[j] == '1')
               tmp += str.charAt (j);
         list.add (tmp);
      }
      list.add (str);
      return list.toArray (new String[] {});
   }
}

And here's the comparison of both the algorithms:

On x-axis, we have length of string ranging between 5 and 21 and time in milliseconds on y-axis. The recursive algorithm refused to proceed after length 10, throwing OutOfMemory Exception.

Note: due to fitting problem, I took Log of time by both Algorithms.

Another plus with this algorithm is that since recursive functions keep reducing the problem to a simpler solution and not start solving them unless it reaches the bottom, i.e. it cannot further reduce the input, you cannot interrupt them in the middle and ask for the values it has computed so far. Whereas here, you can stop the program at any stage and fetch the the program has already computed.

From pure algorithmic aspect, the complexity with former approach is Exponential, i.e. O(k^n). But our solution does the same in quadratic time, O(n^2). Please note that this does not mean that we have reduced an NP-Hard problem to a P-type problem. Because the number of times the loop executes itself grows exponentially, it is the execution time within the loop that we have reduced.

Realistically speaking, a common word in English will be under 20 characters. I mean, how often do you use Internationalization, really? But then this isn't only about English, right?!

Ending note: I'm new to theoretical computing, and might have mistaken here; please make correction if I have misinterpreted the results...

Saturday, March 8, 2014

Executing MapReduce Applications on Hadoop (Single-node Cluster) - Part 2

Previously, we saw how to execute built-in example of Word Count on Hadoop, in this part, we will try to build the same application on Eclipse from the source code of word count and run it.

First, you need to install Eclipse on your Hadoop-ready Virtual Machine (assuming that JDK is already installed when you set up Hadoop). This can be done by installing from Ubuntu software center, but my recommendation is that you download it and extract to your Home directory. Any version of Eclipse should work, I have done the experiments on version 4.3 (Kepler).

After installation, launch Eclipse and the first thing to do is to make Oracle JDK your default Java Runtime:
- Go to Window > Preferences > Java > Installed JREs
- If the default JRE does not point to Oracle JRE, then edit and set the directory to /usr/lib/jvm/java-7-oracle/
- Press OK to finish




Now we will create a Java Application Project:
- Go to New > Java Project



- Name the project Combinatorics, since we will be doing some counting problems in this project



- No need to change anything else. Press Finish
- A Java project named Combinatorics should appear in your Package Explorer window on the Left

We will need some external libraries in order to build Hadoop's code. Download these libraries:

I have put these libraries in a zipped file here as well.


After you have collected all the libraries:
- Right click on the project > New > Folder
- Name the folder lib and finish



- Copy all jar files in the newly created folder (you can do so in Nautilus as well as in Eclipse)
- Right click on lib folder and click Refresh



- The jars you have added should now appear here
-Go to Project > Properties > Java Build Path > Add Jars > Combinatorics> lib. Select all jar files



- Go to Project and check Build Automatically

Next, we need to create a Source file in src folder. Right click on src folder > New > Class. Name it WordCount and Finish.



Add the following methods to the newly created class:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;

public class WordCount {
   public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
      private final static IntWritable one = new IntWritable (1);
      private Text word = new Text ();

      public void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {
         String line = value.toString ();
         StringTokenizer tokenizer = new StringTokenizer (line);
         while (tokenizer.hasMoreTokens ()) {
            word.set (tokenizer.nextToken ());
            context.write (word, one);
         }
      }
   }

   public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
      public void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
         int sum = 0;
         for (IntWritable val : values) {
            sum += val.get ();
         }
         context.write (key, new IntWritable (sum));
      }
   }

   public static void main (String[] args) throws Exception {
      Configuration conf = new Configuration ();
      Job job = new Job (conf, "wordcount");
      job.setOutputKeyClass (Text.class);
      job.setOutputValueClass (IntWritable.class);
      job.setMapperClass (Map.class);
      job.setReducerClass (Reduce.class);
      job.setInputFormatClass (TextInputFormat.class);
      job.setOutputFormatClass (TextOutputFormat.class);
      FileInputFormat.addInputPath (job, new Path (args[0]));
      FileOutputFormat.setOutputPath (job, new Path (args[1]));
      job.waitForCompletion (true);
   }
}

This is the simplest for of a MapReduce program. We will have an in-depth look at the code later; first, we need to run this.

- Go to Run > Run to execute the program
- The program should, at first end on an ArrayIndexOutOfBounds Exception
- Go to Run > Run Configurations > Arguments and add the following argument:
/home/hadoop/Documents/books/ /home/hadoop/Documents/books/output (assuming that you followed part 1 and the text files are still in this path)



- Before you press Run, are all the Hadoop services running? You have to start them. Remember! Here is the command:
$ start-all.sh
- Now press Run

Watch the same progress log on the output window that you previously saw on Terminal. Your output should be in the /home/hadoop/Documents/books/output directory.



Next, we will try to understand the code and maybe change it to try something else.

Please feel free to comment for corrections, cricitcs, help, etc.