2016년 12월 18일 일요일

HBase write 성능 튜닝 - Kafka topic partition 개수를 늘려서

대체적인 구성은 다음과 같이 이전에 잡아놓은 구성과 같다.
1. Flafka를 사용.
2. Kafka partition 개수에 맞게 flume 설정의 kafka sink 개수와 kafka source, hbase sink 개수를 설정.
3. Table을 pre-split 한다. (16개로 pre-split 하였음.)
다른 점은 HBase 설정을 변경한 점도 있지만 그것 외에 다른 점은 다음과 같다.
1. Kafka topic의 partition 개수를 edge node들의 총 디스크 수보다 크게 잡는다.
      - Edge node가 3개에 총 15개의 디스크가 있다. 이에 따라 partition 개수를 20개로 주었다.
      - 디스크 총 개수에 맞춰서 Partition개수를 맞추는 것은 놀고 있는 디스크가 생기기 않게 하기 위함이다.
전에는 최대 write 성능이 worker node를 3대로 하든 6대로 하든 초당 16만~17만 정도였다. (WAL을 끄면 30만)
그러나 위와 같이 kafka partition을 늘려주니 hbase 성능이 초당 30만 건 정도의  write할 수 있었다. 

HBase table pre-split region 개수 구하기

HBase에 write를 하다보면 어떤 region에는 write를 하지 않는 경우가 생기게 된다. 
그게 하나 이상의 노드가 되면 해당 노드들은 작업을 하지 않게 되고 다른 노드에 write가 몰리면서 부하가 생기게 된다.
이처럼 write 작업이 되지 않는 구멍이 안생기도록 하기 위해 hbase table의 region을 미리 분할하는데 이를 pre-split이라고 한다.
그러면 pre-split하기 적절한 region 개수는 어떻게 구해야 할까?
그것을 다음과 같은 방식으로 할 수 있다.
우선 다음과 같은 hbase의 설정 값을 계산식에서 사용한다.
1. Region 서버의 자바 힙 크기 = 32GiB
2. hbase.regionserver.global.memstore.upperLimit =0.4
3. hbase.hregion.memstore.flush.size = 128MiB
위의 1과 2의 값을 곱하면 region 서버의 자바 힙에서 write를 위해 사용하는 힙 크기를 구할 수 있다.
이를 memstore에서 flush 하는 크기로 나누면 region 개수를 구할 수 있다.
( 1 * 2) / 3 = 102.4
따라서 위의 값을 가지고 연산하면 102.4가 나오므로 약 102개의 region으로 나누는게 좋다는 걸 알 수 있다.

2016년 11월 30일 수요일

mybatis-guice에 bonecp 연동

mybatis-guice는 mybatis sql mapper와 google guice를 함께 사용하는 프레임워크다.

제 생각대로 mybatis-guice의 장점을 간단하게 정리하자면 다음과 같다.

mybatis는 알려진 대로 sql문을 xml 파일로 따로 관리할 수 있게 해주는 라이브러리인데 여기에 guice를 사용하여 xml을 사용하지 않고 java코드에서 sql문을 관리할 수 있게 해준다. xml 파일로 관리 하는 것 자체가 짜증스럽고 mybatis로 시작하는 것 자체가 어렵게 느껴진다면 mybatis-guice가 좀 더 간단하고 편안하게 다가올 수 있다.

다음 url에서 mybatis-guice에 대한 더 많은 내용을 확인할 수 있다.
http://www.mybatis.org/guice/


이번 글에서는 이 mybatis-guice에 DB connection pool 라이브러리인 bonecp를 연동하는 방법에 대해 간다하게 알아보겠다. mybatis-guice 홈페이지에 자세한 내용이 없어 여기에 간단하게 남기려고 한다.



위의 코드에서와 같이 간단하게 bonecp를 연동해서 사용할 수 있다.
간단하지만 홈페이지엔 어떻게 하는 건지 잘 나와 있지 않아 쪼~금 애를 먹었다. 암튼 알아두면 용이할 듯.

나머지 mybatis-guice를 사용하는 방법은 홈페이지를 참고하면 된다.

2016년 11월 6일 일요일

삼성전자 spark-cep 테스트

삼성전자 spark-cep란?

삼성전자에서 오픈소스로 만든 spark 기반의 cep 엔진이다. Esper 처럼 쿼리문에 WINDOW, SLIDE 기능이 있어 5분 또는 10분 간격으로 처리 구간을 지정하여 쿼리를 실행할 수 있게 해준다.
Github에 오픈소스로 있으면 다음 url에서 확인할 수 있다.

테스트 목적

삼성의 spark-cep를 테스트한 목적은 현재 이글루의 rule base 엔진을 spark-cep를 사용하여 대체가능한 지에 대해 확인하기 위해서이다.
이를 위해 다음과 같은 기술적인 부분들에 대한 확인이 필요하다.
  1. 실시간으로 다수의 rule을 처리할 수 있는가?
  2. 처리할 수 있다면 그 성능은 빠른가?
  3. Rule의 적용 및 수정이 용이한가?
따라서 이 테스트의 목적은 위 세가지에 대해 기능 및 성능을 확인하여 spark-cep를 활용할 수 있는 지를 판단하기 위함이다.

테스트 구성

spark-cep에서는 레디스를 사용하므로 레디스를 구성해야 했다.
레디스 클러스터를 다음과 같이 노드 sn1, sn2, sn3 세 개에 구성하였다.


