2016년 3월 24일 목요일

IntelliJ에서 spark 개발환경 구성 방법

IntelliJ에서 spark 관련 개발환경 구성 방법

목적

IntelliJ에서 spark 관련 코드를 개발하고 빌드할 수 있는 환경을 만들 수 있는 방법을 조사하여 정리한다.

개발환경 구성방법

개발환경을 구성하는 방법에는 다음 세 가지가 있다.
1.     Spark를 빌드한 jar 파일을 사용하는 방법.
2.     SBT를 사용하는 방법
3.     Maven을 사용하는 방법
위 세 가지 방법 중에 1번 방법은 프로젝트를 구성하고 관리하는 방향과는 거리가 있고 spark 빌드할 필요가 현재로서는 없고 부담되는 작업이 될 수 있어 여기서는 다루지 않았다.

SBT (Simple Build Tool)

스칼라로 작성된 스칼라 코드를 빌드할 수 있는 툴이다.
의존성 관리에 Ivy를 사용한다.
SBT와 관련된 정보는 다음 url을 참조할 수 있다.
http://www.scala-sbt.org/index.html
구성 방법은 다음과 같다.
1.     IntelliJ scala SBT 플러그인을 설치한다.
2.     프로젝트 생성 시 SBT를 선택한다.

3.     그 다음 화면에서 SBT 버전과 Scala 버전을 맞추고 프로젝트를 생성한다.

4.     프로젝트가 생성되면 우측하단에 프로그레스바가 보이면서 관련된 작업을 한다. 이때 수분이 걸릴 수 있으니 대기해야 한다.
5.     Spark 코드를 빌드해보기 위해 spark 예제 중, SparkPi를 추가한다. (SparkPi spark를 다운로드 받으면 그 안에 포함되어 있다.)
6.     지금까지 하면 프로젝트는 다음과 같은 구조를 갖는다.

7.     현재 spark를 참조하지 못하므로 코드 상에서 에러가 발생한다.  build.sbt 파일에 spark를 참조할 수 있는 설정을 넣는다. 설정을 넣은 build.sbt 파일의 내용은 다음과 같다.


8.     메뉴 View->Tool Windows->SBT Console 을 실행한다.
9.     SBT Console 화면에서 Start SBT 버튼을 클릭한다.

10.  위와 같은 화면의 커서에서 ‘package’ 명령을 실행한다.

11.  위와 같은 화면이 나오면 package 작업에 성공함을 의미한다. 해당 프로젝트의 target 디렉토리에 가면 생성된 jar 파일을 확인할 수 있다.

Maven

Maven을 사용하여 개발환경 구성하는 방법은 다음과 같다.
1.     프로젝트를 생성 시, Maven을 선택한다. 그리고 scala archetype을 추가한다.

2.     추가 시, 다음과 같이 정보를 입력한다.
A.     GroupId: net.alchim31.maven
B.     ArtifactId: scala-archetype-simple
C.     Version: 1.6
3.     추가한 archetype을 선택하여 프로젝트 생성을 진행한다.
4.     예제로 생성한 프로젝트에 빌드 테스트를 위해 SparkPi.scala 파일을 추가한다.
5.     Pom.xml 파일을 보면 사용하는 scala.version 2.11.5로 들어가 있다. 본인이 작업 시에 설치된 scala 버전은 2.11.7 버전이므로 이 정보를 수정하였다.
6.     Pom.xml  파일에 scala-core의 의존성을 추가한다.
<dependency>
     <groupId>org.apache.spark</groupId>
     <artifactid>spark-core_2.11</artifactId>
     <version>1.6.0</version>
     <scope>test</scope>
</dependency>

   


7.     메뉴 View->Tool Windows->Maven Project를 실행한다.
8.     Maven Project 화면에서 package를 더블클릭하여 실행한다.
9.     실행하면 “bad option: '-make:transitive'”이란 에러가 발생한다. Pom.xml  파일에서 이 부분을 주석처리한다.
10.  다시 한번 package를 실행한다. 그러면 테스트 부분에서 JUnitRunner가 없다는 에러가 발생한다. 그러면 다음 의존성을 추가해준다.
<dependency>
    <groupId>org.specs2</groupId>
    <artifactId>specs2-junit_${scala.compat.version}</artifactId>
    <version>2.4.16</version>
    <scope>test</scopre>
</dependency>



11.   이후에 package를 실행하면 성공함을 확인할 수 있다.

정리

