본문 바로가기

빅데이터405

[airflow] 에어플로우 컨셉(airflow concepts) 에어플로우 컨셉 에어플로우 공식 컨셉 문서를 통해 작업을 실행하고, 모니터링 하는 방법을 알아보겠습니다. 핵심 구상 DAG 파이썬으로 정의한 작업의 모음 작업 = Task = Operator 동작의 실행순서, 동작 시간, 공통 파라미터 등을 정의 에어플로우의 DAG_FOLDER에 파이썬 파일을 생성하면 에어플로우가 주기적으로 해당 폴더를 스캔하여 인식함 Scope 에어플로우는 파이썬 파일에 선언된 DAG를 모두 로딩 DAG는 글로벌 영역으로 선언되어야 함 자주 사용하는 패턴등을 SubDagOperator로 구현할 수도 있음 기본 파라미터 default_args는 모든 오퍼레이터에 적용 됨 공통 파라미터를 모든 오퍼레이터에 전달 default_args = { 'start_date': date.. 2020. 8. 2.
[hive] Gzip파일 처리 중 Unexpected end of input stream 오류 해결 방법 하이브에서 Gzip 파일로 작업 할 때 0byte 파일이 존재하면 아래와 같이 Unexpected end of input stream 오류가 발생합니다. Caused by: java.io.EOFException: Unexpected end of input stream at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:165) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.. 2020. 7. 17.
[hive] 맵 조인 처리 기준 사이즈 확인 맵 조인은 작은 크기의 테이블을 메모리에 적재하여 조인을 처리합니다. 이 때 테이블 데이터의 크기는 자바 객체의 사이즈입니다. 실제 파일의 사이즈와 다릅니다. 예를 들어 다음의 테이블 table_orc, table_txt는 동일한 데이터를 보관하고 있는 테이블입니다. 65,454개의 데이터를 저장 형식만 다르게 하여 저장한 것입니다. 맵 조인에는 객체 사이즈(rawDataSize)를 이용하기 때문에 hive.auto.convert.join.noconditionaltask.size를 설정할 때 객체 사이즈 기준으로 설정해야 합니다. 기본 설정이 10MB 일때 table_orc는 셔플 조인으로 처리되고, table_txt는 맵 조인으로 처리됩니다. table_orc를 맵조인으로 처리하기 위해서는 hive.a.. 2020. 6. 11.
[hive] 맵조인과 셔플조인(Map Join vs Shuffle Join) hive의 맵조인과 셔플조인의 차이를 확인해 보겠습니다. 다음의 조인 쿼리에서 table_a는 14.7G이고, table_b는 5KB입니다. 이 테이블을 조인할 때 각 조인에 따른 성능을 확인해 보면 셔플 조인일 때는 리듀서 단계가 추가되고 맵 조인에 비하여 2배의 시간이 더 걸리는 것을 확인할 수 있습니다. # table_a와 table_b를 조인하여 join_test 테이블 생성 # table_a: 14.7 GB # table_b: 5 KB CREATE TABLE join_test AS select a.deviceid, b.cnty_cd from db_a.table_a a, db_b.table_b b where a.date = '20191020' and a.code = b.code_cd ; 작업 시간.. 2020. 6. 9.
[pyspark] pyspark에서 udf를 이용하는 방법 pyspark에서 UDF를 이용하는 방법을 알아보겠습니다. 2020. 6. 8.
[pyspark] pyspark와 Hive 연동 pyspark와 Hive 서버와 연동하는 방법을 알아보겠습니다. 2020. 6. 8.
[pyspark] 데이터프레임 생성 pyspark에서 스파크세션(SparkSession)과 스파크컨텍스트(SparkContext)를 이용해서 데이터프레임을 생성하는 방법을 알아보겠습니다. 2020. 6. 8.
[pyspark] CSV파일로 데이터프레임 생성 pyspark에서 스파크세션을 이용하여 CSV파일을 읽어서 데이터프레임을 생성하는 예제를 알아보겠습니다. ; 2020. 6. 4.
[hadoop] [스크랩] Line에서 하둡 클러스터를 운영하면서 발생한 장애 상황 대응 방법 Line에서 하둡 클러스터를 운영하면서 발생한 장애 상황을 정리한 것입니다. 데이터 엔지니어링 관련 소프트웨어 장애 대응 사례 에서 상세한 내용을 확인할 수 있습니다. 짧게 요약하면 다음과 같습니다. hadoop.registry.rm.enabled=false로 설정 HDFS의 휴지통 설정을 켜놓으면 삭제 데이터가 많을 때는 HDFS에 부담이 될 수 있으므로 삭제 간격을 잘 조절 Zeepline의 버그에 의한 오류가 발생할 수 있으니 버전업, 버그 리포트를 잘 확인 하이브 테이블의 파티션이 많으면 스파크 드라이버가 힘들 수 있으니 파티션을 잘 설정 Apache Hadoop YARN 리소스 매니저 failover 발생 문제와 해결 방안 현상 하둡 클러스터에서 동작하는 애플리케이션의 수가 늘어나면서 리소스 매.. 2020. 6. 4.
[Kafka] 카프카 운영시 주의 사항 스크랩 KAFKA에 대해서 알아보자 카카오는 각 서버가 시스템 적으로 묶여 있어서 장애가 발생하면 동시에 다운되어 카프카를 이용하여 커플링을 줄여줌. 파티션은 8~20개로 설정하고 처리 속도에 따라 적절하게 설정하는 것이 중요함 Kafka 기본 개념잡기 Kafka 운영자가 말하는 처음 접하는 Kafka 파티션 순서에 따른 메시지 순서: 파티션의 개수가 여러개 일때 메시지는 파티션의 여러 위치에 저장되어 구독자가 메시지를 가져올 때 메시지의 발생 순서와 구독자가 받은 메시지의 순서가 꼭 일치하지는 않음. 메시지 처리 순서가 중요한 서비스는 Kafka를 이용하지 않는 것이 좋을 것 같고, 이용해야 한다면 파티션을 1개로 설정하던지 다른 방법을 통해 메시지의 순서를 정렬할 수 있는 방법을 찾아야 함 Kafka 운영자.. 2020. 6. 3.
[flume] HTTP로 플룸 모니터링 설정하는 방법 Flume 에이전트는 HTTP를 이용한 REST API로 플룸 작업 모니터링을 위한 방법을 제공합니다. 기본적으로 JMX, Ganglia, JSON 모니터링과 사용자 커스텀 모니터링 방법을 제공합니다. 여기서는 JSON 모니터링 방법을 알아보겠습니다. HTTP 모니터링 설정 JSON 모니터링은 에이전트를 실행할 때 flume.monitoring.type, flume.monitoring.port를 설정하는 것으로 간단하게 설정할 수 있습니다. 에이전트를 실행할 때 모니터링 옵션을 설정하여 실행하고, 아래와 같이 curl 명령으로 현재 플룸 에이전트의 상태를 간단하게 모니터링 할 수 있습니다. # 모니터링 설정 $ bin/flume-ng agent --conf-file conf/example.conf --n.. 2020. 6. 3.
[flume] ChannelFullException 오류 확인 플룸 메모리 채널을 이용할 때 ChannelFullException이 발생하는 경우가 있습니다. 보통 메모리 채널과 연결된 싱크에서 데이터가 쌓이는 속도보다 빠르게 데이터를 처리하지 못하여 발생합니다. 메모리 채널과 파일 싱크를 연결했을 때 메모리 채널에 데이터가 쌓이는 속도보다 파일을 쓰는 속도가 느릴때 발생할 수 있습니다. Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight at org.apache.flume.channel.MemoryChan.. 2020. 6. 2.
[hive] collect_list()와 같은 UDAF 함수의 GC 오류 해결 방법 하이브 기본 UDAF를 사용하는 중에 Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded 오류가 발생하는 경우가 있습니다. collect_list(), collect_set()을 이용하는 경우 발생할 수 있는데 너무 많은 데이터가 집계되어 JVM의 힙사이즈를 넘어설 때 많이 발생합니다. 이럴 때는 하나의 맵에서 처리하는 데이터를 줄이고, 컨테이너의 메모리를 늘려서 문제를 해결할 수 있습니다. -- 매퍼 개수 조정 set mapreduce.input.fileinputformat.split.maxsize=8000000; set mapreduce.input.fileinputformat.split.minsize=4000000; set tez.gr.. 2020. 6. 1.
[hive] 하이브 매크로(macro) 하이브는 매크로를 이용할 수 있습니다. 생성 기본 문법은 아래와 같습니다. 칼럼 이름을 표현식에 사용할 수 있습니다. 기본 빌트인 함수를 이용하여 매크로를 생성할 수도 있습니다. 매크로는 현재 세션에만 유지됩니다. CREATE TEMPORARY MACRO macro_name([col_name col_type, ...]) expression; 매크로를 생성하는 방법은 아래와 같습니다. CREATE TEMPORARY MACRO fixed_number() 42; CREATE TEMPORARY MACRO string_len_plus_two(x string) length(x) + 2; CREATE TEMPORARY MACRO simple_add (x int, y int) x + y; CREATE TEMPORARY.. 2020. 4. 22.
[빅데이터][스크랩] 네이버의 하둡 클러스터 운영 주의 사항 네이버에서 발표한 자료를 통해 하둡 클러스터를 운영하면서 주의해야 할 사항에 대해서 알아보겠습니다. 네임노드 네임노드는 힙메모리에 HDFS에 존재하는 모든 파일의 메타정보를 저장하고 있습니다. 네임노드 JVM의 힙메모리 사이즈(만 블록당 1G의 메모리가 사용)에 따라 전체 파일, 블록의 개수가 제한됩니다. 파일, 블록이 많이 생성되어 메모리의 한계치에 도달하면 JVM의 힙메모리를 늘려야 합니다. 하지만 메모리 설정을 변경하려면 네임노드를 재기동해야 하고, 네임노드는 재기동 할 때 블록 정보를 재구축하기 위해 fsimage, edits 파일을 읽어서 블록정보를 재구축합니다. 또한 데이터노드로 부터 블록 정보를 받아서 결과를 연동합니다. 이 과정에서 파일 개수가 많을수록 재기동에 걸리는 시간이 길어지고, 이.. 2020. 3. 24.
[oozie] 우지에서 replaceAll 함수를 이용하여 문자열 치환하기 우지에서 기본 EL 함수를 이용하여 문자열을 치환(replace)하는 방법을 알아보겠습니다. 문자열을 치환하는 함수는 replaceAll입니다. 이 함수와 정규식을 이용하여 문자열을 치환할 수 있습니다. replaceAll(String src, String regex, String replacement) 정규식을 이용할 수 있기 때문에 대소문자 구분없이 변경을 위해서 (?i)를 넣어주면 됩니다. mapred.job.queue.name ${replaceAll(param, 'AA', 'aa')} ${replaceAll(param, '(?i)AA', 'aa') eq 'aa') Basic_EL_Constants 2020. 3. 18.
[hive] This command is not allowed on an ACID table default.table_name with a non-ACID transaction manager 오류 해결 방법 하이브 트랜잭션 테이블을 조회할 때 발생하는 오류는 아래 두 가지 입니다. 트랜잭션 테이블은 현재 세션이 트랜잭션 세션이어야 하고, hive.support.concurrency=true일 때만 조회할 수 있습니다. This command is not allowed on an ACID table default.table_name with a non-ACID transaction manager FAILED: RuntimeException [Error 10264]: To use DbTxnManager you must set hive.support.concurrency=true -- 설정을 하지 않은 상태에서 ACID 테이블인 table_name을 조회할 때 오류 발생 hive (default)> select .. 2020. 3. 17.
[빅데이터] 하이브 메타스토어 통합을 지원하는 waggle-dance waggle-dance는 여러 개의 하이브 메타스토어를 통합하여 하나의 메타스토어처럼 처리할 수 있게 해주는 서비스입니다. 아래의 쿼리에서 x, y테이블은 다른 메타스토어에 존재하는데 waggle-dance가 이 메타스토어를 통합하여 하나의 메타스토어에 요청하는 것처럼 처리할 수 있게 도와줍니다. hive> SELECT * FROM local.x, remote.y WHERE x.id = y.id AND x.foo > 1 AND y.bar = 2; 특징 호텔스 닷컴에서 개발하여 오픈 소스로 변경 여러 곳에 배포된 하이브 메타스토어를 통합하여 조회할 수 있는 기능을 제공 다중 하이브 메타스토어 환경에서 통합된 정보를 제공할 수 있는 장점 테이블의 select, join 처리를 위한 통합 엔드 포인트 제공 아.. 2020. 3. 12.
[빅데이터] Apache Livy Apache livy는 REST Aapi를 이용해서 스파크 작업을 요청할 수 있는 서비스입니다. REST Api와 자바, 스칼라 라이브러리를 이용해서 작업을 요청할 수 있습니다. 다음의 특징을 가집니다. 멀티 클라이언트에서 여러 개의 스파크 작업을 요청할 수 있음 작업 간 RDD와 데이터 프레임 공유가 가능 여러 개의 스파크 컨텍스트를 관리할 수 있고, 스파크 컨텍스트는 얀이나 메조스 같은 클러스터에서 실행(Livy 서버에서 실행되지 않음) 스파크 작업은 JAR, 자바/스칼라 API, 코드 조각을 통해 요청 보안 통신을 이용해 안정성 제공 REST API 요청 방법 # POST 방식으로 작업 실행 # curl 옵션 -X: 전송방식, -H: 헤더정보추가 -d: POST 파라미터(json 형식) # file.. 2020. 3. 10.
[빅데이터 아키텍처] 네이버의 빅데이터 플랫폼 네이버는 HBase와 엘라스틱서치 기반으로 빅데이터 플랫폼을 구성하고 있는 것으로 보입니다. 데이터로그(DataLog) 엘라스틱서치 기반 2017년에 구축한 로그 통합 관리 플랫폼 검색 서비스의 모든 로그를 한곳에 모아 효율적인 분석을 위한 환경을 제공 초당 22만건 실시간 색인이 가능 데이터스토어(DataStore) HBase 기반 데이터 카탈로그를 통해 보관된 데이터의 목록, 상세정보, 생산자와 소비자를 한눈에 알 수 있도록 제공 저장된 데이터의 효율적인 활용을 위해 SQL 기반의 처리 시스템을 구축 비슷한 형태의 요청이 많으므로 SQL 템플릿을 제공하여 처리할 수 있도록 지원(Hue) 빠른 처리를 위해 가공테이블을 제공. 자주 사용되는 데이터를 미리 테이블로 분리하여 적재 하이브의 ORC, 파티션,.. 2020. 1. 31.
[빅데이터 아키텍처] 멜론의 빅데이터 플랫폼 멜론은 초기에는 IBM 네티자 데이터웨어하우스를 도입해 데이터 분석을 처리하였으나, 스케일아웃의 어려움과 비용 부담으로 인하여 오픈 소스를 이용한 빅데이터 플랫폼을 자체 운영하는 것으로 결정하였습니다. 멜론은 마우스의 움직임, 검색, 음악 선택, 클릭 패턴 등 이용자들의 행동을 종합적으로 관찰하고 이를 분석하기 위해 노력하고 있습니다. 멜론의 빅데이터 플랫폼 멜론의 빅데이터 처리 순서는 수집/분석/서비스 단계를 따릅니다. 수집 데이터베이스 데이터 스쿱을 이용하여 수집 로그 데이터 플룸을 이용하여 한시간마다 쉘 스크립트(scp)로 수집 허드슨을 이용하여 배치 데이터를 수집 분석 실시간 분석과 배치 분석을 제공 Hive를 이용하여 분석 결과 제공 데이터의 종류에 따라 MR, Mahout, Tajo, Spar.. 2020. 1. 29.
[hive] 구체화 뷰(Materialized View) 하이브 3.0에서 제공하는 구체화 뷰(Materialized Views)에 대해서 알아보겠습니다. 뷰(View)는 논리적인 테이블입니다. 데이터 검색을 위한 구조는 가지고 있지만 실제 데이터는 가지고 있지 않습니다. 구체화 뷰(M-View)는 물리적인 테이블입니다. 구체화 뷰를 생성할 때 데이터를 별도의 저장공간에 저장하여 뷰를 사용할 때 속도를 높일 수 있습니다. 보통 데이터웨어 하우스에서 쿼리의 속도를 높이는데 많이 사용됩니다. 하이브에서 규체화 뷰는 LLAP, Calcite(CBO) 기능과 협력하여 쿼리의 속도를 높이는 데 사용됩니다. 구체화 뷰 생성 구체화 뷰는 생성되는 시점에 테이블의 데이터를 취합하여 데이터를 저장합니다. 이 과정에서 맵리듀스 작업이 발생합니다. 구체화 뷰를 저장하는 기본 서데.. 2020. 1. 21.
[hive] UDF에서 발생하는 argument type mismatch 오류 수정 hive udf에서 java.lang.illegalargumentexception argument type mismatch 오류가 발생하는 경우는 파라미터로 전달하는 타입이 설정과 달라서 발생합니다. 아래와 같은 경우 evaluate UDF의 입력값으로 String 이 전달되어야 하는데 다른 타입이 전달되면 오류가 발생합니다. 일반적인 경우에는 타입이 다르다는 것을 알 수 있지만 함수의 중첩으로 처리하는 경우에는 이 오류를 정확하게 확인하기가 어렵습니다. public class SampleUDF extends UDF { public Text evaluate(String text) { // 입력받은 문자를 대문자로 반환 return new Text(text.toUpperCase()); } } 함수의 중첩 .. 2020. 1. 14.
[spark-sql] tez.lib.uris is not defined 오류 처리 방법 Spark SQL을 이용할 때 tez.lib.uris is not defined in the configuration 오류가 발생하는 경우 hive-site.xml파일에 tez 환경 설정을 넣어주면 됩니다. : org.apache.spark.sql.AnalysisException: java.lang.RuntimeException: org.apache.tez.dag.api.TezUncheckedException: Invalid configuration of tez jars, tez.lib.uris is not defined in the configuration; 설정 추가 tez.lib.uris hdfs:///apps/tez/tez.tar.gz tez.use.cluster.hadoop-libs true 2020. 1. 14.
[hdfs] HDFS WebHDFS REST API 사용법 HDFS는 REST API를 이용하여 파일을 조회하고, 생성, 수정, 삭제하는 기능을 제공합니다. 이 기능을 이용하여 원격지에서 HDFS의 내용에 접근하는 것이 가능합니다. REST API 설정 REST API를 사용하기 위해서는 hdfs-site.xml에 다음의 설정이 되어 있어야 합니다. -- webhdfs 사용여부 설정 dfs.webhdfs.enabled=true; -- webhdfs 포트 설정 dfs.namenode.http-address=0.0.0.0:50070; REST API 사용 방법 위에서 설정한 http 포트로 curl 명령을 이용하여 ls명령을 날리는 예제는 다음과 같습니다. -- /user/hadoop 위치를 조회 $ curl -s http://$(hostname -f):50070/.. 2020. 1. 9.
[hive] 벡터화(vectorized) 처리 하이브 성능 향상의 한 방법인 벡터화(vectorized) 처리는 한 번에 처리하는 데이터의 양을 늘려서 CPU 사용률을 높이고, 처리속도를 빠르게 하는 기법입니다. 검색, 필터, 집계, 조인 처리에서 사용되고, 한 번에 1024개의 행을 동시에 처리하여 속도를 높입니다. 벡터화 설정을 하면 1024행(row)의 블록으로 한번에 작업을 처리합니다. 하나의 블록에서 열(column)은 배열로 처리됩니다. 아래의 클래스와 같이 칼럼이 ColumnVector클래스 배열로 한 번에 읽어서 처리합니다. 조회, 필터링 등에 벡터화를 이용하면 한번에 처리하는 작업이 증가하여 속도가 빨라지게 됩니다. 16억 건의 데이터를 이용해서 count명령을 처리한 결과 벡터화 처리를 하지 않으면 67.6초, 벡터화 처리를 하면 .. 2020. 1. 7.
[hive] 하이브의 조인방식(hive join) 하이브의 세가지 조인 방식에 대해서 알아보겠습니다. 셔플조인 매퍼에서 각 테이블을 읽고 셔플 단계에서 조인되는 키를 기준으로 파티션닝후 셔플을 진행하여 각 리듀서에서 조인을 수행 어떤형태의 데이터 크기와 구성에도 사용 가능 가장 자원을 많이 사용하고 느린 조인 방식 맵조인 (브로드캐스트 조인, 맵사이드 조인) 작은 사이즈의 테이블이 메모리에 올라가고, 각 매퍼에서 조인을 수행후 결과를 반환하는 방식 가장 큰 테이블에서 가장 빠른 단일 스캔 작은 테이블은 메모리에 들어갈 정도로 작아야 함 -- 맵조인 사용여부 설정, 3개 이상의 테이블을 조인할 때 맵조인 사용여부 설정하는 옵션 hive> set hive.auto.convert.join=true; hive> set hive.auto.convert.join... 2020. 1. 6.
[hadoop-hdfs] HDFS 포맷 하는 방법 HDFS를 포맷하는 방법에 대해서 알아보겠습니다. HDFS를 포맷하면 데이터가 모두 사라집니다. 반드시 백업을 해두고 진행하는 것이 좋습니다. 작업 순서 작업 순서는 AWS EMR의 HDFS를 기준으로 작성되었습니다. 각 제조사의 하둡마다 순서가 바뀔수 있지만 전체적인 맥락은 변경되지 않습니다. 서버 종료 네임노드, 데이타노드 종료를 종료합니다. 모든 노드의 네임노드 프로세스와 데이타노드 프로세스를 종료합니다. 네임노드 포맷 포맷 hdfs namenode -format 명령으로 네임노드를 포맷합니다. 네임노드를 포맷하면 dfs.namenode.name.dir경로의 fsimage와 edits 파일이 초기화 됩니다. 네임노드를 포맷하면 클러스터 ID가 신규로 생성됩니다. 이 정보는 dfs.namednoe.n.. 2019. 12. 26.
[hive] Blobstore 기능으로 처리 속도 증가 하이브 2.2.0 버전부터 Blobstore기능을 제공합니다. Blobstore Blobstore는 하이브 작업시에 생성되는 임시 파일을 S3에 작성하지 않고, HDFS에 작성하는 기능을 제공합니다. HDFS가 S3보다 IO속도가 빠르기 때문에 작업의 속도가 빨라지게 됩니다. 해당 기능을 이용하였을 때 1.5배 정도의 속도 증가가 이루어졌습니다. 하이브에서 TEZ로 작업하고 파일 머지까지 발생하는 작업으로 테스트 결과 MR속도와 파일 머지 속도가 빨라서 전체 작업시간이 다음과 같이 HDFS를 사용하는 경우가 1.5배 빠르게 나왔습니다. 하지만 HDFS를 사용하는 경우 임시파일의 저장으로 인한 작업 공간의 사용, 네임노드 관리로 인한 과부하 등의 오버헤드가 발생하기 때문에 작업의 형태에 따라 적절한 선택과.. 2019. 12. 19.
[hadoop] hadoop fs 명령의 OutOfMemory 오류 수정 hadoop fs 명령에서 OutOfMemory 오류가 발생하는 경우가 있습니다. 아래와 같이 디렉토리의 정보를 가져올 때 오류가 발생합니다. $ hadoop fs -ls /app/logs # # java.lang.OutOfMemoryError: GC overhead limit exceeded # -XX:OnOutOfMemoryError="kill -9 %p" # Executing /bin/sh c "kill -9 1234"... KILLED 원인 및 해결방법 주로 파일 개수가 많을 때 발생합니다. 이런 경우 하둡 클라이언트의 메모리를 늘려주면 됩니다. 아래와 같이 입력하여 메모리 설정을 늘려주고 fs 명령을 입력하면 됩니다. export HADOOP_CLIENT_OPTS="-Xmx2048m" 2019. 12. 17.