그런데 테스트 시에 spark-cep에서 레디스 클러스터에 접속할 수가 없어 확인해보니 spark-cep 자체에서 레디스 클러스터를 지원하지 않게 되어 있었다.
레디스 클러스터 구성을 삭제하고 다음과 같이 하나의 노드만 사용하도록 하였다.


테스트 결과

우선, 위 테스트 목적의 세 가지 항목에 대한 답은 다음과 같다.
  1. 없다. 구조 상 1개만 가능하다. 아니면 하나의 쿼리에 여러가지를 처리할 수 있게 만들어야 한다.
  2. 1에서 처리가 되지 않으므로 성능 테스트를 진행할 수 없었다.
  3. Standard한 sql 문을 사용하므로 적용 및 수정에 용이하다.
위 내용에 대해 몇 가지 덫붙이자면, spark-cep란 오픈소스로는 WINDOW와 SLIDE 쿼리문을 사용하여 시간간격을 지정해서  데이터를 분석할 수 있지만 다수의 쿼리를 처리할 수는 없었다.
그리고 레디스를 활용하여 데이터 분석에 대해 성능적으로 도움을 주는 부분이 있을 거라 생각했지만 해당 부분은 spark의 원래 기능인 DataFrame을 그대로 사용하고 있어 spark의 원기능이 제공해주는 성능에서 크게 벗어나지 않고 있음을 알 수 있었다.
레디스는 분석한 결과를 저장하는 용도로 사용하고 있다.
spark 버전도 1.5 버전을 사용하고 있어 현재 cloudera 장비에 설치된 1.6 버전에서는 구동이 잘 되지 않았다. 그래서 장비에 spark 1.5 버전을 따로 올려서 테스트하였다.
spark-cep 코드를 컴파일할 때에도 1.6 버전을 사용하면 에러가 발생하여 빌드를 할 수가 없었다.
현재 spark가 2.0 버전에서는 structured streaming 기능이 추가되어 spark-cep에서 제공하는 WINDOW, SLIDE 쿼리문 같은 기능을 사용할 수 있게 되었다. 이런 상황에서 spark를 활용한 실시간 분석을 사용한다면 2.0에서 코드를 개발해서 하는 게 현재 spark-cep의 분석처리 성능보다 더 나을 거라고 보인다.
spark-cep도 코드를 마지막으로 커밋한 게 2015년 11월 4일로 1년이 넘었다. 이를 봐서는 여기서 더 발전시켜 나갈 의향이 없어보인다.
또한 머신러닝으로 실시간 이상 현상을 탐지하는 보안 기술을 가진 제품이 나오는 추세에 cep를 사용하는 것 자체가 시대에 뒤떨어지고 기술적, 보안적으로 확장성이 떨어진다고 생각된다. 

2016년 10월 5일 수요일

Hadoop 3.0 새로 추가된 기능들에 대해 살펴보자.

Hadoop 3.0 alpha 버전이 릴리즈되었다.
3.0에 추가된 새로운 기능들과 그 특성에 대해 한번 알아보자.

Minimum required Java version increased from Java 7 to Java 8

모든 Hadoop JAR 파일들이 Java 8으로 컴파일되었다. Java 7을 아직 사용 중이라면 Java 8으로 업그레이드해야 한다.

Support for erasure encoding in HDFS

Erasure encoding은 replication에 비해 storage를 크게 절약하면서도 데이터를 견고하게 저장할 수 있게 한다. 디폴트 3x 의 오버헤드를 가지는 기존 replication 방식에 비해 1.4x 정도로 오버헤드가 줄게 된다.
방식은 RAID에서 사용하는 Erasure encoding 방식과 동일하다. 파일의 데이터 저장은 3개의 디스크에 나누어서 이뤄지고 3개의 디스크에 저장된 bit들의 parity bit를 따로 저장한다. 나중에 3개의 디스크 중 하나에서 장애가 발생하면 나머지 디스크들에 있는 bit들과 parity bit를 사용하여 장애가 발생한 디스크의 bit를 복구하여 사용하는 방식이다.
이 방식을 기존의 서비스를 운영하는 시스템에 적용하려면 기존의 모든 데이터를 Erasure encoding을 적용하여 구성해야 하므로 그에 따른 비용을 고려해서 적용해야할 것으로 보인다. 그럼에도 불구하고 적용을 한 이후에 기존에 사용하던 시스템의 저장공간이 2배로 늘어나게 되므로 하드웨어 비용과 확장성을 봤을 때 정말 매력적인 기능이 되지 않을까 싶다.

YARN Timeline Service v.2

YARN Timeline Service v.2 에 대해 살펴보기 전에 YARN Timeline Server가 어떤 역할을 하는 지에 대해 알아보자.
YARN Timeline Server는 YARN에서 실행되는 application의 현재 정보 및 히스토리 정보를 관리한다.
  • 완료된 애플리케이션들에 대한 일반적인 정보 관리
    • 애플리케이션 레벨의 일반적인 정보로 queue-name, 사용자 정보, 컨테이너 리스트 등등으로 리소스 매니저에 의해 history-store에 저장된다. 그리고 web-UI에서 보여준다.
  • 실행 중이거나 완료된 애플리케이션의 per-framework 정보
    • 애플리케이션 또는 프레임워크에 특정한 정보를 의미한다. Hadoop MapReduce를 예로 들면 map task, reduce task의 개수 등이 이에 해당된다.
    • Timeline Server에 TimelineClient를 사용하여 특정 정보를 올릴 수 있고 REST API를 통해 정보를 쿼리할 수 있다.
