티스토리 뷰
UDAF는 AbstractGenericUDAFResolver를 상속하여 구현한다.
- Resolver 클래스
- 전달파라미터를 체크
- 실제 처리 프로세스 구현체(GenericUDAFEvaluator 구현)를 반환
- Evaluator 클래스
- init(), merge(), terminatePartial() 등의 실제 처리 구현
<Evaluator 클래스 주요 구현>
- getNewAggregationBuffer() - 집계에 사용할 AggregationBuffer 반환
- reset - aggregation 이 재사용될 때의 처리
- init - 입력 받는 아규먼트와 반환값의 타입을 지정
- iterate - 매퍼가 동작하는 동안 반복하는 작업
- terminatePartial - 부분적으로 집계작업을 종류할 때 작업
- merge - 집계작업의 결과를 머지할 때
- terminate - 작업이 종료될 때
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; | |
import org.apache.hadoop.hive.ql.metadata.HiveException; | |
import org.apache.hadoop.hive.ql.parse.SemanticException; | |
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; | |
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; | |
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; | |
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; | |
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; | |
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; | |
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; | |
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; | |
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; | |
public class SumInt extends AbstractGenericUDAFResolver { | |
@Override | |
public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException { | |
// 파라미터는 하나만 받음 | |
if (info.length != 1) { | |
throw new UDFArgumentTypeException(info.length - 1, "Exactly one argument is expected."); | |
} | |
// 파라미터의 카테고리가 프리미티브 타입이 아니면 예외 처리 | |
if (info[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { | |
throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + info[0].getTypeName() + " was passed as parameter 1."); | |
} | |
// 전달된 파라미터의 타입이 스트링이면 SumStringEvaluator, 아니면 SumIntEvaluator 처리 | |
if (((PrimitiveTypeInfo)info[0]).getPrimitiveCategory() == PrimitiveCategory.STRING) { | |
return new SumStringEvaluator(); | |
} else if (((PrimitiveTypeInfo)info[0]).getPrimitiveCategory() == PrimitiveCategory.INT) { | |
return new SumIntEvaluator(); | |
} else { | |
throw new UDFArgumentTypeException(0, "Only string, int type arguments are accepted but " + info[0].getTypeName() + " was passed as parameter 1."); | |
} | |
} | |
/** | |
* 문자열 int 를 변환하여 sum 하는 Evaluator | |
* | |
* @author User | |
* | |
*/ | |
public static class SumStringEvaluator extends GenericUDAFEvaluator { | |
private PrimitiveObjectInspector inputOI; | |
static class SumAggregationBuffer implements AggregationBuffer { | |
int sum; | |
} | |
@Override | |
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { | |
super.init(m, parameters); | |
inputOI = (PrimitiveObjectInspector) parameters[0]; | |
return PrimitiveObjectInspectorFactory.javaIntObjectInspector; | |
} | |
@Override | |
public AggregationBuffer getNewAggregationBuffer() throws HiveException { | |
SumAggregationBuffer sum = new SumAggregationBuffer(); | |
reset(sum); | |
return sum; | |
} | |
@Override | |
public void reset(AggregationBuffer agg) throws HiveException { | |
((SumAggregationBuffer) agg).sum = 0; | |
} | |
@Override | |
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { | |
// 전달받은 파라미터가 없거나, null 일경우 처리 | |
if(parameters.length != 0 && inputOI.getPrimitiveJavaObject(parameters[0]) != null) { | |
((SumAggregationBuffer) agg).sum += Integer.parseInt(inputOI.getPrimitiveJavaObject(parameters[0]).toString()); | |
} | |
} | |
@Override | |
public Object terminatePartial(AggregationBuffer agg) throws HiveException { | |
return ((SumAggregationBuffer) agg).sum; | |
} | |
@Override | |
public void merge(AggregationBuffer agg, Object partial) throws HiveException { | |
((SumAggregationBuffer) agg).sum += Integer.parseInt(inputOI.getPrimitiveJavaObject(partial).toString()); | |
} | |
@Override | |
public Object terminate(AggregationBuffer agg) throws HiveException { | |
return ((SumAggregationBuffer) agg).sum; | |
} | |
} | |
/** | |
* int 값을 sum 하는 Evaluator | |
* | |
* @author User | |
* | |
*/ | |
public static class SumIntEvaluator extends GenericUDAFEvaluator { | |
private IntObjectInspector inputOI; | |
static class SumAggregationBuffer implements AggregationBuffer { | |
int sum; | |
} | |
@Override | |
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { | |
super.init(m, parameters); | |
inputOI = (IntObjectInspector) parameters[0]; | |
return PrimitiveObjectInspectorFactory.javaIntObjectInspector; | |
} | |
@Override | |
public AggregationBuffer getNewAggregationBuffer() throws HiveException { | |
SumAggregationBuffer sum = new SumAggregationBuffer(); | |
reset(sum); | |
return sum; | |
} | |
@Override | |
public void reset(AggregationBuffer agg) throws HiveException { | |
((SumAggregationBuffer) agg).sum = 0; | |
} | |
@Override | |
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { | |
((SumAggregationBuffer) agg).sum += inputOI.get(parameters[0]); | |
} | |
@Override | |
public Object terminatePartial(AggregationBuffer agg) throws HiveException { | |
return ((SumAggregationBuffer) agg).sum; | |
} | |
@Override | |
public void merge(AggregationBuffer agg, Object partial) throws HiveException { | |
((SumAggregationBuffer) agg).sum += inputOI.get(partial); | |
} | |
@Override | |
public Object terminate(AggregationBuffer agg) throws HiveException { | |
return ((SumAggregationBuffer) agg).sum; | |
} | |
} | |
} |
hive 매뉴얼 UDAF 작성 예제 - https://cwiki.apache.org/confluence/display/Hive/GenericUDAFCaseStudy
UDAF 작성 예제 #1 - http://stackoverflow.com/questions/6445339/collect-set-in-hive-keep-duplicates
UDAF 구현 예제 #2 - https://www.linkedin.com/pulse/hive-functions-udfudaf-udtf-examples-gaurav-singh
반응형
'빅데이터 > hive' 카테고리의 다른 글
[hive] 쿼리를 이용하여 파일시스템에 데이터를 쓰기(INSERT OVERWRITE DIRECTORY) (0) | 2017.04.06 |
---|---|
[hive] 문자열을 맵으로 변화하기 위한 str_to_map() 함수 (0) | 2017.04.05 |
[hive] UDF 구현 예제 (0) | 2017.03.23 |
[hive] 하이브의 UDF, UDAF, UDTF (0) | 2017.03.22 |
[hive] 하이브의 CSV 서데 사용 방법 (0) | 2017.03.08 |
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- bash
- mysql
- oozie
- 알고리즘
- emr
- 하이브
- 하둡
- 백준
- 오류
- Python
- S3
- ubuntu
- error
- hbase
- 파이썬
- build
- nodejs
- 다이나믹
- AWS
- java
- HDFS
- SPARK
- Tez
- 정올
- Linux
- HIVE
- yarn
- SQL
- Hadoop
- airflow
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
글 보관함