개발환경 구성을 SBT, Maven으로 구성할 수 있는 방법을 알아보았다. 그렇다면 무엇을 쓰는 것이 좋을까?”라는 의문이 생기게 된다. 이와 관련해서 구글링을 통해 확인한 바로는 SBT Maven에 비해 빠르고 효율적이지만(이는 Maven은 매번 새로 컴파일을 하지만 SBT는 컴파일한 작업을 캐시하기 때문) 작고 간단한 프로젝트에 좋다는 얘기가 많다. 소스의 크기가 커지거나 하면 SBT로 관리하기에는 무리가 있다는 얘기이다. 또한 maven은 일반적으로 많이 쓰여서 잘 알고 있는 반면 SBT scala 빌드를 위한 DSL로 만들어진 빌드 툴로 기존에 maven을 알고 있는 사람이라면 새로 학습을 해야 한다.
개인적인 생각으로 차세대TF팀에서는 maven을 사용하기를 바란다. 왜냐하면 본인이 maven을 많이 써봤고 익숙하다. 또한 프로젝트 크기가 커질 경우의 관리에 대해서도 고려해야 한다고 생각하기 때문이다.

이상으로 IntelliJ에서 spark 개발환경 구성에 대한 내용을 마무리한다.

Spark Streaming 코드레벨단에서의 성능개선

Spark Streaming을 작성하여 동작 시, 옵션으로 코어 개수, 익스큐터의 총 개수(yarn 모드로 동작 시), 메모리 사이즈, 각종 데이터에 쓰이는 로컬 디스크 개수 등의 하드웨어 자원등의 할당으로 성능 개선을 할 수 있다.
그러나 코드레벨 단에서 병렬화 처리가 제대로 되지 않아 주어진 자원을 충분히 활용하지 못하게 되는 문제가 발생하게 된다.
위와 같은 문제로 현재까지 팀내에서 경험을 통해 성능을 개선했던 내용에 대해 아래에 정리하였다.

  1. Spark Streaming으로 데이터를 받을 경우에 리시버의 수를 적절히 증가시켜주어야 한다.
    1. Kafka를 사용했을 때를 예로 들어보면 검색등을 통해 쉽게 찾아보면 다음과 같이 DStream을 생성하여 사용할 수 있다.
      val conf = new SparkConf().setAppName("Test").setMaster("local[8]")
      val sc = new SparkContext(conf)
      val ssc = new StreamingContext(sc, Seconds(5))
      val topics = List(("TestTopic", 10)).toMap
      val kafkaDStreams = KafkaUtils.createStream(ssc, "test1.zookeeper.com,test2.zookeeper.com,test3.zookeeper.com", "ConsumerGroupId", topics)
    2. 이 경우에는 DStream을 하나만 사용하게 된다. 그러므로 초당 처리해야할 데이터 유입이 더 많다면 다음과 같이 리시버의 개수를 늘려주어야 한다.
      ...
      val topics = List(("TestTopic", 10)).toMap
      val numInputDStreams = 5
      val kafkaDStreams  = (1 to numInputDStreams).map(_ => KafkaUtils.createStream(ssc, "test1.zookeeper.com,test2.zookeeper.com,test3.zookeeper.com", "ConsumerGroupId", topics))
      val unifiedStream = ssc.union(kafkaDStreams)
    3. 복수개의 DStream을 생성한 후에 union을 하여 하나의 DStream으로 사용할 수 있게 한다. 위와 같이 할 경우 초당 유입할 수 있는 데이터의 수가 크게 증가하게 된다.
    4. 주의할 사항은 예시한 샘플 코드에서는 로컬pc에서 8개의 thread를 사용하도록 되어 있다. 'numInputDStreams' 값이 실행 thread 개수보다 많거나 비슷하게 되어 있으면 RDD가 정상적으로 동작하지 않으므로 테스트 시에 이 점을 주의해야한다.
  2. 병렬화 수준을 높이기 위한 RDD의 파티션의 개수 조정
    1. repartition() 메소드를 사용하여 병렬화 개수가 너무 많거나 적을 경우 조정이 가능하다. repartition() 메소드 사용 시 데이터 셔플이 발생할 수 있다.
    2. 파티션 개수를 줄인다면 coalesce() 메소들 쓸 수 있다. 이 메소드 사용 시엔 데이터 셔플이 발생하지 않는다.
    3. 실제 RDD의 연산에서는 다음과 같은 코드로 간단하게 repartition()을 수행할 수 있다.
      ...
      unifiedStream.repartition(10)
      ...
    4. 실제 저 파티션 개수를 조정하면서 최적의 성능을 적절한 개수를 파악해나가는 게 좋다.