YARN Timeline Server가 어떤 역할을 하는 지를 알아보았다. 그러면 v.2에 대해 알아보자.
v.2는 v.1에 비해 scalability와 reliability가 향상되었다. 그리고 flow와 aggregation 정보를 제공함으로써 Usability 를 향상시켰다.

  • Scalability
    • v.1에서는 writer/reader 그리고 storage가 한 개의 인스턴스로 제한되어 작은 클러스터에서 더 크게 확장할 수가 없었다. v.2에서는 확장되고 분산된 writer와 storage를 사용한다.
    • Collector(Writer)와 reader를 분리한다. Collector는 각 YARN 애플리케이션마다 한 개가 할당된다. Reader는 분리된 인스턴스로 REST API를 통해 쿼리처리만 하도록 한다.
    • Backing storage로 HBase가 사용된다. 데이터 read/write는 HBase를 통해 이뤄진다.
Timeline Service v.2 architecture

  • Usability improvements
    • 사용자들은 YARN 애플리케이션의 flows 혹은 YARN 애플리케이션들의 논리적 그룹 수준의 정보를 얻기를 원한다. v.2에서는 이러한 정보들을 제공해준다.
    • 다음 그림은 각 YARN 애플리케이션 간에 관계를 보여준다.
Flow Hierarchy

그러나 아직은 security 처리가 되지 않아 테스트 용도로만 사용하기를 권하고 있다.

Shell script rewrite

Hadoop shell script가 오랫동안 가지고 있던 버그를 수정함과 동시에 새로운 기능들이 추가되었다. 주의할 점은 기존 설치된 쉘 버전과는 호환성이 유지되지 않는다는 점이다.
자세한 사항은 Unix Shell GuideUnix Shell API 문서를 참고하기 바란다.

MapReduce task-level native optimization

Map Collector의 output을 내는 부분을 C/C++로 만들어 JNI를 통해 사용하도록 수정되었다. 본 수정으로 인해 셔플이 많이 일어나는 작업같은 경우는 30% 이상의 성능을 향상시키게 된다.

Support for more than 2 NameNodes.

기존의 HDFS의 고가용성은 하나의 NameNode에 하나의 Standby NameNode를 두고 edits 로그를 세 개의 JournalNode들에 저장하는 방식으로 사용되었다. 
그러나 좀 더 높은 수준의 fault-tolerance가 요구되어지면서 다수의 Standby NameNode를 동시에 운영할 수 새로운 기능이 추가되었다. 예를 들어 3개의 NameNode와 5개의 JournalNode를 구성하면 해당 클러스터는 두 개 노드의 장애에 견딜 수 있게 된다. 
- Active NameNode는 1개이다. -

Default ports of multiple services have been changed.

Hadoop 서비스에서 사용되던 디폴트 포트들이 수정되었다. Linux의 ephemeral port 범위(32768-61000)를 사용할 때, 서비스가 올라오면서 다른 애플리케이션과의 충돌로 포트에 바인드를 못하는 경우가 발생한다. 충돌이 발생하지 않게 디폴트 포트들이 다음과 같이 변경되었다.
  • Namenode ports: 50470 --> 9871, 50070 --> 9870, 8020 --> 9820 
  • Secondary NN ports: 50091 --> 9869, 50090 --> 9868 
  • Datanode ports: 50020 --> 9867, 50010 --> 9866, 50475 --> 9865, 50075 --> 9864
  • KMS: 16000 --> 9600

Support for Microsoft Azure Data Lake filesystem connector

Hadoop-compatible filesystem으로 Microsoft Azure Data Lake를 지원한다. 
Azure Data Lake라는 게 대략 살펴보니 클라우드 스토리지를 제공하면서 데이터 분석을 할 수 있게 해주는 서비스로 보인다. Azure Data Lake를 Hotonworks, Cloudera 플랫폼에서도 호환이 되도록 지원하고 있다.


Intra-datanode balancer

하나의 데이터 노드에서는 여러 개의 디스크를 관리한다. 정상적인 운영에선 디스크들에 동일한 데이터량이 쓰이게 되므로 문제가 없다. 그러나 디스크를 추가하거나 교체하게 되면 해당 데이터 노드에 심각한 데이터 불균형 현상이 발생하게 된다. 이런 현상은 HDFS balancer에서 처리되고 있지 않다. HDFS balancer는 inter- 에는 신경쓰지만 intra- 부분에 신경쓰지 않는다.
위와 같은 문제점을 처리하기 위한 "hdfs diskbalacer"라는 커맨드가 추가되었다. 해당 커맨드의 자셍한 내용은 HDFS Disk Balancer에서 확인할 수 있다.


Reworked daemon and task heap management

하둡 데몬들과 맵리듀스 태스크들을 위한 힙 관리 관련 일련의 수정이 이뤄졌다.
HADOOP-10950 => 데몬 힙 사이즈를 설정하는 새로운 방법을 소개한다. 호스트의 메모리 사이즈에 근거하여 auto-tuning이 가능하다. 그리고 HADOOP_HEAPSIZE는 deprecated 되었다.
MAPREDUCE-5785 => map과 reduce의 힙 사이즈를 설정을 단순화하였다. 따라서 Java option으로 map과 reduce의 힙 사이즈 설정을 해 줄 필요가 없어졌다. 기존에 존재하고 있는 설정은 이 변경사항에 영향을 미치지 않는다.



