본문 바로가기
빅데이터/hive

[hive] UDF 구현 예제

by hs_seo 2017. 3. 23.
하이브의 UDF는 두가지 방법으로 구현할 수 있다. 
  • UDF 클래스를 상속
    • evaluate() 함수를 구현하여 처리
  • GenericUDF 클래스를 상속
    • initialize(), evaluate(), getDisplayString() 함수를 구현하여 처리

UDF 클래스를 상속하는 방법이 간단하고 처리하기 쉽다.
GenericUDF를 사용하면 다음의 장점이 있다고 한다. 

A Generic User-defined function (GenericUDF) for the use with Hive. 

New GenericUDF classes need to inherit from this GenericUDF class. 

The GenericUDF are superior to normal UDFs in the following ways: 


1. It can accept arguments of complex types, and return complex types. 

2. It can accept variable length of arguments. 

3. It can accept an infinite number of function signature - for example, it's easy to write a GenericUDF that accepts array, array> and so on (arbitrary levels of nesting). 

4. It can do short-circuit evaluations using DeferedObject.


각각의 구현 방법은 다음과 같다. 

UDF 예제 위치 - 바로가기


<UDF 클래스 구현>

  • evaluate() 메소드를 하나에서 여러개 구현 가능
  • 파라미터의 타입에 따라 하이브가 메소드를 선택하여 실행 (메소드 오버로등)

import java.util.Map;


import org.apache.hadoop.hive.ql.exec.UDF;

import org.apache.hadoop.io.Text;


public class SampleUDF extends UDF {


public Text evaluate(Text text) {

// 입력받은 문자를 대문자로 반환

return new Text(text.toString().toUpperCase());

}

public int evaluate(int number) {

// 입력받은 숫자에 1을 더하여 반환

return number + 1;

}

public String evaluate(Map<String, String> map, String key) {

// 입력받은 키의 밸류가 있으면 반환하고, 없으면 None를 반환

return map.containsKey(key) ? map.get(key) : "None";

}

}


<사용방법>

ADD JAR hdfs://localhost:8020/sample.jar;


CREATE TEMPORARY FUNCTION func AS 'sdk.hive.hadoop.SampleUDF';


select func('k');

select func(1);

select func(map) from test;




<GenericUDF 클래스 구현>

  • List 의 문자열의 길이를 보두 더하여 출력하는 샘플

import java.util.List;


import org.apache.hadoop.hive.ql.exec.Description;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;

import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;

import org.apache.hadoop.hive.ql.metadata.HiveException;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;

import org.apache.hadoop.hive.serde2.lazy.LazyString;

import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;

import org.apache.hadoop.io.IntWritable;


@Description(name="sumListStringLength", value="_FUNC_(value) - Returns value that sum list string length.", extended="Example:\n  > SELECT _FUNC_(Array<String>) FROM table LIMIT 1;")

public class ListGenericUDF extends GenericUDF {


ListObjectInspector listOi;

@Override

public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {

// initialize 함수에서는 다음과 같은 역활을 진행

// 입력받은 파라미터에 대한 검증

// 반환하는 파라미터에 대한 검증

// 함수에 입력받는 파라미터 개수 확인

if(arguments.length != 1) 

throw new UDFArgumentLengthException("function argument need 1.");

// 파라미터의 타입 확인

ObjectInspector inspector = arguments[0];

if( !(inspector instanceof ListObjectInspector) )

throw new UDFArgumentException("function argument need List");

listOi = (ListObjectInspector) inspector;

// 입력받는 리스트내 엘리먼트의 객체 타입 확인

if( !(listOi.getListElementObjectInspector() instanceof StringObjectInspector) ) 

throw new UDFArgumentException("array argument need ");

// 반환은 문자열의 수이므로 int 객체 반환

return PrimitiveObjectInspectorFactory.writableIntObjectInspector;

}


@SuppressWarnings("unchecked")

@Override

public Object evaluate(DeferredObject[] arguments) throws HiveException {

// arguments의 객체를 형변환 

List<LazyString> list = (List<LazyString>) listOi.getList(arguments[0].get());

if(list == null)

return null;

int sum = 0;

for(LazyString str : list) {

sum += str.getWritableObject().getLength();

}

return new IntWritable(sum);

}


@Override

public String getDisplayString(String[] children) {

StringBuffer buffer = new StringBuffer();

buffer.append("sumListStringLength(Array<String>), ");

for(String child : children) 

buffer.append(child).append(",");

return buffer.toString();

}


}


