Eval

详细点击[官方文档](http://pig.apache.org/docs/r0.16.0/udf.html#eval-functions)

代数函数实现了 这个接口,并提供了处理整个bag的机制。结果通常必须是类型的,例如,代数函数的结果不能是 tuple、bag或者map类型。这个函数的特性是它不能处理未经过排序的整个数据的子集。这些函数基本上最开始在map阶段被调用,然后在阶段被调用,最后在阶段被调用并输出结果。pig中自带的这种类型的函数有COUNT、MIN、MAX和AVG。

public interface Algebraic{
     //在map中使用EvalFunc类
    public String getInitial();
    //在combine中使用EvalFunc类
    public String getIntermed();
    //在reduce中使用EvalFunc类
    public String getFinal();
}

该接口提供了高效利用内存机制处理bag中的数据功能。通常用于执行那些不能用代数形式表示的运算。例如,如果函数需要在中运行,就需要对输入数据进行排序,那么就需要使用函数。数据被分块放入中(通常每个键对应一个或多个块)

public interface Accumulator  {
    //用于传递带有单个字段的数组,一个内部bag,其中包含组元组的子集
    public void accumulate(Tuple b) throws IOException;
    // 一旦所有数组块被传递给累积函数,就调用该方法一次
    public T getValue();
    //调用完getValue()后,处理下一个key时调用该方法
    public void cleanup();
}

该函数一般用于处理由group操作生成的结果。GROUP操作对应每个group字段生成唯一的元组,元组中的第一个字段是group字段的主键,第二个字段是包含与group字段匹配的所有元组的bag。因此,group元组可以很大。

简单的对类的扩展。

例如:

public class UPPER extends EvalFunc {
    public UPPER() {
    }
    public String exec(Tuple input) throws IOException {
        if(input != null && input.size() != 0 && input.get(0) != null) {
            String str = null;
            try {
                str = (String)input.get(0);
                return str.toUpperCase();
            } catch (ClassCastException var4) {
                this.warn("unable to cast input " + input.get(0) + " of class " + input.get(0).getClass() + " to String", PigWarning.UDF_WARNING_1);
                return null;
            } catch (Exception var5) {
                this.warn("Error processing input " + input.get(0), PigWarning.UDF_WARNING_1);
                return null;
            }
        } else {
            return null;
        }
    }
    public Schema outputSchema(Schema input) {
        return new Schema(new FieldSchema(this.getSchemaName(this.getClass().getName().toLowerCase(), input), (byte)55));
    }
    public List getArgToFuncMapping() throws FrontendException {
        ArrayList funcList = new ArrayList();
        funcList.add(new FuncSpec(this.getClass().getName(), new Schema(new FieldSchema((String)null, (byte)55))));
        return funcList;
    }
    public boolean allowCompileTimeCalculation() {
        return true;
    }
}

函数是一类特殊的eval函数,这类函数只返回 值。自定义过滤函数需要继承 类。

public class IsGoodQuality extends FilterFunc {
    @Override
    public Boolean exec(Tuple input) throws IOException {
        if (null != input || input.size() != 0) {
            return false;
        }
        try {
            Object obj = input.get(0);
            if (obj == null) {
                return false;
            }
            int i = (int) obj;
            return i == 0 || i == 1 || i == 4 || i == 5 || i == 9;
        } catch (ExecException e) {
            throw new IOException(e);
        }
    }
    // 设置需求参数类型
    @Override
    public List getArgToFuncMapping() throws FrontendException {
        List funcSpecs = new ArrayList();
        funcSpecs.add(new FuncSpec(this.getClass().getName(),new Schema(new Schema.FieldSchema(null, DataType.INTEGER))));
        return funcSpecs;
    }
}

Load

指明如何将数据从外部加载数据至pig。

简单实现一个load,需要集成类。

public class CutLoadFunc extends LoadFunc {
    //private static final Log
    private List ranges;
    private final TupleFactory factory = TupleFactory.getInstance();
    private RecordReader recordReader;
    public CutLoadFunc(String cutPattern) {
        ranges = Range.parse(cutPattern);
    }
    @Override
    public void setLocation(String location, Job job) throws IOException {
        FileInputFormat.setInputPaths(job,location);
    }
    @Override
    public InputFormat getInputFormat() throws IOException {
        return new TextInputFormat();
    }
    @Override
    public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
        this.recordReader = reader;
    }
    @Override
    public Tuple getNext() throws IOException {
        try{
                if (!recordReader.nextKeyValue()){
                    return null;
                }
            Text value = (Text) recordReader.getCurrentValue();
            String line = value.toString();
            Tuple tuple = factory.newTuple(ranges.size());
            for (int i = 0; i <ranges.size(); i++) {
                Range range = ranges.get(i);
                if (range.getEnd() > line.length()){
                    continue;
                }
                tuple.set(i,new DataByteArray(range.getSubstring(line)));
            }
            return tuple;
        }catch (InterruptedException e){
            throw new ExecException(e);
        }
    }
}

还有四个默认的方法实现:

    // 从分布式缓存中获取hdfs文件
    public List<String> getCacheFiles() 
    //从分布式缓存中获取本地文件
    List<String> getShipFiles()
    //
    public void setUDFContextSignature(String signature)
    //pig运行时调用该方法将相对路径转换为绝对路径
    public String relativeToAbsolutePath(String location, Path curDir)

Store

存储函数指明将数据以何种方式存储,比如。

public class SequenceFileStoreFunc extends StoreFunc {
    protected RecordWriter writer;
    private String compressionType;
    private String compressionCodecClass;
    @Override
    public OutputFormat getOutputFormat() throws IOException {
        return new SequenceFileOutputFormat();
    }
    @Override
    public void setStoreLocation(String s, Job job) throws IOException {
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(DefaultTuple.class);
        if (compressionType != null && compressionCodecClass != null) {
            Class codecClass =
                    FileOutputFormat.getOutputCompressorClass(job,
                            DefaultCodec.class);
            SequenceFileOutputFormat.
                    setOutputCompressorClass(job, codecClass);
            SequenceFileOutputFormat.setOutputCompressionType(job,
                    SequenceFile.CompressionType.valueOf(compressionType));
        }
        FileOutputFormat.setOutputPath(job, new Path(s));
    }
    @Override
    public void prepareToWrite(RecordWriter recordWriter) throws IOException {
        writer = recordWriter;
    }
    @Override
    public void putNext(Tuple tuple) throws IOException {
        System.out.println(tuple.getClass().getName());
        if (tuple != null){
            try {
                Tuple t = new DefaultTuple();
                for(Object val: tuple.getAll()) {
                    t.append(val);
                }
                writer.write(NullWritable.get(), t);
            } catch (InterruptedException e) {
                // Under what circumstances does this happen?
                throw new IOException(e);
            }
        }
    }
}