참고자료

http://hadoop.apache.org/docs/r3.0.0-alpha1/index.html









2016년 8월 30일 화요일

내가 쓰는 git 명령어

git을 사용하다 보면 명령어를 잊어버리게 된다.

그래서 여기에 간략하게 적어두고 필요하면 다시 보고자 한다.


repository에 push 하기 => git push origin master
add된 상태의 파일을 add 취소하기=> git rm --cached filename

Node 제거 후 kudu에서의 leader election 문제 및 해결

Kudu 0.9.1 버전을 사용하였다.
Kudu를 사용 중에 cluster의 노드를 6개에서 3개로 줄이게 되었다.
그 이후로 kudu가 정상동작하지 않아서 로그를 살펴보았더니 계속해서 leader를 선출하려고 시도하고 있었다. 그리고 제거한 다른 3개의 노드에 계속해서 접근 시도를 하였다.
Cloudera community에도 글을 올려서 문제점을 찾으려 노력하였다. (아래 url에서 확인할 수 있다.)

kudu가 정상적으로 동작하지도 않고 서비스를 올려둔 상태로 두면 로그 파일 용량만 급격히 증가하여 문제가 발생하므로 한 동안 서비스를 중지해두었다. 
HBase 문제등 다른 사항들을 먼저 처리하면서 시간이 흘렀다.
우선순위가 높은 문제들을 처리한 뒤, 다시 kudu를 살펴보기 시작하였다. 문득, kudu에 생성한 테이블을 삭제하면 어떨까란 생각이 들었다. 왜냐하면 해당 테이블은 노드가 6개일 때 생성된 테이블이므로 cluster에서 제거된 노드의 정보를 가지고 있어서 문제가 될 수도 있겠구나란 생각이 들었기 때문이다.
테이블을 삭제하였다. (어차피 테스트 데이터라 상관없었음) 그랬더니 leader를 선출하는 시도를 더 이상 하지 않게 되었다.

이러한 내용을 cloudera community에 올리니 답변이 왔다. 답변은 kudu를 지속적으로 서비스하기 위해서는 단순하게 node를 제거해서는 안된다였다. 그냥 node를 제거하면 데이터를 잃거나 서비스가 비정상적으로 될 수 있다고 하였다.
안전하게 node를 제거하기 위해서는 제거하려는  node를 5분 간격으로 shut down하여 완전하게 kudu cluster에서 삭제되도록 하라는 것이었다. 이를 빨리 하려면 --follower_unavailable_considered_failed_sec 값을 master에서 2 또는 3 같은 작은 값으로 설정하여 process 속도를 빠르게 하라는 방법도 알려주었다.
아래 글은 해당 답변 원문이다.
Hi Park, you cannot simply remove half of the nodes and expect Kudu to keep running. It's likely that you will lose data. Imagine you have the following scenario: 
2 tablets (I, II), 6 tablet servers (A, B, C, D, E, F) 
Imagine also you have 3 replicas each. Imagine that, by luck, you have the following replicas hosted on tablet servers: 
tablet I (A, B, C) and tablet II (D, E, F) 
Now, take tablet servers D, E, F offline. There are no copies of tablet II and Kudu will not be able to operate. That table cannot recover. 
Kudu is even more strict than that. It can only operate if a majority of the replicas remain online. One replica is not enough to recover if the replication factor is higher than 1. 
We still need to create safe decommission tools for Kudu. Right now, you will have to shut down one machine every 5 minutes (by default) to get the effect of removing nodes from the cluster permanently in a safe manner. To work around this, you could set --follower_unavailable_considered_failed_sec on the master to a shorter value, say 2 or 3 minutes or something, to speed up the process: http://kudu.apache.org/docs/configuration_reference.html#kudu-master_follower_unavailable_considered...

Hope this helps,
Mike

아직 kudu에서는 노드를 제거하는 부분에서의 처리가 미흡한 부분이 있는 것으로 보인다.

2016년 8월 8일 월요일

HBase write 성능 튜닝

HBase에 데이터 적재를 가능한 많이 할 수 있도록 HBase write 성능 튜닝을 하였다.

튜닝 결과