<사용방법>

ADD JAR hdfs://localhost:8020/sample.jar;


CREATE TEMPORARY FUNCTION func AS 'sdk.hive.hadoop.ListGenericUDF ';


select func(list) from test;




  • 맵에 키의 데이터가 있으면 반환하고, 없으면 None Data 를 반환

import java.util.Map;

import java.util.TreeMap;


import org.apache.hadoop.hive.ql.exec.UDFArgumentException;

import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;

import org.apache.hadoop.hive.ql.metadata.HiveException;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;

import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;

import org.apache.hadoop.io.Text;


public class MapGenericUDF extends GenericUDF {


MapObjectInspector mapOi;

StringObjectInspector keyOi;

@Override

public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {

// initialize 함수에서는 다음과 같은 역활을 진행

// 입력받은 파라미터에 대한 검증

// 반환하는 파라미터에 대한 검증

// 함수에 입력받는 파라미터 개수 확인

if(arguments.length != 2) 

throw new UDFArgumentLengthException("function argument need 2.");

// 파라미터의 타입 확인

ObjectInspector inspector = arguments[0];

if( !(inspector instanceof MapObjectInspector) )

throw new UDFArgumentException("function argument Map, String");

mapOi = (MapObjectInspector) inspector;

ObjectInspector strIns = arguments[1];

if( !(strIns instanceof StringObjectInspector) )

throw new UDFArgumentException("function argument Map, String");

keyOi = (StringObjectInspector) strIns;

// 맵의 키와 값의 객체 타입이 String 인지 확인 

if( !(mapOi.getMapKeyObjectInspector() instanceof StringObjectInspector) || !(mapOi.getMapValueObjectInspector() instanceof StringObjectInspector) ) 

throw new UDFArgumentException("Map type is Map<String, String>");

// 반환은 문자열의 수이므로 int 객체 반환

return PrimitiveObjectInspectorFactory.javaStringObjectInspector;

}


@SuppressWarnings("unchecked")

@Override

public Object evaluate(DeferredObject[] arguments) throws HiveException {

Map<Object, Object> objMap = (Map<Object, Object>) mapOi.getMap(arguments[0].get());

Map<String, String> map = converMapType(objMap);

Text key = keyOi.getPrimitiveWritableObject(arguments[1].get());


return map.containsKey(key.toString()) ? map.get(key.toString()) : "None Data";

}

/**

* Map의 키 타입이 String이 아니어서 값을 빼낼수 없을 수 있으므로 키, 밸류를 String 으로 변환 

* @param strMap

* @return

*/

public Map<String, String> converMapType(Map<Object, Object> strMap) {

Map<String, String> newMap = new TreeMap<String, String>();

for (Object keyObj : strMap.keySet()) {

newMap.put(keyObj.toString(), strMap.get(keyObj).toString());

}

return newMap;

}


@Override

public String getDisplayString(String[] children) {

return "Map<String, String>";

}


}


<사용방법>

ADD JAR hdfs://localhost:8020/sample.jar;


CREATE TEMPORARY FUNCTION func AS 'sdk.hive.hadoop.MapGenericUDF';


select func(map) from test;


좀더 복잡한 UDF는 하이브에 구현된 소스코드를 보고 참고하여 구현하면 된다. 






반응형