Skip to main content

Amazon Elastic Map Reduce (EMR) Beyond Basics

ENVIRONMENT

I run these commands from an linux EC2 instance.  It doesn't have to be a 'powerful' instance, as it doesn't do much work.  So an M1.SMALL type is fine.  The following needs to be installed

ec2 tools

We use these tools to launch and monitor EMR jobs.
follow the guides from

https://help.ubuntu.com/community/EC2StartersGuide
and
http://aws.amazon.com/developertools/351?_encoding=UTF8&jiveRedirect=1

s3cmd
to copy files to and from S3.  get it from :

http://s3tools.org/s3cmd

INPUT PATHS

For testing MR jobs on the local hadoop instance, we might use an input path like  'hdfs://localhost:9000/input'.
For running on EMR, we can use S3 as input:   's3://my_bucket/input'
Since hadoop supports reading from S3 natively, S3 input works just like a HDFS url

So how to do this, wih out hard-coding the path into the code?  We passs it like a command line argument.  The following example illustrates how to pass two arguments in command line

HADOOP on development machine:
hadoop jar my.jar   my.TestMR   hdfs://localhost:9000/input hdfs://localhost:9000/output


HADOOP cluster running on EC2:
hadoop jar my.jar   my.TestMR   s3://my_bucket/input  s3://my_bucket/output

EMR:
elastic-mapreduce   --create --name "MyJob"   --num-instances "5"  --master-instance-type "m1.large"  --slave-instance-type "c1.xlarge"  --jar s3://my_bucket/my.jar --main-class my.TestMR    --arg  s3://my_bucket/input   --arg  s3://my_bucket/output

Here is a skeleton code that takes the input path as a command line argument

/**
 * amazon-emr-beyond-basics/
 * takes one argument for input path
 */
public class TestMR extends Configured implements Tool
{

    public static void main(String[] args) throws Exception
    {
        int res = ToolRunner.run(new Configuration(), new TestMR(), args);
        System.exit(res);
    }

