Tuesday, 25 August 2015

Data -> BigData -> Hadoop

 

Data is collection of fact and figures, whether processed or unprocessed, meaningful or meaningless.

With our each step toward Digitization the data is getting expanded in all the aspects whether its amount of data, speed of data streaming in or type of data.

Early days:
Web grew like anything in late 90's and early 20th century. It started from few websites/ web pages and went through dozens and then millions of web pages now. The web results returned by group of people at very beginning, but with grown web automation was needed - search engine, indexes and then web crawler were being developed.

Problem:
The problem was identified by Internet search companies that have to query very large amount of unorganized and distributed data. This actually given birth to a new concept named Big Data.

BigData:
Big Data in an organization is the huge amount of data that is unable to be processed in reasonable time using traditional method.
Big Data is a popular term used to describe the exponential growth and availability of data, both Structured and unstructured.

So, here come a question for some of us in mind what is structured and unstructured data?

Structured Data is the data that we used in traditional RDBMS(ex: SQL, MySQL etc) and is the one divided in well defined structures like table, row, columns.
Unstructured Data are the data which are in the raw form and cannot be processed by RDBMS. ex: data from facebook, twitter etc.

As per 3 v's model there are 3 dimensions of Big Data
  •         Volume: Amount of data
  •         Velocity: Data Streaming
  •         Variety: Different variety of data format.



The challenges of Big Data Management resulted from expansion of all the above 3 dimensions of Data rather than just volume.

There are other two dimensions also proposed that appears to contribute to challenges to Big Data Management . They are:
  •        Variability: Increase in the range of values typically of large data set.
  •        Value: It addresses the need for valuation of enterprise data.

Processing Big Data
There was/is need to process Big Data to extract valuable information for various benefits.
Relational DBMS/Traditional methods were helpless
The traditional systems were not capable to handle Big Data. Relational DBMS took long time to process large amount of data also Relational DBMS works well only with structured data.

This created a environment where there was need of some framework/tool that can process Big Data on which data being processed does not have consistent relationship.

Birth of Hadoop.

The internet search engine vendors actually at that time was trying to find a solution to return search result faster by distributing data and calculating across different computers so multiple task can be accomplished together. One such project was NUTCH (Web Crawler).

Taking reference to Google's map-reduce paper and Google File System, Doug Cutting worked on NUTCH and later NUTCH (a project on finding above problem's solution) got divided in to two project:
  1.     NUTCH : Web Crawler Portion
  2.     Hadoop: Distributed computing and processing.


What is

To pull increased load a giant OX is not searched instead multiple Ox should help, Similarly to store and process Big Data - Big size servers are not needed  instead multiple commodity hardware can be used. Hadoop works on the same line.

Hadoop is an open source software framework for storing data and running applications on clusters of commodity h/w. It provide massive storage for any kind of data(Big Data) , enormous processing power and ability to handle virtually limitless concurrent task or jobs.

Open source s/w:
     free to download, created and maintained by set of developers.
Framework:
    Everything needed to develop and run is available
Massive Storage:
    Hadoop breaks Big Data into Blocks, which are stores on to the clusters of commodity(primary)     h/w.
Processing Power:
    Hadoop concurrently process large amount of data using multiple low cost computer for fast results.

Pointers on Hadoop
  • Hadoop is an open source Apache Project.
  • Its written in java.
  • It is scalable and hence support high performance demanding application.
  • Storing large amount of data on file system of multiple computer is possible in Hadoop framework.
  • Its scalable
  • Error handling is performed on application layer.
  • Its based on google map reduce.
  • It is widely used for Big Data Processing


Key Features of Hadoop

Flexible
    Support multiple OS, map-reduce can be written in programming language other than java.
    Works with both structured and unstructured data.
Scalable:
     New Node can be easily added without affecting the system with any aspect.
Robust ecosystem:
     Hadoop has a very robust and rich ecosystem. Ecosystems consist of various related projects.     ex:  Hive, HBase, Pig etc
Real-time:
     Hadoop is getting more and more real time.
Cloudy:
     Hadoop is getting cloud

Hadoop had two major release
  1. Hadoop 1
  2. Hadoop 2

 Hadoop 1 :

 Hadoop consists of :

####################

HDFS (Hadoop Distributed File System) - Storage

           There are no restrictions on the data that HDFS stores. Data may be unstructured and schema less. By contrast, relational databases require that data be structured and schemas be defined before storing the data. With HDFS, making sense of the data is the responsibility of the developer’s code. It can be considered as a standard file system but it is distributed. So from the client point of view, he sees a standard file system (the one he can have on your laptop) but behind this, the file system actually runs on several machines. Thus, HDFS implements fail-over using data replication and has been designed to manipulate, store large data sets (in large file) in a write-one-read-many access model for files.

      The main components of HDFS are as described below:
  • Name Node is the master of the system. It maintains the name system (directories and files) and manages the blocks which are present on the Data Nodes.
  • Data Nodes are the slaves which are deployed on each machine and provide the actual storage. They are responsible for serving read and write requests for the clients.
  • Secondary Name Node is responsible for performing periodic checkpoints. In the event of Name Node failure, you can restart the Name Node using the checkpoint.

 
Note
    HDFS Client
      User applications access the file system using the HDFS client, a library that exports the HDFS file system interface.

Like most conventional file systems, HDFS supports operations to read, write and delete files, and operations to create and delete directories. The user references files and directories by paths in the namespace. The user application does not need to know that file system metadata and storage are on different servers, or that blocks have multiple replicas.

fs  command:
Runs a generic file system user client. so, to invoke Hadoop fs client command is: fs


Overview  (processes and file)

HDFS is made of several Java process running on separate (even if this is not necessary) physical machines and follow a Master/Slave model :

Name Node is responsible for storing the files’ metadatas (the locations of the files…) and for tracing all the operations realized on the file system. All these data is stored on the local file system of the Name Node in two files called “Edit Log” and “FSImage” :

    Edit Log: Stores all changes that occur into the file system metadatas. For instance, adding a   new file in HDFS is traced in this file. (present on local disk of name node)

    FSImage: Stores all the file system namespace : the location of files (or blocks) in the cluster (present on Local disk of name node and RAM both)

|-- edits_inprogress_0000000000000000003
|-- fsimage_0000000000000000000
|-- fsimage_0000000000000000000.md5

Data Nodes. The Data Nodes simply store the data, i.e, the files’ blocks without any knowledge of HDFS.

The files’ blocks are stored on the local file system of the Data Nodes and are replicated among the other data nodes based on the configured replication factor.


 Operations on Hdfs:
  • Write file
  • Read file


        Write File to HDFS:

1- User invokes the HDFSclient passing argument to create a file
2- The data is directly not copied to datanode, instead it is copied to the hdfs client local file till the data content reaches the block size.
3- Once data content is atleast of size of a block, HDFS Client request Namenode to write a file and calculated checksum
4- Name node make an entry to file hierarchy/ inventory and identify the Datanode and block to be used to write
5- NameNode returns the DataNode and Block Id for both primary and replica nodes.
6- HDFS Client flushes the data from local file to Datanode also stores the checksum on datanode
rw-r--r-- 1 hduser hadoop 134217728 Sep 25 10:51 blk_1073741827
rw-r--r-- 1 hduser hadoop   1048583 Sep 25 10:51 blk_1073741827_1003.meta

7- Replicating pipeline occurs that copies the data from first Datnode -> second DN -> Third DN. Also Datanode informs HDFS client of Success
8- HDFS Client Inform NameNode of success write and then NameNode make an entry to persistence store.



        Read File from HDFS:



1- User invokes hdfs client using fs command and pass the argument for file to read.
2- HDFS Client calculate the chunk index based on the offset of file pointer.
3- HDFS Client sends chunk Index to Namenode
4- Namenode sends back the primary datanode and other datanode containing replicas , block Id,
5-HDFS Client reads from Primary/First Datanaode's block Id, 
It also reads the checksum stored at the time file was created. For each block stored, an additional checksum file is stored, which is later used by hdfs client to cross check content -
-rw-r--r-- 1 hduser hadoop 134217728 Sep 25 10:51 blk_1073741827
-rw-r--r-- 1 hduser hadoop   1048583 Sep 25 10:51 blk_1073741827_1003.meta
5.5/6.5/7.5 - calculate the checksum and compare with the one retrieved from NN to see if its correct.
6- if 5 or 5.5 fails
7- if 6 or 6.5 fails
8- HDFS client returns Data to User.



Drawback of HDFS - Hadoop1:
1. Single point of failure (No standby name node i.e, no HighAvailability)
2. No scope of scalability i.e, overburdened Namenode (multiple namenode)
3. No multitenancy (No multiple  jobs can run)



