A data flow language and execution environment for exploring very large datasets.Pig runs on HDFS and MapReduce clusters.
Pig has two execution types or modes: local mode and MapReduce mode.

Local Mode: To run the scripts in local mode, no Hadoop or HDFS installation is required. All files are installed and run from your local host and file system.
Mapreduce Mode: To run the scripts in mapreduce mode, you need access to a Hadoop cluster and HDFS installation.

Local mode
In local mode, Pig runs in a single JVM and accesses the local filesystem. This mode is suitable only for small datasets and when trying out Pig. The execution type is set using the -x or -exectype option. To run in local mode, set the option to local: % pig -x local

MapReduce mode
To run the scripts in MapReduce mode

Running the Pig Scripts in Local Mode
To run the Pig scripts in local mode, do the following:
1. Move to the pigtmp directory.
2. Execute the following command using script1-local.pig (or script2-local.pig).
$ pig -x local script1-local.pig
The output may contain a few Hadoop warnings which can be ignored:
3. A directory named script1-local-results.txt (or script2-local-results.txt) is created. This directory contains the results file, part-r-0000.

Running the Pig Scripts in Mapreduce Mode
To run the Pig scripts in mapreduce mode, do the following:
1. Move to the pigtmp directory.
2. Copy the excite.log.bz2 file from the pigtmp directory to the HDFS directory.
$ hadoop fs –copyFromLocal excite.log.bz2 .

What is Flume

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data soon after the data is produced.. It has a simple and flexible architecture based on streaming data flows.It uses a simple extensible data model that allows for online analytic applications.
The primary use case for Flume is as a logging system that gathers a set of log files on every machine in a cluster and aggregates them to a centralized persistent store such as the Hadoop Distributed File System (HDFS).

Flume can only write data through a flow at the rate that the final destinations can accept. Although Flume is able to buffer data inside a flow to smooth out high-volume bursts, the output rate needs to be equal on average to the input rate to avoid log jams. Thus, writing to a scalable storage tier is advisable. For example, HDFS has been shown to scale to thousands of machines and can handle many petabytes of data.
HDFS is the primary output destination, events can be sent to local files, or to monitoring and alerting applications such as Ganglia or communication channels such as IRC.HDFS, MapReduce, and Hive, Flume provides simple mechanisms for output file management and output format management. Data gathered by Flume can be processed easily with Hadoop and Hive.

Flume Architecture

Flume’s architecture is simple, robust, and flexible. The main abstraction in Flume is a stream-oriented data flow. A data flow describes the way a single stream of data is transferred and processed from its point of generation to its eventual destination.Data flows are composed of logical nodes that can transform or aggregate the events they receive.Controlling all this is the Flume Master.
Flume that collects log data from a set of application servers. The deployment consists of a number of logical nodes, arranged into three tiers. The first tier is the agent tier. Agent nodes are typically installed on the machines that generate the logs and are your data’s initial point of contact with Flume. They forward data to the next tier of collector nodes, which aggregate the separate data flows and forward them to the final storage tier.

For example, the agents could be machines listening for syslog data or monitoring the logs of a service such as a web server or the Hadoop JobTracker. The agents produce streams of data that are sent to the collectors; the collectors then aggregate the streams into larger streams which can be written efficiently to a storage tier such as HDFS.
Logical nodes are a very flexible abstraction. Every logical node has just two components - a source and a sink. The source tells a logical node where to collect data, and the sink tells it where to send the data. The only difference between two logical nodes is how the source and sink are configured.

Logical and Physical Nodes
One physical node per machine. Physical nodes act as containers for logical nodes, which are wired together to form data flows. Each physical node can play host to many logical nodes, and takes care of arbitrating the assignment of machine resources between them. So, although the agents and the collectors in the preceding example are logically separate processes, they could be running on the same physical node. Flume gives users the flexibility to specify where the computation and data transfer are done.
Flume is designed with four key goals
Reliability, the ability to continue delivering events in the face of failures without losing data.Flume can guarantee that all data received by an agent node will eventually make it to the collector at the end of its flow as long as the agent node keeps running. That is, data can be reliably delivered to its eventual destination.There are three supported reliability levels:
1)End-to-end :-reliability level guarantees that once Flume accepts an event.The first thing the agent does in this setting is write the event to disk in a 'write-ahead log' (WAL) so that, if the agent crashes and restarts, knowledge of the event is not lost.After the event has successfully made its way to the end of its flow, an acknowledgment is sent back to the originating agent so that it knows it no longer needs to store the event on disk.
2)Store on failure  :-reliability level causes nodes to only require an acknowledgement from the node one hop downstream. If the sending node detects a failure, it will store data on its local disk until the downstream node is repaired, or an alternate downstream destination can be selected. While this is effective, data can be lost if a compound or silent failure occurs.
3)Best effort :-reliability level sends data to the next hop with no attempts to confirm or retry delivery. If nodes fail, any data that they were in the process of transmitting or receiving can be lost. This is the weakest reliability level, but also the most lightweight.

