Re: spark dataset.cache is not thread safe

2019-07-22 Thread Amit Sharma
please update me if any one knows how to handle it.

On Sun, Jul 21, 2019 at 7:18 PM Amit Sharma  wrote:

> Hi , I wrote a code in future block which read data from dataset and cache
> it which is used later in the code. I faced a issue that data.cached() data
> will be replaced by concurrent running thread . Is there any way we can
> avoid this condition.
>
> val dailyData = callDetailsDS.collect.toList
> val adjustedData = dailyData.map(callDataPerDay => Future{
>
>
>
>   val data = callDetailsDS.filter((callDetailsDS(DateColumn) geq (some 
> conditional date ))
> data.cache()
>
> 
>
> }
>
>
>


Re: Spark 2.3 Dataframe Grouby operation throws IllegalArgumentException on Large dataset

2019-07-22 Thread Bobby Evans
You are missing a lot of the stack trace that could explain the exception.
All it shows is that an exception happened while writing out the orc file,
not what that underlying exception is, there should be at least one more
caused by under the one you included.

Thanks,

Bobby

On Mon, Jul 22, 2019 at 5:58 AM Balakumar iyer S 
wrote:

> Hi ,
>
> I am trying to perform a group by  followed by aggregate collect set
> operation on a two column data-setschema (LeftData int , RightData
> int).
>
> code snippet
>
>   val wind_2  =
> dframe.groupBy("LeftData").agg(collect_set(array("RightData")))
>
>  wind_2.write.mode(SaveMode.Append).format("orc").save(args(1))
>
> the above code works fine on a smaller dataset but throws the following
> error on large dataset (where each keys in LeftData column  needs to be
> grouped with 64k values approximately ).
>
> Could some one assist me on this , should i  set any configuration to
> accommodate such a large  values?
>
> ERROR
> -
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
>
>
> Caused by: org.apache.spark.SparkException: Task failed while writing rows.
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
>
> --
> REGARDS
> BALAKUMAR SEETHARAMAN
>
>


Spark 2.3 Dataframe Grouby operation throws IllegalArgumentException on Large dataset

2019-07-22 Thread Balakumar iyer S
Hi ,

I am trying to perform a group by  followed by aggregate collect set
operation on a two column data-setschema (LeftData int , RightData
int).

code snippet

  val wind_2  =
dframe.groupBy("LeftData").agg(collect_set(array("RightData")))

 wind_2.write.mode(SaveMode.Append).format("orc").save(args(1))

the above code works fine on a smaller dataset but throws the following
error on large dataset (where each keys in LeftData column  needs to be
grouped with 64k values approximately ).

Could some one assist me on this , should i  set any configuration to
accommodate such a large  values?

ERROR
-
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)


Caused by: org.apache.spark.SparkException: Task failed while writing rows.
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)

-- 
REGARDS
BALAKUMAR SEETHARAMAN