2016년 6월 29일 수요일

CDH 5.7.1 환경에서 Spark & FiloDB 애플리케이션 실행 시, NoSuchMethodError: akka.util.Helpers$.ConfigOps(Lcom/typesafe/config/Config;) 에러 문제

Spark와 FiloDB를 함께 사용하여 애플리케이션을 만들어 테스트해보려 Cloudera CDH 장비에 올려서 실행하였지만 다음과 같은 에러가 발생하면서 프로그램이 실행되지 않았다.

- CDH는 5.7.1이 설치되어 있다. CDH의 spark를 사용하여 실행하였다. -

Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: akka.util.Helpers$.ConfigOps(Lcom/typesafe/config/Config;)Lcom/typesafe/config/Config;
 at akka.cluster.ClusterSettings.(ClusterSettings.scala:28)
 at akka.cluster.Cluster.(Cluster.scala:67)
 at akka.cluster.Cluster$.createExtension(Cluster.scala:42)
 at akka.cluster.Cluster$.createExtension(Cluster.scala:37)
 at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:654)
 at akka.actor.ExtensionId$class.apply(Extension.scala:79)
 at akka.cluster.Cluster$.apply(Cluster.scala:37)
 at akka.cluster.ClusterActorRefProvider.createRemoteWatcher(ClusterActorRefProvider.scala:66)
 at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:186)
 at akka.cluster.ClusterActorRefProvider.init(ClusterActorRefProvider.scala:58)
 at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)
 at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)
 at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
 at filodb.spark.FiloSetup$class.system(FiloSetup.scala:37)
 at filodb.spark.FiloDriver$.system$lzycompute(FiloSetup.scala:62)
 at filodb.spark.FiloDriver$.system(FiloSetup.scala:62)
 at filodb.coordinator.CoordinatorSetup$class.coordinatorActor(CoordinatorSetup.scala:66)
 at filodb.spark.FiloDriver$.coordinatorActor$lzycompute(FiloSetup.scala:62)
 at filodb.spark.FiloDriver$.coordinatorActor(FiloSetup.scala:62)
 at filodb.spark.FiloDriver$$anonfun$init$1.apply$mcV$sp(FiloSetup.scala:75)
 at filodb.spark.FiloDriver$$anonfun$init$1.apply(FiloSetup.scala:70)
 at filodb.spark.FiloDriver$$anonfun$init$1.apply(FiloSetup.scala:70)
 at scala.Option.getOrElse(Option.scala:120)
 at filodb.spark.FiloDriver$.init(FiloSetup.scala:70)
 at filodb.spark.FiloContext$.createOrUpdateDataset$extension(FiloContext.scala:57)
 at filodb.spark.FiloContext$.saveAsFilo$extension(FiloContext.scala:122)
 at filodb.spark.DefaultSource.createRelation(DefaultSource.scala:65)
 at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
 at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
 at com.igloosec.rule.analyzer.RuleAnalyzer$$anonfun$createKafkaStream$4.apply(RuleAnalyzer.scala:69)
 at com.igloosec.rule.analyzer.RuleAnalyzer$$anonfun$createKafkaStream$4.apply(RuleAnalyzer.scala:57)
 at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
 at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
 at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
 at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
 at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
 at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
 at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
 at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
 at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
 at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
 at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
 at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)


왜 이런 에러가 발생하는 지에 대해 구글링을 많이 해보았지만 뚜렷한 이유를 찾을 수 없었다.

처음에는 com.typesafe.config 패키지의 문제로 판단하여 내가 작성한 애플리케이션이 사용하는 버전과 CDH 상에서 사용하는 버전이 달라서 그런거라고 생각하여 이 패키지의 버전을 맞춰보려 하였다.

그러나 여전히 위와 같은 문제가 발생하였다.

좀 더 시간이 지나 com.typesafe.config 패키지의 문제가 아니라 akka 버전이 맞지 않아서 발생하는 문제로 파악되었다.

CDH의 spark에서 사용하는 akka 버전과 FiloDB의 akka 버전 중, 두 개 모두에서 사용가능한 akka 버전을 사용해보려고 하였다.

CDH 5.7.1 버전에서는 spark 1.6.0-cdh5.7.1 버전을 사용한다. 이 버전에서 사용하는 akka는 패키지는 다음과 같다.
com.spark-project.akka:akka-core_2.10:2.2.3-shaded-protobuf
com.spark-project.akka:akka-remote_2.10:2.2.3-shaded-protobuf
com.spark-project.akka:akka-slf4j_2.10:2.2.3-shaded-protobuf

FiloDB는 다음 버전의 akka 패키지를 사용한다.
com.typesafe.akka:akka-core_2.10:2.3.15
com.typesafe.akka:akka-remote_2.10:2.3.15
com.typesafe.akka:akka-sfl4j_2.10:2.3.15
com.typesafe.akka:akka-cluster_2.10:2.3.15

