ENVIRONMENT
I run these commands from an linux EC2 instance. It doesn't have to be a 'powerful' instance, as it doesn't do much work. So an M1.SMALL type is fine. The following needs to be installed
ec2 tools
We use these tools to launch and monitor EMR jobs.
follow the guides from
https://help.ubuntu.com/community/EC2StartersGuide
and
http://aws.amazon.com/developertools/351?_encoding=UTF8&jiveRedirect=1
s3cmd
to copy files to and from S3. get it from :
http://s3tools.org/s3cmd
INPUT PATHS
For testing MR jobs on the local hadoop instance, we might use an input path like 'hdfs://localhost:9000/input'.
For running on EMR, we can use S3 as input: 's3://my_bucket/input'
Since hadoop supports reading from S3 natively, S3 input works just like a HDFS url
So how to do this, wih out hard-coding the path into the code? We passs it like a command line argument. The following example illustrates how to pass two arguments in command line
HADOOP on development machine:
hadoop jar my.jar my.TestMR hdfs://localhost:9000/input hdfs://localhost:9000/output
HADOOP cluster running on EC2:
hadoop jar my.jar my.TestMR s3://my_bucket/input s3://my_bucket/output
EMR:
elastic-mapreduce --create --name "MyJob" --num-instances "5" --master-instance-type "m1.large" --slave-instance-type "c1.xlarge" --jar s3://my_bucket/my.jar --main-class my.TestMR --arg s3://my_bucket/input --arg s3://my_bucket/output
Here is a skeleton code that takes the input path as a command line argument
/**
* amazon-emr-beyond-basics/
* takes one argument for input path
*/
public class TestMR extends Configured implements Tool
{
public static void main(String[] args) throws Exception
{
int res = ToolRunner.run(new Configuration(), new TestMR(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception
{
if (args.length < 1)
{
System.err.printf("Usage: %s [generic options] <input_paths>\n", getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Configuration conf = getConf();
Job job = new Job(conf, "TestMR");
job.setJarByClass(TestMR.class);
job.setMapperClass(TestMapper.class);
job.setReducerClass(TestReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(Text.class);
FileInputFormat.addInputPath(job, args[0]);
FileInputFormat.addOutputPath(job, args[1]);
return job.waitForCompletion(true) ? 0 : 1;
}
}
INPUT ARGUMENTS AS PROPERTY FILES
As we have seen we can supply any number of arguments in command line. But when doing so, we are also hardcoding the parameter's positional values. First parameter is input_dir, second parameter is output_dir ...etc. This can lead to inflexible programs. What if we want to over write only the third argument? We still have to supply all the args. And if we need to pass in a lot of arguments, then this method is not very handy.
Lets put all our arguments into a Property file and feed that to our program. Java property files are plain text files containing key=value per line.
Here is an example
my.input=xxxx
my.output=xxxx
my.db.host=xxxx
my.db.dbname=xxxxx
my.db.user=xxxxx
my.db.pass=xxxx
my.memcached.host=xxxx
my.foo=xxxx
my.bar=xxxxx
Here the properties are prefixed with 'my' just so we don't end up override any system properties by accident.
Next we should place this property file in HDFS or S3.
HDFS:
hadoop dfs -copyFromLocal my.conf hdfs://localhost:9000/my.conf
EMR:
copy this file to your bucket using s3cmd or Firefox S3 organizer
Now provide this file as an argument to MR job
HADOOP on development machine:
hadoop jar my.jar my.TestMR hdfs://localhost:9000/my.conf
EMR:
elastic-mapreduce --create --name "MyJob" --num-instances "5" --master-instance-type "m1.large" --slave-instance-type "c1.xlarge" --jar s3://my_bucket/my.jar --main-class my.TestMR --arg s3://my_bucket/my.conf
Here is how we access the property file in our MR job
package amazonemr;
import java.net.URI;
import java.util.Enumeration;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ParseProperties extends Configured implements Tool
{
public static void main(String[] args) throws Exception
{
int res = ToolRunner.run(new Configuration(), new ParseProperties(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception
{
if (args.length < 1)
{
System.err.printf("Usage: %s <config file>\n", getClass().getSimpleName());
// ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Configuration conf = getConf();
String configFileLocation = args[0];
Path configFilePath = new Path(configFileLocation);
FileSystem fs = FileSystem.get(URI.create(configFileLocation), conf);
FSDataInputStream fdis = fs.open(configFilePath);
Properties props = new Properties();
props.load(fdis);
System.out.println("Loaded Properties:\n" + props.toString());
// set the properties into Config object so it is available for Mappers and Reducers
Enumeration keys = props.propertyNames();
while (keys.hasMoreElements())
{
String key = (String) keys.nextElement();
String value = props.getProperty(key);
conf.set(key, value);
}
// setup a job using the configuration
Job job = new Job(conf, "MyJob");
// other job setup goes here...
boolean status = bidJob.waitForCompletion(true);
return status ? 0 : 1;
}
}
MULTIPLE JAR FILES
EMR allows uploading a single JAR file. What if we need extra JARs, like a JSON library. We need to repackage extra jars into a single jar. We will upload this JAR into our S3 bucket, so it can be used for launching MR jobs.
#!/bin/bash
## http://sujee.net/tech/articles/amazon-emr-beyond-basics/
CLASS_DIR=classes
mkdir -p $CLASS_DIR
rm -rf $CLASS_DIR/*
# compile
javac -d $CLASS_DIR -sourcepath src -classpath $HADOOP_INSTALL/conf:$HADOOP_INSTALL/lib/*:$HADOOP_INSTALL/*:lib/* $(find src -name "*.java")
rm -f my.jar
# extract other jars so we can bundle them together
cd $CLASS_DIR; jar xf ../lib/opencsv-2.2.jar; cd ..
cd $CLASS_DIR; jar xf ../lib/mysql-connector-java-5.1.10-bin.jar; cd ..
cd $CLASS_DIR; jar xf ../lib/memcached-2.5.jar; cd ..
# bundle every thing into a single jar
jar cf my.jar -C $CLASS_DIR .
LAUNCHING AND MONITORING EMR JOBS
There is a Web UI to submit a MapReduce job and monitor its progress. We will look at an alternative - launching an EMR job from command line and monitoring its progress.
The following scripts do this.
Launch script is split in two parts. First part is configurable. Second part of the script is generic and does not need to be changed all much. That is why I have split the script this way. The bottom script can be 'called' from any script.
#!/bin/bash
## http://sujee.net/tech/articles/amazon-emr-beyond-basics/
# config
# if changing machine type, also change mapred config file
MASTER_INSTANCE_TYPE="m1.large"
SLAVE_INSTANCE_TYPE="c1.xlarge"
INSTANCES=5
export JOBNAME="MyMR"
export TIMESTAMP=$(date +%Y%m%d-%H%M%S)
# end config
echo "==========================================="
echo $(date +%Y%m%d.%H%M%S) " > $0 : starting...."
export t1=$(date +%s)
#if the line breaks don't work, join the following lines and remove all '\'
export JOBID=$(elastic-mapreduce --plain-output --create --name "${JOBNAME}__${TIMESTAMP}" \
--num-instances "$INSTANCES" --master-instance-type "$MASTER_INSTANCE_TYPE" \
--slave-instance-type "$SLAVE_INSTANCE_TYPE" \
--jar s3://my_bucket/my.jar --main-class my.TestMR \
--log-uri s3://my_bucket/emr-logs/ )
sh ./emr-wait-for-completion.sh
Some explanations:
instances : we control instance type (--instance-type ) and number of instances (--num-instances). This is a great feature of EMR. We can requisition a cluster that fits our needs. For example, if a small job needs only 5 instances we get 5. IF a larger job needs 20 instances we can get 20.
to specify different types of machine types for NAMENODE and SLAVE NODES, we use '--master-instance-type' and '--slave-instance-type'. Name node doesn't do much. Slave nodes do the heavy lifting. So in this case we make Namenode as 'm1.large' and Slaves 'c1.xlarge'
logging : '--log-uri' we save the logs to S3
I run these commands from an linux EC2 instance. It doesn't have to be a 'powerful' instance, as it doesn't do much work. So an M1.SMALL type is fine. The following needs to be installed
ec2 tools
We use these tools to launch and monitor EMR jobs.
follow the guides from
https://help.ubuntu.com/community/EC2StartersGuide
and
http://aws.amazon.com/developertools/351?_encoding=UTF8&jiveRedirect=1
s3cmd
to copy files to and from S3. get it from :
http://s3tools.org/s3cmd
INPUT PATHS
For testing MR jobs on the local hadoop instance, we might use an input path like 'hdfs://localhost:9000/input'.
For running on EMR, we can use S3 as input: 's3://my_bucket/input'
Since hadoop supports reading from S3 natively, S3 input works just like a HDFS url
So how to do this, wih out hard-coding the path into the code? We passs it like a command line argument. The following example illustrates how to pass two arguments in command line
HADOOP on development machine:
hadoop jar my.jar my.TestMR hdfs://localhost:9000/input hdfs://localhost:9000/output
HADOOP cluster running on EC2:
hadoop jar my.jar my.TestMR s3://my_bucket/input s3://my_bucket/output
EMR:
elastic-mapreduce --create --name "MyJob" --num-instances "5" --master-instance-type "m1.large" --slave-instance-type "c1.xlarge" --jar s3://my_bucket/my.jar --main-class my.TestMR --arg s3://my_bucket/input --arg s3://my_bucket/output
Here is a skeleton code that takes the input path as a command line argument
/**
* amazon-emr-beyond-basics/
* takes one argument for input path
*/
public class TestMR extends Configured implements Tool
{
public static void main(String[] args) throws Exception
{
int res = ToolRunner.run(new Configuration(), new TestMR(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception
{
if (args.length < 1)
{
System.err.printf("Usage: %s [generic options] <input_paths>\n", getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Configuration conf = getConf();
Job job = new Job(conf, "TestMR");
job.setJarByClass(TestMR.class);
job.setMapperClass(TestMapper.class);
job.setReducerClass(TestReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(Text.class);
FileInputFormat.addInputPath(job, args[0]);
FileInputFormat.addOutputPath(job, args[1]);
return job.waitForCompletion(true) ? 0 : 1;
}
}
INPUT ARGUMENTS AS PROPERTY FILES
As we have seen we can supply any number of arguments in command line. But when doing so, we are also hardcoding the parameter's positional values. First parameter is input_dir, second parameter is output_dir ...etc. This can lead to inflexible programs. What if we want to over write only the third argument? We still have to supply all the args. And if we need to pass in a lot of arguments, then this method is not very handy.
Lets put all our arguments into a Property file and feed that to our program. Java property files are plain text files containing key=value per line.
Here is an example
my.input=xxxx
my.output=xxxx
my.db.host=xxxx
my.db.dbname=xxxxx
my.db.user=xxxxx
my.db.pass=xxxx
my.memcached.host=xxxx
my.foo=xxxx
my.bar=xxxxx
Here the properties are prefixed with 'my' just so we don't end up override any system properties by accident.
Next we should place this property file in HDFS or S3.
HDFS:
hadoop dfs -copyFromLocal my.conf hdfs://localhost:9000/my.conf
EMR:
copy this file to your bucket using s3cmd or Firefox S3 organizer
Now provide this file as an argument to MR job
HADOOP on development machine:
hadoop jar my.jar my.TestMR hdfs://localhost:9000/my.conf
EMR:
elastic-mapreduce --create --name "MyJob" --num-instances "5" --master-instance-type "m1.large" --slave-instance-type "c1.xlarge" --jar s3://my_bucket/my.jar --main-class my.TestMR --arg s3://my_bucket/my.conf
Here is how we access the property file in our MR job
package amazonemr;
import java.net.URI;
import java.util.Enumeration;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ParseProperties extends Configured implements Tool
{
public static void main(String[] args) throws Exception
{
int res = ToolRunner.run(new Configuration(), new ParseProperties(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception
{
if (args.length < 1)
{
System.err.printf("Usage: %s <config file>\n", getClass().getSimpleName());
// ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Configuration conf = getConf();
String configFileLocation = args[0];
Path configFilePath = new Path(configFileLocation);
FileSystem fs = FileSystem.get(URI.create(configFileLocation), conf);
FSDataInputStream fdis = fs.open(configFilePath);
Properties props = new Properties();
props.load(fdis);
System.out.println("Loaded Properties:\n" + props.toString());
// set the properties into Config object so it is available for Mappers and Reducers
Enumeration keys = props.propertyNames();
while (keys.hasMoreElements())
{
String key = (String) keys.nextElement();
String value = props.getProperty(key);
conf.set(key, value);
}
// setup a job using the configuration
Job job = new Job(conf, "MyJob");
// other job setup goes here...
boolean status = bidJob.waitForCompletion(true);
return status ? 0 : 1;
}
}
MULTIPLE JAR FILES
EMR allows uploading a single JAR file. What if we need extra JARs, like a JSON library. We need to repackage extra jars into a single jar. We will upload this JAR into our S3 bucket, so it can be used for launching MR jobs.
#!/bin/bash
## http://sujee.net/tech/articles/amazon-emr-beyond-basics/
CLASS_DIR=classes
mkdir -p $CLASS_DIR
rm -rf $CLASS_DIR/*
# compile
javac -d $CLASS_DIR -sourcepath src -classpath $HADOOP_INSTALL/conf:$HADOOP_INSTALL/lib/*:$HADOOP_INSTALL/*:lib/* $(find src -name "*.java")
rm -f my.jar
# extract other jars so we can bundle them together
cd $CLASS_DIR; jar xf ../lib/opencsv-2.2.jar; cd ..
cd $CLASS_DIR; jar xf ../lib/mysql-connector-java-5.1.10-bin.jar; cd ..
cd $CLASS_DIR; jar xf ../lib/memcached-2.5.jar; cd ..
# bundle every thing into a single jar
jar cf my.jar -C $CLASS_DIR .
LAUNCHING AND MONITORING EMR JOBS
There is a Web UI to submit a MapReduce job and monitor its progress. We will look at an alternative - launching an EMR job from command line and monitoring its progress.
The following scripts do this.
Launch script is split in two parts. First part is configurable. Second part of the script is generic and does not need to be changed all much. That is why I have split the script this way. The bottom script can be 'called' from any script.
#!/bin/bash
## http://sujee.net/tech/articles/amazon-emr-beyond-basics/
# config
# if changing machine type, also change mapred config file
MASTER_INSTANCE_TYPE="m1.large"
SLAVE_INSTANCE_TYPE="c1.xlarge"
INSTANCES=5
export JOBNAME="MyMR"
export TIMESTAMP=$(date +%Y%m%d-%H%M%S)
# end config
echo "==========================================="
echo $(date +%Y%m%d.%H%M%S) " > $0 : starting...."
export t1=$(date +%s)
#if the line breaks don't work, join the following lines and remove all '\'
export JOBID=$(elastic-mapreduce --plain-output --create --name "${JOBNAME}__${TIMESTAMP}" \
--num-instances "$INSTANCES" --master-instance-type "$MASTER_INSTANCE_TYPE" \
--slave-instance-type "$SLAVE_INSTANCE_TYPE" \
--jar s3://my_bucket/my.jar --main-class my.TestMR \
--log-uri s3://my_bucket/emr-logs/ )
sh ./emr-wait-for-completion.sh
Some explanations:
instances : we control instance type (--instance-type ) and number of instances (--num-instances). This is a great feature of EMR. We can requisition a cluster that fits our needs. For example, if a small job needs only 5 instances we get 5. IF a larger job needs 20 instances we can get 20.
to specify different types of machine types for NAMENODE and SLAVE NODES, we use '--master-instance-type' and '--slave-instance-type'. Name node doesn't do much. Slave nodes do the heavy lifting. So in this case we make Namenode as 'm1.large' and Slaves 'c1.xlarge'
logging : '--log-uri' we save the logs to S3
The script 'emr-wait-for-completion.sh' is below. This script is called from our run script.
#!/bin/bash echo "=== $JOBID started...." LOGDIR="/var/logs/hadoop-logs/${JOBNAME}__${JOBID}__${TIMESTAMP}" mkdir -p "${LOGDIR}" ## stuff below is to wait till the jobs is done # credit ekampf (https://gist.github.com/762371) STATUS=$(elastic-mapreduce --list --nosteps | grep $JOBID | awk '{print $2}') while [ "$STATUS" = "STARTING" -o "$STATUS" = "BOOTSTRAPPING" ] do sleep 60 STATUS=$(elastic-mapreduce --list --nosteps | grep $JOBID | awk '{print $2}') done t2=$(date +%s) echo "=== Job started RUNNING in " $(expr $t2 - $t1) " seconds. status : $STATUS" if [ "$STATUS" = "RUNNING" ] then elastic-mapreduce --list | grep "$JOBID" MASTER_NODE=$(elastic-mapreduce --list | grep "$JOBID"| awk '{print $3}') echo "Task tracker interface : http://$MASTER_NODE:9100" echo "Namenode interface : http://$MASTER_NODE:9101" fi while [ "$STATUS" = "RUNNING" ] do sleep 60 STATUS=$(elastic-mapreduce --list --nosteps | grep $JOBID | awk '{print $2}') s3cmd sync "s3://my_bucket/emr-logs/${JOBID}/" "${LOGDIR}/" > /dev/null 2> /dev/null cp -f "${LOGDIR}/steps/1/syslog" "${LOGDIR}/mapreduce.log" 2> /dev/null done t3=$(date +%s) diff=$(expr $t3 - $t1) elapsed="$(expr $diff / 3600)-hours-$(expr $diff % 60)-mins" s3cmd sync "s3://my_bucket/emr-logs/${JOBID}/" "${LOGDIR}/" > /dev/null 2> /dev/null cp "${LOGDIR}/steps/1/syslog" "${LOGDIR}/mapreduce.log" 2> /dev/null s3cmd del -r "s3://my_bucket/emr-logs/${JOBID}/" > /dev/null 2> /dev/null echo $(date +%Y%m%d.%H%M%S) " > $0 : finished in $elapsed. status: $STATUS" touch "${LOGDIR}/job-finished-in-$elapsed" echo "==========================================="
Here is how the script is launched
sh ./run-emr-testMR.sh [input arguments]
or to run in background
nohup sh ./run-emr-testMR.sh > emr.out 2>&1 &
The output will look like this:
20110129.090002 > run-emr-testMR.sh : starting....
=== j-J6USL8HDRX93 launched....
=== Job started RUNNING in 302 seconds. status : RUNNING
j-J6USL8HDRX93 RUNNING ec2-50-16-29-197.compute-1.amazonaws.com TestMR__20110129-090002
Task tracker interface : http://ec2-50-16-29-197.compute-1.amazonaws.com:9100
Namenode interface : http://ec2-50-16-29-197.compute-1.amazonaws.com:9101
20110129.104454 > ./emr-wait-for-completion.sh : finished in 1-hours-27-mins. status: SHUTTING_DOWN
Here is what it does:
when job starts prints out Namenode status url and TaskTracker status url
monitors the job progress every minute it copies the logs created by emr into a directory in '/var/logs/hadoop-logs'. We do this, so we can track the progress by from our machine. This directory can be made accessible via a webserver
we use S3CMD to transfer files script terminates when our EMR job is completed (success or fail)
CONFIGURING HADOOP CLUSTER
We had launched a EMR job with default configuration, how about if we want to tweak hadoop settings? The following script shows how to do that.
#!/bin/bash # config # if changing machine type, also change mapred config file MASTER_INSTANCE_TYPE="m1.large" SLAVE_INSTANCE_TYPE="c1.xlarge" INSTANCES=5 export JOBNAME="MyMR" export TIMESTAMP=$(date +%Y%m%d-%H%M%S) # end config echo "===========================================" echo $(date +%Y%m%d.%H%M%S) " > $0 : starting...." export t1=$(date +%s) #if the line breaks don't work, join the following lines and remove all '\' export JOBID=$(elastic-mapreduce --plain-output --create --name "${JOBNAME}__${TIMESTAMP}"\ --num-instances "$INSTANCES" --master-instance-type "$MASTER_INSTANCE_TYPE" --slave-instance-type "$SLAVE_INSTANCE_TYPE" \ --jar s3://my_bucket/my.jar --main-class my.TestMR \ --log-uri s3://my_bucket/emr_logs/ \ --bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-hadoop \ --args "--core-config-file,s3://my_bucket/config-core-site.xml,\ --mapred-config-file,s3://my_bucket/config-mapred-site-m1-xl.xml") sh ./emr-wait-for-completion.sh
We use 'boostrap action' and supply config-core-site.xml and config-mapred-site.xml
(these are just examples... not recommended settings)
config-core-site.xml
adjusts memory for in memory sort
config-mapred-site.xml : we have two variants, depending on the type of instance
config-mapred-site-m1-l.xml (for m1.large slaves)
config-mapred-site-m1-xl.xml (for m1.xlarge slaves)
LOGGING AND DEBUGGING
We can track our job progress at task tracker interface (on port 9100 of master node). e.g: http://ec2-50-16-29-197.compute-1.amazonaws.com:9100
In order to view the name node and task tracker web pages, you need to have access to machines launched with 'ElasticMapReduce-master' security group. I usually give my IP address access to ports 1-65535.
Note how ever, accessing output from individual mappers, require an SSH tunnel setup. More on this later
Also, all these logs will go away after our cluster terminates. This is why we are copying the logs to our machine using s3cmd --sync command. This way we can go back and check our logs for debugging purposes.
MORE:
So far we have looked at some handy scripts and tips to work with Amazon EMR.
Also MRJob framework, developed by Yelp engineers and open sourced is also worth a look. It is a Python based framework.
Comments
Post a Comment