Friday, February 19, 2016

Generate RC file from Java code

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

public class RCComplexTypeWriter {
 private static class Identity {
 String ssn;
 String address;
   public Identity(String ssn, String address) {
     this.ssn = ssn;
     this.address = address;
   }
 }

 private static class Person {
   String name;
   Map<String, List<String>> contactInfo;
   List<Identity> parents;
   Map<String, Identity> relatives;
   Identity identity;
 }

 public void buildAndSaveFile(Configuration conf, String path) {
   StructObjectInspector oi = (StructObjectInspector) ObjectInspectorFactory
           .getReflectionObjectInspector(Person.class, ObjectInspectorOptions.JAVA);
       String cols = ObjectInspectorUtils.getFieldNames(oi);
       Properties props = new Properties();
       props.setProperty(serdeConstants.LIST_COLUMNS, cols);
       props.setProperty(serdeConstants.LIST_COLUMN_TYPES, ObjectInspectorUtils.getFieldTypes(oi));
       ColumnarSerDe serde;
       try {
        serde = new ColumnarSerDe();
serde.initialize(new Configuration(), props);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}

       Person outerStruct = new Person();
       outerStruct.name = "Steven Washington";
       List array1 = new ArrayList<String>();
       array1.add("583-195-1121");
       array1.add("583-195-1122");
       array1.add("583-195-1123");
       array1.add("583-195-1124");
       List array2 = new ArrayList<String>();
       array2.add("john@yahoooo.com");
       array2.add("mary@yahoooo.com");
       array2.add("mark@yahoooo.com");
       outerStruct.contactInfo =  new TreeMap<String, List<String>>();
       outerStruct.contactInfo.put("Phone", array1);
       outerStruct.contactInfo.put("Email", array2);
       Identity is1 = new Identity("383-19-1111", "123 Fremont Blvd, Fremont CA 94555");
       Identity is2 = new Identity("383-19-1112", "124 Fremont Blvd, Fremont CA 94555");
       outerStruct.parents = new ArrayList<Identity>(2);
       outerStruct.parents.add(is1);
       outerStruct.parents.add(is2);
       outerStruct.relatives = new TreeMap<String, Identity>();
       outerStruct.relatives.put(new String("Uncle"), new Identity("383-19-8881", "223 Fremont Blvd, Fremont CA 94555"));
       outerStruct.relatives.put(new String("FatherInLaw"), new Identity("383-19-8882", "224 Fremont Blvd, Fremont CA 94555"));
       outerStruct.identity = new Identity("383-19-9991", "111 Fremont Blvd, Fremont CA 94555");
       try {
BytesRefArrayWritable braw = (BytesRefArrayWritable) serde.serialize(outerStruct, oi);
System.out.println("Row is [" + braw.toString() + "]");
int numColumns = braw.size();
System.out.println("Write file with " + numColumns + " columns...");
conf.set(RCFile.COLUMN_NUMBER_CONF_STR, String.valueOf(numColumns)); final FileSystem fs = FileSystem.get(conf);
RCFile.Writer rcWriter = new RCFile.Writer(fs, conf, new Path(path));

rcWriter.append(braw);
rcWriter.close();
System.out.println("Write " + numColumns + " columns successfullly.");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}
 }
}

Friday, December 18, 2015

A simple way for reducer to get how many records are processed in mapreduce job's mapper

JobClient client = new JobClient(jobConf);
RunningJob mapreduceJob = client.getJob(JobID.forName( jobConf.get("mapred.job.id") ));
long mapProcessedRecordCounter = mapreduceJob .getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getCounter();

Wednesday, November 4, 2015

A simple way to calculate PageRank in R

library(igraph)
source("http://michael.hahsler.net/SMU/ScientificCompR/code/map.R")

# 1->2; 1->3, 2->1, 3->4, 4->3
g <- graph.formula(1 -+ 2, 1 -+ 3, 2 -+ 1, 3 -+ 4, 4 -+ 3)
plot(g)

# Normal PageRank
page.rank(g, damping=0.70)$vector

# Topic-specific PageRank (assume node '1' and '2' are interesting topic)
page.rank(g, damping=0.70, personalized=c(1/2, 1/2, 0, 0))$vector

# Topic-specific PageRank with weight (node '1' weight 2 when node '2' weight 1)
page.rank(g, damping=0.70, personalized=c(2/3, 1/3, 0, 0))$vector

More info:

http://michael.hahsler.net/SMU/LearnROnYourOwn/code/igraph.html
http://stats.stackexchange.com/questions/175517/calculating-personalized-pagerank-in-r