우선, CDH에서 사용중인 akka만을 사용하여 애플리케이션 실행 시, ActorSelectMessage라는 class가 없다는 에러가 발생하면서 실행이 중지되었다.

FiloDB에서 사용중인 akka만을 사용하여 실행 시, 처음에서 얘기한 에러가 발생하면서 실행이 되지 않았다.

이후로 com.spark_project.akks:2.3.4_spark 버전, com.typesafe.akka:2.3.11 버전을 사용해 보았지만 두 버전 모두 ActorSelectMessage 클래스를 찾을 수 없다면서 실행되지 않았다.

그래서 maven의 shade 기능을 사용해 FiloDB를 jar로 묶을 때, akka 패키지를 relocation하여 spark에서 참조하는 akka 버전과 FiloDB에서 사용하는 akka 버전을 따로 가져가보려고 하였다.

하지만 이 또한, akka에서 akka 설정파일을 읽어들여 akka를 사용하는 구조라 하나의 jar에 동일한 설정파일을 두 개 두어야 하는 말도 안되는 일을 해주어야 하므로 불가능한 것으로 판단되었다.

여기까지 왔을 때, 사실 상 CDH의 spark에서는 실행이 불가능할 수도 있겠다는 생각에 spark 1.6.0 버전을 다운로드 받아 CDH 장비에 설치하고 이를 이용하여 애플리케이션을 실행하여 보았다.

해당 장비에서 standalone 모드로 실행해보니 프로그램이 정상적으로 실행되었다.

spark 1.6에서는 akka 2.3.11 버전에 의존성이 있어 문제가 되지 않은 것이다.

결론은 김빠지게도 CDH 장비에 따로 spark를 설치하여 사용해야 된다는 것이다.

꽤 며칠동안 헤매던 문제였지만 이렇게 보니 꽤나 단순한 문제였다.

개발자에겐 삽질은 어쩔 수 없는 일인가보다.

2016년 6월 8일 수요일

Spark Streaming & FiloDB Test

FiloDB의 쿼리 처리 성능이 매우 좋다고 하여 Spark Streaming과 함께 사용하여 어느 정도의 성능을 내는 지를 확인해보기로 하였다.
FiloDB는 아래 url에서 다운로드 받아 빌드해서 사용해야 한다.

테스트를 위해 작성한 Spark 애플리케이션은 다음과 같은 방식으로 작업을 처리한다.
  1. Kafka로부터 Apache, NetScreen, Sniper 로그를 받는다.
  2. 로그 데이터를 파싱한다.
  3. 파싱한 데이터를 FiloDB에 저장한다.
  4. FiloDB에 쿼리를 실행하여 결과 값을 보여준다.

실행 쿼리

  1. Apache
    1. FiloDB에 저장된 전체 apache 로그 count
    2. status가 '200'인 로그들에서의 evt_size 필드 값들의 평균
    3. ext2 필드 값이 'GET'인 로그의 count
    4. evt_size 필드 값이 '18000' 보다 작은 로그의 count
    5. timestamp 필드로 group by하여 로그 개수 count
  2. NetScreen
    1. FiloDB에 저장된 전체 netscreen 로그 count
    2. ext4 필드 값이 '1360'인 로그들에서의 duration 필드 값들의 평균
    3. s_port 필드 값이 54690이고 s_info 필드 값이 '130.1.242.138'인 로그들의 count
    4. xstatus 필드 값이 'Permit'이고 duration 필드 값이 '60'을 초과하는 로그들의 count
    5. protocol 필드 값이 '17'이고 duration 필드 값이 '64'와 같은 로그들의 count
  3. Sniper
    1. FiloDB에 저장된 전체 sniper로그 count
    2. method 필드 값이 '(0015)IP Spoofing'인 로그들의 count
    3. d_info 필드 값이 '130.1.2.58'인 로그들의 count
    4. protocol 필드 값이 'tcp'인 로그들의 count
    5. risk 필드 값이 'High'인 로그들의 count

쿼리실행 결과

Spark 애플리케이션을 본인의 노트북에서 "local[6]"으로 실행하여 테스트하였다. (메모리는 따로 설정을 하지 않아 디폴트 1G로 주어졌다.)
쿼리를 실행 할 때 걸리는 시간을 알아보기 위하여 아래와 같이 쿼리 실행 결과 사이사이에 시간을 남겨두었다.