먼저 튜닝 결과부터 얘기하겠다.
튜닝 결과 3개의 region server 환경 하에서 최대 초당 30만개의 로그를 HBase에  저장할 수 있었다.
이는 120 바이트 크기의 apache 로그일 경우의 얘기이고 1KB 크기의 netscreen 로그의 경우엔 초당 16만~17만 정도였다.
Apache, sniper, netscreen 세 개의 로그를 소스에서 대략 9만~10만 건을 동시에 전송하여 HBase에 저장할 경우에는 평균 초당 27만~28만 정도가 나왔다.
이는 테이블 한 개 또는 그 이상의 테이블에 데이터를 저장하더라도 결국, 저장할 수 있는 데이터의 총량이 한정되어 있다는 얘기가 될 수 있다. (물론 설정을 통해 더 늘려줄 수는 있을 듯 하나, cloudera manager 자체에서는 리소스 설정에 제한이 되어 있어 그 수준을 더 넘겨서 설정하기는 힘들듯)
하지만 이 정도의 성능을 내기 위해서는 WAL 파일을 쓰지 않도록 설정해야 가능하다. WAL 파일을 쓰지 않게 하면 region server에서 region들간에 충돌이 나거나 하는 문제 발생 시, 복구가 정상적으로 이뤄지지 않게 된다.
WAL 파일을 쓰도록 설정할 경우엔 apache로그로 초당 14만~15만 건 정도를 write할 수 있었다.
위와 같은 성능에 따라 실제 서비스에 적용할 때에는 최대 성능보다는 2~3만 건 이하가 서비스에 적당하다고 생각하고 구성해야 한다.
왜냐하면 HBase에 write 시, memstore에서 flush 발생 또는 compaction이 진행되면 그 영향에 따라 write 되는 로그 건수가 줄어들게 되기 때문이다.
물론, flush나 compaction 뒤에 뒤처진만큼 많이 저장하려고 하지만 HBase 자체에서 처리할 수 있는 최대용량을 넘어서면 지연이 발생하게 된다. (노드를 추가하면 처리 용량을 늘릴 수 있음)
가령, WAL 파일을 쓰게 설정한 환경에서 초당 10만 건 이하로 로그를 수집한다면 문제가 되지 않겠지만 HBase 최대 수집 건수인 14만~15만 건을 수집하려면 데이터 수집에 지연이 발생할 수 있게 된다.
이러한 점을 고려하여 운영을 하는 게 중요하다.
결과에 대해 한 가지 덧붙이자면, 여기에 정리한 설정들을 가지고 이만큼의 결과를 냈지만 이게 최선이랄 수는 없다.
설정 값을 조금 바꾸거나 리소스를 좀 더 늘리거나 한다면 좀 더 나은 결과를 얻을 수도 있을 것이다. 따라서 이러한 설정 값으로 이 정도 성능이 나올 수 있다고 참고하는 수준으로 보면 좋을 듯하다.

환경

Kafka cluster 환경은 'Flume과 Kafka를 사용한 초당 100만개 로그 수집 테스트'에서 설명한 바와 같다.
이번에는 HBase에 데이터를 적재하므로 Kafka cluster의 노드와 같은 서버 3대를 region 서버로 가진 환경에서 테스트가 진행되었다.
따라서 3대의 kafka cluster로부터 데이터를 받아서 3대의 region 서버에 데이터를 저장하는 식으로 HBase 튜닝을 진행하였다.

설정

여기서 default 값은 cloudera manager에서 관리하는 default 값을 의미한다.
그리고 이전에 cloudera 엔지니어가 설정한 값도 있으므로 여기에 포함되어 있지 않은 값도 있을 수 있다.
여기서는 HBase 튜닝에 필요했던 설정 값에 대해서만 다루겠다.
HBase 튜닝 시에는 read를 많이 하느냐 또는 write를 많이 하느냐를 따져서 해당 사용용도에 맞게 튜닝을 해야 한다.
여기서는 write가 많으므로 write를 많이 할 수 있는 설정을 하였다.

HDFS 설정


이름
default 값
설정 값
설명
dfs.block.size
128MiB256MiB
HDFS의 블록 크기.
한 개의 WAL 파일이 block 크기로 인해 나눠지지 않게 하기 위함.
dfs.datanode.handler.count364datanode에서 사용되는 thread 수
dfs.namenode.handler.count3064namenode에서 사용되는 thread 수
dfs.datanode.max.xcievers40964096datanode 내/외로 데이터 전송 시 사용되는 최대 thread 수.
dfs.datanode.balance.bandwidthPerSec10MB/s10MB/sdatanode간 load balancing의 속도 제한.

HBase pre-split regions

HBase의 region 서버에 데이터를 저장하는데 저장되는 장소를 region이라고 불린다.
이 region을 rowkey를 분할 할당하여 나누어 주지 않으면 한 두 군데의 region에만 데이터를 쓸려고 하는 문제가 발생되어 성능이 저해되는 문제가 있다.
따라서 pre-splitting을 통해 각 region에서는 해당 rowkey를 가진 데이터만 처리하도록 하게 하여 모든 region에서 골고루 데이터를 처리할 수 있도록 한다.
이 부분은 성능에 크게 영향을 미치므로 꼭 해주어야 한다.
ApacheLog, SniperLog, NetScreenLog 테이블을 다음과 같은 hbase shell 명령을 통해 생성하여 region이 분리된 테이블들을 만들었다.
create 'ApacheLog',{NAME=>'cf',TTL=>432000,COMPRESSION=>'SNAPPY'}, SPLITS=>['b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p']
create 'SniperLog',{NAME=>'cf',TTL=>432000,COMPRESSION=>'SNAPPY'}, SPLITS=>['b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p']
create 'NetScreenLog',{NAME=>'cf',TTL=>432000,COMPRESSION=>'SNAPPY'}, SPLITS=>['b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p']

rowkey를 사전순서대로 파악하여 region이 나누어진다.

분할된 region은 다음과 같은 HBase UI 화면에서 확인할 수 있다.


HBase 설정


