Wednesday, March 27, 2013


A data flow language and execution environment for exploring very large datasets.Pig runs on HDFS and MapReduce clusters.
Pig has two execution types or modes: local mode and MapReduce mode.

Local Mode: To run the scripts in local mode, no Hadoop or HDFS installation is required. All files are installed and run from your local host and file system.
Mapreduce Mode: To run the scripts in mapreduce mode, you need access to a Hadoop cluster and HDFS installation.

Local mode
In local mode, Pig runs in a single JVM and accesses the local filesystem. This mode is suitable only for small datasets and when trying out Pig. The execution type is set using the -x or -exectype option. To run in local mode, set the option to local: % pig -x local

MapReduce mode
To run the scripts in MapReduce mode

Running the Pig Scripts in Local Mode
To run the Pig scripts in local mode, do the following:
1. Move to the pigtmp directory.
2. Execute the following command using script1-local.pig (or script2-local.pig).
$ pig -x local script1-local.pig
The output may contain a few Hadoop warnings which can be ignored:
3. A directory named script1-local-results.txt (or script2-local-results.txt) is created. This directory contains the results file, part-r-0000.

Running the Pig Scripts in Mapreduce Mode
To run the Pig scripts in mapreduce mode, do the following:
1. Move to the pigtmp directory.
2. Copy the excite.log.bz2 file from the pigtmp directory to the HDFS directory.
$ hadoop fs –copyFromLocal excite.log.bz2 .

Tuesday, March 26, 2013

What is Flume

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data soon after the data is produced.. It has a simple and flexible architecture based on streaming data flows.It uses a simple extensible data model that allows for online analytic applications.
The primary use case for Flume is as a logging system that gathers a set of log files on every machine in a cluster and aggregates them to a centralized persistent store such as the Hadoop Distributed File System (HDFS).

Flume can only write data through a flow at the rate that the final destinations can accept. Although Flume is able to buffer data inside a flow to smooth out high-volume bursts, the output rate needs to be equal on average to the input rate to avoid log jams. Thus, writing to a scalable storage tier is advisable. For example, HDFS has been shown to scale to thousands of machines and can handle many petabytes of data.
HDFS is the primary output destination, events can be sent to local files, or to monitoring and alerting applications such as Ganglia or communication channels such as IRC.HDFS, MapReduce, and Hive, Flume provides simple mechanisms for output file management and output format management. Data gathered by Flume can be processed easily with Hadoop and Hive.

Flume Architecture

Flume’s architecture is simple, robust, and flexible. The main abstraction in Flume is a stream-oriented data flow. A data flow describes the way a single stream of data is transferred and processed from its point of generation to its eventual destination.Data flows are composed of logical nodes that can transform or aggregate the events they receive.Controlling all this is the Flume Master.
Flume that collects log data from a set of application servers. The deployment consists of a number of logical nodes, arranged into three tiers. The first tier is the agent tier. Agent nodes are typically installed on the machines that generate the logs and are your data’s initial point of contact with Flume. They forward data to the next tier of collector nodes, which aggregate the separate data flows and forward them to the final storage tier.

For example, the agents could be machines listening for syslog data or monitoring the logs of a service such as a web server or the Hadoop JobTracker. The agents produce streams of data that are sent to the collectors; the collectors then aggregate the streams into larger streams which can be written efficiently to a storage tier such as HDFS.
Logical nodes are a very flexible abstraction. Every logical node has just two components - a source and a sink. The source tells a logical node where to collect data, and the sink tells it where to send the data. The only difference between two logical nodes is how the source and sink are configured.

Logical and Physical Nodes
One physical node per machine. Physical nodes act as containers for logical nodes, which are wired together to form data flows. Each physical node can play host to many logical nodes, and takes care of arbitrating the assignment of machine resources between them. So, although the agents and the collectors in the preceding example are logically separate processes, they could be running on the same physical node. Flume gives users the flexibility to specify where the computation and data transfer are done.
Flume is designed with four key goals
Reliability, the ability to continue delivering events in the face of failures without losing data.Flume can guarantee that all data received by an agent node will eventually make it to the collector at the end of its flow as long as the agent node keeps running. That is, data can be reliably delivered to its eventual destination.There are three supported reliability levels:
1)End-to-end :-reliability level guarantees that once Flume accepts an event.The first thing the agent does in this setting is write the event to disk in a 'write-ahead log' (WAL) so that, if the agent crashes and restarts, knowledge of the event is not lost.After the event has successfully made its way to the end of its flow, an acknowledgment is sent back to the originating agent so that it knows it no longer needs to store the event on disk.
2)Store on failure  :-reliability level causes nodes to only require an acknowledgement from the node one hop downstream. If the sending node detects a failure, it will store data on its local disk until the downstream node is repaired, or an alternate downstream destination can be selected. While this is effective, data can be lost if a compound or silent failure occurs.
3)Best effort :-reliability level sends data to the next hop with no attempts to confirm or retry delivery. If nodes fail, any data that they were in the process of transmitting or receiving can be lost. This is the weakest reliability level, but also the most lightweight.

Scalability :-Scalability is the ability to increase system performance linearly - or better - by adding more resources to the system.When load increases, it is simple to add more resources to the system in the form of more machines to handle the increased load.

