Friday, February 19, 2016

Kerberos renewal trick when using keytab to login through proxy

When ugi returned by UserGroupInformation.createProxyUser is used to call "doAs", renewing ticket (through checkTGTAndReloginFromKeytab) need to go through its RealUser.  If you call ugi.isFromKeytab(), it will return 'false' because ugi is just a proxy.  But if you call ugi.getRealUser().isFromKeytab(), it will return 'true'.  Here, ugi.getRealUser() returns the UserGroupInformation object for its real user (hdfs@HDP.DG).  Calling 'checkTGTAndReloginFromKeytab' should go through real user, not the proxy.

        if (ugi.getRealUser().isFromKeytab()) {
               log("trying to reloginFromKeytab");
               UserGroupInformation ugiReal = ugi.getRealUser();
               ugiReal.checkTGTAndReloginFromKeytab();
               log("RealUser.getUserName: " + ugiReal.getUserName());
               log("RealUser.getRealAuthenticationMethod: " + ugiReal.getRealAuthenticationMethod().name());
               log("RealUser.hasKerberosCredentials: " + ugiReal.hasKerberosCredentials());
               readFile(conf, dirPath);
        }


The ticket expiration time is controlled by KDC's krb5.conf file (not the client's krb5.conf file).  

If you use HW:

Even you set your own value for "ticket_lifetime", Hortonwork 2.2 will reset it when keytab file is recreated and its services are restarted. 

Key API: 

UserGroupInformation.loginUserFromKeytabAndReturnUGI
UserGroupInformation.createProxyUser


Generate RC file from Java code

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

public class RCComplexTypeWriter {
 private static class Identity {
 String ssn;
 String address;
   public Identity(String ssn, String address) {
     this.ssn = ssn;
     this.address = address;
   }
 }

 private static class Person {
   String name;
   Map<String, List<String>> contactInfo;
   List<Identity> parents;
   Map<String, Identity> relatives;
   Identity identity;
 }

 public void buildAndSaveFile(Configuration conf, String path) {
   StructObjectInspector oi = (StructObjectInspector) ObjectInspectorFactory
           .getReflectionObjectInspector(Person.class, ObjectInspectorOptions.JAVA);
       String cols = ObjectInspectorUtils.getFieldNames(oi);
       Properties props = new Properties();
       props.setProperty(serdeConstants.LIST_COLUMNS, cols);
       props.setProperty(serdeConstants.LIST_COLUMN_TYPES, ObjectInspectorUtils.getFieldTypes(oi));
       ColumnarSerDe serde;
       try {
        serde = new ColumnarSerDe();
serde.initialize(new Configuration(), props);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}

       Person outerStruct = new Person();
       outerStruct.name = "Steven Washington";
       List array1 = new ArrayList<String>();
       array1.add("583-195-1121");
       array1.add("583-195-1122");
       array1.add("583-195-1123");
       array1.add("583-195-1124");
       List array2 = new ArrayList<String>();
       array2.add("john@yahoooo.com");
       array2.add("mary@yahoooo.com");
       array2.add("mark@yahoooo.com");
       outerStruct.contactInfo =  new TreeMap<String, List<String>>();
       outerStruct.contactInfo.put("Phone", array1);
       outerStruct.contactInfo.put("Email", array2);
       Identity is1 = new Identity("383-19-1111", "123 Fremont Blvd, Fremont CA 94555");
       Identity is2 = new Identity("383-19-1112", "124 Fremont Blvd, Fremont CA 94555");
       outerStruct.parents = new ArrayList<Identity>(2);
       outerStruct.parents.add(is1);
       outerStruct.parents.add(is2);
       outerStruct.relatives = new TreeMap<String, Identity>();
       outerStruct.relatives.put(new String("Uncle"), new Identity("383-19-8881", "223 Fremont Blvd, Fremont CA 94555"));
       outerStruct.relatives.put(new String("FatherInLaw"), new Identity("383-19-8882", "224 Fremont Blvd, Fremont CA 94555"));
       outerStruct.identity = new Identity("383-19-9991", "111 Fremont Blvd, Fremont CA 94555");
       try {
BytesRefArrayWritable braw = (BytesRefArrayWritable) serde.serialize(outerStruct, oi);
System.out.println("Row is [" + braw.toString() + "]");
int numColumns = braw.size();
System.out.println("Write file with " + numColumns + " columns...");
conf.set(RCFile.COLUMN_NUMBER_CONF_STR, String.valueOf(numColumns)); final FileSystem fs = FileSystem.get(conf);
RCFile.Writer rcWriter = new RCFile.Writer(fs, conf, new Path(path));

rcWriter.append(braw);
rcWriter.close();
System.out.println("Write " + numColumns + " columns successfullly.");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}
 }
}

