Monday, March 28, 2016

Two key properties to set if you write code for accessing Hadoop concurrently

conf.setBoolean("fs.hdfs.impl.disable.cache", true);
conf.setBoolean("fs.maprfs.impl.disable.cache", true);
  conf.setBoolean("fs.s3.impl.disable.cache", true);
  conf.setBoolean("fs.s3n.impl.disable.cache", true);
  conf.setBoolean("fs.s3a.impl.disable.cache", true);

This will disable the cache instance of FileSystem when calling FileSystem.get(conf).  Otherwise, when first thread closes the FileSystem instance, other threads will get "Filesystem closed" error.  

Friday, February 19, 2016

Kerberos renewal trick when using keytab to login through proxy

When ugi returned by UserGroupInformation.createProxyUser is used to call "doAs", renewing ticket (through checkTGTAndReloginFromKeytab) need to go through its RealUser.  If you call ugi.isFromKeytab(), it will return 'false' because ugi is just a proxy.  But if you call ugi.getRealUser().isFromKeytab(), it will return 'true'.  Here, ugi.getRealUser() returns the UserGroupInformation object for its real user (hdfs@HDP.DG).  Calling 'checkTGTAndReloginFromKeytab' should go through real user, not the proxy.

        if (ugi.getRealUser().isFromKeytab()) {
               log("trying to reloginFromKeytab");
               UserGroupInformation ugiReal = ugi.getRealUser();
               ugiReal.checkTGTAndReloginFromKeytab();
               log("RealUser.getUserName: " + ugiReal.getUserName());
               log("RealUser.getRealAuthenticationMethod: " + ugiReal.getRealAuthenticationMethod().name());
               log("RealUser.hasKerberosCredentials: " + ugiReal.hasKerberosCredentials());
               readFile(conf, dirPath);
        }


The ticket expiration time is controlled by KDC's krb5.conf file (not the client's krb5.conf file).  

If you use HW:

Even you set your own value for "ticket_lifetime", Hortonwork 2.2 will reset it when keytab file is recreated and its services are restarted. 

Key API: 

UserGroupInformation.loginUserFromKeytabAndReturnUGI
UserGroupInformation.createProxyUser


Generate RC file from Java code

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

public class RCComplexTypeWriter {
 private static class Identity {
 String ssn;
 String address;
   public Identity(String ssn, String address) {
     this.ssn = ssn;
     this.address = address;
   }
 }

 private static class Person {
   String name;
   Map<String, List<String>> contactInfo;
   List<Identity> parents;
   Map<String, Identity> relatives;
   Identity identity;
 }