**************************************apache 1: Wed Jun 08 15:01:31 KST 2016
+-----------+
|count(uuid)|
+-----------+
| 2270000|
+-----------+
**************************************apache 2: Wed Jun 08 15:01:32 KST 2016
+----------------+
| avg(evt_size)|
+----------------+
|7809.38996780936|
+----------------+
**************************************apache 3: Wed Jun 08 15:01:32 KST 2016
+-------------+
|count(method)|
+-------------+
| 2270000|
+-------------+
**************************************apache 4: Wed Jun 08 15:01:34 KST 2016
+-------------+
|count(method)|
+-------------+
| 1646548|
+-------------+
**************************************apache 5: Wed Jun 08 15:01:34 KST 2016
+-------------+----------------+
| timestamp|count(timestamp)|
+-------------+----------------+
|1465364070000| 20200|
|1465364086000| 18100|
|1465364114000| 17300|
|1465364135000| 15600|
|1465364171000| 17100|
|1465364151000| 15800|
|1465364178000| 14500|
|1465364134000| 16100|
|1465364066000| 20000|
|1465364155000| 17000|
|1465364150000| 14700|
|1465364071000| 17000|
|1465364107000| 16700|
|1465364130000| 15000|
|1465364091000| 17600|
|1465364087000| 17100|
|1465364062000| 18400|
|1465364063000| 17400|
|1465364143000| 14400|
|1465364147000| 16600|
+-------------+----------------+
only showing top 20 rows
**************************************apache finished: Wed Jun 08 15:01:35 KST 2016
**************************************netscreen 1: Wed Jun 08 15:01:35 KST 2016
+-----------+
|count(uuid)|
+-----------+
| 129900|
+-----------+
**************************************netscreen 2: Wed Jun 08 15:01:36 KST 2016
+------------------+
| avg(duration)|
+------------------+
|32.124710799313384|
+------------------+
**************************************netscreen 3: Wed Jun 08 15:01:36 KST 2016
+--------+
|count(1)|
+--------+
| 0|
+--------+
**************************************netscreen 4: Wed Jun 08 15:01:36 KST 2016
+--------+
|count(1)|
+--------+
| 28420|
+--------+
**************************************netscreen 5: Wed Jun 08 15:01:36 KST 2016
+--------+
|count(1)|
+--------+
| 24372|
+--------+
**************************************netscreen finished: Wed Jun 08 15:01:36 KST 2016
**************************************sniper 1: Wed Jun 08 15:01:36 KST 2016
+-----------+
|count(uuid)|
+-----------+
| 132900|
+-----------+
**************************************sniper 2: Wed Jun 08 15:01:36 KST 2016
+-----------+
|count(risk)|
+-----------+
| 27605|
+-----------+
**************************************sniper 3: Wed Jun 08 15:01:36 KST 2016
+-----------+
|count(risk)|
+-----------+
| 0|
+-----------+
**************************************sniper 4: Wed Jun 08 15:01:36 KST 2016
+-----------+
|count(risk)|
+-----------+
| 36648|
+-----------+
**************************************sniper 5: Wed Jun 08 15:01:36 KST 2016
+-----------+
|count(risk)|
+-----------+
| 66690|
+-----------+
**************************************sniper finished: Wed Jun 08 15:01:36 KST 2016
**************************************apache saved
**************************************netscreen saved
Process finished with exit code -1


위의 결과는 프로그램 마지막에 실행된 쿼리 결과이다. 대체적으로 로그당 5개의 쿼리들 처리되는데 1초가 채 걸리지 않는다.
다만 apache 로그에서 group by를 사용하는 쿼리 포함 2개 정도의 쿼리들의 실행 시간이 1초 정도 걸리는 경우가 좀 빈번하게 발생하였었다.
아무래도 group by 자체가 비용이 큰 쿼리라 좀 더 시간이 걸리는 것으로 보인다.
그러나 대체적으로 쿼리 처리를 빠르게 수행함을 알 수 있었다.
아마도 각각의 쿼리를 실행 및 FiloDB에 데이터를 저장하는 부분을 akka actor로 하여 병렬처리할 수 있게 하고 FiloDB의 튜닝을 통해 더욱 빠르게 처리할 수도 있을 거라고 보여진다.

Spark 애플리케이션 EPS

그리고 spark에서 처리하는  event 개수는 다음과 같다. 대체적으로 초당 2만 EPS 내외를 오간다.

10~20분 정도 실행시켜 보았을 때, 위와 같은 EPS를 처리하면서도 spark 애플리케이션은 안정적으로 실행이 되었었다.

결론 및 향후 진행

위와 같은 테스트 결과를 보았을 때, Spark Streaming과 FiloDB를 함께 사용하였을 때의 성능이 매우 좋다고 볼 수 있다. 
노트북에 올려서 저 정도의 성능을 보인다고 하면 서버에 클러스터 모드로 실행했을 경우의 성능이 더 나아지는 거야 건 불보듯 뻔한 일이다. 
앞으로는 쿼리를 개수를 좀 더 더하고 장비에 올려서 테스트를 해보도록 할려고 한다.
또한 계속적으로 Spark Streaming과 FiloDB를 적절히 사용하여 Rule 분석 엔진을 개발할 수 있는 방법들을 마련해가고자 한다.