我的习惯是用java写pig udf,为了图方便,我尝试用写udf,发现非常简单,对于测试或者快速验证非常有价值,下面有一个简单的例子。更多的信息,参考官方链接:

下面的例子用来计算一个分布的总熵:

import math
@outputSchema("element:tuple(entropy:double, cid:chararray, total_cnt:long)")
def CalcEntropy(bag):
    max_cnt = 0
    category = ""
    total_cnt = 0
    cnt_list = []
    bad_cnt=0
    for ele in bag:
        cid = ele[1]
        cnt = ele[2]
        try:
            total_cnt += cnt
        except:
            bad_cnt+=1
            print "bad cnt:%d" % (bad_cnt)
            continue
        cnt_list.append(cnt)
        if( max_cnt < cnt):
            max_cnt = cnt
            category = cid
    if total_cnt == 0:
        return (0, category, 0)
    entropy = 0.0
    for a in cnt_list:
        prob = a*1.0/total_cnt
        entropy += prob * math.log(prob, 2)
        entropy = -entropy
    return (entropy, category, total_cnt)

的pig udf就是一个函数,从输入得到输出,非常直接。除此外,还需要定义一个,用@来定义,非常直观,比java版的清晰很多。使用的时候不需要编译,边用边改,非常便利。

在pig中使用的时候可以像这样:

register 'PythonUdf.py' using jython as pyudf;
register 'jython-standalone.jar';
raw_data = load '$input_data_path' using PigStorage('t') 
	as (cid:chararray, ngram:chararray, freq:long);
group_ngram_cid = group raw_data by (ngram, cid);
count_ngram_cid = foreach group_ngram_cid 
    generate flatten(group) as (ngram, cid), 
    SUM(raw_data.freq) as total_cnt;
group_ngram = group count_ngram_cid by (ngram);
entropy_ngram = foreach group_ngram generate 
	group as ngram, 
	flatten(pyudf.CalcEntropy(count_ngram_cid)) as (entropy, cid, total_cnt);
filter_data = filter entropy_ngram by (total_cnt >= 4);
trim_result = foreach filter_data generate ngram, entropy, cid;
	
rmf $output_data_path;
store trim_result into '$output_data_path' using PigStorage('t');

唯一比java版的udf复杂的是必须要注册“-.jar”文件,该文件可以再$/lib/下找到。

我对了解不多。我个人觉得,pig udf方面,相比java,最主要的优势是开发方便,调试方便,所见即所得。对于原型开发和快速计算非常有价值。