Re: Infinite loop in org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter

2019-04-03 Thread Brett Marcott
Thanks for the recommendation Russell. Haven't looked into that code yet,
but the docs didn't seem to indicate if it wrote sstables directly instead
of going through normal write path.


On Wed, Apr 3, 2019, 11:11 AM Russell Spitzer 
wrote:

> I would recommend using the Spark Cassandra Connector instead of the Hadoop
> based writers. The Hadoop code has not had a lot of love in a long time.
> See
>
> https://github.com/datastax/spark-cassandra-connector
>
> On Wed, Apr 3, 2019 at 12:21 PM Brett Marcott 
> wrote:
>
> > Hi folks,
> >
> > I am noticing my spark jobs being stuck when using the
> > org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter/CqlBulkOutputFormat.
> >
> >
> > It seems that whenever there is a stream failure it may be expected
> > behavior based on the code to infinite loop.
> >
> > Here are one executors logs:
> > 19/04/03 15:35:06 INFO streaming.StreamResultFuture: [Stream
> > #59290530-5625-11e9-a2bb-8bc7b49d56b0] Session with /10.82.204.173 is
> > complete
> > 19/04/03 15:35:06 WARN streaming.StreamResultFuture: [Stream
> > #59290530-5625-11e9-a2bb-8bc7b49d56b0] Stream failed
> >
> >
> > On stream failure it seems StreamResultFuture sets the exception for the
> > AbstractFuture.
> > AFAIK this should cause the Abstract future to return a new
> > ExecutionException.
> >
> > The problem seems to lie in the fact that the CqlBulkRecordWriter
> swallows
> > the Execution exception and continues in a while loop:
> >
> >
> https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274
> > <
> >
> https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274
> > >
> >
> > When taking consecutive thread dumps on the same process I see that the
> > only thread doing work is constantly creating new ExecutionExceptions
> (the
> > memory location for ExecutionException was different on each thread
> dump):
> > java.lang.Throwable.fillInStackTrace(Native Method)
> > java.lang.Throwable.fillInStackTrace(Throwable.java:783) => holding
> > Monitor(java.util.concurrent.ExecutionException@80240763})
> > java.lang.Throwable.(Throwable.java:310)
> > java.lang.Exception.(Exception.java:102)
> >
> java.util.concurrent.ExecutionException.(ExecutionException.java:90)
> >
> >
> com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:476)
> >
> >
> com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:357)
> >
> >
> org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:257)
> >
> >
> org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:237)
> >
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1131)
> >
> >
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1359)
> >
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1131)
> >
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
> > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> > org.apache.spark.scheduler.Task.run(Task.scala:99)
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:285)
> >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > java.lang.Thread.run(Thread.java:748)
> >
> > It seems the logic that lies right below the while loop in linked code
> > above that checks for failed hosts/streamsessions maybe should have been
> > within the while loop?
> >
> > Thanks,
> >
> > Brett
>


Re: Infinite loop in org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter

2019-04-03 Thread Russell Spitzer
I would recommend using the Spark Cassandra Connector instead of the Hadoop
based writers. The Hadoop code has not had a lot of love in a long time. See

https://github.com/datastax/spark-cassandra-connector

On Wed, Apr 3, 2019 at 12:21 PM Brett Marcott 
wrote:

> Hi folks,
>
> I am noticing my spark jobs being stuck when using the
> org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter/CqlBulkOutputFormat.
>
>
> It seems that whenever there is a stream failure it may be expected
> behavior based on the code to infinite loop.
>
> Here are one executors logs:
> 19/04/03 15:35:06 INFO streaming.StreamResultFuture: [Stream
> #59290530-5625-11e9-a2bb-8bc7b49d56b0] Session with /10.82.204.173 is
> complete
> 19/04/03 15:35:06 WARN streaming.StreamResultFuture: [Stream
> #59290530-5625-11e9-a2bb-8bc7b49d56b0] Stream failed
>
>
> On stream failure it seems StreamResultFuture sets the exception for the
> AbstractFuture.
> AFAIK this should cause the Abstract future to return a new
> ExecutionException.
>
> The problem seems to lie in the fact that the CqlBulkRecordWriter swallows
> the Execution exception and continues in a while loop:
>
> https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274
> <
> https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274
> >
>
> When taking consecutive thread dumps on the same process I see that the
> only thread doing work is constantly creating new ExecutionExceptions (the
> memory location for ExecutionException was different on each thread dump):
> java.lang.Throwable.fillInStackTrace(Native Method)
> java.lang.Throwable.fillInStackTrace(Throwable.java:783) => holding
> Monitor(java.util.concurrent.ExecutionException@80240763})
> java.lang.Throwable.(Throwable.java:310)
> java.lang.Exception.(Exception.java:102)
> java.util.concurrent.ExecutionException.(ExecutionException.java:90)
>
> com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:476)
>
> com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:357)
>
> org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:257)
>
> org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:237)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1131)
>
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1359)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1131)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> org.apache.spark.scheduler.Task.run(Task.scala:99)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:285)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748)
>
> It seems the logic that lies right below the while loop in linked code
> above that checks for failed hosts/streamsessions maybe should have been
> within the while loop?
>
> Thanks,
>
> Brett


Infinite loop in org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter

2019-04-03 Thread Brett Marcott
Hi folks,

I am noticing my spark jobs being stuck when using the 
org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter/CqlBulkOutputFormat.


It seems that whenever there is a stream failure it may be expected behavior 
based on the code to infinite loop.

Here are one executors logs:
19/04/03 15:35:06 INFO streaming.StreamResultFuture: [Stream 
#59290530-5625-11e9-a2bb-8bc7b49d56b0] Session with /10.82.204.173 is complete
19/04/03 15:35:06 WARN streaming.StreamResultFuture: [Stream 
#59290530-5625-11e9-a2bb-8bc7b49d56b0] Stream failed


On stream failure it seems StreamResultFuture sets the exception for the 
AbstractFuture.
AFAIK this should cause the Abstract future to return a new ExecutionException.

The problem seems to lie in the fact that the CqlBulkRecordWriter swallows the 
Execution exception and continues in a while loop:
https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274
 


When taking consecutive thread dumps on the same process I see that the only 
thread doing work is constantly creating new ExecutionExceptions (the memory 
location for ExecutionException was different on each thread dump):
java.lang.Throwable.fillInStackTrace(Native Method)
java.lang.Throwable.fillInStackTrace(Throwable.java:783) => holding 
Monitor(java.util.concurrent.ExecutionException@80240763})
java.lang.Throwable.(Throwable.java:310)
java.lang.Exception.(Exception.java:102)
java.util.concurrent.ExecutionException.(ExecutionException.java:90)
com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:476)
com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:357)
org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:257)
org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:237)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1131)
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1359)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1131)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
org.apache.spark.scheduler.Task.run(Task.scala:99)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:285)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

It seems the logic that lies right below the while loop in linked code above 
that checks for failed hosts/streamsessions maybe should have been within the 
while loop?

Thanks,

Brett