Spark Exception
Hi, we are running a spark streaming job and sometimes it throws below two exceptions . I am not understanding what is the difference between these two exception for one timeout is 120 seconds and another is 600 seconds. What could be the reason for these Error running job streaming job 1605709968000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout at org.apache.spark.rpc.RpcTimeout.org $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:76) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:466) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$reportBlockStatus(BlockManager.scala:445) at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1519) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1047) 2020-11-18 14:44:03 ERROR Utils:91 - Uncaught exception in thread heartbeat-receiver-event-loop-thread org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600 seconds]. This timeout is controlled by BlockManagerHeartbeat at org.apache.spark.rpc.RpcTimeout.org $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92) at org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:251) at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:455) at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361) at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Re: Cannot perform operation after producer has been closed
I must say.. *Spark has let me down in this case*. I am surprised an important issue like this hasn't been fixed in Spark 2.4. I am fighting a battle of 'Spark Structured Streaming' Vs 'Flink' at work & now because Spark 2.4 can't handle this *I've been asked to rewrite the code in Flink*. Moving to Spark 3.0 is not an easy option 'cause Cloudera 6.2 doesn't have a Spark 3.0 parcel So we can't upgrade to 3.0. So sad. Let me ask one more time. *Is there no way to fix this in Spark 2.4?* On Tue, Nov 10, 2020 at 11:33 AM Eric Beabes wrote: > BTW, we are seeing this message as well: > *"org.apache.kafka.common.KafkaException: > Producer** closed while send in progress"*. I am assuming this happens > because of the previous issue.."producer has been closed", right? Or are > they unrelated? Please advise. Thanks. > > On Tue, Nov 10, 2020 at 11:17 AM Eric Beabes > wrote: > >> Thanks for the reply. We are on Spark 2.4. Is there no way to get this >> fixed in Spark 2.4? >> >> On Mon, Nov 2, 2020 at 8:32 PM Jungtaek Lim >> wrote: >> >>> Which Spark version do you use? There's a known issue on Kafka producer >>> pool in Spark 2.x which was fixed in Spark 3.0, so you'd like to check >>> whether your case is bound to the known issue or not. >>> >>> https://issues.apache.org/jira/browse/SPARK-21869 >>> >>> >>> On Tue, Nov 3, 2020 at 1:53 AM Eric Beabes >>> wrote: >>> I know this is related to Kafka but it happens during the Spark Structured Streaming job that's why I am asking on this mailing list. How would you debug this or get around this in Spark Structured Streaming? Any tips would be appreciated. Thanks. java.lang.IllegalStateException: Cannot perform operation after producer has been closed at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:853) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:862) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846) at org.apache.spark.sql.kafka010.KafkaRowWriter.sendRow(KafkaWriteTask.scala:92) at org.apache.spark.sql.kafka010.KafkaStreamDataWriter.write(KafkaStreamWriter.scala:95) >>>
Spark SQL check timestamp with other table and update a column.
Hi Team, i want to update a col3 in table 1 if col1 from table2 is less than col1 in table1 and update each record in table 1.I 'am not getting the correct output. Table 1: col1|col2|col3 2020-11-17T20:50:57.777+|1|null Table 2: col1|col2|col3 2020-11-17T21:19:06.508+|1|win 2020-11-17T20:49:06.244+|1|win 2020-11-17T20:19:13.484+|1|Win sql tried: select a.col1,a.col2.coalesce(a.col3,b.col3) as col3 from table1 a left table2 b on (a.col2=b.col2) and (b.col1 < b.col1) output: I getting the following output. 2020-11-17T20:50:57.777+|1|Win2020-11-17T21:19:06.508+|1|win 2020-11-17T20:50:57.777+|1|Win2020-11-17T20:49:06.244+|1|win i'm looking for only the second record in the output. the issue here is i'm getting additional one records if col1 from table2 is less than col1 in table1 when i'm using the above query. expected output: 2020-11-17T20:50:57.777+|1|Win 2020-11-17T20:49:06.244+|1|win how do we achieve that correctly.I have many records like this. Thanks -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Need Unit test complete reference for Pyspark
Hi Users, I have to write Unit Test cases for PySpark. I think pytest-spark and "spark testing base" are good test libraries. Can anyone please provide full reference for writing the test cases in Python using these? Kind Regards, Sachit Murarka