Monday, February 6, 2017

My first scala from Spark system: word count

[root@john1 john]# cd /var/lib/hadoop-hdfs
[root@john1 hadoop-hdfs]# sudo -u hdfs spark-shell --master yarn --deploy-mode client
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc (master = yarn-client, app id = application_1486166793688_0005).
17/02/06 16:13:53 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.1.0
17/02/06 16:13:54 WARN metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException
SQL context available as sqlContext.

scala> val logFile = "hdfs://john1.dg:8020/johnz/text/test.txt"
logFile: String = hdfs://john1.dg:8020/johnz/text/test.txt

scala> val file = sc.textFile(logFile)
file: org.apache.spark.rdd.RDD[String] = hdfs://john1.dg:8020/johnz/text/test.txt MapPartitionsRDD[1] at textFile at <console>:29

scala> val counts = file.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_ + _)
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:31

scala> counts.collect()
res0: Array[(String, Int)] = Array((executes,1), (is,1), (expressive,1), (real,1), ((clustered),1), ("",1), (apache,1), (computing,1), (fast,,1), (job,1), (environment.,1), (spark,1), (a,1), (in,1), (which,1), (extremely,1), (distributed,1), (time,1), (and,1), (system,1))

Thursday, February 2, 2017

Solve KMS authentication issue under CDH5 HA environment

2017-01-31 21:53:41,538 WARN org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationFilter: Authentication exception: User: testuser@DG.COM is not allowed to impersonate testuser

When above log is found in KMS log file(/var/log/kms-keytrustee/kms.log),  add below properties in the KMS configuration from the cloudera manager (Cloudera Manager > Key Trustee KMS > Configuration > Key Management Server Proxy Default Group > Advanced) and then we were able to submit the jobs on the cluster using testuser user:-

<property>
   <name>hadoop.kms.proxyuser.testuser.users</name>
   <value>*</value>
</property>
<property>
   <name>hadoop.kms.proxyuser.testuser.hosts</name>
   <value>*</value>
</property>
<property>
   <name>hadoop.kms.proxyuser.testuser.groups</name>
   <value>*</value>
</property>



On CDH5.8.4, the UI is changed and we have to put the content of such xml property into their new UI.

Wednesday, January 18, 2017

Dynamically load jar file in Java


File customJarHome = new File(customJarHomePath);
URLClassLoader classLoader = (URLClassLoader)  Thread.currentThread().getContextClassLoader();
addCustomJars(customJarHome, classLoader);


  public void addCustomJars(File file, URLClassLoader classLoader) {
       File[] jarFiles = file.listFiles();
        if (jarFiles != null) {
               if (logger.isDebugEnabled()) {
                        URL[] urls = classLoader.getURLs();
                        for (URL url : urls) {
                        logger.debug("URL before custom jars are added:" + url.toString());
                        }
               }

               class<?> sysclass = URLClassLoader.class;
                Method method = null;
              try {
                        method = sysclass.getDeclaredMethod("addURL",parameters);
                } catch (NoSuchMethodException e1) {
                       logger.error("Unable to find addURL method", e1);
                    return;
                } catch (SecurityException e1) {
                        logger.error("Unable to get addURL method", e1);
                        return;
                }
                method.setAccessible(true);

                // add each jar file under such folder
              for (File jarFile : jarFiles) {
                        if (jarFile.isFile() && jarFile.getName().endsWith("jar")) {
                                try {
                                    method.invoke(classLoader,new Object[]{ jarFile.toURI().toURL() });
                                    logger.info("Add custom jar " + jarFile.toString());
                                } catch (Exception e) {
                                  logger.error("Failed to add classpath for " + jarFile.getName(), e);;
                                }
                        }
                }

                if (logger.isDebugEnabled()) {
                      URL[] urls = classLoader.getURLs();
                      for (URL url : urls) {
                        logger.debug("URL after custom jars are added:" + url.toString());
                      }
                }
        }
}

Monday, March 28, 2016

Two key properties to set if you write code for accessing Hadoop concurrently

conf.setBoolean("fs.hdfs.impl.disable.cache", true);
conf.setBoolean("fs.maprfs.impl.disable.cache", true);
  conf.setBoolean("fs.s3.impl.disable.cache", true);
  conf.setBoolean("fs.s3n.impl.disable.cache", true);
  conf.setBoolean("fs.s3a.impl.disable.cache", true);

This will disable the cache instance of FileSystem when calling FileSystem.get(conf).  Otherwise, when first thread closes the FileSystem instance, other threads will get "Filesystem closed" error.  

Friday, February 19, 2016

Kerberos renewal trick when using keytab to login through proxy

