Monday, March 28, 2016

Two key properties to set if you write code for accessing Hadoop concurrently

conf.setBoolean("fs.hdfs.impl.disable.cache", true);
conf.setBoolean("fs.maprfs.impl.disable.cache", true);
  conf.setBoolean("fs.s3.impl.disable.cache", true);
  conf.setBoolean("fs.s3n.impl.disable.cache", true);
  conf.setBoolean("fs.s3a.impl.disable.cache", true);

This will disable the cache instance of FileSystem when calling FileSystem.get(conf).  Otherwise, when first thread closes the FileSystem instance, other threads will get "Filesystem closed" error.  

Friday, February 19, 2016

Kerberos renewal trick when using keytab to login through proxy

When ugi returned by UserGroupInformation.createProxyUser is used to call "doAs", renewing ticket (through checkTGTAndReloginFromKeytab) need to go through its RealUser.  If you call ugi.isFromKeytab(), it will return 'false' because ugi is just a proxy.  But if you call ugi.getRealUser().isFromKeytab(), it will return 'true'.  Here, ugi.getRealUser() returns the UserGroupInformation object for its real user (hdfs@HDP.DG).  Calling 'checkTGTAndReloginFromKeytab' should go through real user, not the proxy.

        if (ugi.getRealUser().isFromKeytab()) {
               log("trying to reloginFromKeytab");
               UserGroupInformation ugiReal = ugi.getRealUser();
               ugiReal.checkTGTAndReloginFromKeytab();
               log("RealUser.getUserName: " + ugiReal.getUserName());
               log("RealUser.getRealAuthenticationMethod: " + ugiReal.getRealAuthenticationMethod().name());
               log("RealUser.hasKerberosCredentials: " + ugiReal.hasKerberosCredentials());
               readFile(conf, dirPath);
        }


The ticket expiration time is controlled by KDC's krb5.conf file (not the client's krb5.conf file).  

If you use HW:

Even you set your own value for "ticket_lifetime", Hortonwork 2.2 will reset it when keytab file is recreated and its services are restarted. 

Key API: 

UserGroupInformation.loginUserFromKeytabAndReturnUGI
UserGroupInformation.createProxyUser


Generate RC file from Java code

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

public class RCComplexTypeWriter {
 private static class Identity {
 String ssn;
 String address;
   public Identity(String ssn, String address) {
     this.ssn = ssn;
     this.address = address;
   }
 }

 private static class Person {
   String name;
   Map<String, List<String>> contactInfo;
   List<Identity> parents;
   Map<String, Identity> relatives;
   Identity identity;
 }

 public void buildAndSaveFile(Configuration conf, String path) {
   StructObjectInspector oi = (StructObjectInspector) ObjectInspectorFactory
           .getReflectionObjectInspector(Person.class, ObjectInspectorOptions.JAVA);
       String cols = ObjectInspectorUtils.getFieldNames(oi);
       Properties props = new Properties();
       props.setProperty(serdeConstants.LIST_COLUMNS, cols);
       props.setProperty(serdeConstants.LIST_COLUMN_TYPES, ObjectInspectorUtils.getFieldTypes(oi));
       ColumnarSerDe serde;
       try {
        serde = new ColumnarSerDe();
serde.initialize(new Configuration(), props);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}

       Person outerStruct = new Person();
       outerStruct.name = "Steven Washington";
       List array1 = new ArrayList<String>();
       array1.add("583-195-1121");
       array1.add("583-195-1122");
       array1.add("583-195-1123");
       array1.add("583-195-1124");
       List array2 = new ArrayList<String>();
       array2.add("john@yahoooo.com");
       array2.add("mary@yahoooo.com");
       array2.add("mark@yahoooo.com");
       outerStruct.contactInfo =  new TreeMap<String, List<String>>();
       outerStruct.contactInfo.put("Phone", array1);
       outerStruct.contactInfo.put("Email", array2);
       Identity is1 = new Identity("383-19-1111", "123 Fremont Blvd, Fremont CA 94555");
       Identity is2 = new Identity("383-19-1112", "124 Fremont Blvd, Fremont CA 94555");
       outerStruct.parents = new ArrayList<Identity>(2);
       outerStruct.parents.add(is1);
       outerStruct.parents.add(is2);
       outerStruct.relatives = new TreeMap<String, Identity>();
       outerStruct.relatives.put(new String("Uncle"), new Identity("383-19-8881", "223 Fremont Blvd, Fremont CA 94555"));
       outerStruct.relatives.put(new String("FatherInLaw"), new Identity("383-19-8882", "224 Fremont Blvd, Fremont CA 94555"));
       outerStruct.identity = new Identity("383-19-9991", "111 Fremont Blvd, Fremont CA 94555");
       try {
BytesRefArrayWritable braw = (BytesRefArrayWritable) serde.serialize(outerStruct, oi);
System.out.println("Row is [" + braw.toString() + "]");
int numColumns = braw.size();
System.out.println("Write file with " + numColumns + " columns...");
conf.set(RCFile.COLUMN_NUMBER_CONF_STR, String.valueOf(numColumns)); final FileSystem fs = FileSystem.get(conf);
RCFile.Writer rcWriter = new RCFile.Writer(fs, conf, new Path(path));

rcWriter.append(braw);
rcWriter.close();
System.out.println("Write " + numColumns + " columns successfullly.");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}
 }
}