Scalability :-Scalability is the ability to increase system performance linearly - or better - by adding more resources to the system.When load increases, it is simple to add more resources to the system in the form of more machines to handle the increased load.

Manageability :-Manageability is the ability to control data flows, monitor nodes, modify settings, and control outputs of a large system.The Flume Master is the point where global state such as the data flows can be managed. Via the Flume Master, users can monitor flows and reconfigure them on the fly. The Flume Master has the information required to automatically respond to system changes such as load imbalances, partial failures, or newly provisioned hardware.

Extensibility :-Extensibility is the ability to add new functionality to a system. For example, you can extend Flume by adding connectors to existing storage layers or data platforms. Some general sources include files from the file system, syslog and syslog-ng emulation, or the standard output of a process.

Flume Daemons

Flume Daemons are Flume Master and Flume Node

The Flume Master is the central management point and controls the data flows of the nodes. It is the single logical entity that holds global state data and controls the Flume node data flows and monitors Flume nodes.The nodes periodically contact the Master to transmit a heartbeat and to get their data flow configuration.Flume system is working, you must start a single Flume Master and some Flume nodes that interact with the Master.

The Master can be manually started by executing the following command:
$ flume master
After the Master is started, you can access it by pointing a web browser to http://localhost:35871/. This web page displays the status of all Flume nodes that have contacted the Master, and shows each node’s currently assigned configuration. When you start this up without Flume nodes running, the status and configuration tables will be empty.
The web page contains four tables the Node status table, the Node configuration table, the Physical/Logical Node mapping table, and a Command history table.

The Flume NodeTo start a Flume node, invoke the following command
$ flume node_nowatch

On the master, a node can be in one of several states

HELLO : A new node instance initially contacted the master.
IDLE : A node has completed its configuration or has no configuration.
CONFIGURING: A node has received a configuration and is activating the configuration.
ACTIVE: A node is actively pulling data from the source and pushing data into the sink.
LOST: A node has not contacted the master for an extended period of time (default is after 10x the expected heartbeat period 50s by default)
DECOMMISSIONED: A node has been purposely decommissioned from a master.
ERROR: A node has stopped in an error state.

Flume Commands

$ flume dump console

Reading from a text file, text
$ flume dump 'text("/etc/services")'

Tailing a file name
$ flume dump 'tail("testfile")'

create and write data to the file:
$ echo Hello world! >> testfile

delete the file
$ rm testfile

multiple files by file name
$ flume dump 'multitail("test1", "test2")'

send it data coming from the two different files
$ echo Hello world test1! >> test1
$ echo Hello world test2! >> test2

Newlines are excluded from the events.
tail("file", delim="\n\n+", delimMode="exclude")

appends the delimiter to the previous event
tail("file", delim="</a>", delimMode="prev")

dump in a log file that starts with four digits (like a year from a date stamp)
tail("file", delim="\\n\\d\\d\\d\\d", delimMode="next")

start a traditional syslog-like UDP server listening on port 5140
$ flume dump 'syslogUdp(5140)'
$ flume dump 'syslogTcp(5140)'

Flume Event
console ,text("filename") ,tail("filename"),multitail("file1"[, "file2"[, …]]),asciisynth(msg_count,msg_size),syslogUdp(port),syslogTcp(port)

Sqoop is a tool for efficiently moving data between relational databases and HDFS.
Sqoop provides a method to import or export data from tables in a relational database into HDFS and viseversa.Populate database tables from files in HDFS.It is an open source tool written at Cloudera.

Sqoop (SQL-to-Hadoop) is a straightforward command-line tool with the following capabilities:
  • Imports individual tables or entire databases to files in HDFS
  • Generates Java classes to allow you to interact with your imported data
  • Provides the ability to import from SQL databases straight into your Hive data warehouse
After setting up an import job in Sqoop, you can get started working with SQL database-backed data from your Hadoop MapReduce cluster in minutes.

Importing this table into HDFS could be done with the command:
you@db$ sqoop --connect jdbc:mysql:// --table USERS \--local --hive-import

This would connect to the MySQL database on this server and import the USERS table into HDFS. The –-local option instructs Sqoop to take advantage of a local MySQL connection which performs very well. The –-hive-import option means that after reading the data into HDFS, Sqoop will connect to the Hive metastore, create a table named USERS with the same columns and types (translated into their closest analogues in Hive), and load the data into the Hive warehouse directory on HDFS (instead of a subdirectory of your HDFS home directory).

store the data in SequenceFiles
you@db@ sqoop --connect jdbc:mysql:// --table USERS \--as-sequencefile