Read 성능을 증진 시키기 위해서는 HFile 수가 적을 수록 좋다. (즉, 한 개의 파일에 데이터가 많이 들어가 있을수록 좋다)
Write 성능을 증진시키려면 compaction을 적게 할 수록 좋다.
두 가지 모두에 맞게 튜닝을 할 수 없으므로 read 또는 write에 중점을 맞춰서 튜닝을 해야한다.
여기서는 write에 중점을 두어 튜닝을 하였다.
설정에 들어가기 앞서 아래 그림과 같은 방식으로 write가 진행된다는 것을 이해해야 한다.
그리고 HFile은 compaction을 통해 합쳐서 파일의 수를 줄이게 된다. 따라서 이러한 구조를 이해하고 write 성능을 고려해야한다.
이름
default 값
설정 값
설명
hbase.client.write.buffer2MiB8MiB
클라이언트에서 서버로 데이터 전송을 할 때 사용되는 버퍼.
버퍼 크기를 키워서 rpc 횟수를 줄이도록 한다.
hbase.regionserver.handler.count30100region 서버에서 데이터 처리를 위해 사용되는 thread 수.
hbase.hregion.memstore.flush.size128MiB128Mib
MemStore가 이 크기 이상을 가지면 flush가 진행됨.
default 값이 좋음.
hbase.regionserver.global.memstore.upperLimit0.40.5
region 서버의 힙영역에서의 MemStore의 크기 비율로 이 크기를 넘기면 MemStore에 쓰기를 차단하고 강제로 flush 하게 됨.
크기가 클 수록 write 성능에 좋음.
  • hfile.block.cache.size와 hbase.regionserver.global.memstore.upperLimit의 합이 0.8 (80%)를 넘을 수 없게 되어 있다. 이는 아마 read cache 와 memstore의 크기의 합이 전체 힙영역 중 대부분을 차지해 버리면 HBase의 다른 구성 요소들이 충분한 메모리를 할당받을 수 없기 때문인 듯하다.
hbase.regionserver.global.memstore.lowerLimit0.380.38
MemStore 크기가 lowLimit에 도달하면 soft한 flush가 발생한다.
MemStore에 쓰기를 하면서 flush도 진행함을 의미한다.
예로 MemStore 크기가 힙영역의 38%를 차지하면 soft flush를 발생시킴.
hbase.hregion.memstore.block.multiplier28
MemStore 크기가 'hbase.hregion.block.memstore' 값과 'hbase.hregion.flush.size' 바이트 값을 곱한 값만큼 증가할 경우 쓰기를 차단하고 강제로 flush함.
크기가 클수록 write에 좋음.
hbase.hregion.max.filesize10MiB10MiB
HStoreFile(위 그림에서의 HFile로 생각하면 됨)의 최대 크기입니다. 열 패밀리 HStoreFile 중 하나라도 이 값을 초과하면 호스팅 HRegion이 두 개로 분할된다.
default 값이 좋음.
hbase.hstore.blockingStoreFiles10200
한 HStore에 이 수보다 많은 HStoreFile이 있을 경우, 압축이 완료될 때까지 또는 'hbase.hstore.blockingWaitTime'에 지정된 값이 초과될 때까지 이 HRegion에 대한 업데이트가 차단된다.
값이 클수록 write 성능에 좋음.
hfile.block.cache.size0.40.3
HFile/StoreFile에서 사용하는 블록 캐시에 할당할 최대 힙(-Xmx 설정)의 백분율이다. 해제하려면 이 값을 0으로 설정하면 된다.
write 성능을 향상 시키기 위해 값을 줄이고 hbase.regionserver.global.memstore.upperLimit을 늘려주는 게 좋다.
hbase.hstore.compactionThreshold33
이 HStoreFile 수를 초과하는 HStore가 하나라도 있으면 압축이 실행되어 모든 HStoreFile 파일을 하나의 HStoreFile로 다시 씁니다. 
이 값이 클수록 압축으로 인한 시간 지연이 발생한다.
hbase.hregion.majorcompaction7일7일
주기적으로 하나의 region에서 여러개의 HStoreFile들을 하나의 파일로 압축하는 일을 한다.
되도록이면 서비스 이용 시간이 적은 시간대에 이뤄지는게 좋음.
(본 튜닝 테스트에서는 이 부분에 대해서 고려하지 않았음.)
HBase RegionServer의 Java 힙 크기4GiB32GiBregion 서버에서 사용할 힙 크기.

위와 같은 설정들을 보면 HBase 튜닝하는 게 어려운 일이라는 것을 알 수 있다.
그리고 위 설정들의 튜닝으로 성능을 어느 정도 올린 이후에는 write 성능의 경우, flush와 compaction이 서비스에 영향을 주지 않을 정도로 부드럽게 이뤄지도록 조정해주어야 한다.
가령, 이런 문제가 발생할 수 있다. MemStore 크기가 커져서 flush를 하려는데 지금 존재하고 있는 HStoreFile의 수가 많아(blockingStoreFiles 설정에 따라 flush 막음) flush를 하지 못하게 될 수 있다. 이러면 장애가 발생한다.
또는 flush 크기가 너무 크거나 compaction하는 파일들의 크기가 커서 지연이 발생할 수 있다.
또한 region의 크기가 너무 커지면 자동으로 region을 분할하게 된다. 이렇게 분할되는 region이 많을수록 성능에 문제가 있을 수 있다. 따라서 수집하는 데이터 양이 많다면 hbase.hregion.max.filesize 크기 설정을 적절하게 키워주는 게 좋다.
서비스는 24/7 이뤄져하므로 이러한 일들도 고려해야 한다.

Flume 설정

Flume 구성은 다음 블로그 내용에서와 같이 하였다.

그림으로 보면 아래와 같다.
위 그림에서 HDFS Sink를 HBase Sink를 사용했다는 점이 다르다.