 public void buildAndSaveFile(Configuration conf, String path) {
   StructObjectInspector oi = (StructObjectInspector) ObjectInspectorFactory
           .getReflectionObjectInspector(Person.class, ObjectInspectorOptions.JAVA);
       String cols = ObjectInspectorUtils.getFieldNames(oi);
       Properties props = new Properties();
       props.setProperty(serdeConstants.LIST_COLUMNS, cols);
       props.setProperty(serdeConstants.LIST_COLUMN_TYPES, ObjectInspectorUtils.getFieldTypes(oi));
       ColumnarSerDe serde;
       try {
        serde = new ColumnarSerDe();
serde.initialize(new Configuration(), props);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}

       Person outerStruct = new Person();
       outerStruct.name = "Steven Washington";
       List array1 = new ArrayList<String>();
       array1.add("583-195-1121");
       array1.add("583-195-1122");
       array1.add("583-195-1123");
       array1.add("583-195-1124");
       List array2 = new ArrayList<String>();
       array2.add("john@yahoooo.com");
       array2.add("mary@yahoooo.com");
       array2.add("mark@yahoooo.com");
       outerStruct.contactInfo =  new TreeMap<String, List<String>>();
       outerStruct.contactInfo.put("Phone", array1);
       outerStruct.contactInfo.put("Email", array2);
       Identity is1 = new Identity("383-19-1111", "123 Fremont Blvd, Fremont CA 94555");
       Identity is2 = new Identity("383-19-1112", "124 Fremont Blvd, Fremont CA 94555");
       outerStruct.parents = new ArrayList<Identity>(2);
       outerStruct.parents.add(is1);
       outerStruct.parents.add(is2);
       outerStruct.relatives = new TreeMap<String, Identity>();
       outerStruct.relatives.put(new String("Uncle"), new Identity("383-19-8881", "223 Fremont Blvd, Fremont CA 94555"));
       outerStruct.relatives.put(new String("FatherInLaw"), new Identity("383-19-8882", "224 Fremont Blvd, Fremont CA 94555"));
       outerStruct.identity = new Identity("383-19-9991", "111 Fremont Blvd, Fremont CA 94555");
       try {
BytesRefArrayWritable braw = (BytesRefArrayWritable) serde.serialize(outerStruct, oi);
System.out.println("Row is [" + braw.toString() + "]");
int numColumns = braw.size();
System.out.println("Write file with " + numColumns + " columns...");
conf.set(RCFile.COLUMN_NUMBER_CONF_STR, String.valueOf(numColumns)); final FileSystem fs = FileSystem.get(conf);
RCFile.Writer rcWriter = new RCFile.Writer(fs, conf, new Path(path));

rcWriter.append(braw);
rcWriter.close();
System.out.println("Write " + numColumns + " columns successfullly.");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}
 }
}

Friday, December 18, 2015

A simple way for reducer to get how many records are processed in mapreduce job's mapper

JobClient client = new JobClient(jobConf);
RunningJob mapreduceJob = client.getJob(JobID.forName( jobConf.get("mapred.job.id") ));
long mapProcessedRecordCounter = mapreduceJob .getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getCounter();

Wednesday, November 4, 2015

A simple way to calculate PageRank in R

library(igraph)
source("http://michael.hahsler.net/SMU/ScientificCompR/code/map.R")

# 1->2; 1->3, 2->1, 3->4, 4->3
g <- graph.formula(1 -+ 2, 1 -+ 3, 2 -+ 1, 3 -+ 4, 4 -+ 3)
plot(g)

# Normal PageRank
page.rank(g, damping=0.70)$vector

# Topic-specific PageRank (assume node '1' and '2' are interesting topic)
page.rank(g, damping=0.70, personalized=c(1/2, 1/2, 0, 0))$vector

# Topic-specific PageRank with weight (node '1' weight 2 when node '2' weight 1)
page.rank(g, damping=0.70, personalized=c(2/3, 1/3, 0, 0))$vector

More info:

http://michael.hahsler.net/SMU/LearnROnYourOwn/code/igraph.html
http://stats.stackexchange.com/questions/175517/calculating-personalized-pagerank-in-r

Tuesday, October 27, 2015

Good articles for shingle, minhash and LSH

http://matthewcasperson.blogspot.com/2013/11/minhash-for-dummies.html

http://infolab.stanford.edu/~ullman/mmds/bookL.pdf   (Chapter 3, round page 83)

Doc --> Shingle --> hash --> minhash (may lose accuracy by using a much smaller signature matrix) -->  LSH (not compute the similarity for every pair; possible false positive or false negative)

50,000 bytes --> 400,000 bytes --> 200,000 bytes --> 1,000 bytes (based on 250 signatures) -->  additional b (band) x r (row) hash matrix

Friday, September 25, 2015

How to override cluster jar files

If you need to use your own version of a jar file in your map reduce job, you can do so through following steps:

1. Bundle the jar file into your map reduce jar or pass the jar file through distributed cache.

2. From your map reduce client, update mapred-site.xml file to put following section into it:

mapreduce.job.user.classpath.first=true

(of course, you need to call conf.addResource() to add mapred-site.xml on your client code)

3. Restart your client.