我的习惯是用java写pig udf,为了图方便,我尝试用写udf,发现非常简单,对于测试或者快速验证非常有价值,下面有一个简单的例子。更多的信息,参考官方链接:
下面的例子用来计算一个分布的总熵:
import math
@outputSchema("element:tuple(entropy:double, cid:chararray, total_cnt:long)")
def CalcEntropy(bag):
max_cnt = 0
category = ""
total_cnt = 0
cnt_list = []
bad_cnt=0
for ele in bag:
cid = ele[1]
cnt = ele[2]
try:
total_cnt += cnt
except:
bad_cnt+=1
print "bad cnt:%d" % (bad_cnt)
continue
cnt_list.append(cnt)
if( max_cnt < cnt):
max_cnt = cnt
category = cid
if total_cnt == 0:
return (0, category, 0)
entropy = 0.0
for a in cnt_list:
prob = a*1.0/total_cnt
entropy += prob * math.log(prob, 2)
entropy = -entropy
return (entropy, category, total_cnt)
的pig udf就是一个函数,从输入得到输出,非常直接。除此外,还需要定义一个,用@来定义,非常直观,比java版的清晰很多。使用的时候不需要编译,边用边改,非常便利。
在pig中使用的时候可以像这样:
register 'PythonUdf.py' using jython as pyudf;
register 'jython-standalone.jar';
raw_data = load '$input_data_path' using PigStorage('t')
as (cid:chararray, ngram:chararray, freq:long);
group_ngram_cid = group raw_data by (ngram, cid);
count_ngram_cid = foreach group_ngram_cid
generate flatten(group) as (ngram, cid),
SUM(raw_data.freq) as total_cnt;
group_ngram = group count_ngram_cid by (ngram);
entropy_ngram = foreach group_ngram generate
group as ngram,
flatten(pyudf.CalcEntropy(count_ngram_cid)) as (entropy, cid, total_cnt);
filter_data = filter entropy_ngram by (total_cnt >= 4);
trim_result = foreach filter_data generate ngram, entropy, cid;
rmf $output_data_path;
store trim_result into '$output_data_path' using PigStorage('t');
唯一比java版的udf复杂的是必须要注册“-.jar”文件,该文件可以再$/lib/下找到。
我对了解不多。我个人觉得,pig udf方面,相比java,最主要的优势是开发方便,调试方便,所见即所得。对于原型开发和快速计算非常有价值。
发表回复