Pages

Thursday, November 22, 2012

Hadoop distributed file system (HDFS)

HDFS is a distributed file system. It is loosely based on Google file system. It is one of the core components of Hadoop system. Hadoop cluster consists of Master name node and data nodes. The master name node stores metadata information about the blocks and the data nodes store the actual data. In the world of distributed computing, hardware and network failures can easily occur. Hence, it is essential that the system has in-built fault tolerance. HDFS provides fault tolerance by distributing data across multiple nodes, so, even if one of the nodes were to go down, the data is not lost. As the data gets loaded into HDFS, it is split into blocks, typically 64MB or 128MB. These blocks are replicated across multiple nodes (default is three nodes). From a user perspective, HDFS abstracts the networking aspect from the end users. So, even though the data is spread across multiple nodes, users do not have to worry about networking and other low-level infrastructure code. The data is local to the nodes and nodes themselves do not talk much with each other. Essentially, it is a share nothing architecture.

Key characteristics of HDFS :

Write once.
Does not support random writes.
Optimized for streaming reads.
Works well for small number of large files

Here are some examples of HDFS commands  as :

hadoop fs -ls
hadoop fs -mkdir hdfsTest
hadoop fs -copyFromLocal SherlockHolmes.txt hdfsTest/SH.txt
hadoop fs -cat hdfsTest/SH.txt | tail -n 2
hadoop fs -rm hdfsTest/SH.txt
hadoop fs -rmr hdfsTest

Monday, November 12, 2012

Pig latin

Pig is the data flow system for Hadoop. It provides a way to execute map reduce jobs without writing code in Java. Pig comes with a scripting language called Pig Latin. Pig provides an abstraction on top of map reduce. Pig interpreter, which runs outside of the hadoop system, decomposes a pig latin script into map reduce job and submits it to the hadoop cluster. Pig latin is suitable for people familiar with scripting languages. While both hive and pig provides abstraction on top of hadoop map-reduce, one big difference between the two is that pig does not have any concept of metadata. Pig loads datasets that can be modified using pig latin scripts. The pig latin scripts can be used to complex processing such as joins, group by, order by, etc., using simple constructs. Users can also create custom user defined functions and use them in pig latin scripts. There is an open source pig function library called piggy bank that can be downloaded freely.

Map Reduce


