티스토리 뷰

빅데이터/hive

[hive] UDAF 구현 예제

hs_seo 2017. 3. 27. 15:33

UDAF는 AbstractGenericUDAFResolver를 상속하여 구현한다.

  • Resolver 클래스
    • 전달파라미터를 체크
    • 실제 처리 프로세스 구현체(GenericUDAFEvaluator 구현)를 반환
  • Evaluator 클래스 
    • init(), merge(), terminatePartial() 등의 실제 처리 구현

<Evaluator 클래스 주요 구현>
  • getNewAggregationBuffer() - 집계에 사용할 AggregationBuffer 반환
  • reset - aggregation 이 재사용될 때의 처리
  • init - 입력 받는 아규먼트와 반환값의 타입을 지정
  • iterate - 매퍼가 동작하는 동안 반복하는 작업
  • terminatePartial - 부분적으로 집계작업을 종류할 때 작업
  • merge - 집계작업의 결과를 머지할 때 
  • terminate - 작업이 종료될 때

import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
public class SumInt extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException {
// 파라미터는 하나만 받음
if (info.length != 1) {
throw new UDFArgumentTypeException(info.length - 1, "Exactly one argument is expected.");
}
// 파라미터의 카테고리가 프리미티브 타입이 아니면 예외 처리
if (info[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + info[0].getTypeName() + " was passed as parameter 1.");
}
// 전달된 파라미터의 타입이 스트링이면 SumStringEvaluator, 아니면 SumIntEvaluator 처리
if (((PrimitiveTypeInfo)info[0]).getPrimitiveCategory() == PrimitiveCategory.STRING) {
return new SumStringEvaluator();
} else if (((PrimitiveTypeInfo)info[0]).getPrimitiveCategory() == PrimitiveCategory.INT) {
return new SumIntEvaluator();
} else {
throw new UDFArgumentTypeException(0, "Only string, int type arguments are accepted but " + info[0].getTypeName() + " was passed as parameter 1.");
}
}
/**
* 문자열 int 를 변환하여 sum 하는 Evaluator
*
* @author User
*
*/
public static class SumStringEvaluator extends GenericUDAFEvaluator {
private PrimitiveObjectInspector inputOI;
static class SumAggregationBuffer implements AggregationBuffer {
int sum;
}
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
super.init(m, parameters);
inputOI = (PrimitiveObjectInspector) parameters[0];
return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
}
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
SumAggregationBuffer sum = new SumAggregationBuffer();
reset(sum);
return sum;
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
((SumAggregationBuffer) agg).sum = 0;
}
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
// 전달받은 파라미터가 없거나, null 일경우 처리
if(parameters.length != 0 && inputOI.getPrimitiveJavaObject(parameters[0]) != null) {
((SumAggregationBuffer) agg).sum += Integer.parseInt(inputOI.getPrimitiveJavaObject(parameters[0]).toString());
}
}
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
return ((SumAggregationBuffer) agg).sum;
}
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
((SumAggregationBuffer) agg).sum += Integer.parseInt(inputOI.getPrimitiveJavaObject(partial).toString());
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
return ((SumAggregationBuffer) agg).sum;
}
}
/**
* int 값을 sum 하는 Evaluator
*
* @author User
*
*/
public static class SumIntEvaluator extends GenericUDAFEvaluator {
private IntObjectInspector inputOI;
static class SumAggregationBuffer implements AggregationBuffer {
int sum;
}
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
super.init(m, parameters);
inputOI = (IntObjectInspector) parameters[0];
return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
}
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
SumAggregationBuffer sum = new SumAggregationBuffer();
reset(sum);
return sum;
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
((SumAggregationBuffer) agg).sum = 0;
}
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
((SumAggregationBuffer) agg).sum += inputOI.get(parameters[0]);
}
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
return ((SumAggregationBuffer) agg).sum;
}
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
((SumAggregationBuffer) agg).sum += inputOI.get(partial);
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
return ((SumAggregationBuffer) agg).sum;
}
}
}
view raw SumInt.java hosted with ❤ by GitHub






반응형
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
«   2025/04   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30
글 보관함