Skip to main content

Hadoop Useful Utility Classes

Some handy classes for using Hadoop / Map Reduce / Hbase

IDENTITYMAPPER  / IDENTITYREDUCER

org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
org.apache.hadoop.mapreduce.Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

jar : hadoop-core.jar

if your mappers and reducers write inputs to outputs, then use these guys.  No need to receate them.


SHELL  / SHELLCOMMANDEXECUTOR

org.apache.hadoop.util.Shell
org.apache.hadoop.util.Shell.ShellCommandExecutor

jar : hadoop-core.jar

handy for executing commands on local machine and inspect outputs

123456789101112131415161718192021222324
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
String[] cmd = { "ls", "/usr" };
ShellCommandExecutor shell = new ShellCommandExecutor(cmd);
shell.execute();
System.out.println("* shell exit code : " + shell.getExitCode());
System.out.println("* shell output: \n" + shell.getOutput());
 
/* output:
* shell exit code : 0
* shell output:
X11
X11R6
bin
etc
include
lib
libexec
llvm-gcc-4.2
local
sbin
share
standalone
*/

STRINGUTILS

org.apache.hadoop.util.StringUtils

jar : hadoop-core.jar

lots of functions to deal with Strings.  I will highlight a few

StringUtils.byteDesc() : User-friendly / human-readable byte lengths

how many megabytes is 10000000 bytes?   this will tell you.

123456789101112
import org.apache.hadoop.util.StringUtils;
 
// --- human readable byte lengths -----
System.out.println ("1024 bytes = " + StringUtils.byteDesc(1024));
System.out.println ("67108864 bytes = " + StringUtils.byteDesc(67108864));
System.out.println ("1000000 bytes = " + StringUtils.byteDesc(1000000));
 
/* produces:
1024 bytes = 1 KB
67108864 bytes = 64 MB
1000000 bytes = 976.56 KB
*/

StringUtils.byteToHexString() : Convert Bytes to Hex strings and vice-versa

We deal with byte arrays in Hadoop / map reduce.  This is a handy way to print / debug issues

12345678910111213
import org.apache.hadoop.util.StringUtils;
 
// ----------------- String Utils : bytes <--> hex ---------
String s = "aj89y1_xxy";
byte[] b = s.getBytes();
String hex = StringUtils.byteToHexString(b);
byte[] b2 = StringUtils.hexStringToByte(hex);
String s2 = new String(b2);
System.out.println(s + " --> " + hex + " <-- " + s2);
 
/* output:
aj89y1_xxy --> 616a383979315f787879 <-- aj89y1_xxy
*/
StringUtils.formatTime() :  human readable elapsed time

how long is 100000000 ms?   see below

1234567891011
System.out.println ("1000000 ms = " + StringUtils.formatTime(1000000));
long t1 = System.currentTimeMillis();
// execute a command
long t2 = System.currentTimeMillis();
t2 += 10000000; // adjust for this demo
System.out.println ("operation took : " + StringUtils.formatTimeDiff(t2, t1));
 
/* output:
1000000 ms = 16mins, 40sec
operation took : 2hrs, 46mins, 40sec
*/


HADOOP CLUSTER STATUS

ClusterStatus : org.apache.hadoop.mapred.ClusterStatus

jar : hadoop-core.jar

Find out how many nodes are in the cluster, how many mappers, reducers ...etc

1234567891011121314151617181920212223242526272829303132333435363738394041
package sujee;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
/*
compile into a jar and execute:
hadoop jar a.jar sujee.TestCluster
 
output:
number of task trackers: 1
maximum map tasks : 3
currently running map tasks : 0
 
*/
 
public class TestCluster extends Configured implements Tool
{
public static void main(String[] args) throws Exception
{
int result = ToolRunner.run(new Configuration(), new TestCluster(), args);
System.exit(result);
}
 
@Override
public int run(String[] arg0) throws Exception
{
JobConf dummyJob = new JobConf(new Configuration(), TestCluster.class);
JobClient jobClient = new JobClient(dummyJob);
ClusterStatus clusterStatus = jobClient.getClusterStatus();
System.out.println("number of task trackers: " + clusterStatus.getTaskTrackers());
System.out.println("maximum map tasks : " + clusterStatus.getMaxMapTasks());
System.out.println("currently running map tasks : " + clusterStatus.getMapTasks());
return 0;
}
}

Hbase Handy Classes


BYTES
org.apache.hadoop.hbase.util.Bytes

jar : hbase*.jar

handy utility for dealing with bytes and byte arrays

Bytes.toBytes() : convert objects to bytes

