- CDH는 5.7.1이 설치되어 있다. CDH의 spark를 사용하여 실행하였다. -
Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: akka.util.Helpers$.ConfigOps(Lcom/typesafe/config/Config;)Lcom/typesafe/config/Config; at akka.cluster.ClusterSettings.(ClusterSettings.scala:28) at akka.cluster.Cluster. (Cluster.scala:67) at akka.cluster.Cluster$.createExtension(Cluster.scala:42) at akka.cluster.Cluster$.createExtension(Cluster.scala:37) at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:654) at akka.actor.ExtensionId$class.apply(Extension.scala:79) at akka.cluster.Cluster$.apply(Cluster.scala:37) at akka.cluster.ClusterActorRefProvider.createRemoteWatcher(ClusterActorRefProvider.scala:66) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:186) at akka.cluster.ClusterActorRefProvider.init(ClusterActorRefProvider.scala:58) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588) at akka.actor.ActorSystem$.apply(ActorSystem.scala:111) at akka.actor.ActorSystem$.apply(ActorSystem.scala:104) at filodb.spark.FiloSetup$class.system(FiloSetup.scala:37) at filodb.spark.FiloDriver$.system$lzycompute(FiloSetup.scala:62) at filodb.spark.FiloDriver$.system(FiloSetup.scala:62) at filodb.coordinator.CoordinatorSetup$class.coordinatorActor(CoordinatorSetup.scala:66) at filodb.spark.FiloDriver$.coordinatorActor$lzycompute(FiloSetup.scala:62) at filodb.spark.FiloDriver$.coordinatorActor(FiloSetup.scala:62) at filodb.spark.FiloDriver$$anonfun$init$1.apply$mcV$sp(FiloSetup.scala:75) at filodb.spark.FiloDriver$$anonfun$init$1.apply(FiloSetup.scala:70) at filodb.spark.FiloDriver$$anonfun$init$1.apply(FiloSetup.scala:70) at scala.Option.getOrElse(Option.scala:120) at filodb.spark.FiloDriver$.init(FiloSetup.scala:70) at filodb.spark.FiloContext$.createOrUpdateDataset$extension(FiloContext.scala:57) at filodb.spark.FiloContext$.saveAsFilo$extension(FiloContext.scala:122) at filodb.spark.DefaultSource.createRelation(DefaultSource.scala:65) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) at com.igloosec.rule.analyzer.RuleAnalyzer$$anonfun$createKafkaStream$4.apply(RuleAnalyzer.scala:69) at com.igloosec.rule.analyzer.RuleAnalyzer$$anonfun$createKafkaStream$4.apply(RuleAnalyzer.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
왜 이런 에러가 발생하는 지에 대해 구글링을 많이 해보았지만 뚜렷한 이유를 찾을 수 없었다.
처음에는 com.typesafe.config 패키지의 문제로 판단하여 내가 작성한 애플리케이션이 사용하는 버전과 CDH 상에서 사용하는 버전이 달라서 그런거라고 생각하여 이 패키지의 버전을 맞춰보려 하였다.
그러나 여전히 위와 같은 문제가 발생하였다.
좀 더 시간이 지나 com.typesafe.config 패키지의 문제가 아니라 akka 버전이 맞지 않아서 발생하는 문제로 파악되었다.
CDH의 spark에서 사용하는 akka 버전과 FiloDB의 akka 버전 중, 두 개 모두에서 사용가능한 akka 버전을 사용해보려고 하였다.
CDH 5.7.1 버전에서는 spark 1.6.0-cdh5.7.1 버전을 사용한다. 이 버전에서 사용하는 akka는 패키지는 다음과 같다.
com.spark-project.akka:akka-core_2.10:2.2.3-shaded-protobuf
com.spark-project.akka:akka-remote_2.10:2.2.3-shaded-protobuf
com.spark-project.akka:akka-slf4j_2.10:2.2.3-shaded-protobuf
FiloDB는 다음 버전의 akka 패키지를 사용한다.
com.typesafe.akka:akka-core_2.10:2.3.15
com.typesafe.akka:akka-remote_2.10:2.3.15
com.typesafe.akka:akka-sfl4j_2.10:2.3.15
com.typesafe.akka:akka-cluster_2.10:2.3.15
우선, CDH에서 사용중인 akka만을 사용하여 애플리케이션 실행 시, ActorSelectMessage라는 class가 없다는 에러가 발생하면서 실행이 중지되었다.
FiloDB에서 사용중인 akka만을 사용하여 실행 시, 처음에서 얘기한 에러가 발생하면서 실행이 되지 않았다.
이후로 com.spark_project.akks:2.3.4_spark 버전, com.typesafe.akka:2.3.11 버전을 사용해 보았지만 두 버전 모두 ActorSelectMessage 클래스를 찾을 수 없다면서 실행되지 않았다.
그래서 maven의 shade 기능을 사용해 FiloDB를 jar로 묶을 때, akka 패키지를 relocation하여 spark에서 참조하는 akka 버전과 FiloDB에서 사용하는 akka 버전을 따로 가져가보려고 하였다.
하지만 이 또한, akka에서 akka 설정파일을 읽어들여 akka를 사용하는 구조라 하나의 jar에 동일한 설정파일을 두 개 두어야 하는 말도 안되는 일을 해주어야 하므로 불가능한 것으로 판단되었다.
여기까지 왔을 때, 사실 상 CDH의 spark에서는 실행이 불가능할 수도 있겠다는 생각에 spark 1.6.0 버전을 다운로드 받아 CDH 장비에 설치하고 이를 이용하여 애플리케이션을 실행하여 보았다.
해당 장비에서 standalone 모드로 실행해보니 프로그램이 정상적으로 실행되었다.
spark 1.6에서는 akka 2.3.11 버전에 의존성이 있어 문제가 되지 않은 것이다.
결론은 김빠지게도 CDH 장비에 따로 spark를 설치하여 사용해야 된다는 것이다.
꽤 며칠동안 헤매던 문제였지만 이렇게 보니 꽤나 단순한 문제였다.
개발자에겐 삽질은 어쩔 수 없는 일인가보다.