What is Map reduce ?
Map reduce is a framework for distributed processing. The framework takes care of abstracting the distribution from the developers. Developers write standard java programs and the framework takes care of distributing them across the network, monitoring the processing, detecting any failures and re-running it if there is a failure. The framework can also run the same task on several nodes if one of the node is running very slowly and use the results from the faster node. This is known as speculative excecution.
How does it work ?
There are two phases to map-reduce. In the map phase, the input key value pairs are processed and a new set of key-value pairs are generated. The number of mappers depends on the volume of input data. At the end of the map phase, the intermediate keys are combined into a list and shuffled to the reducers on a per key basis. The keys are shuffled to the reducer, in a sorted order. The number of reducers can be configured by the developer. To achive the distribution of processing, the framework employs two daemons: Job tracker and Task tracker. The job tracker runs on the named  node and it distributes individual map tasks and reduce tasks to nodes in the cluster. The task tracker manages these individual tasks and reports the results to the job tracker. When the task tracker is running the task, it is accessing the local slice of data on the node.
How to write map-reduce code ?
At the minimum, there are three parts to a map-reduce program :  Client-side driver, Mapper and Reducer. Client-side driver, Mapper and Reducer. The mapper and the reducer code run on the hadoop cluster, where as the driver runs on the client system. The driver is used to define the job details such as location of the data, input format of the data, mapper class, reducer class, output format, etc.,. The default input format is TextInputFormat in which the line in a file is the value and the byte offset is the key. Both the mappers and reducers deal with key value pairs. Key is an object of type WritableComparable and value is an object of type Writable to support serialization across hadoop nodes. Hadoop API comes with several WritableComparable classes such as IntWritable, LongWritable, Text, etc., that can be used directly in map reduce program.

 import java.io.IOException;  
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.StringTokenizer;
public class HelloWordCount {
public static class HelloWordCountMapper
extends Mapper<Object, Text, Text, IntWritable>{
private IntWritable one = new IntWritable(1);
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String sentence = value.toString();
StringTokenizer tokens = new StringTokenizer(sentence);
while (tokens.hasMoreTokens())
{
String tkn = tokens.nextToken();
context.write(new Text(tkn), one);
}
}
}
public static class HelloWordCountReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int count = 0;
for(IntWritable value : values)
{
count = value.get() + count;
}
result.set(count);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
if (args.length != 2) {
System.err.println("Arguments: <in> <out>");
System.exit(2);
}
conf.set("mapred.job.tracker", "local");
conf.set("fs.default.name", "file:///");
Job job = new Job(conf, "WordCount");
job.setJarByClass(HelloWordCount.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(HelloWordCountMapper.class);
job.setCombinerClass(HelloWordCountReducer.class);
job.setReducerClass(HelloWordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Sqoop

Sqoop is used to transfer data between relational databases and HDFS. A sqoop job is decomposed into a map reduce job. The sqoop map-reduce job makes a jdbc connection to the database and retrives data for import. To avoid problems such as Denial of service on RDBMS, sqoop caps the number of mappers and the default is 4. Using sqoop, we can import/export single table or multiple tables from/to the database. 

Oozie

There are three pieces to map reduce programs - the driver class, the mapper and the reducer. For a simple task, a simple map reduce program is good. But, for complex problems, a single map reduce job may not be sufficient. You may have to create a chain of map reduce jobs and moreover, these may not be linear (meaning output of next job simply depends on the previous job). There could be a complex dependency chain which could also involve running several jobs in parallel. Moreover, you may also want run hive, pig and other streaming jobs. Oozie accomplishes these goals. Oozie is the work-flow engine to manage hadoop jobs. It helps in managing complex workflows and frequency based execution. When you use Oozie, you don't use driver class, instead, you will provide details of the job to Oozie and Oozie will execute it for you. Oozie work flows are written using XML. The three main components of Oozie are: .properties file, workflow XML and libraries. The .properties file is used for specifying the parameters needed for running the workflow. The workflow xml file is used for specifying execution control flow and actions. The libraries folder contains the libraries used in the workflow. Oozie uses hadoop distributed cache to distribute these libraries to processing nodes.

Monday, November 5, 2012

Hive


Hive is the dataware housing system for Hadoop. It provides a way to issue SQL like statements to Hadoop. Hive provides a mechanism to project structure onto data. In a core hadoop setup, user has to write map reduce programs in java or use scripting languages and stream them using hadoop streaming. So, for users who are familiar with SQL language, hive provides an abstraction on top of map reduce. Hive query language is known as HQL, is very similar to SQL, though, it does not support correlated subqueries and other sql constructs. Hive will decompose a HQL statement into map reduce job and execute it on hadoop cluster. Hadoop cluster itself is agnostic to Hive. For hadoop, hive is like any other map reduce program using data from hdfs. Hive creates a directory by name /user/hive/warehouse to store tables. So, a table is just another folder with some data files in hdfs.  Partitions are stored in the subfolder under a table folder. Since, hdfs is write-once file system, there is no support for modifying or deleting individual records through hive.
Hive surfaces relational concepts such as tables and columns on the data in hadoop. This data model is stored in hive meta store. Meta store is a repository for hive meta data and is stored in a relational database. By default, hive uses derby. The data in hive table can be partitioned by date. In addition, hive also allows data to be further partitioned into buckets. Bucketing helps with sampling and predictive algorithms. At data load time, Hive does not check for correctness of the structure of the data when loading into tables.
 
Hive supports following data types :
TINYINT - 1 byte integer
SMALLINT - 2 byte integer
INT - 4 byte integer
BIGINT - 8 byte integer
BOOLEAN - TRUE/FALSE
FLOAT - single precision
DOUBLE - Double precision
STRING

Flume

Flume is a useful tool to import real-time data into hadoop. For example, most software systems and engines have a running log file to indicate the activity. Using flume, the log records can be gathered and inserted into hdfs as they are produced by source systems

HBase


HBase provides a columnar storage. Essentially, it provides a key-value view of the data. The data itself can be organized as column families. Within each column family, the data is stored as byte array with zero or more columns. So, one can have million column rows and still have a low footprint especially, if your data is sparse. Nulls are free. HBase provides random real time read/write access. HBase has a very light schema.

Sunday, November 4, 2012

Pig Installation


Pre-requistes: Hadoop should have been installed on your system.
Also, check if hadoop is in the path and HADOOP_HOME environment variable is set. 
Download Pig from apache website. 
Explode it /opt folder.  
Add /opt/pig-0.9.2/bin to Path. 
Once pig is added to path, you can start pig grunt command line by typing 'pig'.

Hive Installation


Hive Installation

Pre-requistes: Hadoop should have been installed on your system. Also, check if hadoop is in the path and HADOOP_HOME environment variable is set.
Hive requires a metastore for storing metadata information. This installation of hive will use embedded derby database.
Download hive from apache website. I have downloaded hive-0.8.0-bin.tar.gz. 
Explode it /opt folder.
Create tmp folder in hdfs and set permissions (if one does not already exists):
hadoop fs -mkdir  /tmp
hadoop fs -chmod g+w   /tmp
Create a /user/hive/warehouse folder and set permission as follows:
hadoop fs -mkdir /user/hive/warehouse
hadoop fs -chmod g+w /user/hive/warehouse
To start hive, go to /opt/hive-0.8.0-bin/bin directory and run ./hive. This will show a hive prompt.
Here are some simple hive operations to verify if your instance of Hive is working correctly :
create table customer(id INT, name STRING, address STRING);
-> This creates a table by name customer with id, name and address fields.
show tables;
-> This lists all the tables that were created.
alter table customer add columns(zipcode INT);
--> This adds a column zipcode to the customer table.
describe customer;
--> shows details about customer table including fields and their types.

Hadoop Installation


Install Java :

Download JDK1.6 and extracted it to \opt\java\jdk1.6.0_29 by running ./jdk-6u29-linux-i586.bin. To run java from any location, let’s create a symbolic link to it in /usr/bin as follows :
cd /usr/bin
sudo ln -s /opt/java/jdk1.6.0_29/bin/java java
This will ensure that Java can be run from any folder. Next, we want to set JAVA_HOME as an environment variable. Login as root and  gedit /etc/environment and append :export JAVA_HOME="/opt/java/jdk1.6.0_29 as follows :
Logout and log back in for the values to take effect.

Setup SSH :

Check if sshd is running by typing pgrep sshd.  If you get empty output, that means, sshd is not running.
Login as root and start sshd by typing : /etc/init.d/sshd start
To automatically start sshd on next reboot, type:  chkconfig sshd on
To generate ssh key, type : ssh-keygen -t rsa -P "" 
Next, you need to copy the generated rsa file to ssh's authorized keys folder :
cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys
(Note: If you have data nodes on different machines, then you will need to copy (using scp) id_rsa.pub to each data nodes’ authorized_keys folder).
Next, type ssh localhost and say 'Yes' to ‘Are you sure you want continue connecting ?'
The call "ssh localhost" should not be prompting for password. If it does, then, you may need to chage the permissions of ~/.ssh/authorized_keys folder to 600. Otherwise, you will be prompted password when starting hadoop scripts.

Hadoop Installation :

Download hadoop-0.20.2 from apache site and unzip it to /opt/hadoop-0.20.2. Edit the following configuration files in /opt/hadoop-0.20.2/conf folder :
core-site.xml :
  <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost:9000</value>
  </property>
hdfs-site.xml (I added dfs.name.dir and dfs.data.dir, because, the default tmp location contents gets erased after reboot):
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
 <property> 
   <name>dfs.name.dir</name> 
   <value>/home/user/hadoopFiles/dfs/name</value> 
 </property> 
 <property> 
   <name>dfs.data.dir</name> 
   <value>/home/user/hadoopFiles/dfs/data</value> 
 </property>
mapred-site.xml :
  <property>
    <name>mapred.job.tracker</name>
    <value>localhost:9001</value>
  </property>
Next, we are going to format the Name node from /opt/hadoop-0.20.2 by running :
bin/hadoop namenode -format
To start hadoop , go to /opt/ hadoop-0.20.2 and run bin/start-all.sh
To check if the hadoop cluster is running, you can go to browser and check Namenode at: http://localhost:50070/ and jobtracker at: http://localhost:50030/
  
To shutdown the hadoop cluster, run : bin/stop-all.sh