    @Override
    public int run(String[] args) throws Exception
    {
        if (args.length < 1)
        {
            System.err.printf("Usage: %s [generic options] <input_paths>\n", getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }

        Configuration conf = getConf();

        Job job = new Job(conf, "TestMR");
        job.setJarByClass(TestMR.class);
        job.setMapperClass(TestMapper.class);
        job.setReducerClass(TestReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputFormatClass(Text.class);

        FileInputFormat.addInputPath(job, args[0]);
        FileInputFormat.addOutputPath(job, args[1]);

        return job.waitForCompletion(true) ? 0 : 1;
    }

}

INPUT ARGUMENTS AS PROPERTY FILES

As we have seen we can supply any number of arguments in command line.  But when doing so, we are also hardcoding the parameter's positional values.   First parameter is input_dir, second parameter is output_dir ...etc.  This can lead to inflexible programs.  What if we want to over write only the third argument?  We still have to supply all the args.  And if we need to pass in a lot of arguments, then this method is not very handy.

Lets put all our arguments into a Property file and feed that to our program.  Java property files are  plain text files containing  key=value per line.
Here is an example

my.input=xxxx
my.output=xxxx
my.db.host=xxxx
my.db.dbname=xxxxx
my.db.user=xxxxx
my.db.pass=xxxx
my.memcached.host=xxxx
my.foo=xxxx
my.bar=xxxxx

Here the properties are prefixed with 'my' just so we don't end up override any system properties by accident.

Next we should place this property file  in HDFS or S3.

HDFS:
hadoop  dfs -copyFromLocal   my.conf  hdfs://localhost:9000/my.conf

EMR:
copy this file to your bucket using s3cmd or   Firefox S3 organizer

Now provide this file as an argument to MR job

HADOOP on development machine:
hadoop jar my.jar   my.TestMR   hdfs://localhost:9000/my.conf


EMR:
elastic-mapreduce   --create --name "MyJob"   --num-instances "5"  --master-instance-type "m1.large"  --slave-instance-type "c1.xlarge"  --jar s3://my_bucket/my.jar --main-class my.TestMR    --arg  s3://my_bucket/my.conf

Here is how we access the property file in our MR job

package amazonemr;

import java.net.URI;
import java.util.Enumeration;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ParseProperties extends Configured implements Tool
{
    public static void main(String[] args) throws Exception
    {
        int res = ToolRunner.run(new Configuration(), new ParseProperties(), args);
        System.exit(res);
    }

    @Override
    public int run(String[] args) throws Exception
    {
        if (args.length < 1)
        {
            System.err.printf("Usage: %s <config file>\n", getClass().getSimpleName());
            // ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }
        Configuration conf = getConf();

        String configFileLocation = args[0];
        Path configFilePath = new Path(configFileLocation);
        FileSystem fs = FileSystem.get(URI.create(configFileLocation),  conf);

        FSDataInputStream fdis = fs.open(configFilePath);
        Properties props = new Properties();
        props.load(fdis);
        System.out.println("Loaded Properties:\n" + props.toString());

        // set the properties into Config object so it is available for Mappers and Reducers
        Enumeration keys = props.propertyNames();
        while (keys.hasMoreElements())
        {
            String key = (String) keys.nextElement();
            String value = props.getProperty(key);
            conf.set(key, value);
        }

        // setup a job using the configuration
        Job job = new Job(conf, "MyJob");
        // other job setup goes here...
        boolean status = bidJob.waitForCompletion(true);
        return status ? 0 : 1;
    }
}

MULTIPLE JAR FILES

EMR allows uploading a single JAR file.  What if we need extra JARs, like a JSON library.   We need to repackage extra jars into a single jar.  We will upload this JAR into our S3 bucket, so it can be used for launching MR jobs.

#!/bin/bash

## http://sujee.net/tech/articles/amazon-emr-beyond-basics/

CLASS_DIR=classes

mkdir -p $CLASS_DIR
rm -rf $CLASS_DIR/*

# compile
javac -d $CLASS_DIR -sourcepath src   -classpath $HADOOP_INSTALL/conf:$HADOOP_INSTALL/lib/*:$HADOOP_INSTALL/*:lib/*    $(find src -name "*.java")

rm -f my.jar

# extract other jars so we can bundle them together
cd $CLASS_DIR;  jar xf ../lib/opencsv-2.2.jar;  cd ..
cd $CLASS_DIR;  jar xf ../lib/mysql-connector-java-5.1.10-bin.jar;  cd ..
cd $CLASS_DIR;  jar xf ../lib/memcached-2.5.jar;  cd ..

# bundle every thing into a single jar
jar cf my.jar -C $CLASS_DIR .


LAUNCHING AND MONITORING EMR JOBS

There is a Web UI to submit a MapReduce job and monitor its progress.  We will look at an alternative - launching an EMR job from command line and monitoring its progress.

The following scripts do this.

Launch script is split in two parts.  First part is configurable.  Second part of the script is generic and does not need to be changed all much.  That is why I have split the script this way.  The bottom script can be 'called' from any script.

#!/bin/bash
## http://sujee.net/tech/articles/amazon-emr-beyond-basics/

# config
# if changing machine type, also change mapred config file
MASTER_INSTANCE_TYPE="m1.large"
SLAVE_INSTANCE_TYPE="c1.xlarge"
INSTANCES=5
export JOBNAME="MyMR"
export TIMESTAMP=$(date +%Y%m%d-%H%M%S)
# end config

echo "==========================================="
echo $(date +%Y%m%d.%H%M%S) " > $0 : starting...."


export t1=$(date +%s)

#if the line breaks don't work, join the following lines and remove all '\'
export JOBID=$(elastic-mapreduce --plain-output  --create --name "${JOBNAME}__${TIMESTAMP}"  \
--num-instances "$INSTANCES"  --master-instance-type "$MASTER_INSTANCE_TYPE" \
--slave-instance-type "$SLAVE_INSTANCE_TYPE" \
--jar s3://my_bucket/my.jar --main-class my.TestMR  \
--log-uri s3://my_bucket/emr-logs/ )


sh ./emr-wait-for-completion.sh

Some explanations:
instances : we control instance type  (--instance-type ) and number of instances (--num-instances).  This is a great feature of EMR.  We can requisition a cluster that fits our needs.  For example, if a small job needs only 5 instances we get 5.  IF a larger job needs 20 instances we can get 20.
to specify  different types of machine types for NAMENODE and SLAVE NODES, we use '--master-instance-type'  and '--slave-instance-type'.  Name node doesn't do much.  Slave nodes do the heavy lifting.  So in this case we make Namenode as 'm1.large'  and Slaves 'c1.xlarge'
logging : '--log-uri' we save the logs to S3

The script 'emr-wait-for-completion.sh' is below.  This script is called from our run script.

#!/bin/bash

echo "=== $JOBID started...."

LOGDIR="/var/logs/hadoop-logs/${JOBNAME}__${JOBID}__${TIMESTAMP}"
mkdir -p "${LOGDIR}"

## stuff below is to wait till the jobs is done

# credit ekampf (https://gist.github.com/762371)
STATUS=$(elastic-mapreduce --list --nosteps | grep $JOBID | awk '{print $2}')

while  [  "$STATUS" = "STARTING"  -o   "$STATUS" = "BOOTSTRAPPING"   ]
do
    sleep 60
    STATUS=$(elastic-mapreduce --list --nosteps | grep $JOBID | awk '{print $2}')
done
t2=$(date +%s)
echo "=== Job started RUNNING in " $(expr $t2 - $t1) " seconds.  status : $STATUS"


if [ "$STATUS" = "RUNNING" ]
then
    elastic-mapreduce --list | grep "$JOBID"
    MASTER_NODE=$(elastic-mapreduce --list | grep "$JOBID"| awk '{print $3}')
    echo "Task tracker interface : http://$MASTER_NODE:9100"
    echo "Namenode interface : http://$MASTER_NODE:9101"
fi

while [ "$STATUS" =  "RUNNING"  ]
do
    sleep 60
    STATUS=$(elastic-mapreduce --list --nosteps | grep $JOBID | awk '{print $2}')
    s3cmd sync "s3://my_bucket/emr-logs/${JOBID}/" "${LOGDIR}/"  > /dev/null 2> /dev/null
    cp -f "${LOGDIR}/steps/1/syslog" "${LOGDIR}/mapreduce.log" 2> /dev/null
done

t3=$(date +%s)
diff=$(expr $t3 - $t1)
elapsed="$(expr $diff / 3600)-hours-$(expr $diff % 60)-mins"

s3cmd sync "s3://my_bucket/emr-logs/${JOBID}/" "${LOGDIR}/"  > /dev/null 2> /dev/null
cp "${LOGDIR}/steps/1/syslog" "${LOGDIR}/mapreduce.log" 2> /dev/null
s3cmd del -r  "s3://my_bucket/emr-logs/${JOBID}/"   > /dev/null 2> /dev/null

echo $(date +%Y%m%d.%H%M%S) " > $0 : finished in $elapsed.  status: $STATUS"
touch "${LOGDIR}/job-finished-in-$elapsed"
echo "==========================================="

Here is how the script is launched

sh ./run-emr-testMR.sh   [input arguments]

or to run in background

nohup sh ./run-emr-testMR.sh  > emr.out 2>&1 &

The output will look like this:

20110129.090002  > run-emr-testMR.sh : starting....

=== j-J6USL8HDRX93 launched....
=== Job started RUNNING in  302  seconds.  status : RUNNING
j-J6USL8HDRX93      RUNNING        ec2-50-16-29-197.compute-1.amazonaws.com          TestMR__20110129-090002
Task tracker interface : http://ec2-50-16-29-197.compute-1.amazonaws.com:9100
Namenode interface : http://ec2-50-16-29-197.compute-1.amazonaws.com:9101
20110129.104454  > ./emr-wait-for-completion.sh : finished in 1-hours-27-mins.  status: SHUTTING_DOWN

Here is what it does:
when job starts prints out Namenode status url and TaskTracker status url
monitors the job progress every minute it copies the logs created by emr into a directory in '/var/logs/hadoop-logs'. We do this, so we can track the progress by from our machine.  This directory can be made accessible via a webserver
we use S3CMD to transfer files script terminates when our EMR job is completed (success or fail)


CONFIGURING HADOOP CLUSTER

We had launched a EMR job with default configuration, how about if we want to tweak hadoop settings?  The following script shows how to do that.

#!/bin/bash

# config
# if changing machine type, also change mapred config file
MASTER_INSTANCE_TYPE="m1.large"
SLAVE_INSTANCE_TYPE="c1.xlarge"
INSTANCES=5
export JOBNAME="MyMR"
export TIMESTAMP=$(date +%Y%m%d-%H%M%S)
# end config

echo "==========================================="
echo $(date +%Y%m%d.%H%M%S) " > $0 : starting...."


export t1=$(date +%s)

#if the line breaks don't work, join the following lines and remove all '\'

export JOBID=$(elastic-mapreduce --plain-output  --create --name "${JOBNAME}__${TIMESTAMP}"\
--num-instances "$INSTANCES"  --master-instance-type "$MASTER_INSTANCE_TYPE"  --slave-instance-type "$SLAVE_INSTANCE_TYPE" \
--jar s3://my_bucket/my.jar  --main-class my.TestMR  \
--log-uri s3://my_bucket/emr_logs/  \
--bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-hadoop \
--args "--core-config-file,s3://my_bucket/config-core-site.xml,\
--mapred-config-file,s3://my_bucket/config-mapred-site-m1-xl.xml")


sh ./emr-wait-for-completion.sh


We use 'boostrap action' and supply config-core-site.xml and config-mapred-site.xml
(these are just examples... not recommended settings)
config-core-site.xml
adjusts memory for in memory sort
config-mapred-site.xml : we have two variants, depending on the type of instance
config-mapred-site-m1-l.xml  (for m1.large slaves)
config-mapred-site-m1-xl.xml  (for m1.xlarge slaves)
LOGGING AND DEBUGGING

We can track our job progress at task tracker interface (on port 9100 of master node).  e.g:  http://ec2-50-16-29-197.compute-1.amazonaws.com:9100

In order to view the name node and task tracker web pages, you need to have access to machines launched with 'ElasticMapReduce-master'  security group.  I usually give my IP address access to ports 1-65535.

Note how ever, accessing output from individual mappers, require an SSH tunnel setup.  More on this later

Also, all these logs will go away after our cluster terminates.  This is why we are copying the logs to our machine using s3cmd --sync command.  This way we can go back and check our logs for debugging purposes.

MORE:

So far we have looked at some handy scripts and tips to work with Amazon EMR. 
Also MRJob framework, developed by Yelp engineers and open sourced is also worth a look.  It is a Python based framework.


Comments

Popular posts from this blog

Python and Parquet Performance

In Pandas, PyArrow, fastparquet, AWS Data Wrangler, PySpark and Dask. This post outlines how to use all common Python libraries to read and write Parquet format while taking advantage of  columnar storage ,  columnar compression  and  data partitioning . Used together, these three optimizations can dramatically accelerate I/O for your Python applications compared to CSV, JSON, HDF or other row-based formats. Parquet makes applications possible that are simply impossible using a text format like JSON or CSV. Introduction I have recently gotten more familiar with how to work with  Parquet  datasets across the six major tools used to read and write from Parquet in the Python ecosystem:  Pandas ,  PyArrow ,  fastparquet ,  AWS Data Wrangler ,  PySpark  and  Dask . My work of late in algorithmic trading involves switching between these tools a lot and as I said I often mix up the APIs. I use Pandas and PyArrow for in-RAM comput...

Kubernetes Configuration Provider to load data from Secrets and Config Maps

Using Kubernetes Configuration Provider to load data from Secrets and Config Maps When running Apache Kafka on Kubernetes, you will sooner or later probably need to use Config Maps or Secrets. Either to store something in them, or load them into your Kafka configuration. That is true regardless of whether you use Strimzi to manage your Apache Kafka cluster or something else. Kubernetes has its own way of using Secrets and Config Maps from Pods. But they might not be always sufficient. That is why in Strimzi, we created Kubernetes Configuration Provider for Apache Kafka which we will introduce in this blog post. Usually, when you need to use data from a Config Map or Secret in your Pod, you will either mount it as volume or map it to an environment variable. Both methods are configured in the spec section or the Pod resource or in the spec.template.spec section when using higher level resources such as Deployments or StatefulSets. When mounted as a volume, the contents of the Secr...

Andriod Bug

A bug that steals cash by racking up charges from sending premium rate text messages has been found in Google Play.  Security researchers have identified 32 apps on Google Play that harbour the bug called BadNews. A security firm Lookout, which uncovered BadNews, said that the malicious program lays dormant on handsets for weeks to escape detection.  The malware targeted Android owners in Russia, Ukraine, Belarus and other countries in eastern Europe. 32 apps were available through four separate developer accounts on Google Play. Google has now suspended those accounts and it has pulled all the affected apps from Google Play, it added. Half of the 32 apps seeded with BadNews are Russian and the version of AlphaSMS it installed is tuned to use premium rate numbers in Russia, Ukraine, Belarus, Armenia and Kazakhstan.