Tuesday, October 27, 2015

Good articles for shingle, minhash and LSH

http://matthewcasperson.blogspot.com/2013/11/minhash-for-dummies.html

http://infolab.stanford.edu/~ullman/mmds/bookL.pdf   (Chapter 3, round page 83)

Doc --> Shingle --> hash --> minhash (may lose accuracy by using a much smaller signature matrix) -->  LSH (not compute the similarity for every pair; possible false positive or false negative)

50,000 bytes --> 400,000 bytes --> 200,000 bytes --> 1,000 bytes (based on 250 signatures) -->  additional b (band) x r (row) hash matrix

Friday, September 25, 2015

How to override cluster jar files

If you need to use your own version of a jar file in your map reduce job, you can do so through following steps:

1. Bundle the jar file into your map reduce jar or pass the jar file through distributed cache.

2. From your map reduce client, update mapred-site.xml file to put following section into it:

mapreduce.job.user.classpath.first=true

(of course, you need to call conf.addResource() to add mapred-site.xml on your client code)

3. Restart your client.  

Saturday, July 25, 2015

Dynamically add jar files to classpath

    /**
     * Adds jar files to classpath.
     * @param file the folder for jar files.
     * @param classLoader the URLClassLoader
     * @throws IOException
     */
public void addCustomJars(File file, URLClassLoader classLoader) {
File[] jarFiles = file.listFiles();
if (jarFiles != null) {
if (logger.isDebugEnabled()) {
      URL[] urls = classLoader.getURLs();
      for (URL url : urls) {
      logger.debug("URL before custom jars are added:" + url.toString());
      }
}

   Class<?> sysclass = URLClassLoader.class;
    Method method = null;
try {
method = sysclass.getDeclaredMethod("addURL",parameters);
} catch (NoSuchMethodException e1) {
logger.error("Unable to find addURL method", e1);
return;
} catch (SecurityException e1) {
logger.error("Unable to get addURL method", e1);
return;
  }
    method.setAccessible(true);

    // add each jar file under such folder
for (File jarFile : jarFiles) {
if (jarFile.isFile() && jarFile.getName().endsWith("jar")) {
try {
             method.invoke(classLoader,new Object[]{ jarFile.toURI().toURL() });
} catch (Exception e) {
logger.error("Failed to add classpath for " + jarFile.getName(), e);;
}
}
}

if (logger.isDebugEnabled()) {
      URL[] urls = classLoader.getURLs();
      for (URL url : urls) {
        logger.debug("URL after custom jars are added:" + url.toString());
      }
}
}

In your main:

URLClassLoader classLoader = (URLClassLoader)Thread.currentThread().getContextClassLoader();
addCustomJars(new File("path_to_jar_files"), classLoader);


Wednesday, June 10, 2015

Skip Hadoop mapper when condition is satisfied

If conf is an instance of JobConf:

conf.setMapRunnerClass(MyMapRunner.class);

A sample of your MyMapRunner, 

public class FindAndExitMapRunner<K1, V1, K2, V2> extends MapRunner<K1, V1, K2, V2> {
private Mapper<K1, V1, K2, V2> mapper;
private boolean incrProcCount;
public static final String FIND_INDICATE = "Exit after finding at least one data";
public static final String REACH_MAX = "Exit after reaching max lines";
 
@SuppressWarnings("unchecked")
public void configure(JobConf job) {
this.mapper = ReflectionUtils.newInstance(job.getMapperClass(), job);
//increment processed counter only if skipping feature is enabled
this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
SkipBadRecords.getAutoIncrMapperProcCount(job);
}
@Override
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter) throws IOException {
   try {
    // allocate key & value instances that are re-used for all entries
    K1 key = input.createKey();
    V1 value = input.createValue();
    while (input.next(key, value)) {
    // map pair to output
    mapper.map(key, value, output, reporter);
    if(incrProcCount) {
    reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
    SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
    }
    }
    } catch (IOException e) {
    // break if exception is thrown
    if (!FIND_INDICATE.equalsIgnoreCase(e.getMessage()) && 
    !REACH_MAX.equalsIgnoreCase(e.getMessage())) {
    // re-throw except if it is not what we expect.
    throw e;
    }
} finally {
    mapper.close();
    }
}

protected Mapper<K1, V1, K2, V2> getMapper() {
return mapper;
}
}

Then, you throw exception in your mapper when your condition is satisfied:

throw new Exception(MyMapRunner.FIND_INDICATE);