Friday, December 18, 2015

A simple way for reducer to get how many records are processed in mapreduce job's mapper

JobClient client = new JobClient(jobConf);
RunningJob mapreduceJob = client.getJob(JobID.forName( jobConf.get("mapred.job.id") ));
long mapProcessedRecordCounter = mapreduceJob .getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getCounter();

Wednesday, November 4, 2015

A simple way to calculate PageRank in R

library(igraph)
source("http://michael.hahsler.net/SMU/ScientificCompR/code/map.R")

# 1->2; 1->3, 2->1, 3->4, 4->3
g <- graph.formula(1 -+ 2, 1 -+ 3, 2 -+ 1, 3 -+ 4, 4 -+ 3)
plot(g)

# Normal PageRank
page.rank(g, damping=0.70)$vector

# Topic-specific PageRank (assume node '1' and '2' are interesting topic)
page.rank(g, damping=0.70, personalized=c(1/2, 1/2, 0, 0))$vector

# Topic-specific PageRank with weight (node '1' weight 2 when node '2' weight 1)
page.rank(g, damping=0.70, personalized=c(2/3, 1/3, 0, 0))$vector

More info:

http://michael.hahsler.net/SMU/LearnROnYourOwn/code/igraph.html
http://stats.stackexchange.com/questions/175517/calculating-personalized-pagerank-in-r

Tuesday, October 27, 2015

Good articles for shingle, minhash and LSH

http://matthewcasperson.blogspot.com/2013/11/minhash-for-dummies.html

http://infolab.stanford.edu/~ullman/mmds/bookL.pdf   (Chapter 3, round page 83)

Doc --> Shingle --> hash --> minhash (may lose accuracy by using a much smaller signature matrix) -->  LSH (not compute the similarity for every pair; possible false positive or false negative)

50,000 bytes --> 400,000 bytes --> 200,000 bytes --> 1,000 bytes (based on 250 signatures) -->  additional b (band) x r (row) hash matrix

Friday, September 25, 2015

How to override cluster jar files

If you need to use your own version of a jar file in your map reduce job, you can do so through following steps:

1. Bundle the jar file into your map reduce jar or pass the jar file through distributed cache.

2. From your map reduce client, update mapred-site.xml file to put following section into it:

mapreduce.job.user.classpath.first=true

(of course, you need to call conf.addResource() to add mapred-site.xml on your client code)

3. Restart your client.  

Saturday, July 25, 2015

Dynamically add jar files to classpath

    /**
     * Adds jar files to classpath.
     * @param file the folder for jar files.
     * @param classLoader the URLClassLoader
     * @throws IOException
     */
public void addCustomJars(File file, URLClassLoader classLoader) {
File[] jarFiles = file.listFiles();
if (jarFiles != null) {
if (logger.isDebugEnabled()) {
      URL[] urls = classLoader.getURLs();
      for (URL url : urls) {
      logger.debug("URL before custom jars are added:" + url.toString());
      }
}

   Class<?> sysclass = URLClassLoader.class;
    Method method = null;
try {
method = sysclass.getDeclaredMethod("addURL",parameters);
} catch (NoSuchMethodException e1) {
logger.error("Unable to find addURL method", e1);
return;
} catch (SecurityException e1) {
logger.error("Unable to get addURL method", e1);
return;
  }
    method.setAccessible(true);

    // add each jar file under such folder
for (File jarFile : jarFiles) {
if (jarFile.isFile() && jarFile.getName().endsWith("jar")) {
try {
             method.invoke(classLoader,new Object[]{ jarFile.toURI().toURL() });
} catch (Exception e) {
logger.error("Failed to add classpath for " + jarFile.getName(), e);;
}
}
}

if (logger.isDebugEnabled()) {
      URL[] urls = classLoader.getURLs();
      for (URL url : urls) {
        logger.debug("URL after custom jars are added:" + url.toString());
      }
}
}

In your main:

URLClassLoader classLoader = (URLClassLoader)Thread.currentThread().getContextClassLoader();
addCustomJars(new File("path_to_jar_files"), classLoader);