123456789101112131415161718
// -------- Bytes.toBytes() : converting objects to bytes ------
int i = 10000;
byte [] intBytes = Bytes.toBytes(i);
System.out.println ("int " + i + " in bytes : " + Arrays.toString(intBytes));
float f = (float) 999.993;
byte [] floatBytes = Bytes.toBytes(f);
System.out.println ("float " + f + " in bytes : " + Arrays.toString(floatBytes));
String s = "foobar";
byte [] stringBytes = Bytes.toBytes(s);
System.out.println ("string " + s + " in bytes : " + Arrays.toString(stringBytes));
 
/* output:
int 10000 in bytes : [0, 0, 39, 16]
float 999.993 in bytes : [68, 121, -1, -115]
string foobar in bytes : [102, 111, 111, 98, 97, 114]
*/

Bytes.add()  : create composite keys

12345678910111213141516
// ---------- Bytes.add() : creating composite keys ---------
// rowkey = int + long
int i = 0;
long timestamp = System.currentTimeMillis();
byte [] rowkey2 = Bytes.add(Bytes.toBytes(i), Bytes.toBytes(timestamp));
System.out.println ("rowkey2 (" + rowkey2.length + ") : " + Arrays.toString(rowkey2));
 
// add also supports adding 3 byte arrays
// rowkey = int + str + long
byte[] rowkey3 = Bytes.add(Bytes.toBytes(0) , Bytes.toBytes("hello"), Bytes.toBytes(timestamp));
System.out.println ("rowkey3 (" + rowkey3.length + ") : " + Arrays.toString(rowkey3));
 
/* output:
rowkey2 (12) : [0, 0, 0, 0, 0, 0, 1, 50, 8, -101, 99, -41]
rowkey3 (17) : [0, 0, 0, 0, 104, 101, 108, 108, 111, 0, 0, 1, 50, 8, -101, 99, -41]
*/

Comments

Popular posts from this blog

Python and Parquet Performance

In Pandas, PyArrow, fastparquet, AWS Data Wrangler, PySpark and Dask. This post outlines how to use all common Python libraries to read and write Parquet format while taking advantage of  columnar storage ,  columnar compression  and  data partitioning . Used together, these three optimizations can dramatically accelerate I/O for your Python applications compared to CSV, JSON, HDF or other row-based formats. Parquet makes applications possible that are simply impossible using a text format like JSON or CSV. Introduction I have recently gotten more familiar with how to work with  Parquet  datasets across the six major tools used to read and write from Parquet in the Python ecosystem:  Pandas ,  PyArrow ,  fastparquet ,  AWS Data Wrangler ,  PySpark  and  Dask . My work of late in algorithmic trading involves switching between these tools a lot and as I said I often mix up the APIs. I use Pandas and PyArrow for in-RAM comput...

How to construct a File System that lives in Shared Memory.

Shared Memory File System Goals 1. MOUNTED IN SHARED MEMORY The result is a very fast, real time file system. We use Shared Memory so that the file system is public and not private. 2. PERSISTS TO DISK When the file system is unmounted, what happens to it? We need to be able to save the file system so that a system reboot does not destroy it. A great way to achieve this is to save the file system to disk. 3. EXTENSIBLE IN PLACE We want to be able to grow the file system in place. 4. SUPPORTS CONCURRENCY We want multiple users to be able to access the file system at the same time. In fact, we want multiple users to be able to access the same file at the same time. With the goals now in mind we can now talk about the major design issues: FAT File System & Design Issues The  FAT File System  has been around for quite some time. Basically it provides a pretty good file structure. But I have two problems with it: 1. FAT IS NOT EXTENSIBLE IN PLAC...

Fetching Facebook Friends using Windows Azure Mobile Services

This tutorial shows you how to fetch Facebook Friends if you have Facebook accessToken. Here is the the code for Scheduled task called getFriends function getFriends() { //Name of the table where accounts are stored var accountTable = tables.getTable('FacebookAccounts'); //Name of the table where friends are stored var friendsTable = tables.getTable('Friends'); checkAccounts(); function checkAccounts(){ accountTable .read({success: function readAccounts(accounts){ if (accounts.length){ for (var i = 0; i < accounts.length; i++){ console.log("Creating query"); //Call createQuery function for all of the accounts that are found createQuery(accounts[i], getDataFromFacebook); } } else { console.log("Didn't find any account"); prepareAccountTable(); } }}); } function prepareAccountTable(){ var myAccount = { accessToken: "", //enter here you facebook accessToken. You can retrieve ...