Imports tables from an RDBMS into HDFS
– Just one table
– All tables in a database
– Just portions of a table
– It supports a WHERE clause

Sqoop: Importing To Hive Tables
The Sqoop option --hive-import will automatically create a Hive table from the imported data
– Imports the data
– Generates the Hive CREATE TABLE statement
– Runs the statement
– Note: This will move the imported table into Hive’s warehouse directory

Imports data to HDFS as delimited text files or SequenceFiles, default is a comma-delimited text file.
Sqoop can take data from HDFS and insert it into an alreadyexisting table in an RDBMS with the command
sqoop export [options]

Sqoop Basic Syntax

sqoop tool-name [tool-options]
tool-names are import,import-all-tables,list-tables
tool-options are --connect, --username ,--password
Example: import a table called example from a database called sample in a MySQL RDBMS
sqoop import --username prince --password test \--connect jdbc:mysql:// example--where "id > 1000"
sqoop help command

What is HBase?

HBase is a database: the Hadoop database.It is indexed by rowkey, column key, and timestamp.HBase stores structured and semistructured data naturally so you can load it with tweets and parsed log files and a catalog of all your products right along with their customer reviews. It can store unstructured
data too, as long as it’s not too large.

HBase is designed to run on a cluster of computers instead of a single computer.The cluster can be built using commodity hardware; HBase scales horizontally as you add more machines to the cluster. Each node in the cluster provides a bit of storage, a bit of cache, and a bit of computation as well. This makes HBase incredibly flexible and forgiving. No node is unique, so if one of those machines breaks down, you simply replace it with another. This adds up to a powerful, scalable approach to data that,until now, hasn’t been commonly available to mere mortals.

Hbase Architecture

HBase Tables and Regions

Table is made up of any number of regions.
Region is specified by its startKey and endKey.
  • Empty table: (Table, NULL, NULL)
  • Two-region table: (Table, NULL, “com.ABC.www”) and (Table, “com.ABC.www”, NULL)
Each region may live on a different node and is made up of several HDFS files and blocks, each of which is replicated by Hadoop

HBase Tables:-
  • Tables are sorted by Row in lexicographical order
  • Table schema only defines its column families
  • Each family consists of any number of columns
  • Each column consists of any number of versions
  • Columns only exist when inserted, NULLs are free
  • Columns within a family are sorted and stored together
  • Everything except table names are byte[]
  • Hbase Table format (Row, Family:Column, Timestamp) -> Value

HBase uses HDFS as its reliable storage layer.It Handles checksums, replication, failover

Hbase consists of,
  • Java API, Gateway for REST, Thrift, Avro
  • Master manages cluster
  • RegionServer manage data
  • ZooKeeper is used the “neural network” and coordinates cluster
Data is stored in memory and flushed to disk on regular intervals or based on size
  • Small flushes are merged in the background to keep number of files small
  • Reads read memory stores first and then disk based files second
  • Deletes are handled with “tombstone” markers
After data is written to the WAL the RegionServer saves KeyValues in memory store
  • Flush to disk based on size, is hbase.hregion.memstore.flush.size
  • Default size is 64MB
  • Uses snapshot mechanism to write flush to disk while still serving from it and accepting new data at the same time
Two types: Minor and Major Compactions
Minor Compactions
  • Combine last “few” flushes
  • Triggered by number of storage files
Major Compactions
  • Rewrite all storage files
  • Drop deleted data and those values exceeding TTL and/or number of versions
Key Cardinality:-
The best performance is gained from using row keys
  • Time range bound reads can skip store files
  • So can Bloom Filters
  • Selecting column families reduces the amount of data to be scanned    

Fold, Store, and Shift:-
All values are stored with the full coordinates,including: Row Key, Column Family, Column Qualifier, and Timestamp
  • Folds columns into “row per column”
  • NULLs are cost free as nothing is stored
  • Versions are multiple “rows” in folded table
Stands for Denormalization, Duplication and Intelligent Keys
Block Cache
Region Splits

Hbase shell and Commands

Hbase Install

$ mkdir hbase-install
$ cd hbase-install
$ wget
$ tar xvfz hbase-0.92.1.tar.gz

configuration changes in Hbase<property>

$ hbase shell
hbase(main):001:0> list
0 row(s) in 0.5710 seconds

Create a table
hbase(main):002:0> create 'mytable', 'cf'
hbase(main):003:0> list
1 row(s) in 0.0080 seconds

hbase(main):004:0> put 'mytable', 'first', 'cf:message', 'hello HBase'

hbase(main):007:0> get 'mytable', 'first'
hbase(main):008:0> scan 'mytable'