Basic commands:







  • Hadoop fs -ls /user/hdfs
  • Hadoop fs -copyFromLocal test.txt /user/hsdfs


  • ###################

    Map Reduce framework - based on MapReduce Programming model


     So, lets read about MapReduce Model first,

    Map Reduce programming model

    To process the BigData Parallel Algorithm was needed,

    Multiprogramming cannot be used because it requires the data sharing and may lead to deadlock. As multiple thread may have lock on different resources depending on each other to get free.

    So, another Programming that can be used to eliminate the shared state completely so the complexity of co-ordination disappears - Functional Programming. In this programming data is passed as  parameter or return value. MapReduce is Functional Programming.

    MapReduce is a programming model designed for processing large volumes of data in parallel by dividing the work into a set of independent tasks. MapReduce programs are written in a particular style influenced by functional programming constructs, specifically idioms for processing lists of data.

    - The important innovation of MapReduce is the ability to take a query over a dataset, divide it, and run it in parallel over multiple nodes. Distributing the computation solves the issue of data too large to fit onto a single machine. Combine this technique with commodity Linux servers and you have a cost-effective alternative to massive computing arrays.

    Note
     In addition to Hadoop, you’ll find MapReduce inside MPP and NoSQL databases, such as Vertica or MongoDB.


    Map Reduce implementation in Hadoop:

    MapReduce is a framework for performing distributed data processing using the MapReduce programming paradigm. In the MapReduce paradigm, each job has a user-defined map phase (which is a parallel, share-nothing processing of input; followed by a user-defined reduce phase where the output of the map phase is aggregated). Typically, HDFS is the storage system for both input and output of the MapReduce jobs.

    The main components of MapReduce are as described below:
    • JobTracker is the master of the system which manages the jobs and resources in the clus­ter (TaskTrackers). The JobTracker tries to schedule each map as close to the actual data being processed i.e. on the TaskTracker which is running on the same DataNode as the underlying block.
    • TaskTrackers are the slaves which are deployed on each machine. They are responsible for running the map and reduce tasks as instructed by the JobTracker.
    • JobHistoryServer is a daemon that serves historical information about completed applications. Typically, JobHistory server can be co-deployed with Job­Tracker, but we recommend to run it as a separate daemon.

    The following illustration provides details of the core components for the Hadoop stack.

     

     Overview  (processes and file) 


    Map Reduce is made of several Java process running on separate (even if this is not necessary) physical machines and follow a Master/Slave model :

    1. Job Tracker - Client submits the job to Job Tracker. JobTracker allocate task to different task tracker.
    2. Task Tracker - Task tracker has multiple slots each of one can run as a JVM and behave as Mapper(responsible to run Map phase) or Reducer(responsible to run reduce phase).

      

    Operation - Map Reduce :
      
    1- User submits job passing argument input , output file, map reduce implementation etc.
    2-  User Keep Polling JobTracker to get the result.
    3- JobTracker Tals to NameNode for Block address
    4- NameNode looks into its fsimage and edit log(metadata) and return the block address
    5- JobTracker Identifies the TaskTracker nearest to Data either on same Node data resides, if not then in same rac else in other rac.
    6a, 6b-JobTracker submits the task to the Task tracker that has multiple slots each capable to run JVM as mapper and reducer. JT submits same task to another task tracker if it founds that particular tasktracker is really slow, so that the overall computation is not affected- this is call speculative execution.
    7- TaskTracker write the data onto the HDFS and notifies JobTracker , which then notifies user.


    Internal Data Processing of MapReduce 

    1. File loaded from HDFS, 
    2.Input Splitting is performed, the input split actually contains the reference to Datanode blocks. 
    Note: split() of InputFormat is used for spliting. InputSplit file size may be little less, equal or little more to Block size, so we can say almost same. It happens all because for ex: the the last line is not completely ended in a data block, so input split will include last lines remaining part from other datanode block.
    3. Using RecordReader of InputFormat then it reads each Line and invoke map() for each line. 
    4. Each map() output is written to circular buffer. 
    5. Then if the circular buffer is occupied each time by 80% or finally at last spills it is written to local disk and before writing the spill to localdisk it goes through  partitioning, sorting, combination. 
    • partitioning: an index is added to each <k,v> on basis of it willbe processed by which reducer
    • Sorting: sorting is done on basis of partitionId and then on basis of Key
    • Combination: Each similiar key's values are combined to form <k, v1,v2>

    6. Then finally merging spill files to create each mapper output. 
    7. The output for each reducer is fetched from each mapper and merge 
    8. Then reduce() is called for each key and then finally output is written to HDFS. 
    9. Later the datanode sends block report to NameNode to make it aware of changes in HDFS


                                                            Data processing in Hadoop


    Drawback of Map Reduce - Hadoop1:
        1. Single point of failure (Job Tracker).
        2. Overburdened Job Tracker.
        3. Map Reduce programming model can only be used.




    Since Hadoop 1 has got some drawback so in the next higher release i.e, Hadoop 2 they were addressed.


    Hadoop 2 :


    HDFS Hadoop 1 Drawback revisited:


    @ Drawback 1. Single point of failure (No standby name node i.e, no HighAvailability)

    Addressed: HDFS StandByNode

    HDFS StandByNode:

    Prior talking about standbynode I will like to discuss the nodes it extended for checkpointing.

                     



    The NameNode maintain its state by fsImage and since the time fsimage is created since then any operations happened will be tracked through edit logs.  Each time something is requested from namenode it first looks for it in fsimage(loaded in RAM - so its fast) and then the edit log (on local disk).
    NameNode Satte = FsImage + Edit Log(contain changes done after last fs image created).

    Secondary Name Node:
    In Hadoop 1, there was concept of  secondary name node (SNN) which kept the checkpoint of Name Node. Secondary NameNode downloads edit logs and fsimage from namenode and merge it to form new fsimage(called checkPoint) It was not capable enough to upload it to Primary nameNode,  PNN  itself fetches it.

    Checkpoint node
    The other type of checkpointing was using checkpoint node that extends the SNN features, additionally it was capable to upload fsimage to PNN. It then uploads the fsimage to PNN. which replaces its existing fsimage with new one and empty editlog file.

    BackUp Node: Online Streaming, no need to transfer file from actual NameNode, receives online updates of metadata. It then uploads the fsimage to PNN. which replaces its existing fsimage with new one and empty editlog file.

    StandByNode: This namenode provided high availability. They get the datablock reports directly from Datanodes and edit log is shared using
     a. Shared storage
     b. Quorum based storage
    It then uploads the fsimage to PNN. which replaces its existing fsimage with new one and empty editlog file.

    So, Now lets focus on addressing the HighAvailability issue of HDFS

    The HDFS HA(High Availability) addresses the above problem by providing the option of running the redundant namenode(two) in same cluster. This allows fast fail-over to a new namenode in the case that a machine crashes or a graceful maintenance is initiated.

    In this two separate m/c's are configured as NameNode. At any point of time, exactly one of the namenode is in active state and other is in standBy.

    a. Active : Responsible for all client operations.
    b. StandBy: Acting as slave, maintaining enough state to provide fast fail-over. It requires to have same h/w as Active NN.

    In order for StandBy NN to keep its state synchronized with Active NN, the current implementation requires that the two nodes both have access to shared storage device(NFS mount from a NAS) or Quorum based Storage.

    Shared Storage

    Steps:
    1. Whenever namespace is modified on Active NN its directly logs a record of modification to an edit log file stored in shared directory.
    2. StandBy NN keep an eye on log file and apply same to its namespace.
    3. In fail over condition, the StandByNN will make sure that it has read all edits and is promoted to active.
    4. Also keep a note that for fast failover it is also necessary that standby NN has upto date information of Data Blocks. So, DataNode are configured with Both NN in this case and sends heartBeat/reports to both.


    Below diagram:
    1- Active NN Writes
    2- StandBy NN Reads
    3- StandBy Uploads FsImage (new). CheckPointing
    4- Datanodes sending heartBeat/reports to both NN.

    Note:
    If using StandBy NN there is no need to use Secondary, checkopoint or backup infact thats a error.


    Quorum based Storage.

    All is same as shared storage way, but instead of shared storage,  group of daemon i.e, Journal Nodes are used.  Active NN writes to JN(Journal Node) and StandByNN reads.

    Journal Node: This are light weighted, can run on other daemon m/c's Minimum number of Journal nodes is 3, it should always be odd. So, that if StandBy NN reads changes fine from N-1/2 , it goes well else switch may be required and this reading/writing to Journal node is done using Quorum Journal Manager on each NN.


    Note:
    The switch in both the cases above have to be done manually by administrator.



    To automate this switching in the case of fail-over a new component is introduced named ZooKeeper.


    ZooKeeper(Apache ZooKeeper) -
    It is highly available service for maintaining small amount of co-ordination data, notifying clients of changes in that data and monitoring Client for failure.

    The implementation of automatic HDFS failover relies on ZooKeeper for following things:
    a. FailOver detection: 
    Each of the Nn maintains its persistent session in zookeeper service . If the NN m/c crashes the session expires notifying the other NN  that a failover should be triggered.

    b. Actice NN selection:
    If NN m/c crashes the other NN can acquire a special lock named Znode indicating zookeeper that it should be next active NN.


      Steps/ components covered:
    1. ZKFC(ZooKeeper Failover Controler): It keep pinging NN JVM to see if its healthy, if its healthy it keep maintaing a session in ZooKeeper Service
    2. The session of a NN acquires a special lock ZNode it becomes Active NN.
    3. If QJM see any problem in edit logs , it notifies to NN JVM and then NNJVm notifies to ZKFC, which deletes teh sessiona nd hence release lock.
    4. Now, other NN which acquires Lock becomes Active.


      Below image shows Components of Zookeeper - ZKFC, ZooKeeper Service:

    3. Ping - ZKFC pings for the health of NN
    5. Maintains the persisten session in case NN is healthy.
    6. NameNode that acquires lock in ZooKeeper Service becomes Active and other is StandBy.


      Fencing: It is important that at a time only one NN is active otherwise there are chances od DataLoss etc. So, if its not verified that Active NN has not given up than the fencing process cuts Active NN from shared storage.




    Drawback 2. No scope of scalability i.e, overburdened Namenode (multiple namenode)

    Addressed: HDFS Federation

    HDFS Federation
                   NN: NameNode        DN:DataNode

    • In order to scale NameNode horizontally, federation uses multiple independent namenode. These NameNodes are federated, they are independent and do not require coordination with each other.
    • DataNodes are used as common storage for blocks by all the NameNodes.
    • Each DN registers with all the NN in cluster.
    • DN sends heartbeat and block report to all NN.
    • DN also handle command from NN,

    HDFS has got two layers :
               a. NameSpace
               b. Block Storage Service  - (pool and DataNode)


    a. NameSpace
    1. Keep track of Directory, files and block
    2. All namespace related File System operations       Create, Modify, Delete,  List   - files and directories
    3. Holds FSImage and edit log.
    4. Exist in NameNode

    b. Block Storage Service

         Block Management (Pool)
    1. Exist in Name Node
    2. Data Cluster MemberShip
    3. Process Blocks
    4. Block related Operations
    5. Manage replica's
    6. component involved is Pool
         Storage
    1. In DataNode
    2. Stores Block on Local File System abnd provide R/W access.
    Note

    1. Block pool contains set of blocks belong to single NameSpace(NS).
    2. DN stores Block for all Pools in Cluster.
    3. Each block pool is managed independently.
    4. NS generate Block Id's without co-ordination with other NN.
    5. NN failure do not prevent DN from serving to other NN.


    Benefits of HDFS Federation

    • Scaling
    • Performance - R/W Throughput high
    • Isolation- Multiple Application and users works in isolation manner.




    MapReduce Hadoop 1 Drawback revisited:

    Drawback
        1. Single point of failure (Job Tracker).
        2. Overburdened Job Tracker.
        3. Map Reduce programming model can only be used.

    Addressed: YARN (Yet another Resource Manager)


    YARN:

    1. Second Generation hadoop
    2. Cluster management Technology
    3. Characterized as a Large scale, distributed OS for BigData operations.
    4. SubProject of Larger Apache Hadoop Project.
    5. Sometimes called as MapReduce2.0
    6. S/w rewrite that decoupled - resource mgmt, scheduling capabilities, Data processing component.

    Application Working in YARN
    Armed with the knowledge of the above concepts, it will be useful to sketch how applications conceptually work in YARN.
    Application execution consists of the following steps:
    • Application submission.
    • Bootstrapping the ApplicationMaster instance for the application.
    • Application execution managed by the ApplicationMaster instance.
    Let’s walk through an application execution sequence (steps are illustrated in the diagram)
    1. A client program submits the application, including the necessary specifications to launch the application-specific ApplicationMaster itself.
    2. The ResourceManager assumes the responsibility to negotiate a specified container in which to start the Application Master and then asks Node Manager to launches the Application Master.
    3. The ApplicationMaster, on boot-up, registers with the ResourceManager – the registration allows the client program to query the ResourceManager for details, which allow it to  directly communicate with its own ApplicationMaster.
    4. During normal operation the ApplicationMaster negotiates appropriate resource containers via the resource-request protocol.
    5. On successful container allocations, the ApplicationMaster launches the container by providing the container launch specification to the NodeManager. The launch specification, typically, includes the necessary information to allow the container to communicate with the ApplicationMaster itself.
    6. The application code executing within the container then provides necessary information (progress, status etc.) to its ApplicationMaster via an application-specific protocol.
    7. During the application execution, the client that submitted the program communicates directly with the ApplicationMaster to get status, progress updates etc. via an application-specific protocol.
    8. Once the application is complete, and all necessary work has been finished, the ApplicationMaster deregisters with the ResourceManager and shuts down, allowing its own container to be repurposed.


    Note:
    1. Container is same as slots (contain resources) i.e, a JVM process.
    2. Application Master is also launched in to a container
    3. If the Application master feels it alone can do data processing it do it in sequential manner i.e, mapper and then reducer. such jobs are called uber jobs. The Application master decides if it has to do it alone or not with help of threshold, for ex default number of mapper is 10.
    4. Any operation that happens on a node is done via Node Manager. As in second step, Resource manager not directly launches Application master instead it requests Node Manager to do that and Node manager thus launches Application Master

    Flow Diagram:





    Hadoop can be run in 3 different modes. Different modes of Hadoop are

    1. Standalone Mode
           - Default mode of Hadoop
           - Hadoop is configured to run in a non-distributed mode, as a single Java process
           - HDFS is not utilized in this mode.
           - Local file system is used for input and output
           - Used for debugging purpose
           - No Custom Configuration is required in 3 hadoop(mapred-site.xml,core-site.xmlhdfs-site.xml) files.
           - Standalone mode is much faster than Pseudo-distributed mode.

    2. Pseudo Distributed Mode(Single Node Cluster)
           - Configuration is required in given 3 files for this mode 
           - Replication factory is one for HDFS.
           - Here one node will be used as Master Node / Data Node / Job Tracker / Task Tracker
           - Used for Real Code to test in HDFS.
           - Each Hadoop daemon runs in a separate Java process
           - Pseudo distributed cluster is a cluster where all daemons are running on one node itself.

    3. Fully distributed mode (or multiple node cluster)
          - This is a Production Phase
          - Data are used and distributed across many nodes.
          - Different Nodes will be used as Master Node / Data Node / Job Tracker / Task Tracker

    Play around with Hadoop 

    Now, we already have got an idea of Hadoop as a product and we can do some practical.

    1. Get your hadoop setup ready - This can be done using classic Hadoop deliverable installed on Linux flavor or deploy sandbox provided by some of there partners like, cloudera, Horton's on oracle Virtual Box . I have used (Hortonwork sandbox).

    2. Once the sandbox is deployed successfully, Restart the sandbox vm.



    3. There are multiple ways to access this VM depending on our need and operation to be performed.
          a. For HDFS operations: Putty to sandbox VM as below,default  user/password: root/hadoop





            b. For copying Input File, user's jar to sandbox vm use winscp. if you are using windows for development/ your local m/c. user/pasword: root/hadoop



              c. For sandbox vm console:
                         http://127.0.0.1:8888/

              d. For metric details:
                         http://127.0.0.1:8080/
                         user/password: admin/admin

              e. All other details are here in below image.

                   



    So, our hadoop setup is ready and we have tried accessing it with different tools as putty, winscp etc which works absolutely fine.

    4. There are many sample app that come together with sandbox we can try running them first and then we will move to creating by own.

    4. Now, Next thing when we want to create a application that will run on hadoop. Please download and include in ClassPath or use maven to resolve dependencies for jars- hadoop-common-2.7.1.jar and hadoop-mapreduce-client-core-2.7.1.jar.

    5. Switch to the user hdfs.
        > su hdfs

    6.  To see the available samples run,

    Available samples:
     [hdfs@sandbox /]$ yarn jar /usr/hdp/2.3.0.0-2557/hadoop-mapreduce/hadoop-mapreduce-examples-2.7.1.2.3.0.0-2557.jar

    output
    An example program must be given as the first argument.
    Valid program names are:
      aggregatewordcount: An Aggregate based map/reduce program that counts the words in the input files.
      aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files.
      bbp: A map/reduce program that uses Bailey-Borwein-Plouffe to compute exact digits of Pi.
      dbcount: An example job that count the pageview counts from a database.
      distbbp: A map/reduce program that uses a BBP-type formula to compute exact bits of Pi.
      grep: A map/reduce program that counts the matches of a regex in the input.
      join: A job that effects a join over sorted, equally partitioned datasets
      multifilewc: A job that counts words from several files.
      pentomino: A map/reduce tile laying program to find solutions to pentomino problems.
      pi: A map/reduce program that estimates Pi using a quasi-Monte Carlo method.
      randomtextwriter: A map/reduce program that writes 10GB of random textual data per node.
      randomwriter: A map/reduce program that writes 10GB of random data per node.
      secondarysort: An example defining a secondary sort to the reduce.
      sort: A map/reduce program that sorts the data written by the random writer.
      sudoku: A sudoku solver.
      teragen: Generate data for the terasort
      terasort: Run the terasort
      teravalidate: Checking results of terasort
      wordcount: A map/reduce program that counts the words in the input files.
      wordmean: A map/reduce program that counts the average length of the words in the input files.
      wordmedian: A map/reduce program that counts the median length of the words in the input files.
      wordstandarddeviation: A map/reduce program that counts the standard deviation of the length of the words in the input files

    7. Check for input values to be passed for available sample app.
    To check for each sample inputs
    [hdfs@sandbox /]$ yarn jar /usr/hdp/2.3.0.0-2557/hadoop-mapreduce/hadoop-mapreduce-examples-2.7.1.2.3.0.0-2557.jar wordcount

    output
    Usage: wordcount <in> [<in>...] <out>

    [hdfs@sandbox /]$


    8. Run one of the available sample

    To run Word count Single file:
    [root@sandbox /]# su hdfs
    [hdfs@sandbox /]$ hadoop fs -copyFromLocal abc.txt /user/hdfs/
    [hdfs@sandbox /]$ yarn jar /usr/hdp/2.3.0.0-2557/hadoop-mapreduce/hadoop-mapreduce-examples-2.7.1.2.3.0.0-2557.jar wordcount abc.txt def.txt

    output
    15/08/17 09:46:47 INFO impl.TimelineClientImpl: Timeline service address: http://sandbox.hortonworks.com:8188/ws/v1/timeline/
    15/08/17 09:46:47 INFO client.RMProxy: Connecting to ResourceManager at sandbox.hortonworks.com/10.0.2.15:8050
    15/08/17 09:46:47 INFO input.FileInputFormat: Total input paths to process : 1
    15/08/17 09:46:48 INFO mapreduce.JobSubmitter: number of splits:1
    15/08/17 09:46:48 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1439801989509_0005
    15/08/17 09:46:48 INFO impl.YarnClientImpl: Submitted application application_1439801989509_0005
    15/08/17 09:46:48 INFO mapreduce.Job: The url to track the job: http://sandbox.hortonworks.com:8088/proxy/application_1439801989509_0005/
    15/08/17 09:46:48 INFO mapreduce.Job: Running job: job_1439801989509_0005
    15/08/17 09:46:54 INFO mapreduce.Job: Job job_1439801989509_0005 running in uber mode : false
    15/08/17 09:46:54 INFO mapreduce.Job:  map 0% reduce 0%
    15/08/17 09:46:59 INFO mapreduce.Job:  map 100% reduce 0%
    15/08/17 09:47:04 INFO mapreduce.Job:  map 100% reduce 100%
    15/08/17 09:47:04 INFO mapreduce.Job: Job job_1439801989509_0005 completed successfully
    15/08/17 09:47:04 INFO mapreduce.Job: Counters: 49
            File System Counters
                    FILE: Number of bytes read=51
                    FILE: Number of bytes written=253029
                    FILE: Number of read operations=0
                    FILE: Number of large read operations=0
                    FILE: Number of write operations=0
                    HDFS: Number of bytes read=150
                    HDFS: Number of bytes written=29
                    HDFS: Number of read operations=6
                    HDFS: Number of large read operations=0
                    HDFS: Number of write operations=2
            Job Counters
                    Launched map tasks=1
                    Launched reduce tasks=1
                    Data-local map tasks=1
                    Total time spent by all maps in occupied slots (ms)=2762
                    Total time spent by all reduces in occupied slots (ms)=3086
                    Total time spent by all map tasks (ms)=2762
                    Total time spent by all reduce tasks (ms)=3086
                    Total vcore-seconds taken by all map tasks=2762
                    Total vcore-seconds taken by all reduce tasks=3086
                    Total megabyte-seconds taken by all map tasks=690500
                    Total megabyte-seconds taken by all reduce tasks=771500
            Map-Reduce Framework
                    Map input records=1
                    Map output records=6
                    Map output bytes=56
                    Map output materialized bytes=51
                    Input split bytes=118
                    Combine input records=6
                    Combine output records=4
                    Reduce input groups=4
                    Reduce shuffle bytes=51
                    Reduce input records=4
                    Reduce output records=4
                    Spilled Records=8
                    Shuffled Maps =1
                    Failed Shuffles=0
                    Merged Map outputs=1
                    GC time elapsed (ms)=87
                    CPU time spent (ms)=1250
                    Physical memory (bytes) snapshot=341491712
                    Virtual memory (bytes) snapshot=1648259072
                    Total committed heap usage (bytes)=264241152
            Shuffle Errors
                    BAD_ID=0
                    CONNECTION=0
                    IO_ERROR=0
                    WRONG_LENGTH=0
                    WRONG_MAP=0
                    WRONG_REDUCE=0
            File Input Format Counters
                    Bytes Read=32
            File Output Format Counters
                    Bytes Written=29
    [hdfs@sandbox /]$ hadoop fs -ls /user/hdfs/def.txt
    Found 2 items
    -rw-r--r--   1 hdfs hdfs          0 2015-08-17 09:47 /user/hdfs/def.txt/_SUCCESS
    -rw-r--r--   1 hdfs hdfs         29 2015-08-17 09:47 /user/hdfs/def.txt/part-r-00000
    [hdfs@sandbox /]$ hadoop fs -cat /user/hdfs/def.txt/part-r-00000
    ajay    2
    alok    1
    amar    1
    anand   2


    Similarly, we can run other available sample, such as wordMean, wordMedian, wordStandard deviation.

    To run Word count Multiple file: Pass Dir path containing files

    [hdfs@sandbox /]$ hadoop fs -copyFromLocal ijk.txt /user/hdfs/anand/
    [hdfs@sandbox /]$ hadoop fs -ls /user/hdfs/anand
    Found 2 items
    -rw-r--r--   1 hdfs hdfs         27 2015-08-17 10:18 /user/hdfs/anand/ijk.txt
    -rw-r--r--   1 hdfs hdfs         32 2015-08-17 10:16 /user/hdfs/anand/jkl.txt
    [hdfs@sandbox /]$ yarn jar /usr/hdp/2.3.0.0-2557/hadoop-mapreduce/hadoop-mapreduce-examples-2.7.1.2.3.0.0-2557.jar wordcount /user/hdfs/anand/ /user/hdfs/outp
    15/08/17 10:19:54 INFO impl.TimelineClientImpl: Timeline service address: http://sandbox.hortonworks.com:8188/ws/v1/timeline/
    15/08/17 10:19:54 INFO client.RMProxy: Connecting to ResourceManager at sandbox.hortonworks.com/10.0.2.15:8050
    15/08/17 10:19:54 INFO input.FileInputFormat: Total input paths to process : 2
    15/08/17 10:19:55 INFO mapreduce.JobSubmitter: number of splits:2
    15/08/17 10:19:55 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1439801989509_0009
    15/08/17 10:19:55 INFO impl.YarnClientImpl: Submitted application application_1439801989509_0009
    15/08/17 10:19:55 INFO mapreduce.Job: The url to track the job: http://sandbox.hortonworks.com:8088/proxy/application_1439801989509_0009/
    15/08/17 10:19:55 INFO mapreduce.Job: Running job: job_1439801989509_0009
    15/08/17 10:20:00 INFO mapreduce.Job: Job job_1439801989509_0009 running in uber mode : false
    15/08/17 10:20:00 INFO mapreduce.Job:  map 0% reduce 0%
    15/08/17 10:20:06 INFO mapreduce.Job:  map 100% reduce 0%
    15/08/17 10:20:13 INFO mapreduce.Job:  map 100% reduce 100%
    15/08/17 10:20:13 INFO mapreduce.Job: Job job_1439801989509_0009 completed successfully
    15/08/17 10:20:13 INFO mapreduce.Job: Counters: 49
            File System Counters
                    FILE: Number of bytes read=102
                    FILE: Number of bytes written=379609
                    FILE: Number of read operations=0
                    FILE: Number of large read operations=0
                    FILE: Number of write operations=0
                    HDFS: Number of bytes read=307
                    HDFS: Number of bytes written=56
                    HDFS: Number of read operations=9
                    HDFS: Number of large read operations=0
                    HDFS: Number of write operations=2
            Job Counters
                    Launched map tasks=2
                    Launched reduce tasks=1
                    Data-local map tasks=2
                    Total time spent by all maps in occupied slots (ms)=6985
                    Total time spent by all reduces in occupied slots (ms)=4195
                    Total time spent by all map tasks (ms)=6985
                    Total time spent by all reduce tasks (ms)=4195
                    Total vcore-seconds taken by all map tasks=6985
                    Total vcore-seconds taken by all reduce tasks=4195
                    Total megabyte-seconds taken by all map tasks=1746250
                    Total megabyte-seconds taken by all reduce tasks=1048750
            Map-Reduce Framework
                    Map input records=2
                    Map output records=10
                    Map output bytes=99
                    Map output materialized bytes=108
                    Input split bytes=248
                    Combine input records=10
                    Combine output records=8
                    Reduce input groups=7
                    Reduce shuffle bytes=108
                    Reduce input records=8
                    Reduce output records=7
                    Spilled Records=16
                    Shuffled Maps =2
                    Failed Shuffles=0
                    Merged Map outputs=2
                    GC time elapsed (ms)=139
                    CPU time spent (ms)=1740
                    Physical memory (bytes) snapshot=551854080
                    Virtual memory (bytes) snapshot=2480885760
                    Total committed heap usage (bytes)=398458880
            Shuffle Errors
                    BAD_ID=0
                    CONNECTION=0
                    IO_ERROR=0
                    WRONG_LENGTH=0
                    WRONG_MAP=0
                    WRONG_REDUCE=0
            File Input Format Counters
                    Bytes Read=59
            File Output Format Counters
                    Bytes Written=56
    [hdfs@sandbox /]$ hadoop fs -ls /user/hdfs/outp
    Found 2 items
    -rw-r--r--   1 hdfs hdfs          0 2015-08-17 10:20 /user/hdfs/outp/_SUCCESS
    -rw-r--r--   1 hdfs hdfs         56 2015-08-17 10:20 /user/hdfs/outp/part-r-00000
    [hdfs@sandbox /]$ hadoop fs -cat /user/hdfs/outp/part-r-00000
    ajay    2
    alok    1
    amar    1
    anand   3
    sanjay  1
    savita  1
    tushar  1

    WordMean
    [hdfs@sandbox /]$ yarn jar /usr/hdp/2.3.0.0-2557/hadoop-mapreduce/hadoop-mapreduce-examples-2.7.1.2.3.0.0-2557.jar wordmean abc.txt ghi
    15/08/17 09:59:51 INFO impl.TimelineClientImpl: Timeline service address: http://sandbox.hortonworks.com:8188/ws/v1/timeline/
    15/08/17 09:59:52 INFO client.RMProxy: Connecting to ResourceManager at sandbox.hortonworks.com/10.0.2.15:8050
    15/08/17 09:59:52 INFO input.FileInputFormat: Total input paths to process : 1
    15/08/17 09:59:52 INFO mapreduce.JobSubmitter: number of splits:1
    15/08/17 09:59:53 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1439801989509_0006
    15/08/17 09:59:53 INFO impl.YarnClientImpl: Submitted application application_1439801989509_0006
    15/08/17 09:59:53 INFO mapreduce.Job: The url to track the job: http://sandbox.hortonworks.com:8088/proxy/application_1439801989509_0006/
    15/08/17 09:59:53 INFO mapreduce.Job: Running job: job_1439801989509_0006
    15/08/17 09:59:59 INFO mapreduce.Job: Job job_1439801989509_0006 running in uber mode : false
    15/08/17 09:59:59 INFO mapreduce.Job:  map 0% reduce 0%
    15/08/17 10:00:04 INFO mapreduce.Job:  map 100% reduce 0%
    15/08/17 10:00:11 INFO mapreduce.Job:  map 100% reduce 100%
    15/08/17 10:00:11 INFO mapreduce.Job: Job job_1439801989509_0006 completed successfully
    15/08/17 10:00:11 INFO mapreduce.Job: Counters: 49
            File System Counters
                    FILE: Number of bytes read=39
                    FILE: Number of bytes written=252993
                    FILE: Number of read operations=0
                    FILE: Number of large read operations=0
                    FILE: Number of write operations=0
                    HDFS: Number of bytes read=150
                    HDFS: Number of bytes written=18
                    HDFS: Number of read operations=6
                    HDFS: Number of large read operations=0
                    HDFS: Number of write operations=2
            Job Counters
                    Launched map tasks=1
                    Launched reduce tasks=1
                    Data-local map tasks=1
                    Total time spent by all maps in occupied slots (ms)=2876
                    Total time spent by all reduces in occupied slots (ms)=4070
                    Total time spent by all map tasks (ms)=2876
                    Total time spent by all reduce tasks (ms)=4070
                    Total vcore-seconds taken by all map tasks=2876
                    Total vcore-seconds taken by all reduce tasks=4070
                    Total megabyte-seconds taken by all map tasks=719000
                    Total megabyte-seconds taken by all reduce tasks=1017500
            Map-Reduce Framework
                    Map input records=1
                    Map output records=12
                    Map output bytes=174
                    Map output materialized bytes=39
                    Input split bytes=118
                    Combine input records=12
                    Combine output records=2
                    Reduce input groups=2
                    Reduce shuffle bytes=39
                    Reduce input records=2
                    Reduce output records=2
                    Spilled Records=4
                    Shuffled Maps =1
                    Failed Shuffles=0
                    Merged Map outputs=1
                    GC time elapsed (ms)=102
                    CPU time spent (ms)=1430
                    Physical memory (bytes) snapshot=341884928
                    Virtual memory (bytes) snapshot=1672888320
                    Total committed heap usage (bytes)=264241152
            Shuffle Errors
                    BAD_ID=0
                    CONNECTION=0
                    IO_ERROR=0
                    WRONG_LENGTH=0
                    WRONG_MAP=0
                    WRONG_REDUCE=0
            File Input Format Counters
                    Bytes Read=32
            File Output Format Counters
                    Bytes Written=18
    The mean is: 4.333333333333333
     [hdfs@sandbox /]$ hadoop fs -ls /user/hdfs/ghi
    Found 2 items
    -rw-r--r--   1 hdfs hdfs          0 2015-08-17 10:00 /user/hdfs/ghi/_SUCCESS
    -rw-r--r--   1 hdfs hdfs         18 2015-08-17 10:00 /user/hdfs/ghi/part-r-00000
    [hdfs@sandbox /]$ hadoop fs -cat /user/hdfs/ghi/part-r-00000
    count   6
    length  26

    WordMedian
     [hdfs@sandbox /]$ yarn jar /usr/hdp/2.3.0.0-2557/hadoop-mapreduce/hadoop-mapreduce-examples-2.7.1.2.3.0.0-2557.jar wordmedian /user/hdfs/abc.txt wordmedianout
    15/08/17 10:26:48 INFO impl.TimelineClientImpl: Timeline service address: http://sandbox.hortonworks.com:8188/ws/v1/timeline/
    15/08/17 10:26:49 INFO client.RMProxy: Connecting to ResourceManager at sandbox.hortonworks.com/10.0.2.15:8050
    15/08/17 10:26:49 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
    15/08/17 10:26:49 INFO input.FileInputFormat: Total input paths to process : 1
    15/08/17 10:26:50 INFO mapreduce.JobSubmitter: number of splits:1
    15/08/17 10:26:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1439801989509_0010
    15/08/17 10:26:50 INFO impl.YarnClientImpl: Submitted application application_1439801989509_0010
    15/08/17 10:26:50 INFO mapreduce.Job: The url to track the job: http://sandbox.hortonworks.com:8088/proxy/application_1439801989509_0010/
    15/08/17 10:26:50 INFO mapreduce.Job: Running job: job_1439801989509_0010
    15/08/17 10:26:56 INFO mapreduce.Job: Job job_1439801989509_0010 running in uber mode : false
    15/08/17 10:26:56 INFO mapreduce.Job:  map 0% reduce 0%
    15/08/17 10:27:01 INFO mapreduce.Job:  map 100% reduce 0%
    15/08/17 10:27:08 INFO mapreduce.Job:  map 100% reduce 100%
    15/08/17 10:27:09 INFO mapreduce.Job: Job job_1439801989509_0010 completed successfully
    15/08/17 10:27:09 INFO mapreduce.Job: Counters: 49
            File System Counters
                    FILE: Number of bytes read=26
                    FILE: Number of bytes written=252719
                    FILE: Number of read operations=0
                    FILE: Number of large read operations=0
                    FILE: Number of write operations=0
                    HDFS: Number of bytes read=150
                    HDFS: Number of bytes written=8
                    HDFS: Number of read operations=6
                    HDFS: Number of large read operations=0
                    HDFS: Number of write operations=2
            Job Counters
                    Launched map tasks=1
                    Launched reduce tasks=1
                    Data-local map tasks=1
                    Total time spent by all maps in occupied slots (ms)=2974
                    Total time spent by all reduces in occupied slots (ms)=3426
                    Total time spent by all map tasks (ms)=2974
                    Total time spent by all reduce tasks (ms)=3426
                    Total vcore-seconds taken by all map tasks=2974
                    Total vcore-seconds taken by all reduce tasks=3426
                    Total megabyte-seconds taken by all map tasks=743500
                    Total megabyte-seconds taken by all reduce tasks=856500
            Map-Reduce Framework
                    Map input records=1
                    Map output records=6
                    Map output bytes=48
                    Map output materialized bytes=26
                    Input split bytes=118
                    Combine input records=6
                    Combine output records=2
                    Reduce input groups=2
                    Reduce shuffle bytes=26
                    Reduce input records=2
                    Reduce output records=2
                    Spilled Records=4
                    Shuffled Maps =1
                    Failed Shuffles=0
                    Merged Map outputs=1
                    GC time elapsed (ms)=114
                    CPU time spent (ms)=1350
                    Physical memory (bytes) snapshot=358936576
                    Virtual memory (bytes) snapshot=1683902464
                    Total committed heap usage (bytes)=264241152
            Shuffle Errors
                    BAD_ID=0
                    CONNECTION=0
                    IO_ERROR=0
                    WRONG_LENGTH=0
                    WRONG_MAP=0
                    WRONG_REDUCE=0
            File Input Format Counters
                    Bytes Read=32
            File Output Format Counters
                    Bytes Written=8
    The median is: 4

    Wordstandarddeviation
    [hdfs@sandbox /]$ yarn jar /usr/hdp/2.3.0.0-2557/hadoop-mapreduce/hadoop-mapreduce-examples-2.7.1.2.3.0.0-2557.jar wordstandarddeviation /user/hdfs/jkl.txt worddev
    15/08/17 10:30:24 INFO impl.TimelineClientImpl: Timeline service address: http://sandbox.hortonworks.com:8188/ws/v1/timeline/
    15/08/17 10:30:24 INFO client.RMProxy: Connecting to ResourceManager at sandbox.hortonworks.com/10.0.2.15:8050
    15/08/17 10:30:25 INFO input.FileInputFormat: Total input paths to process : 1
    15/08/17 10:30:25 INFO mapreduce.JobSubmitter: number of splits:1
    15/08/17 10:30:25 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1439801989509_0012
    15/08/17 10:30:25 INFO impl.YarnClientImpl: Submitted application application_1439801989509_0012
    15/08/17 10:30:26 INFO mapreduce.Job: The url to track the job: http://sandbox.hortonworks.com:8088/proxy/application_1439801989509_0012/
    15/08/17 10:30:26 INFO mapreduce.Job: Running job: job_1439801989509_0012
    15/08/17 10:30:32 INFO mapreduce.Job: Job job_1439801989509_0012 running in uber mode : false
    15/08/17 10:30:32 INFO mapreduce.Job:  map 0% reduce 0%
    15/08/17 10:30:37 INFO mapreduce.Job:  map 100% reduce 0%
    15/08/17 10:30:42 INFO mapreduce.Job:  map 100% reduce 100%
    15/08/17 10:30:42 INFO mapreduce.Job: Job job_1439801989509_0012 completed successfully
    15/08/17 10:30:42 INFO mapreduce.Job: Counters: 49
            File System Counters
                    FILE: Number of bytes read=56
                    FILE: Number of bytes written=253199
                    FILE: Number of read operations=0
                    FILE: Number of large read operations=0
                    FILE: Number of write operations=0
                    HDFS: Number of bytes read=150
                    HDFS: Number of bytes written=29
                    HDFS: Number of read operations=6
                    HDFS: Number of large read operations=0
                    HDFS: Number of write operations=2
            Job Counters
                    Launched map tasks=1
                    Launched reduce tasks=1
                    Data-local map tasks=1
                    Total time spent by all maps in occupied slots (ms)=2857
                    Total time spent by all reduces in occupied slots (ms)=3110
                    Total time spent by all map tasks (ms)=2857
                    Total time spent by all reduce tasks (ms)=3110
                    Total vcore-seconds taken by all map tasks=2857
                    Total vcore-seconds taken by all reduce tasks=3110
                    Total megabyte-seconds taken by all map tasks=714250
                    Total megabyte-seconds taken by all reduce tasks=777500
            Map-Reduce Framework
                    Map input records=1
                    Map output records=18
                    Map output bytes=264
                    Map output materialized bytes=56
                    Input split bytes=118
                    Combine input records=18
                    Combine output records=3
                    Reduce input groups=3
                    Reduce shuffle bytes=56
                    Reduce input records=3
                    Reduce output records=3
                    Spilled Records=6
                    Shuffled Maps =1
                    Failed Shuffles=0
                    Merged Map outputs=1
                    GC time elapsed (ms)=88
                    CPU time spent (ms)=1290
                    Physical memory (bytes) snapshot=349061120
                    Virtual memory (bytes) snapshot=1669423104
                    Total committed heap usage (bytes)=265289728
            Shuffle Errors
                    BAD_ID=0
                    CONNECTION=0
                    IO_ERROR=0
                    WRONG_LENGTH=0
                    WRONG_MAP=0
                    WRONG_REDUCE=0
            File Input Format Counters
                    Bytes Read=32
            File Output Format Counters
                    Bytes Written=29
    The standard deviation is: 0.4714045207910346



    Now, lets make sample of our own:


    1. WordCount- Self  

    Objective-  To count number of times each word has occured


    InputFile:
    A file containing Text which has to be sorted.

    Steps-
    1.       Create a java project
    2.       Create a java class WordCount.java

    package org.myorg;

    import java.io.IOException;
    import java.util.*;

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

    public class WordCount {

    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line);
    while (tokenizer.hasMoreTokens()) {
    word.set(tokenizer.nextToken());
    context.write(word, one);
    }
    }
    }

    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
    throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
    sum += val.get();
    }
    context.write(key, new IntWritable(sum));
    }
    }

    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();

    Job job = new Job(conf, "wordcount");
    job.setJarByClass(org.myorg.WordCount.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
    }

    }


    3.       Include jars - hadoop-common-2.7.1.jar and hadoop-mapreduce-client-core-2.7.1.jar
    4.       Export as jar . Right Click on Project > Export > jar for ex- wc.jar
    5.       Copy to the folder of your wish using winscp to sandbox m/c
    6.       Create input file in hdfs
    7.       Fixed by adding                job.setJarByClass(org.myorg.WordCount.class);
    8.       Run command yarn jar /anand/wc.jar org.myorg.WordCount /user/hdfs/jkl.txt qqq

    To start mapreduce task


    [hdfs@sandbox anand]$ yarn jar /anand/wc.jar org.myorg.WordCount /user/hdfs/jkl.txt qqq
    15/08/17 11:48:05 INFO impl.TimelineClientImpl: Timeline service address: http://sandbox.hortonworks.com:8188/ws/v1/timeline/
    15/08/17 11:48:05 INFO client.RMProxy: Connecting to ResourceManager at sandbox.hortonworks.com/10.0.2.15:8050
    15/08/17 11:48:05 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
    15/08/17 11:48:05 INFO input.FileInputFormat: Total input paths to process : 1
    15/08/17 11:48:06 INFO mapreduce.JobSubmitter: number of splits:1
    15/08/17 11:48:06 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1439801989509_0018
    15/08/17 11:48:06 INFO impl.YarnClientImpl: Submitted application application_1439801989509_0018
    15/08/17 11:48:06 INFO mapreduce.Job: The url to track the job: http://sandbox.hortonworks.com:8088/proxy/application_1439801989509_0018/
    15/08/17 11:48:06 INFO mapreduce.Job: Running job: job_1439801989509_0018
    15/08/17 11:48:13 INFO mapreduce.Job: Job job_1439801989509_0018 running in uber mode : false
    15/08/17 11:48:13 INFO mapreduce.Job:  map 0% reduce 0%
    15/08/17 11:48:19 INFO mapreduce.Job:  map 100% reduce 0%
    15/08/17 11:48:25 INFO mapreduce.Job:  map 100% reduce 100%
    15/08/17 11:48:25 INFO mapreduce.Job: Job job_1439801989509_0018 completed successfully
    15/08/17 11:48:25 INFO mapreduce.Job: Counters: 49
            File System Counters
                    FILE: Number of bytes read=74
                    FILE: Number of bytes written=253051
                    FILE: Number of read operations=0
                    FILE: Number of large read operations=0
                    FILE: Number of write operations=0
                    HDFS: Number of bytes read=150
                    HDFS: Number of bytes written=29
                    HDFS: Number of read operations=6
                    HDFS: Number of large read operations=0
                    HDFS: Number of write operations=2
            Job Counters
                    Launched map tasks=1
                    Launched reduce tasks=1
                    Data-local map tasks=1
                    Total time spent by all maps in occupied slots (ms)=3150
                    Total time spent by all reduces in occupied slots (ms)=3191
                    Total time spent by all map tasks (ms)=3150
                    Total time spent by all reduce tasks (ms)=3191
                    Total vcore-seconds taken by all map tasks=3150
                    Total vcore-seconds taken by all reduce tasks=3191
                    Total megabyte-seconds taken by all map tasks=787500
                    Total megabyte-seconds taken by all reduce tasks=797750
            Map-Reduce Framework
                    Map input records=1
                    Map output records=6
                    Map output bytes=56
                    Map output materialized bytes=74
                    Input split bytes=118
                    Combine input records=0
                    Combine output records=0
                    Reduce input groups=4
                    Reduce shuffle bytes=74
                    Reduce input records=6
                    Reduce output records=4
                    Spilled Records=12
                    Shuffled Maps =1
                    Failed Shuffles=0
                    Merged Map outputs=1
                    GC time elapsed (ms)=87
                    CPU time spent (ms)=1290
                    Physical memory (bytes) snapshot=354729984
                    Virtual memory (bytes) snapshot=1664598016
                    Total committed heap usage (bytes)=264241152
            Shuffle Errors
                    BAD_ID=0
                    CONNECTION=0
                    IO_ERROR=0
                    WRONG_LENGTH=0
                    WRONG_MAP=0
                    WRONG_REDUCE=0
            File Input Format Counters
                    Bytes Read=32
            File Output Format Counters
                    Bytes Written=29
    To see Output file name

    [hdfs@sandbox anand]$ hadoop fs -ls /user/hdfs/qqq
    Found 2 items
    -rw-r--r--   1 hdfs hdfs          0 2015-08-17 11:48 /user/hdfs/qqq/_SUCCESS
    -rw-r--r--   1 hdfs hdfs         29 2015-08-17 11:48 /user/hdfs/qqq/part-r-00000

    To see output file content
    [hdfs@sandbox anand]$ hadoop fs -cat /user/hdfs/qqq/part-r-00000
    ajay    2
    alok    1
    amar    1

    anand   2




    2. Dictionary order

    Objective-  to sort all the words encountered in input file on basic of dictionary order of first letter.

    InputFile:
    A file containing Text which has to be sorted.

    Mapreduce:
    [hdfs@sandbox anand]$ yarn jar dictionary.jar org.demo.Dictionary /user/hdfs/nm.txt  out_new23
    15/08/18 06:24:03 INFO impl.TimelineClientImpl: Timeline service address: http://sandbox.hortonworks.com:8188/ws/v1/timeline/
    15/08/18 06:24:03 INFO client.RMProxy: Connecting to ResourceManager at sandbox.hortonworks.com/10.0.2.15:8050
    15/08/18 06:24:03 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
    15/08/18 06:24:04 INFO input.FileInputFormat: Total input paths to process : 1
    15/08/18 06:24:04 INFO mapreduce.JobSubmitter: number of splits:1
    15/08/18 06:24:04 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1439801989509_0039
    15/08/18 06:24:04 INFO impl.YarnClientImpl: Submitted application application_1439801989509_0039
    15/08/18 06:24:04 INFO mapreduce.Job: The url to track the job: http://sandbox.hortonworks.com:8088/proxy/application_1439801989509_0039/
    15/08/18 06:24:04 INFO mapreduce.Job: Running job: job_1439801989509_0039
    15/08/18 06:24:10 INFO mapreduce.Job: Job job_1439801989509_0039 running in uber mode : false
    15/08/18 06:24:10 INFO mapreduce.Job:  map 0% reduce 0%
    15/08/18 06:24:15 INFO mapreduce.Job:  map 100% reduce 0%
    15/08/18 06:24:21 INFO mapreduce.Job:  map 100% reduce 100%
    15/08/18 06:24:21 INFO mapreduce.Job: Job job_1439801989509_0039 completed successfully
    15/08/18 06:24:22 INFO mapreduce.Job: Counters: 49
            File System Counters
                    FILE: Number of bytes read=3994
                    FILE: Number of bytes written=260909
                    FILE: Number of read operations=0
                    FILE: Number of large read operations=0
                    FILE: Number of write operations=0
                    HDFS: Number of bytes read=1884
                    HDFS: Number of bytes written=1901
                    HDFS: Number of read operations=6
                    HDFS: Number of large read operations=0
                    HDFS: Number of write operations=2
            Job Counters
                    Launched map tasks=1
                    Launched reduce tasks=1
                    Data-local map tasks=1
                    Total time spent by all maps in occupied slots (ms)=3329
                    Total time spent by all reduces in occupied slots (ms)=3104
                    Total time spent by all map tasks (ms)=3329
                    Total time spent by all reduce tasks (ms)=3104
                    Total vcore-seconds taken by all map tasks=3329
                    Total vcore-seconds taken by all reduce tasks=3104
                    Total megabyte-seconds taken by all map tasks=832250
                    Total megabyte-seconds taken by all reduce tasks=776000
            Map-Reduce Framework
                    Map input records=14
                    Map output records=350
                    Map output bytes=3288
                    Map output materialized bytes=3994
                    Input split bytes=117
                    Combine input records=0
                    Combine output records=0
                    Reduce input groups=26
                    Reduce shuffle bytes=3994
                    Reduce input records=350
                    Reduce output records=26
                    Spilled Records=700
                    Shuffled Maps =1
                    Failed Shuffles=0
                    Merged Map outputs=1
                    GC time elapsed (ms)=89
                    CPU time spent (ms)=1380
                    Physical memory (bytes) snapshot=352882688
                    Virtual memory (bytes) snapshot=1665654784
                    Total committed heap usage (bytes)=265289728
            Shuffle Errors
                    BAD_ID=0
                    CONNECTION=0
                    IO_ERROR=0
                    WRONG_LENGTH=0
                    WRONG_MAP=0
                    WRONG_REDUCE=0
            File Input Format Counters
                    Bytes Read=1767
            File Output Format Counters

    Output
    [hdfs@sandbox anand]$ hadoop fs -cat /user/hdfs/out_new23/part-r-00000
    -       - - -
    A       and and and as and and and abode a and Ajanta and are a as a also and all attract are a are and and all a after and a a and and and and
    B       been binding Baba by Bhagat Bose boast beloved Buddha Bose Brahmaputra bloom biggest
    C       country country country country country civilisations caves country culture Chandra C.V centuries coasts Chand Chandra China can country
    D       democracy Dr droughts described diversity delight
    E       earth Ellora
    F       fields from frontiers for few Fort from Fatehpur fit fertile fields fed
    G       gods guard Gandhi greats Gangetic Gandhi given Ganga gods Godavari
    H       have her has have Himalayas has has have has Homi hindrance here
    I       It is India India is is in in in is India In it in is inherited India is is Its
    J       Jawaharlal Jagdish
    K       Krishna Kashmir Kaveri Kajuraho
    L       languages like Lajpat Lala like leaders like land land literature land lakes land like
    M       most many my my modern mountains many mighty my many Mahatma Mahatma mighty My most my Mahal
    N       Netaji Nilgiris Nehru natural north Narmada Nath
    O       of one of Ooty of on over of of our Ours on on oceans our of of of oldest of one of of of
    P       Prem people paradise place proud produced Pratap populous Patel places produced persons produced
    Q       Qutab
    R       Rabindra region Ramman Rai Rana Red running religions rivers rivers
    S       Sikri second Shiva sides Saratchandra speak science South same Singh spirit Subhash spirit Sardar sides Shivaji; secular state
    Special Characters      . , , , , , . , . . , , , , . , , , . . , , , , . . , , , , . . . . . , . . , . , , , , . .
    T       the the the The that three the the through the There Tagore the the through the together tourists' The Taj the The the the the the the temples the the the The the the times The the the the temples the The
    U       unity us
    V       various Valley villages valleys
    W       world world warriors we worship We world We world which without wonders which
    Y       yet Yamuna

    Source code:
    package org.demo;

    import java.io.IOException;
    import java.security.spec.EncodedKeySpec;
    import java.util.*;

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

    public class Dictionary {

    public static class Map extends Mapper<LongWritable, Text, Text, Text> {
    private Text one = new Text();
    private Text word = new Text();
    private Text specialCharacterKey = new Text("Special Characters");
    String t;

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line);
    while (tokenizer.hasMoreTokens()) {
    String temp;
    t=tokenizer.nextToken();
    temp=new String(t);
    if(t.endsWith(",")||t.endsWith(".")||t.endsWith("\"")) {
    t=t.substring(0, t.length()-1);
    context.write(specialCharacterKey, new Text(temp.substring(temp.length()-1, temp.length())));
    }
    if(t.startsWith("\"")) {
    t=t.substring(1, t.length()-1);
    context.write(specialCharacterKey, new Text(temp.substring(0, 1)));
    }
    word.set(t.substring(0, 1).toUpperCase());
    one.set(t);
    context.write(word, one);
    }
    }
    }

    public static class Reduce extends Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterable<Text> values, Context context)
    throws IOException, InterruptedException {
    String sum="";
    for (Text val : values) {
    sum = sum+val.toString()+" ";
    }
    context.write(key, new Text(sum));
    }
    }

    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();

    Job job = new Job(conf, "dictionary");
    job.setJarByClass(org.demo.Dictionary.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
    }


    }


    3, Expenses:
    Objective - To calculate the aggregate expenditure for each category ex- Grocery, medicine etc

    Input file:
    [hdfs@sandbox root]$ hadoop fs -cat /user/hdfs/exp.txt
    Mobile 1000
    Oil 5000
    Medicine 2000
    Medicine 1000
    Grocery 3000
    Grocery 4000
    Medicine 1000
    Flour 2000
    Mobile 2000
    Medicine 1000
    Education 40000
    Grocery 2000
    Grocery 2000
    Medicine 2000
    Drinks 5000
    Mobile 2000
    Medicine 3000
    Grocery 7000
    Education 20000
    Medicine 1000
    Mobile 2000
    Medicine 1000

    MapReduce:

    [hdfs@sandbox anand]$ yarn jar expenses.jar org.demo.Expense /user/hdfs/exp.txt  out_new25
    15/08/18 06:27:39 INFO impl.TimelineClientImpl: Timeline service address: http://sandbox.hortonworks.com:8188/ws/v1/timeline/
    15/08/18 06:27:39 INFO client.RMProxy: Connecting to ResourceManager at sandbox.hortonworks.com/10.0.2.15:8050
    15/08/18 06:27:40 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
    15/08/18 06:27:40 INFO input.FileInputFormat: Total input paths to process : 1
    15/08/18 06:27:40 INFO mapreduce.JobSubmitter: number of splits:1
    15/08/18 06:27:40 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1439801989509_0041
    15/08/18 06:27:41 INFO impl.YarnClientImpl: Submitted application application_1439801989509_0041
    15/08/18 06:27:41 INFO mapreduce.Job: The url to track the job: http://sandbox.hortonworks.com:8088/proxy/application_1439801989509_0041/
    15/08/18 06:27:41 INFO mapreduce.Job: Running job: job_1439801989509_0041
    15/08/18 06:27:47 INFO mapreduce.Job: Job job_1439801989509_0041 running in uber mode : false
    15/08/18 06:27:47 INFO mapreduce.Job:  map 0% reduce 0%
    15/08/18 06:27:52 INFO mapreduce.Job:  map 100% reduce 0%
    15/08/18 06:27:57 INFO mapreduce.Job:  map 100% reduce 100%
    15/08/18 06:27:57 INFO mapreduce.Job: Job job_1439801989509_0041 completed successfully
    15/08/18 06:27:57 INFO mapreduce.Job: Counters: 49
            File System Counters
                    FILE: Number of bytes read=334
                    FILE: Number of bytes written=253589
                    FILE: Number of read operations=0
                    FILE: Number of large read operations=0
                    FILE: Number of write operations=0
                    HDFS: Number of bytes read=441
                    HDFS: Number of bytes written=68
                    HDFS: Number of read operations=6
                    HDFS: Number of large read operations=0
                    HDFS: Number of write operations=2
            Job Counters
                    Launched map tasks=1
                    Launched reduce tasks=1
                    Data-local map tasks=1
                    Total time spent by all maps in occupied slots (ms)=2795
                    Total time spent by all reduces in occupied slots (ms)=2983
                    Total time spent by all map tasks (ms)=2795
                    Total time spent by all reduce tasks (ms)=2983
                    Total vcore-seconds taken by all map tasks=2795
                    Total vcore-seconds taken by all reduce tasks=2983
                    Total megabyte-seconds taken by all map tasks=698750
                    Total megabyte-seconds taken by all reduce tasks=745750
            Map-Reduce Framework
                    Map input records=23
                    Map output records=23
                    Map output bytes=282
                    Map output materialized bytes=334
                    Input split bytes=118
                    Combine input records=0
                    Combine output records=0
                    Reduce input groups=5
                    Reduce shuffle bytes=334
                    Reduce input records=23
                    Reduce output records=5
                    Spilled Records=46
                    Shuffled Maps =1
                    Failed Shuffles=0
                    Merged Map outputs=1
                    GC time elapsed (ms)=85
                    CPU time spent (ms)=1260
                    Physical memory (bytes) snapshot=339275776
                    Virtual memory (bytes) snapshot=1672081408
                    Total committed heap usage (bytes)=264241152
            Shuffle Errors
                    BAD_ID=0
                    CONNECTION=0
                    IO_ERROR=0
                    WRONG_LENGTH=0
                    WRONG_MAP=0
                    WRONG_REDUCE=0
            File Input Format Counters
                    Bytes Read=323
            File Output Format Counters
                    Bytes Written=68


    Output

    [hdfs@sandbox anand]$ hadoop fs -cat /user/hdfs/out_new25/part-r-00000
    Education       60000
    Grocery 29000
    Medicine        12000
    Mobile  7000
    Other   5000
    [hdfs@sandbox anand]$

    Source Code:

    package org.demo;

    import java.io.IOException;
    import java.util.*;

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

    public class Expense {

    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private Text groceryKey = new Text("Grocery");
    private static IntWritable groceryValue;
    private Text medicalKey = new Text("Medicine");
    private static IntWritable medicalValue;
    private Text mobileKey = new Text("Mobile");
    private static IntWritable mobileValue;
    private Text educationKey = new Text("Education");
    private static IntWritable educationValue;
    private Text otherKey = new Text("Other");
    private static IntWritable otherValue;
    private String exp;

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line);
    if (tokenizer.countTokens() == 2) {
    exp = tokenizer.nextToken();
    if (Arrays.asList("Grocery", "Flour", "Oil").contains(exp)) {
    groceryValue = new IntWritable(Integer.parseInt(tokenizer.nextToken().toString()));
    context.write(groceryKey, groceryValue);
    } else if (exp.equals("Medicine")) {
    medicalValue = new IntWritable(Integer.parseInt(tokenizer.nextToken().toString()));
    context.write(medicalKey, medicalValue);
    } else if (exp.equals("Mobile")) {
    mobileValue = new IntWritable(Integer.parseInt(tokenizer.nextToken().toString()));
    context.write(mobileKey, mobileValue);
    } else if (exp.equals("Education")) {
    educationValue = new IntWritable(Integer.parseInt(tokenizer.nextToken().toString()));
    context.write(educationKey, educationValue);
    } else {
    otherValue = new IntWritable(Integer.parseInt(tokenizer.nextToken().toString()));
    context.write(otherKey, otherValue);
    }
    }
    }
    }

    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
    throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
    sum += val.get();
    }
    context.write(key, new IntWritable(sum));
    }
    }

    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();

    Job job = new Job(conf, "Expenses");
    job.setJarByClass(org.demo.Expense.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
    }

    }4. A sample retrieving temperature max , min from weather report.



    4. Weather

    Objective-  Retrieving temperature max , min from weather report.

    Input file:
    A weather report in format as below:

    23907 20150712  2.423  -98.08   30.62    34.1    21.7    27.9    27.8     0.0    26.94 C    50.9    23.8    34.8 -9999.0 -9999.0 -9999.0   0.175   0.238 -99.000 -99.000 -99.000    28.6    27.9 -9999.0 -9999.0 -9999.0
    23907 20150713  2.423  -98.08   30.62    35.5    22.6    29.1    28.8     0.0    28.29 C    52.5    24.8    36.0 -9999.0 -9999.0 -9999.0   0.169   0.233 -99.000 -99.000 -99.000    29.3    28.5 -9999.0 -9999.0 -9999.0
    23907 20150714  2.423  -98.08   30.62    36.0    21.8    28.9    28.3     0.0    28.69 C    52.3    24.1    35.6 -9999.0 -9999.0 -9999.0   0.163   0.228 -99.000 -99.000 -99.000    29.4    28.7 -9999.0 -9999.0 -9999.0

    23907 20150715  2.423  -98.08   30.62    34.5    22.0    28.3    27.7     0.0    28.70 C    52.1    24.3    35.4    90.3    11.4    49.1   0.157   0.223 -99.000 -99.000 -99.000    29.5    28.8 -9999.0 -9999.0 -9999.0

    MapReduce:

    [hdfs@sandbox anand]$ yarn jar weather.jar org.demo.MyMaxMin /user/hdfs/weather_data.txt  out_new25


    Output:



    Source Code:

    package org.demo;
    import java.io.IOException;
    import java.util.Iterator;

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.conf.Configuration;

    public class MyMaxMin {


    //Mapper

    /**
    *MaxTemperatureMapper class is static and extends Mapper abstract class
    having four hadoop generics type LongWritable, Text, Text, Text.
    */


    public static class MaxTemperatureMapper extends
    Mapper<LongWritable, Text, Text, Text> {

    /**
    * @method map
    * This method takes the input as text data type.
    * Now leaving the first five tokens,it takes 6th token is taken as temp_max and
    * 7th token is taken as temp_min. Now temp_max > 35 and temp_min < 10 are passed to the reducer.
    */

    @Override
    public void map(LongWritable arg0, Text Value, Context context)
    throws IOException, InterruptedException {

    //Converting the record (single line) to String and storing it in a String variable line

    String line = Value.toString();

    //Checking if the line is not empty

    if (!(line.length() == 0)) {

    //date

    String date = line.substring(6, 14);

    //maximum temperature

    float temp_Max = Float
    .parseFloat(line.substring(39, 45).trim());

    //minimum temperature

    float temp_Min = Float
    .parseFloat(line.substring(47, 53).trim());

    //if maximum temperature is greater than 35 , its a hot day

    if (temp_Max > 35.0) {
    // Hot day
    context.write(new Text("Hot Day " + date),
    new Text(String.valueOf(temp_Max)));
    }

    //if minimum temperature is less than 10 , its a cold day

    if (temp_Min < 10) {
    // Cold day
    context.write(new Text("Cold Day " + date),
    new Text(String.valueOf(temp_Min)));
    }
    }
    }

    }

    //Reducer

    /**
    *MaxTemperatureReducer class is static and extends Reducer abstract class
    having four hadoop generics type Text, Text, Text, Text.
    */

    public static class MaxTemperatureReducer extends
    Reducer<Text, Text, Text, Text> {

    /**
    * @method reduce
    * This method takes the input as key and list of values pair from mapper, it does aggregation
    * based on keys and produces the final context.
    */

    public void reduce(Text Key, Iterator<Text> Values, Context context)
    throws IOException, InterruptedException {


    //putting all the values in temperature variable of type String

    String temperature = Values.next().toString();
    context.write(Key, new Text(temperature));
    }

    }



    /**
    * @method main
    * This method is used for setting all the configuration properties.
    * It acts as a driver for map reduce code.
    */

    public static void main(String[] args) throws Exception {

                              //reads the default configuration of cluster from the configuration xml files
    Configuration conf = new Configuration();

    //Initializing the job with the default configuration of the cluster
    Job job = new Job(conf, "weather example");

    //Assigning the driver class name
    job.setJarByClass(org.demo.MyMaxMin.class);

    //Key type coming out of mapper
    job.setMapOutputKeyClass(Text.class);

    //value type coming out of mapper
    job.setMapOutputValueClass(Text.class);

    //Defining the mapper class name
    job.setMapperClass(MaxTemperatureMapper.class);

    //Defining the reducer class name
    job.setReducerClass(MaxTemperatureReducer.class);

    //Defining input Format class which is responsible to parse the dataset into a key value pair
    job.setInputFormatClass(TextInputFormat.class);

    //Defining output Format class which is responsible to parse the dataset into a key value pair
    job.setOutputFormatClass(TextOutputFormat.class);

    //setting the second argument as a path in a path variable
    Path OutputPath = new Path(args[1]);

    //Configuring the input path from the filesystem into the job
    FileInputFormat.addInputPath(job, new Path(args[0]));

    //Configuring the output path from the filesystem into the job
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    //deleting the context path automatically from hdfs so that we don't have delete it explicitly
    OutputPath.getFileSystem(conf).delete(OutputPath);

    //exiting the job only if the flag value becomes false
    System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

    }


    5. CommonFriend Finder:
    Objective- find common friends of two.

    Input:
    A:B C D
    B:A C D E
    C:A B D E
    D:A B C E
    E:B C D

    Map-Reduce
    [hdfs@sandbox anand]$ yarn jar commonFriend.jar org.demo.Finder /user/hdfs/friendlist.txt  count_new13

    Output:


    SourceCode:
    package org.demo;

    import java.io.IOException;
    import java.util.ArrayList;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

    public class Finder {

    public static String commonString(String str1, String str2) {
    String res="";
    char[] arr1 = str1.toCharArray();
    char[] arr2 = str2.toCharArray();
    for (int i = 0; i < arr1.length; i++) {
    for (int j = 0; j < arr2.length; j++) {
    if (arr1[i] == arr2[j]) {
    res=res+arr1[i] ;
    }
    }
    }
    return res;
    }

    public static class Map extends Mapper<LongWritable, Text, Text, Text> {
    private static Text keyMapper = new Text();
    String person;
    String friends;

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString();
    person=line.substring(0, 1);
    friends=line.substring(2, line.length());
    char[] ch = friends.toCharArray();
    for(int i=0; i<ch.length;i++) {
    if(ch[i]<person.toCharArray()[0])
    keyMapper.set(ch[i]+person);
    else
    keyMapper.set(person+ch[i]);
    context.write(keyMapper, new Text(friends));
    }
    }
    }


    public static class Reduce extends Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterable<Text> values, Context context)
    throws IOException, InterruptedException {
    ArrayList<String> l = new ArrayList<String>();
    for (Text val : values) {
    l.add(val.toString());
    }
    String res=l.get(0);
    for (String s: l) {
    res=commonString(res, s);
    }
    context.write(key, new Text(res));
    }
    }

    public static void main(String[] args) throws Exception {
    //Needed when output key value for maaper is different from reducer
    Configuration c= new Configuration();
    Job job = new Job(c, "Finder");
    job.setJarByClass(org.demo.Finder.class);
    //expected output key value for both reducer and mapper
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
    }


    }



    6. Different length word counter
    Objective- find number of words of different Length


    InputFile:
    A file containing Text which has to be sorted.

    MapReduce:
    [hdfs@sandbox anand]$ yarn jar wc_diff.jar org.demo.AnalyzeText /user/hdfs/tesxt.txt  count_new10

    OutPut:


    SourceCode:

    package org.demo;

    import java.io.IOException;
    import java.util.StringTokenizer;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

    public class AnalyzeText {

    public static class Map extends Mapper<LongWritable, Text, IntWritable, Text> {
    private static IntWritable length = new IntWritable();
    private static Text word = new Text();
    String val;

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line);
    while (tokenizer.hasMoreTokens()) {
    val=tokenizer.nextToken();
    length.set(val.length());
    word.set(val);
    context.write(length, word);
    }
    }
    }


    public static class Reduce extends Reducer<IntWritable, Text, IntWritable, IntWritable> {

    public void reduce(IntWritable key, Iterable<Text> values, Context context)
    throws IOException, InterruptedException {
    int sum = 0;
    for (Text val : values) {
    sum++;
    }
    context.write(key, new IntWritable(sum));
    }
    }

    public static void main(String[] args) throws Exception {
    //Needed when output key value for maaper is different from reducer
    JobConf c = new JobConf();
    c.setMapOutputKeyClass(IntWritable.class);
    c.setMapOutputValueClass(Text.class);
    Job job = new Job(c, "AnalyzeText");
    job.setJarByClass(org.demo.AnalyzeText.class);
    //expected output key value for both reducer and mapper
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
    }


    }


    Hadoop Maven sample Ex:

    To resolve the dependencies of Hadoop project include the jars using maven repositories. to do so, follow below steps:

    pom.xml
    A. Create a maven project

    B. Define the below artifacts in pom.xml with the version of your need

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>

      <groupId>com.bogotobogo.hadoop</groupId>
      <artifactId>wordcount</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>

      <name>wordcount</name>
      <url>http://maven.apache.org</url>

      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>

      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-core</artifactId>
          <version>0.20.2</version>
        </dependency>
      </dependencies>
    </project>