PIG UDF class
package org.puneetha.pig.udf; import java.io.IOException; import org.apache.log4j.Logger; import org.apache.pig.EvalFunc; import org.apache.pig.data.Tuple; /*** * * * @author Puneetha * */ public final class ConcatStrPig extends EvalFunc { private static final Logger logger = Logger.getLogger(Thread.currentThread().getStackTrace()[0].getClassName()); @Override public String exec(final Tuple input) throws IOException { logger.debug("Tuple=" + input.toString()); String separator = " "; StringBuilder result = new StringBuilder(); if (input == null || input.size() == 0 || input.size() < 2){ result.append("NULL"); return result.toString(); } try{ String param1 = (String)input.get(0); String param2 = (String)input.get(1); result.append(param1.toString()); result.append(separator); result.append(param2.toString()); }catch(Exception e){ throw new IOException("Exception processing input row: ", e); } return result.toString(); } }
Test case – TestNG
package org.puneetha.pig.udf; import org.apache.pig.data.DefaultTuple; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import org.testng.Assert; /*** * * * @author Puneetha * */ public class ConcatStrPigTest { @DataProvider(name = "dataProvider") public static String[][] inputData() { String[][] testStrSet = { {"hello" , "world" , "hello world"}, {"this is an" , "example" , "this is an example!"} }; return testStrSet; } @Test(dataProvider = "dataProvider" ) public void testEvaluate(String param1, String param2, String expectedResultStr) throws Exception { DefaultTuple inputDefaultTuple = new DefaultTuple(); try { inputDefaultTuple = new DefaultTuple(); inputDefaultTuple.append(param1); inputDefaultTuple.append(param2); Assert.assertEquals(expectedResultStr, new ConcatStrPig().exec(inputDefaultTuple)); }catch(Exception e){ e.printStackTrace(); Assert.fail(); } } }
log4j.properties
# Root logger option log4j.rootLogger=DEBUG, stdout # Direct log messages to stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
pom.xml
4.0.0 custom org.puneetha 0.0.1-SNAPSHOT jar pig_udf http://maven.apache.org pig_udf_v1 UTF-8 cdh5.5.2 2.6.0-${cdh.version} 1.1.0-${cdh.version} 0.12.0-${cdh.version} 1.2.17 2.5 1.2.1 6.9.10 4.8.1 log4j log4j ${log4j.version} org.testng testng ${testng.version} org.apache.hadoop hadoop-client ${hadoop.version} org.apache.hive hive-jdbc ${hive.version} org.apache.hive hive-metastore ${hive.version} org.apache.hive hive-service ${hive.version} org.apache.pig pig ${pig.version} org.apache.pig pigunit ${pig.version} org.apache.maven.plugins maven-clean-plugin ${maven_jar_plugin.version} ${project.finalname} org.codehaus.mojo exec-maven-plugin ${codehaus.version} org.apache.maven.plugins maven-jar-plugin ${maven_jar_plugin.version} cloudera-repo http://repository.cloudera.com/artifactory/cloudera-repos/
Using the UDF
REGISTER '/home/localuser/pig_udf_v1.jar'; DEFINE udf_concat org.puneetha.pig.udf.ConcatStrPig(); -- Job name SET job.name 'Test concatenation udf - pig'; A = LOAD '/user/cloudera/wordcount/output/' USING PigStorage('\t') AS (field1:CHARARRAY, field2:CHARARRAY); B = FOREACH A GENERATE , , udf_concat(, ) ; DUMP B