Manageability :-Manageability is the ability to control data flows, monitor nodes, modify settings, and control outputs of a large system.The Flume Master is the point where global state such as the data flows can be managed. Via the Flume Master, users can monitor flows and reconfigure them on the fly. The Flume Master has the information required to automatically respond to system changes such as load imbalances, partial failures, or newly provisioned hardware.

Extensibility :-Extensibility is the ability to add new functionality to a system. For example, you can extend Flume by adding connectors to existing storage layers or data platforms. Some general sources include files from the file system, syslog and syslog-ng emulation, or the standard output of a process.

Flume Daemons

Flume Daemons are Flume Master and Flume Node

The Flume Master is the central management point and controls the data flows of the nodes. It is the single logical entity that holds global state data and controls the Flume node data flows and monitors Flume nodes.The nodes periodically contact the Master to transmit a heartbeat and to get their data flow configuration.Flume system is working, you must start a single Flume Master and some Flume nodes that interact with the Master.

The Master can be manually started by executing the following command:
$ flume master
After the Master is started, you can access it by pointing a web browser to http://localhost:35871/. This web page displays the status of all Flume nodes that have contacted the Master, and shows each node’s currently assigned configuration. When you start this up without Flume nodes running, the status and configuration tables will be empty.
The web page contains four tables the Node status table, the Node configuration table, the Physical/Logical Node mapping table, and a Command history table.

The Flume NodeTo start a Flume node, invoke the following command
$ flume node_nowatch

On the master, a node can be in one of several states

HELLO : A new node instance initially contacted the master.
IDLE : A node has completed its configuration or has no configuration.
CONFIGURING: A node has received a configuration and is activating the configuration.
ACTIVE: A node is actively pulling data from the source and pushing data into the sink.
LOST: A node has not contacted the master for an extended period of time (default is after 10x the expected heartbeat period 50s by default)
DECOMMISSIONED: A node has been purposely decommissioned from a master.
ERROR: A node has stopped in an error state.

Flume Commands

$ flume dump console

Reading from a text file, text
$ flume dump 'text("/etc/services")'

Tailing a file name
$ flume dump 'tail("testfile")'

create and write data to the file:
$ echo Hello world! >> testfile

delete the file
$ rm testfile

multiple files by file name
$ flume dump 'multitail("test1", "test2")'

send it data coming from the two different files
$ echo Hello world test1! >> test1
$ echo Hello world test2! >> test2

Newlines are excluded from the events.
tail("file", delim="\n\n+", delimMode="exclude")

appends the delimiter to the previous event
tail("file", delim="</a>", delimMode="prev")

dump in a log file that starts with four digits (like a year from a date stamp)
tail("file", delim="\\n\\d\\d\\d\\d", delimMode="next")

start a traditional syslog-like UDP server listening on port 5140
$ flume dump 'syslogUdp(5140)'
$ flume dump 'syslogTcp(5140)'

Flume Event
console ,text("filename") ,tail("filename"),multitail("file1"[, "file2"[, …]]),asciisynth(msg_count,msg_size),syslogUdp(port),syslogTcp(port)

Monday, March 25, 2013


Sqoop is a tool for efficiently moving data between relational databases and HDFS.
Sqoop provides a method to import or export data from tables in a relational database into HDFS and viseversa.Populate database tables from files in HDFS.It is an open source tool written at Cloudera.

Sqoop (SQL-to-Hadoop) is a straightforward command-line tool with the following capabilities:
  • Imports individual tables or entire databases to files in HDFS
  • Generates Java classes to allow you to interact with your imported data
  • Provides the ability to import from SQL databases straight into your Hive data warehouse
After setting up an import job in Sqoop, you can get started working with SQL database-backed data from your Hadoop MapReduce cluster in minutes.

Importing this table into HDFS could be done with the command:
you@db$ sqoop --connect jdbc:mysql:// --table USERS \--local --hive-import

This would connect to the MySQL database on this server and import the USERS table into HDFS. The –-local option instructs Sqoop to take advantage of a local MySQL connection which performs very well. The –-hive-import option means that after reading the data into HDFS, Sqoop will connect to the Hive metastore, create a table named USERS with the same columns and types (translated into their closest analogues in Hive), and load the data into the Hive warehouse directory on HDFS (instead of a subdirectory of your HDFS home directory).

store the data in SequenceFiles
you@db@ sqoop --connect jdbc:mysql:// --table USERS \--as-sequencefile

Imports tables from an RDBMS into HDFS
– Just one table
– All tables in a database
– Just portions of a table
– It supports a WHERE clause

Sqoop: Importing To Hive Tables
The Sqoop option --hive-import will automatically create a Hive table from the imported data
– Imports the data
– Generates the Hive CREATE TABLE statement
– Runs the statement
– Note: This will move the imported table into Hive’s warehouse directory

Imports data to HDFS as delimited text files or SequenceFiles, default is a comma-delimited text file.
Sqoop can take data from HDFS and insert it into an alreadyexisting table in an RDBMS with the command
sqoop export [options]

Sqoop Basic Syntax

sqoop tool-name [tool-options]
tool-names are import,import-all-tables,list-tables
tool-options are --connect, --username ,--password
Example: import a table called example from a database called sample in a MySQL RDBMS
sqoop import --username prince --password test \--connect jdbc:mysql:// example--where "id > 1000"
sqoop help command