When ugi returned by UserGroupInformation.createProxyUser is used to call "doAs", renewing ticket (through checkTGTAndReloginFromKeytab) need to go through its RealUser.  If you call ugi.isFromKeytab(), it will return 'false' because ugi is just a proxy.  But if you call ugi.getRealUser().isFromKeytab(), it will return 'true'.  Here, ugi.getRealUser() returns the UserGroupInformation object for its real user (hdfs@HDP.DG).  Calling 'checkTGTAndReloginFromKeytab' should go through real user, not the proxy.

        if (ugi.getRealUser().isFromKeytab()) {
               log("trying to reloginFromKeytab");
               UserGroupInformation ugiReal = ugi.getRealUser();
               ugiReal.checkTGTAndReloginFromKeytab();
               log("RealUser.getUserName: " + ugiReal.getUserName());
               log("RealUser.getRealAuthenticationMethod: " + ugiReal.getRealAuthenticationMethod().name());
               log("RealUser.hasKerberosCredentials: " + ugiReal.hasKerberosCredentials());
               readFile(conf, dirPath);
        }


The ticket expiration time is controlled by KDC's krb5.conf file (not the client's krb5.conf file).  

If you use HW:

Even you set your own value for "ticket_lifetime", Hortonwork 2.2 will reset it when keytab file is recreated and its services are restarted. 

Key API: 

UserGroupInformation.loginUserFromKeytabAndReturnUGI
UserGroupInformation.createProxyUser


Generate RC file from Java code

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

public class RCComplexTypeWriter {
 private static class Identity {
 String ssn;
 String address;
   public Identity(String ssn, String address) {
     this.ssn = ssn;
     this.address = address;
   }
 }

 private static class Person {
   String name;
   Map<String, List<String>> contactInfo;
   List<Identity> parents;
   Map<String, Identity> relatives;
   Identity identity;
 }

 public void buildAndSaveFile(Configuration conf, String path) {
   StructObjectInspector oi = (StructObjectInspector) ObjectInspectorFactory
           .getReflectionObjectInspector(Person.class, ObjectInspectorOptions.JAVA);
       String cols = ObjectInspectorUtils.getFieldNames(oi);
       Properties props = new Properties();
       props.setProperty(serdeConstants.LIST_COLUMNS, cols);
       props.setProperty(serdeConstants.LIST_COLUMN_TYPES, ObjectInspectorUtils.getFieldTypes(oi));
       ColumnarSerDe serde;
       try {
        serde = new ColumnarSerDe();
serde.initialize(new Configuration(), props);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}

       Person outerStruct = new Person();
       outerStruct.name = "Steven Washington";
       List array1 = new ArrayList<String>();
       array1.add("583-195-1121");
       array1.add("583-195-1122");
       array1.add("583-195-1123");
       array1.add("583-195-1124");
       List array2 = new ArrayList<String>();
       array2.add("john@yahoooo.com");
       array2.add("mary@yahoooo.com");
       array2.add("mark@yahoooo.com");
       outerStruct.contactInfo =  new TreeMap<String, List<String>>();
       outerStruct.contactInfo.put("Phone", array1);
       outerStruct.contactInfo.put("Email", array2);
       Identity is1 = new Identity("383-19-1111", "123 Fremont Blvd, Fremont CA 94555");
       Identity is2 = new Identity("383-19-1112", "124 Fremont Blvd, Fremont CA 94555");
       outerStruct.parents = new ArrayList<Identity>(2);
       outerStruct.parents.add(is1);
       outerStruct.parents.add(is2);
       outerStruct.relatives = new TreeMap<String, Identity>();
       outerStruct.relatives.put(new String("Uncle"), new Identity("383-19-8881", "223 Fremont Blvd, Fremont CA 94555"));
       outerStruct.relatives.put(new String("FatherInLaw"), new Identity("383-19-8882", "224 Fremont Blvd, Fremont CA 94555"));
       outerStruct.identity = new Identity("383-19-9991", "111 Fremont Blvd, Fremont CA 94555");
       try {
BytesRefArrayWritable braw = (BytesRefArrayWritable) serde.serialize(outerStruct, oi);
System.out.println("Row is [" + braw.toString() + "]");
int numColumns = braw.size();
System.out.println("Write file with " + numColumns + " columns...");
conf.set(RCFile.COLUMN_NUMBER_CONF_STR, String.valueOf(numColumns)); final FileSystem fs = FileSystem.get(conf);
RCFile.Writer rcWriter = new RCFile.Writer(fs, conf, new Path(path));

rcWriter.append(braw);
rcWriter.close();
System.out.println("Write " + numColumns + " columns successfullly.");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}
 }
}

Friday, December 18, 2015

A simple way for reducer to get how many records are processed in mapreduce job's mapper

JobClient client = new JobClient(jobConf);
RunningJob mapreduceJob = client.getJob(JobID.forName( jobConf.get("mapred.job.id") ));
long mapProcessedRecordCounter = mapreduceJob .getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getCounter();