describe table
hbase(main):003:0> describe 'users'
{NAME => 'users', FAMILIES => [{NAME => 'info', true ,BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0 , COMPRESSION => 'NONE', VERSIONS => '3', TTL=> '2147483647',
BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]}
1 row(s) in 0.0330 seconds

Configurable block size
hbase(main):002:0> create 'mytable',{NAME => 'colfam1', BLOCKSIZE => '65536'}

Block cache
workloads don’t benefit from putting data into a read cache—for instance, if a certain table or column family in a table is only accessed for sequential scans or isn’t
accessed a lot and you don’t care if Gets or Scans take a little longer.By default, the block cache is enabled. You can disable it at the time of table creation
or by altering the table:
hbase(main):002:0> create 'mytable',{NAME => 'colfam1', BLOCKCACHE => 'false’}

Aggressive caching
You can choose some column families to have a higher priority in the block cache (LRU cache). This comes in handy if you expect more random reads on one column family compared to another. This configuration is also done at table-instantiation time:
hbase(main):002:0> create 'mytable',
{NAME => 'colfam1', IN_MEMORY => 'true'}
The default value for the IN_MEMORY parameter is false.

Bloom filters
hbase(main):007:0> create 'mytable',{NAME => 'colfam1', BLOOMFILTER => 'ROWCOL'}
The default value for the BLOOMFILTER parameter is NONE. A row-level bloom filter is enabled with ROW, and a qualifier-level bloom filter is enabled with ROWCOL. The rowlevel bloom filter checks for the non-existence of the particular rowkey in the block,and the qualifier-level bloom filter checks for the non-existence of the row and column qualifier combination. The overhead of the ROWCOL bloom filter is higher than that of the ROW bloom filter.

TTL (Time To Live)
can set the TTL while creating the table like this:
hbase(main):002:0> create 'mytable', {NAME => 'colfam1', TTL => '18000'}
This command sets the TTL on the column family colfam1 as 18,000 seconds = 5 hours. Data in colfam1 that is older than 5 hours is deleted during the next major compaction.

can enable compression on a column family when creating tables like this:
hbase(main):002:0> create 'mytable',
{NAME => 'colfam1', COMPRESSION => 'SNAPPY'}
Note that data is compressed only on disk. It’s kept uncompressed in memory (Mem-Store or block cache) or while transferring over the network.

Cell versioning
Versions are also configurable at a column family level and can be specified at
the time of table instantiation:
hbase(main):002:0> create 'mytable', {NAME => 'colfam1', VERSIONS => 1}
hbase(main):002:0> create 'mytable',
{NAME => 'colfam1', VERSIONS => 1, TTL => '18000'}
hbase(main):002:0> create 'mytable', {NAME => 'colfam1', VERSIONS => 5,

Description of a table
hbase(main):004:0> describe 'follows'
{NAME => 'follows', coprocessor$1 => 'file:///U true users/ndimiduk/repos/hbaseia twitbase/target/twitbase-1.0.0.jar|HBaseIA.TwitBase.coprocessors.FollowsObserver|1001|', FAMILIES => [{NAME => 'f', BLOOMFILTER => 'NONE', REPLICATION_SCOPE =>'0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]}
1 row(s) in 0.0330 seconds

Tuning HBase
hbase(main):003:0> help 'status'

hbase(main):019:0> split 'mytable' , 'G'
Alter table
hbase(main):020:0> alter 't', NAME => 'f', VERSIONS => 1

hbase(main):023:0> truncate 't'
Truncating 't' table (it may take a while):
- Disabling table...
- Dropping table...
- Creating table...
0 row(s) in 14.3190 seconds

Hbase Data model

Hbase Data model - These six concepts form the foundation of HBase.

Table— HBase organizes data into tables. Table names are Strings and composed of characters that are safe for use in a file system path.
Row— Within a table, data is stored according to its row. Rows are identified uniquely by their rowkey. Rowkeys don’t have a data type and are always treated as a
Column family— Data within a row is grouped by column family. Column families also impact the physical arrangement of data stored in HBase. For this reason,they must be
defined up front and aren’t easily modified. Every row in a table has the same column families, although a row need not store data in all its families.Column family
names are Strings and composed of characters that are safe for use in a file system path.
Column qualifier— Data within a column family is addressed via its column qualifier,or column. Column qualifiers need not be specified in advance. Column qualifiers
need not be consistent between rows. Like rowkeys, column qualifiers don’t have a data type and are always treated as a byte[].
Cell— A combination of rowkey, column family, and column qualifier uniquely identifies a cell. The data stored in a cell is referred to as that cell’s value. Values
also don’t have a data type and are always treated as a byte[].
Version— Values within a cell are versioned. Versions are identified by their timestamp,a long. When a version isn’t specified, the current timestamp is used as the
basis for the operation. The number of cell value versions retained by HBase is configured via the column family. The default number of cell versions is three.