Eval
详细点击[官方文档](http://pig.apache.org/docs/r0.16.0/udf.html#eval-functions)
代数函数实现了 这个接口,并提供了处理整个bag的机制。结果通常必须是类型的,例如,代数函数的结果不能是 tuple、bag或者map类型。这个函数的特性是它不能处理未经过排序的整个数据的子集。这些函数基本上最开始在map阶段被调用,然后在阶段被调用,最后在阶段被调用并输出结果。pig中自带的这种类型的函数有COUNT、MIN、MAX和AVG。
public interface Algebraic{
//在map中使用EvalFunc类
public String getInitial();
//在combine中使用EvalFunc类
public String getIntermed();
//在reduce中使用EvalFunc类
public String getFinal();
}
该接口提供了高效利用内存机制处理bag中的数据功能。通常用于执行那些不能用代数形式表示的运算。例如,如果函数需要在中运行,就需要对输入数据进行排序,那么就需要使用函数。数据被分块放入中(通常每个键对应一个或多个块)
public interface Accumulator {
//用于传递带有单个字段的数组,一个内部bag,其中包含组元组的子集
public void accumulate(Tuple b) throws IOException;
// 一旦所有数组块被传递给累积函数,就调用该方法一次
public T getValue();
//调用完getValue()后,处理下一个key时调用该方法
public void cleanup();
}
该函数一般用于处理由group操作生成的结果。GROUP操作对应每个group字段生成唯一的元组,元组中的第一个字段是group字段的主键,第二个字段是包含与group字段匹配的所有元组的bag。因此,group元组可以很大。
简单的对类的扩展。
例如:
public class UPPER extends EvalFunc {
public UPPER() {
}
public String exec(Tuple input) throws IOException {
if(input != null && input.size() != 0 && input.get(0) != null) {
String str = null;
try {
str = (String)input.get(0);
return str.toUpperCase();
} catch (ClassCastException var4) {
this.warn("unable to cast input " + input.get(0) + " of class " + input.get(0).getClass() + " to String", PigWarning.UDF_WARNING_1);
return null;
} catch (Exception var5) {
this.warn("Error processing input " + input.get(0), PigWarning.UDF_WARNING_1);
return null;
}
} else {
return null;
}
}
public Schema outputSchema(Schema input) {
return new Schema(new FieldSchema(this.getSchemaName(this.getClass().getName().toLowerCase(), input), (byte)55));
}
public List getArgToFuncMapping() throws FrontendException {
ArrayList funcList = new ArrayList();
funcList.add(new FuncSpec(this.getClass().getName(), new Schema(new FieldSchema((String)null, (byte)55))));
return funcList;
}
public boolean allowCompileTimeCalculation() {
return true;
}
}
函数是一类特殊的eval函数,这类函数只返回 值。自定义过滤函数需要继承 类。
public class IsGoodQuality extends FilterFunc {
@Override
public Boolean exec(Tuple input) throws IOException {
if (null != input || input.size() != 0) {
return false;
}
try {
Object obj = input.get(0);
if (obj == null) {
return false;
}
int i = (int) obj;
return i == 0 || i == 1 || i == 4 || i == 5 || i == 9;
} catch (ExecException e) {
throw new IOException(e);
}
}
// 设置需求参数类型
@Override
public List getArgToFuncMapping() throws FrontendException {
List funcSpecs = new ArrayList();
funcSpecs.add(new FuncSpec(this.getClass().getName(),new Schema(new Schema.FieldSchema(null, DataType.INTEGER))));
return funcSpecs;
}
}
Load
指明如何将数据从外部加载数据至pig。
简单实现一个load,需要集成类。
public class CutLoadFunc extends LoadFunc {
//private static final Log
private List ranges;
private final TupleFactory factory = TupleFactory.getInstance();
private RecordReader recordReader;
public CutLoadFunc(String cutPattern) {
ranges = Range.parse(cutPattern);
}
@Override
public void setLocation(String location, Job job) throws IOException {
FileInputFormat.setInputPaths(job,location);
}
@Override
public InputFormat getInputFormat() throws IOException {
return new TextInputFormat();
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
this.recordReader = reader;
}
@Override
public Tuple getNext() throws IOException {
try{
if (!recordReader.nextKeyValue()){
return null;
}
Text value = (Text) recordReader.getCurrentValue();
String line = value.toString();
Tuple tuple = factory.newTuple(ranges.size());
for (int i = 0; i <ranges.size(); i++) {
Range range = ranges.get(i);
if (range.getEnd() > line.length()){
continue;
}
tuple.set(i,new DataByteArray(range.getSubstring(line)));
}
return tuple;
}catch (InterruptedException e){
throw new ExecException(e);
}
}
}
还有四个默认的方法实现:
// 从分布式缓存中获取hdfs文件
public List<String> getCacheFiles()
//从分布式缓存中获取本地文件
List<String> getShipFiles()
//
public void setUDFContextSignature(String signature)
//pig运行时调用该方法将相对路径转换为绝对路径
public String relativeToAbsolutePath(String location, Path curDir)
Store
存储函数指明将数据以何种方式存储,比如。
public class SequenceFileStoreFunc extends StoreFunc {
protected RecordWriter writer;
private String compressionType;
private String compressionCodecClass;
@Override
public OutputFormat getOutputFormat() throws IOException {
return new SequenceFileOutputFormat();
}
@Override
public void setStoreLocation(String s, Job job) throws IOException {
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(DefaultTuple.class);
if (compressionType != null && compressionCodecClass != null) {
Class codecClass =
FileOutputFormat.getOutputCompressorClass(job,
DefaultCodec.class);
SequenceFileOutputFormat.
setOutputCompressorClass(job, codecClass);
SequenceFileOutputFormat.setOutputCompressionType(job,
SequenceFile.CompressionType.valueOf(compressionType));
}
FileOutputFormat.setOutputPath(job, new Path(s));
}
@Override
public void prepareToWrite(RecordWriter recordWriter) throws IOException {
writer = recordWriter;
}
@Override
public void putNext(Tuple tuple) throws IOException {
System.out.println(tuple.getClass().getName());
if (tuple != null){
try {
Tuple t = new DefaultTuple();
for(Object val: tuple.getAll()) {
t.append(val);
}
writer.write(NullWritable.get(), t);
} catch (InterruptedException e) {
// Under what circumstances does this happen?
throw new IOException(e);
}
}
}
}
发表回复