실제 flume 설정 내용은 다음과 같다. (HBase sink 성능을 내기 위해서 WAL 파일을 쓰지 않게 했다.)
설정에서 사용된 KafkaSource는 kafka 파티션 키를 설정하여 해당 파티션으로부터 데이터를 받을 수 있게 수정된 버전이다.
HBase sink에서 사용되는 PreSplittedEventSerializer는 pre-split 된 테이블에 맞게 rowkey 맨 앞에 설정한 값을 넣어서 rowkey를 만들도록 구현하였다.
tier3.sources = apache_src1 apache_src2 apache_src3 apache_src4 apache_src5 apache_src6 apache_src7 apache_src8 apache_src9 apache_src10 sniper_src1 sniper_src2 sniper_src3 sniper_src4 sniper_src5 sniper_src6 sniper_src7 sniper_src8 sniper_src9 sniper_src10 netscreen_src1 netscreen_src2 netscreen_src3 netscreen_src4 netscreen_src5 netscreen_src6 netscreen_src7 netscreen_src8 netscreen_src9 netscreen_src10
tier3.sinks = apache_sink1 apache_sink2 apache_sink3 apache_sink4 sniper_sink1 sniper_sink2 sniper_sink3 sniper_sink4 netscreen_sink1 netscreen_sink2 netscreen_sink3 netscreen_sink4
tier3.channels = apache_ch1 netscreen_ch1 sniper_ch1


# Apache Log Source
tier3.sources.apache_src1.type = com.igloosec.flume.source.kafka.KafkaSource
tier3.sources.apache_src1.channels = apache_ch1
tier3.sources.apache_src1.zookeeperConnect = mn1.igloosecurity.co.kr:2181,mn2.igloosecurity.co.kr:2181,mn3.igloosecurity.co.kr:2181
tier3.sources.apache_src1.topic = ApacheLog
tier3.sources.apache_src1.batchSize = 8000
tier3.sources.apache_src1.key = 0
tier3.sources.apache_src1.groupId = ApacheLogConsumerHDFS
tier3.sources.apache_src1.kafka.socket.receive.buffer.bytes=16777216


......

tier3.sources.apache_src10.type = com.igloosec.flume.source.kafka.KafkaSource
tier3.sources.apache_src10.channels = apache_ch1
tier3.sources.apache_src10.zookeeperConnect = mn1.igloosecurity.co.kr:2181,mn2.igloosecurity.co.kr:2181,mn3.igloosecurity.co.kr:2181
tier3.sources.apache_src10.topic = ApacheLog
tier3.sources.apache_src10.batchSize = 8000
tier3.sources.apache_src10.key = 9
tier3.sources.apache_src10.groupId = ApacheLogConsumerHDFS
tier3.sources.apache_src10.kafka.socket.receive.buffer.bytes=16777216



# Apache Log HBase sink

tier3.sinks.apache_sink1.channel = apache_ch1

tier3.sinks.apache_sink1.type = asynchbase

tier3.sinks.apache_sink1.zookeeperQuorum = mn1.igloosecurity.co.kr:2181,mn2.igloosecurity.co.kr:2181,mn3.igloosecurity.co.kr:2181
tier3.sinks.apache_sink1.table = ApacheLog
tier3.sinks.apache_sink1.columnFamily = cf
tier3.sinks.apache_sink1.serializer = com.igloosec.flume.sink.hbase.PreSplittedEventSerializer
tier3.sinks.apache_sink1.serializer.rowPrefixes=a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p
tier3.sinks.apache_sink1.batchSize=3000
tier3.sinks.apache_sink1.enableWal=false


......

tier3.sinks.apache_sink4.channel = apache_ch1
tier3.sinks.apache_sink4.type = asynchbase
tier3.sinks.apache_sink4.zookeeperQuorum = mn1.igloosecurity.co.kr:2181,mn2.igloosecurity.co.kr:2181,mn3.igloosecurity.co.kr:2181
tier3.sinks.apache_sink4.table = ApacheLog
tier3.sinks.apache_sink4.columnFamily = cf
tier3.sinks.apache_sink4.serializer = com.igloosec.flume.sink.hbase.PreSplittedEventSerializer
tier3.sinks.apache_sink4.serializer.rowPrefixes=a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p
tier3.sinks.apache_sink4.batchSize=3000
tier3.sinks.apache_sink4.enableWal=false



# Use a channel which buffers events in memory

tier3.channels.apache_ch1.type = memory

tier3.channels.apache_ch1.capacity = 100000

tier3.channels.apache_ch1.transactionCapacity = 15000

# Sniper Log Source
tier3.sources.sniper_src1.type = com.igloosec.flume.source.kafka.KafkaSource
tier3.sources.sniper_src1.channels = sniper_ch1
tier3.sources.sniper_src1.zookeeperConnect = mn1.igloosecurity.co.kr:2181,mn2.igloosecurity.co.kr:2181,mn3.igloosecurity.co.kr:2181
tier3.sources.sniper_src1.topic = SniperLog
tier3.sources.sniper_src1.batchSize = 8000
tier3.sources.sniper_src1.key = 0
tier3.sources.sniper_src1.groupId = ApacheLogConsumerHDFS
tier3.sources.sniper_src1.kafka.socket.receive.buffer.bytes=16777216


......

tier3.sources.sniper_src10.type = com.igloosec.flume.source.kafka.KafkaSource
tier3.sources.sniper_src10.channels = sniper_ch1
tier3.sources.sniper_src10.zookeeperConnect = mn1.igloosecurity.co.kr:2181,mn2.igloosecurity.co.kr:2181,mn3.igloosecurity.co.kr:2181
tier3.sources.sniper_src10.topic = SniperLog
tier3.sources.sniper_src10.batchSize = 8000
tier3.sources.sniper_src10.key = 9
tier3.sources.sniper_src10.groupId = ApacheLogConsumerHDFS
tier3.sources.sniper_src10.kafka.socket.receive.buffer.bytes=16777216


