Apache Spark: Uncaught exception in thread driver-heartbeater

I wrote below simple spark program, using spark's StreamingContext and SQLContext.

Note: The issue is reproducible, even without the streamingContext. Updated the program to reflect the same.

Note: Downgrading the spark version to 1.4.1 (I was using 1.5.2) seems to have fixed the issue for me. With spark also 1.5.1 the issue us reproducible.

def main(args: Array[String]) {
    val sc = new SparkContext("local[*]", "test")
    val sqc = new SQLContext(sc)

    val dataFrame = sqc.read.json(sc.textFile("<dir>"))
    println(dataFrame.groupBy("Product.SerialNumber").count().count())
    sc.stop()
}

This is giving below exception at the beginning but execution is proceeding and printing result.

15/11/25 15:48:55 ERROR Utils: Uncaught exception in thread driver-heartbeater
java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
        at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at org.apache.spark.util.Utils$.deserialize(Utils.scala:91)
        at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:440)
        at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:430)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:430)
        at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:428)
        at scala.collection.Iterator$class.foreach(Iterator.scala:742)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:428)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:472)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:472)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
        at org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
        ... 33 more 

After 2 minutes, below exception happens and the execution is terminated. Till two minute the execution happens flawlessly and no issue/exception is reported.

15/11/25 15:51:44 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 179219 ms exceeds timeout 120000 ms^M
15/11/25 15:51:44 ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 179219 ms^M
15/11/25 15:51:44 WARN TaskSetManager: Lost task 4.0 in stage 193.0 (TID 7688, localhost): ExecutorLostFailure (executor driver lost)^M
15/11/25 15:51:44 ERROR TaskSetManager: Task 4 in stage 193.0 failed 1 times; aborting job^M
15/11/25 15:51:44 WARN TaskSetManager: Lost task 7.0 in stage 193.0 (TID 7691, localhost): ExecutorLostFailure (executor driver lost)^M
15/11/25 15:51:44 WARN TaskSetManager: Lost task 6.0 in stage 193.0 (TID 7690, localhost): ExecutorLostFailure (executor driver lost)^M
15/11/25 15:51:44 WARN TaskSetManager: Lost task 5.0 in stage 193.0 (TID 7689, localhost): ExecutorLostFailure (executor driver lost)^M
15/11/25 15:51:44 WARN SparkContext: Killing executors is only supported in coarse-grained mode^M
15/11/25 15:51:45 ERROR JobScheduler: Error running job streaming job 1448446890000 ms.0^M
org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 193.0 failed 1 times, most recent failure: Lost task 4.0 in stage 193.0 (TID 7688, localhost): ExecutorLostFailure (executor driver lost)^M
Driver stacktrace:^M
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)^M
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)^M
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)^M
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)^M
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)^M
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)^M
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)^M
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)^M
        at scala.Option.foreach(Option.scala:257)^M
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)^M
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)^M
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)^M
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)^M
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)^M
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)^M
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)^M
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)^M
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)^M
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)^M
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:909)^M
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)^M
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)^M
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)^M
        at org.apache.spark.rdd.RDD.collect(RDD.scala:908)^M
        at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:177)^M
        at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)^M
        at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)^M
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)^M
        at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)^M
        at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)^M
        at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1402)^M
        at main$$anonfun$main$1.apply(Main.scala:72)^M
        at main$$anonfun$main$1.apply(Main.scala:68)^M
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)^M
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)^M
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)^M
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)^M
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)^M
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)^M
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)^M
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)^M
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)^M
        at scala.util.Try$.apply(Try.scala:192)^M
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)^M
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:218)^M
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:218)^M
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:218)^M
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)^M
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:217)^M
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)^M
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)^M
        at java.lang.Thread.run(Thread.java:745)^M
[error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 193.0 failed 1 times, most recent failure: Lost task 4.0 in stage 193.0 (TID 7688, localhost): ExecutorLostFailure (executor driver lost)^M

Answers


You may forget adding some dependency jars when you submit the spark job. Try to assemble your project (so that all dependencies are included) before you submit it to spark:

sbt assembly

BTW, when I run

sbt console

and run commands inside scala interpreter, I will have the same problem as yours. But if I assemble it firstly and run the job by

spark submit --class className target/scala-2.10/xxx-assembly-0.1.0.jar someArgs

It works:)

ref: Apache Spark 1.5 with Cassandra : Class cast exception


Try doing

val dataFrame = sqc.read.json(sc.textFile("<dir>")).cache()

I was having this same exact problem; running a .count() operation on the same dataframe too many times resulted in this error.

If that doesn't help, try this:

val dataFrame = sqc.read.json(sc.textFile("<dir>"))
val serialNumberDF = dataFrame.groupBy("Product.SerialNumber").cache()
println(serialNumberDF.count().count())

My guess is that having to re-evaluate the dataframe over and over (since dataframes are lazily evaluated) caused an error somewhere. And plus, with large amounts of data, using a dataframe in multiple places can be pretty costly on performance without caching.


In our case (Spark 1.6.1), these same errors were showing up somewhat randomly while running tests via sbt. And the issue actually seems to be an sbt issue. The work-around (mentioned in the link above) is to run the tests in a forked jvm:

fork in test := true


Need Your Help

Android Test Monkey - Set Verbosity?

android unit-testing testing monkey android-monkey

How can I change the verbosity level when running Test Monkey from the command line?

How to handle User Authentication/Authorization on Pivotal Web Services/Cloud foundry Spring Cloud application

spring-mvc spring-security cloudfoundry spring-cloud pivotal-web-services

I am building an application which I'm intending to deploy on Pivotal Web Services. I want to secure the application using Spring Security. The Security should be role-based, and users should be ab...