[root@john1 john]# cd /var/lib/hadoop-hdfs
[root@john1 hadoop-hdfs]# sudo -u hdfs spark-shell --master yarn --deploy-mode client
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.0
/_/
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc (master = yarn-client, app id = application_1486166793688_0005).
17/02/06 16:13:53 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.1.0
17/02/06 16:13:54 WARN metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException
SQL context available as sqlContext.
scala> val logFile = "hdfs://john1.dg:8020/johnz/text/test.txt"
logFile: String = hdfs://john1.dg:8020/johnz/text/test.txt
scala> val file = sc.textFile(logFile)
file: org.apache.spark.rdd.RDD[String] = hdfs://john1.dg:8020/johnz/text/test.txt MapPartitionsRDD[1] at textFile at <console>:29
scala> val counts = file.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_ + _)
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:31
scala> counts.collect()
res0: Array[(String, Int)] = Array((executes,1), (is,1), (expressive,1), (real,1), ((clustered),1), ("",1), (apache,1), (computing,1), (fast,,1), (job,1), (environment.,1), (spark,1), (a,1), (in,1), (which,1), (extremely,1), (distributed,1), (time,1), (and,1), (system,1))
Your project can be on or off. Your project's priority can be changed. Your job can be changed. But the technology is always heading north!
Monday, February 6, 2017
Thursday, February 2, 2017
Solve KMS authentication issue under CDH5 HA environment
2017-01-31 21:53:41,538 WARN org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationFilter: Authentication exception: User: testuser@DG.COM is not allowed to impersonate testuser
When above log is found in KMS log file(/var/log/kms-keytrustee/kms.log), add below properties in the KMS configuration from the cloudera manager (Cloudera Manager > Key Trustee KMS > Configuration > Key Management Server Proxy Default Group > Advanced) and then we were able to submit the jobs on the cluster using testuser user:-
<property>
<name>hadoop.kms.proxyuser.testuser.users</name>
<value>*</value>
</property>
<property>
<name>hadoop.kms.proxyuser.testuser.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.kms.proxyuser.testuser.groups</name>
<value>*</value>
</property>
On CDH5.8.4, the UI is changed and we have to put the content of such xml property into their new UI.
Wednesday, January 18, 2017
Dynamically load jar file in Java
File customJarHome = new File(customJarHomePath);
URLClassLoader classLoader = (URLClassLoader) Thread.currentThread().getContextClassLoader();
addCustomJars(customJarHome, classLoader);
public void addCustomJars(File file, URLClassLoader classLoader) {
File[] jarFiles = file.listFiles();
if (jarFiles != null) {
if (logger.isDebugEnabled()) {
URL[] urls = classLoader.getURLs();
for (URL url : urls) {
logger.debug("URL before custom jars are added:" + url.toString());
}
}
class<?> sysclass = URLClassLoader.class;
Method method = null;
try {
method = sysclass.getDeclaredMethod("addURL",parameters);
} catch (NoSuchMethodException e1) {
logger.error("Unable to find addURL method", e1);
return;
} catch (SecurityException e1) {
logger.error("Unable to get addURL method", e1);
return;
}
method.setAccessible(true);
// add each jar file under such folder
for (File jarFile : jarFiles) {
if (jarFile.isFile() && jarFile.getName().endsWith("jar")) {
try {
method.invoke(classLoader,new Object[]{ jarFile.toURI().toURL() });
logger.info("Add custom jar " + jarFile.toString());
} catch (Exception e) {
logger.error("Failed to add classpath for " + jarFile.getName(), e);;
}
}
}
if (logger.isDebugEnabled()) {
URL[] urls = classLoader.getURLs();
for (URL url : urls) {
logger.debug("URL after custom jars are added:" + url.toString());
}
}
}
}
Monday, March 28, 2016
Two key properties to set if you write code for accessing Hadoop concurrently
conf.setBoolean("fs.hdfs.impl.disable.cache", true);
conf.setBoolean("fs.maprfs.impl.disable.cache", true);
conf.setBoolean("fs.maprfs.impl.disable.cache", true);
conf.setBoolean("fs.s3.impl.disable.cache", true);
conf.setBoolean("fs.s3n.impl.disable.cache", true);
conf.setBoolean("fs.s3a.impl.disable.cache", true);
conf.setBoolean("fs.s3n.impl.disable.cache", true);
conf.setBoolean("fs.s3a.impl.disable.cache", true);
This will disable the cache instance of FileSystem when calling FileSystem.get(conf). Otherwise, when first thread closes the FileSystem instance, other threads will get "Filesystem closed" error.
Friday, February 19, 2016
Kerberos renewal trick when using keytab to login through proxy
When ugi returned
by UserGroupInformation.createProxyUser is used to call "doAs", renewing ticket (through checkTGTAndReloginFromKeytab) need to go through its
RealUser. If you call
ugi.isFromKeytab(), it will return 'false' because ugi is just a proxy. But if you call
ugi.getRealUser().isFromKeytab(), it will return 'true'. Here, ugi.getRealUser() returns the
UserGroupInformation object for its real user (hdfs@HDP.DG). Calling 'checkTGTAndReloginFromKeytab' should
go through real user, not the proxy.
if (ugi.getRealUser().isFromKeytab()) {
log("trying to reloginFromKeytab");
UserGroupInformation ugiReal = ugi.getRealUser();
ugiReal.checkTGTAndReloginFromKeytab();
log("RealUser.getUserName: " +
ugiReal.getUserName());
log("RealUser.getRealAuthenticationMethod: "
+ ugiReal.getRealAuthenticationMethod().name());
log("RealUser.hasKerberosCredentials: " +
ugiReal.hasKerberosCredentials());
readFile(conf, dirPath);
}
The ticket expiration time is controlled by KDC's krb5.conf file (not the client's
krb5.conf file).
If you use HW:
Even you set your own value for "ticket_lifetime", Hortonwork 2.2 will reset it when keytab file is recreated and its services are restarted.
Key API:
UserGroupInformation.loginUserFromKeytabAndReturnUGI
UserGroupInformation.createProxyUser
Generate RC file from Java code
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
public class RCComplexTypeWriter {
private static class Identity {
String ssn;
String address;
public Identity(String ssn, String address) {
this.ssn = ssn;
this.address = address;
}
}
private static class Person {
String name;
Map<String, List<String>> contactInfo;
List<Identity> parents;
Map<String, Identity> relatives;
Identity identity;
}
public void buildAndSaveFile(Configuration conf, String path) {
StructObjectInspector oi = (StructObjectInspector) ObjectInspectorFactory
.getReflectionObjectInspector(Person.class, ObjectInspectorOptions.JAVA);
String cols = ObjectInspectorUtils.getFieldNames(oi);
Properties props = new Properties();
props.setProperty(serdeConstants.LIST_COLUMNS, cols);
props.setProperty(serdeConstants.LIST_COLUMN_TYPES, ObjectInspectorUtils.getFieldTypes(oi));
ColumnarSerDe serde;
try {
serde = new ColumnarSerDe();
serde.initialize(new Configuration(), props);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}
Person outerStruct = new Person();
outerStruct.name = "Steven Washington";
List array1 = new ArrayList<String>();
array1.add("583-195-1121");
array1.add("583-195-1122");
array1.add("583-195-1123");
array1.add("583-195-1124");
List array2 = new ArrayList<String>();
array2.add("john@yahoooo.com");
array2.add("mary@yahoooo.com");
array2.add("mark@yahoooo.com");
outerStruct.contactInfo = new TreeMap<String, List<String>>();
outerStruct.contactInfo.put("Phone", array1);
outerStruct.contactInfo.put("Email", array2);
Identity is1 = new Identity("383-19-1111", "123 Fremont Blvd, Fremont CA 94555");
Identity is2 = new Identity("383-19-1112", "124 Fremont Blvd, Fremont CA 94555");
outerStruct.parents = new ArrayList<Identity>(2);
outerStruct.parents.add(is1);
outerStruct.parents.add(is2);
outerStruct.relatives = new TreeMap<String, Identity>();
outerStruct.relatives.put(new String("Uncle"), new Identity("383-19-8881", "223 Fremont Blvd, Fremont CA 94555"));
outerStruct.relatives.put(new String("FatherInLaw"), new Identity("383-19-8882", "224 Fremont Blvd, Fremont CA 94555"));
outerStruct.identity = new Identity("383-19-9991", "111 Fremont Blvd, Fremont CA 94555");
try {
BytesRefArrayWritable braw = (BytesRefArrayWritable) serde.serialize(outerStruct, oi);
System.out.println("Row is [" + braw.toString() + "]");
int numColumns = braw.size();
System.out.println("Write file with " + numColumns + " columns...");
conf.set(RCFile.COLUMN_NUMBER_CONF_STR, String.valueOf(numColumns)); final FileSystem fs = FileSystem.get(conf);
RCFile.Writer rcWriter = new RCFile.Writer(fs, conf, new Path(path));
rcWriter.append(braw);
rcWriter.close();
System.out.println("Write " + numColumns + " columns successfullly.");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}
}
}
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
public class RCComplexTypeWriter {
private static class Identity {
String ssn;
String address;
public Identity(String ssn, String address) {
this.ssn = ssn;
this.address = address;
}
}
private static class Person {
String name;
Map<String, List<String>> contactInfo;
List<Identity> parents;
Map<String, Identity> relatives;
Identity identity;
}
public void buildAndSaveFile(Configuration conf, String path) {
StructObjectInspector oi = (StructObjectInspector) ObjectInspectorFactory
.getReflectionObjectInspector(Person.class, ObjectInspectorOptions.JAVA);
String cols = ObjectInspectorUtils.getFieldNames(oi);
Properties props = new Properties();
props.setProperty(serdeConstants.LIST_COLUMNS, cols);
props.setProperty(serdeConstants.LIST_COLUMN_TYPES, ObjectInspectorUtils.getFieldTypes(oi));
ColumnarSerDe serde;
try {
serde = new ColumnarSerDe();
serde.initialize(new Configuration(), props);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}
Person outerStruct = new Person();
outerStruct.name = "Steven Washington";
List array1 = new ArrayList<String>();
array1.add("583-195-1121");
array1.add("583-195-1122");
array1.add("583-195-1123");
array1.add("583-195-1124");
List array2 = new ArrayList<String>();
array2.add("john@yahoooo.com");
array2.add("mary@yahoooo.com");
array2.add("mark@yahoooo.com");
outerStruct.contactInfo = new TreeMap<String, List<String>>();
outerStruct.contactInfo.put("Phone", array1);
outerStruct.contactInfo.put("Email", array2);
Identity is1 = new Identity("383-19-1111", "123 Fremont Blvd, Fremont CA 94555");
Identity is2 = new Identity("383-19-1112", "124 Fremont Blvd, Fremont CA 94555");
outerStruct.parents = new ArrayList<Identity>(2);
outerStruct.parents.add(is1);
outerStruct.parents.add(is2);
outerStruct.relatives = new TreeMap<String, Identity>();
outerStruct.relatives.put(new String("Uncle"), new Identity("383-19-8881", "223 Fremont Blvd, Fremont CA 94555"));
outerStruct.relatives.put(new String("FatherInLaw"), new Identity("383-19-8882", "224 Fremont Blvd, Fremont CA 94555"));
outerStruct.identity = new Identity("383-19-9991", "111 Fremont Blvd, Fremont CA 94555");
try {
BytesRefArrayWritable braw = (BytesRefArrayWritable) serde.serialize(outerStruct, oi);
System.out.println("Row is [" + braw.toString() + "]");
int numColumns = braw.size();
System.out.println("Write file with " + numColumns + " columns...");
conf.set(RCFile.COLUMN_NUMBER_CONF_STR, String.valueOf(numColumns)); final FileSystem fs = FileSystem.get(conf);
RCFile.Writer rcWriter = new RCFile.Writer(fs, conf, new Path(path));
rcWriter.append(braw);
rcWriter.close();
System.out.println("Write " + numColumns + " columns successfullly.");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}
}
}
Friday, December 18, 2015
A simple way for reducer to get how many records are processed in mapreduce job's mapper
JobClient client = new JobClient(jobConf);
RunningJob mapreduceJob = client.getJob(JobID.forName( jobConf.get("mapred.job.id") ));
long mapProcessedRecordCounter = mapreduceJob .getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getCounter();
RunningJob mapreduceJob = client.getJob(JobID.forName( jobConf.get("mapred.job.id") ));
long mapProcessedRecordCounter = mapreduceJob .getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getCounter();
Subscribe to:
Posts (Atom)