# Sniper Log HBase sink
tier3.sinks.sniper_sink1.channel = sniper_ch1

tier3.sinks.sniper_sink1.type = asynchbase
tier3.sinks.sniper_sink1.zookeeperQuorum = mn1.igloosecurity.co.kr:2181,mn2.igloosecurity.co.kr:2181,mn3.igloosecurity.co.kr:2181
tier3.sinks.sniper_sink1.table = SniperLog
tier3.sinks.sniper_sink1.columnFamily = cf
tier3.sinks.sniper_sink1.serializer = com.igloosec.flume.sink.hbase.PreSplittedEventSerializer
tier3.sinks.sniper_sink1.serializer.rowPrefixes=a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p
tier3.sinks.sniper_sink1.batchSize=3000
tier3.sinks.sniper_sink1.enableWal=false


......

tier3.sinks.sniper_sink4.channel = sniper_ch1
tier3.sinks.sniper_sink4.type = asynchbase
tier3.sinks.sniper_sink4.zookeeperQuorum = mn1.igloosecurity.co.kr:2181,mn2.igloosecurity.co.kr:2181,mn3.igloosecurity.co.kr:2181
tier3.sinks.sniper_sink4.table = SniperLog
tier3.sinks.sniper_sink4.columnFamily = cf
tier3.sinks.sniper_sink4.serializer = com.igloosec.flume.sink.hbase.PreSplittedEventSerializer
tier3.sinks.sniper_sink4.serializer.rowPrefixes=a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p
tier3.sinks.sniper_sink4.batchSize=3000
tier3.sinks.sniper_sink4.enableWal=false



# Use a channel which buffers events in memory

tier3.channels.sniper_ch1.type = memory

tier3.channels.sniper_ch1.capacity = 100000

tier3.channels.sniper_ch1.transactionCapacity = 15000



# NetScreen Log Source

tier3.sources.netscreen_src1.type = com.igloosec.flume.source.kafka.KafkaSource

tier3.sources.netscreen_src1.channels = netscreen_ch1

tier3.sources.netscreen_src1.zookeeperConnect = mn1.igloosecurity.co.kr:2181,mn2.igloosecurity.co.kr:2181,mn3.igloosecurity.co.kr:2181
tier3.sources.netscreen_src1.topic = NetScreenLog
tier3.sources.netscreen_src1.batchSize = 8000
tier3.sources.netscreen_src1.key = 0
tier3.sources.netscreen_src1.groupId = ApacheLogConsumerHDFS


tier3.sources.netscreen_src1.kafka.socket.receive.buffer.bytes=16777216


......

tier3.sources.netscreen_src10.type = com.igloosec.flume.source.kafka.KafkaSource
tier3.sources.netscreen_src10.channels = netscreen_ch1
tier3.sources.netscreen_src10.zookeeperConnect = mn1.igloosecurity.co.kr:2181,mn2.igloosecurity.co.kr:2181,mn3.igloosecurity.co.kr:2181
tier3.sources.netscreen_src10.topic = NetScreenLog
tier3.sources.netscreen_src10.batchSize = 8000
tier3.sources.netscreen_src10.key = 9
tier3.sources.netscreen_src10.groupId = ApacheLogConsumerHDFS
tier3.sources.netscreen_src10.kafka.socket.receive.buffer.bytes=16777216



# NetScreen Log HBase sink

tier3.sinks.netscreen_sink1.channel = netscreen_ch1

tier3.sinks.netscreen_sink1.type = asynchbase

tier3.sinks.netscreen_sink1.zookeeperQuorum = mn1.igloosecurity.co.kr:2181,mn2.igloosecurity.co.kr:2181,mn3.igloosecurity.co.kr:2181
tier3.sinks.netscreen_sink1.table = NetScreenLog
tier3.sinks.netscreen_sink1.columnFamily = cf
tier3.sinks.netscreen_sink1.serializer = com.igloosec.flume.sink.hbase.PreSplittedEventSerializer
tier3.sinks.netscreen_sink1.serializer.rowPrefixes=a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p
tier3.sinks.netscreen_sink1.batchSize=3000
tier3.sinks.netscreen_sink1.enableWal=false


......

tier3.sinks.netscreen_sink4.channel = netscreen_ch1
tier3.sinks.netscreen_sink4.type = asynchbase
tier3.sinks.netscreen_sink4.zookeeperQuorum = mn1.igloosecurity.co.kr:2181,mn2.igloosecurity.co.kr:2181,mn3.igloosecurity.co.kr:2181
tier3.sinks.netscreen_sink4.table = NetScreenLog
tier3.sinks.netscreen_sink4.columnFamily = cf
tier3.sinks.netscreen_sink4.serializer = com.igloosec.flume.sink.hbase.PreSplittedEventSerializer
tier3.sinks.netscreen_sink4.serializer.rowPrefixes=a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p
tier3.sinks.netscreen_sink4.batchSize=3000
tier3.sinks.netscreen_sink4.enableWal=false


# Use a channel which buffers events in memory
tier3.channels.netscreen_ch1.type = memory
tier3.channels.netscreen_ch1.capacity = 100000
tier3.channels.netscreen_ch1.transactionCapacity = 15000

참고 자료

튜닝 관련 다음 자료들을 참고하였다.