이 외에도 Kyro를 사용한 직렬화라든가 메모리 관리 등등 여러 방법이 있겠으나 실제 직면했던 문제에서 개선해나갔던 내용에 대해서만 남긴다.
결론적으로 어떤 작업을 처리하기 위해 Spark 코드를 작성하였다면 가지고 있는 리소스의 충분한 활용을 위해 옵션을 어떻게 주고 코드레벨에서의 병렬화 처리에 대한 처리 및 튜닝을 해야 한다는 교훈을 얻었다.
그렇다면 어떠한 문제에 대해 Spark를 활용하고자 한다면 다음과 같은 순으로 작업이 진행되어야 한다고 생각한다.
문제 파악 및 해결방안 마련 -> 설계 -> Spark를 활용하여 구현 -> 테스트 -> 성능 튜닝 -> 릴리즈

성능개선에 대해 몇 가지를 얘기했지만 아무래도 가장 중요한 건 Spark의 내부구조 및 동작방식을 제대로 이해할 필요가 있지 않나 싶다.
인터넷에 관련 정보가 많이 있으니 열심히 공부해보자!

클라우데라 블로그 글 정리 - How-to: Build a Complex Event Processing App on Apache Spark and Drools


본 글은 클라우데라 블로그의 글을 정리한 내용이다. 원문은 아래 링크를 따라가면 볼 수 있다.
대략적인 블로그 글의 내용은 CDH를 business execution engine과 결합하여 CEP(Complex Event Processing)로서 사용할 수 있다는 내용으로 이를 무엇을 사용해서 어떻게 구현했다는 내용이다.


  1. CEP에 대해서
    1. Event Processing이란?
      1. 데이터 스트림을 추적 및 분석하여 좀 더 나은 insight와 결정을 할 수 있도록 해주는 것을 의미
    2. CEP란?
      1. Event Processing의 일종으로 여러 소스로부터 얻어낸 데이터를 결합하여 여러 이벤트들간의 패턴 및 복잡한 관계를 찾아내는 것을 의미
      2. CEP는 여러 데이터 소스들로부터 기회와 위협을 확인하고 이를 실시간으로 경고를 할 수 있도록 해준다.
      3. 오늘날 CEP는 매우 다양한 분야에서 사용되고 있다.
        1. 금융: 거래 분석, 금융사기 탐지
        2. 항공: 운항 모니터링
        3. 의료: 클레임 프로세싱, 환자 모니터링
        4. 에너와 통신: 정전 탐지
    3. 빅데이터
      1. 기하급수적인 데이터의 증가로 빅데이터 처리 필요함.
      2. CDH를 사용하여 이를 해결할 수 있다.
  2. Architecture and Design
    1. CEP를 만들기 위해 CDH를 사용한 architecture는 다음과 같다.
    2. 위 그림과 같은 architecture에서 사용된 컴포넌트들은 다음과 같다.
      1. Ingest: 이벤트 수집을 위해  Apache Kafka or Apache Flume 사용.
      2. Storage: 수집한 이벤트 저장에는 Apache HBase(또는 미래에는 아마 Kudu)를 사용.
      3. Alerting: Apache Kafka 또는 다른 직접적인 API를 통합하여 경고를 할 수 있게 함.
      4. Stream processing
        1. spark streaming을 사용하여 event processing을 처리함. 
        2. Processing은 마이크로 배치로 처리되며 다음과 같은 일을 한다.
          1. Parsing
          2. Lookup
          3. Persistence
          4. Building of current state from a series of historical events
          5. Custom processing logic
        3. 일례로 다양한 sliding-window위의 여러 Spark RDD stream을 join하여 실시간에 가깝게 insight와 trend를 얻을 수 있다.
        4. 이 배치 작업은 매번 수초 간격으로 진행되며 수초보다 적은 end-to-end latency가 나게 한다.
      5. Business process management
        1. Rules framework는 technical 또는 non-technical한 사용자들도 복잡한 business logic을 디자인할 수 있게 해준다.
        2. 이 글에서는 Apahce spark와 Drools를 함께 사용하여 business의 요구사항에 대해 평가해본다.
      6. Metrics: OpenTSDB와 같은 time-series 데이터베이스의 대쉬보드를 통해 metrics를 제공한다. 또한 Cloudera Search + HUE를 사용해 같은 기능을 사용할 수 있다.
    3. 예제로 sepsis-shock criteria(패혈증 쇼크 기준)을 Drools를 사용하여 동작시켜보자. 해당 조건들은 이곳에 언급된 내용들을 다룬다.

      1. 위 그림에서 볼 수 있듯이 24시간내에 환자의 두 개의 vital이 범위를 넘어갈 경우 SIRS 기준에 부합하게 된다. 
      2. 이 vital들은 다른 시각에 기준에 도달하게 된다. 따라서 매시간 환자의 상태를 재구성하기 위해 HBase를 사용하여 매시간 vital을 읽고 rules를 적용할 수 있게 한다.
      3. 예제를 위해 모든 event마다 모든 환자들의 vital을 가져온다고 가정하자. 그리고 snapshot/state-building step은 스킵한다.
      4. 환자가 SIRS 기준에 부합하게 되면 즉시 sepsis, severe sepsis, septic shock등을 순서대로 체크해야한다. 이 평가 프로세스의 flow chart는 다음과 같다.
      5. 모든 조건들을 찾아내고 업무자들에게 친숙하게 데모를 만들기 위해 Drools decision tables를 사용한다. 이러한 접근은 business logic을 Java/Scala code 또는 custom syntax로 가지는 것보다 좀 더 많은 관중(business analyst 포함)들에게 가시성을 제공한다.
      6. 다음은 sepsis calculator를 충족시키기 위해 만들어진 decision table이다.
      7. 위 그림에 대하여
        1. 연한 빨강색으로 채워진 모든 셀들은 code로 돌아가도록 링크되어 있다.
        2. 오렌지색으로 채워진 모든 셀들은 rule이 성공적으로 평가된 이후에 정해진 값을 가진다.
        3. 녹색으로 채워진 모든 셀들은 주어진 rule을 만족시키기 위해 들어오는 데이터들의 범위들과 값들을 가진다.
        4. 모든 파란색으로 채워진 모든 셀들은 rule들의 이름과 그 조건들이다.
      8. 다음은 Spark와 Drools의 통합의 몇 가지 목표들이다.
        1. Rule들의 실행을 Spark/Streaming으로부터 seamless하게 만든다.
        2. 단순성을 위해 rule 엔진의 stateless 부분을 사용하라. 일정한 시간 간격을 유지하며 상태를 저장하는 spark의 sliding window를 사용할 수 있다.
        3. Rule을 요구사항에 기반하여 순차적 또는 무작위로 실행하라.
        4. 몇몇 metrics를 계산하기 위해 rule 실행 결과를 spark dataframe에 넣어라.
  3. Coding
    1. 지금부터 위 목표를 달성하기 위한 절차와 코드 스니펫을 보여주겠다. 모든 코드는 https://github.com/mganta/sprue 에서 다운로드 받을 수 있다.
      1. 각 파티션에 session factory를 한번만 초기화한다. 그리고 모든 다음 dstream 실행에서 재사용한다.

        KieContainer kContainer = kieServices.newKieContainer(kieRepository.getDefaultReleaseId());
        kContainer.newStatelessKieSession();


      2. 들어오는 데이터를 HBase에 저장한다.
        //store incoming data in hbase
        hbaseContext.streamBulkPut[Patient](patientStream, patientTable,  HBaseUtil.insertIncomingDataIntoHBase, true)
      3. RDD의 각 이벤트마다 모든 rule들을 실행하고 평가 결과를 RDD와 함께 리턴한다.
        //evaluate all the rules for the batch
            patientStream.foreachRDD(rdd =&gt; {
             val evaluatedPatients = rdd.mapPartitions(partitionOfRecords =&gt; {
                val ksession = KieSessionFactory.getKieSession(xlsFileName)
                val patients = partitionOfRecords.map(patient =&gt; {
                  ksession.execute(patient)
                  patient
                })
                patients
              })
      4. RDD를 dataframe으로 변환하고 몇몇 metrics을 계산한다.
        //convert to dataframe
                val patientdf = sqc.applySchema(evaluatedPatients, classOf[Patient])
                //compute statistics
                val countMatrix = patientdf.groupBy("location").agg(max("evaluationDate"), sum("sirsFlag"), sum("sepsisFlag"), sum("severeSepsisFlag"), sum("septicShockFlag"), sum("organDysfunctionSyndrome"))
               countMatrix.show()
      5. HBase에 업데이트를 저장한다.
        hbaseContext.streamBulkPut[Patient](patientStream, patientTable,  HBaseUtil.insertEvaluatedDataIntoHBase, true
      6. Time-series rest api를 호출하고 마이크로 배치 metrics를 게시한다. Time-series 대쉬보드는 이 데이터를 읽을 수 있다.(여기서  OpenTSDB를 어떻게 설치하는 지를 배울 수 있다.)
        //opentsdb update statistics
        countMatrix.foreach(row =&gt; {
        TSDBUpdater.loadPatientStats(row.getString(0), row.getLong(1), row.getLong(2), row.getLong(3), row.getLong(4), row.getLong(5), row.getLong(6))
          })
      7. 위의 모든 절차들은 여기 spark driver code와 링크되어 있다.
      8. 이 예제는 무작위로 선출된 환자의 데이터 스트림 생성하기 위해 QueueStream을 사용한다. 실제 시나리오에서는 각 이벤트에서 hl7 메시지를 받게 된다.
      9. 예제를 실행하면 들어오는 각 이벤트에서 rule이 실해되는 걸 볼 수 있다. 그리고 각 상태의 그룹화된 metrics을 볼 수 있다. 이는 아래와 같다.
        Total Patients in batch: 212
        Patients with atleast one condition: 137
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |location|MAX(evaluationDate)|SUM(sirsFlag)|SUM(sepsisFlag)|SUM(severeSepsisFlag)|SUM(septicShockFlag)|SUM(organDysfunctionSyndrome)|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |      MS|      1443198199233|            3|              2|                    2|                   0|                            0|
        |      NE|      1443198199233|            8|              4|                    4|                   1|                            0|
        |      TX|      1443198199233|           10|              8|                    8|                   7|                            1|
        |      NM|      1443198199232|            8|              6|                    6|                   3|                            0|
        |      NY|      1443198199233|            6|              4|                    3|                   3|                            0|
        |      OK|      1443198199233|            7|              5|                    3|                   1|                            0|
        |      VA|      1443198199232|           16|             14|                   12|                   7|                            1|
        |      IL|      1443198199233|            5|              3|                    3|                   1|                            0|
        |      CA|      1443198199233|            7|              6|                    6|                   4|                            0|
        |      KS|      1443198199233|           12|              8|                    8|                   6|                            0|
        |      LA|      1443198199233|           14|              8|                    7|                   2|                            1|
        |      SC|      1443198199233|           12|              9|                    6|                   4|                            0|
        |      FL|      1443198199233|            7|              4|                    4|                   2|                            0|
        |      MN|      1443198199233|            8|              5|                    5|                   2|                            0|
        |      GA|      1443198199233|           14|             12|                   12|                   6|                            0|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        Total Patients in batch: 247
        Patients with atleast one condition: 170
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |location|MAX(evaluationDate)|SUM(sirsFlag)|SUM(sepsisFlag)|SUM(severeSepsisFlag)|SUM(septicShockFlag)|SUM(organDysfunctionSyndrome)|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |      MS|      1443198199242|            1|              1|                    1|                   1|                            0|
        |      NE|      1443198199241|           14|             11|                   11|                   5|                            1|
        |      TX|      1443198199242|           13|             10|                    9|                   6|                            0|
        |      NM|      1443198199241|           11|              4|                    3|                   2|                            0|
        |      NY|      1443198199241|           13|              7|                    6|                   4|                            0|
        |      OK|      1443198199241|            7|              5|                    5|                   3|                            0|
        |      VA|      1443198199242|           10|              6|                    6|                   2|                            0|
        |      IL|      1443198199241|           17|             12|                   12|                   5|                            1|
        |      CA|      1443198199241|           17|              8|                    7|                   2|                            0|
        |      KS|      1443198199242|           12|              8|                    7|                   5|                            2|
        |      LA|      1443198199242|           15|             11|                    9|                   4|                            2|
        |      SC|      1443198199241|           13|             10|                    9|                   5|                            0|
        |      FL|      1443198199241|           10|              5|                    5|                   1|                            0|
        |      MN|      1443198199241|           10|              6|                    6|                   3|                            0|
        |      GA|      1443198199241|            7|              2|                    1|                   1|                            0|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
      10.  또는 OpenTSDB를 set up하면 다음과 같은 그림을 볼 수 있다.
  4. 결론
    1. 복잡한 시스템을 설계할 때에는 rule 엔진을 사용하는게 유익한 선택이다.(Logic과 데이터의 분리는 domain expert에게 logic 결정에 대한 insight를 주면서 유연한 시스템을 낳는다.)
    2. 지금까지 본 바와 같이 CDH(for Spark, HBase, Kafka)를 rule엔진과 결합하여 사용하는게 복잡한 business logic을 평가하고 실시간으로 이를 동작시키는데 도움이 되는 걸 알 수 있다.

--------------------------------------------------------------------------------------------------------------------------------------------

위와 같은 내용이 블로그의 내용이다.
한줄로 정리하자면 클라우데라의 CDH 플랫폼 위에 Spark와 Drools를 사용하여 CEP로서의 역할을 훌륭히 해낼 수 있다는 내용이다.