Re: Handling errors in IOs

2018-02-12 Thread Jean-Baptiste Onofré
Hi Motty,

yes, you can configure reconnect, timeout, etc on the ConnectionFactory
(especially when you use ActiveMqPooledConnectionFactory).

For the split, I didn't mean regarding the Spark cluster but more in term of
workers.
When you use something like:

pipeline.apply(JmsIO.read().fromQueue("foo"))

the runner can send a desired num of splits
(https://github.com/apache/beam/blob/master/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java#L407).
Then the IO will create a consumer per split to the queue.

It scales in term of speed of consuming as we have concurrent consumers on the
queue.

Regards
JB

On 02/12/2018 07:32 PM, Motty Gruda wrote:
> Hi,
> 
> I managed to set the automatic reconnect through the ConnectionFactory, I 
> didn't
> know it was possible. Thanks!
> 
> What do you mean by using "split"? Now when running on the spark runner, if 
> one
> of the brokers becomes unavailable the entire pipeline is stuck on the 
> following
> job:
> 
> org.apache.spark.streaming.dstream.DStream.(DStream.scala:115)
> org.apache.beam.runners.spark.io.SparkUnboundedSource$ReadReportDStream.(SparkUnboundedSource.java:171)
> org.apache.beam.runners.spark.io.SparkUnboundedSource.read(SparkUnboundedSource.java:113)
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$2.evaluate(StreamingTransformTranslator.java:125)
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$2.evaluate(StreamingTransformTranslator.java:119)
> org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:413)
> org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:399)
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:663)
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:655)
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:655)
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:446)
> org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:88)
> org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:47)
> org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$10.apply(JavaStreamingContext.scala:776)
> org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$10.apply(JavaStreamingContext.scala:775)
> scala.Option.getOrElse(Option.scala:120)
> org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864)
> org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:775)
> 
> 
> 
> The updated code:
> Pipeline p =
> Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
> 
> ConnectionFactory factory_a = new
> ActiveMQConnectionFactory("failover://(tcp://activemq-a:61616)?initialReconnectDelay=2000=-1");
> ConnectionFactory factory_b = new
> ActiveMQConnectionFactory("failover://(tcp://activemq-b:61616)?initialReconnectDelay=2000=-1");
> ConnectionFactory factory_c = new
> ActiveMQConnectionFactory("tcp://activemq-c:61616");
> 
> PCollection a = p.apply("ReadFromQueue",
> JmsIO.read().withConnectionFactory(factory_a).withUsername("admin").withPassword("admin")
> .withQueue("a"));
> PCollection b = p.apply("ReadFromQueue2",
> JmsIO.read().withConnectionFactory(factory_b).withUsername("admin").withPassword("admin")
> .withQueue("b"));
> 
> PCollection combined =
> PCollectionList.of(a).and(b).apply(Flatten.pCollections());
> 
> combined.apply(MapElements.into(TypeDescriptors.strings()).via((JmsRecord r) 
> ->
> r.getPayload()))
> .apply("WriteToQueue",
> JmsIO.write().withConnectionFactory(factory_c).withUsername("admin")
> .withPassword("admin").withQueue("c"));
> 
> p.run().waitUntilFinish();
> 
> *Trying to run the code above on spark 2.2.0 and beam 2.3.0-rc3, most of the
> messages simply disappeared inside the system!!!*
> *The messages were read from queues "a" and "b" but most of them didn't arrive
> at queue "c".*
> *
> *
> *
> *
> 
> On Mon, Feb 12, 2018 at 5:30 AM, Jean-Baptiste Onofré  > wrote:
> 
> Hi,
> 
> here you don't use split, but different JmsIO reading from different 
> queues
> (not the same). The two reads are not related.
> 
> If you kill connection from one, you have to reconnect. That can be done 
> by
> configuration on the ConnectionFactory.
> 
> Is it what you want ? Automatically reconnect ?
> 
> Regards
> JB
> 
> 
> On 11/02/2018 12:58, Motty Gruda wrote:
> 
> runner: spark runner (1.6.3)
> beam: 2.2.0
> activemq: 5.14.3

Re: Handling errors in IOs

2018-02-12 Thread Motty Gruda
Hi,

I managed to set the automatic reconnect through the ConnectionFactory, I
didn't know it was possible. Thanks!

What do you mean by using "split"? Now when running on the spark runner, if
one of the brokers becomes unavailable the entire pipeline is stuck on the
following job:

org.apache.spark.streaming.dstream.DStream.(DStream.scala:115)
org.apache.beam.runners.spark.io.SparkUnboundedSource$ReadReportDStream.(SparkUnboundedSource.java:171)
org.apache.beam.runners.spark.io.SparkUnboundedSource.read(SparkUnboundedSource.java:113)
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$2.evaluate(StreamingTransformTranslator.java:125)
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$2.evaluate(StreamingTransformTranslator.java:119)
org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:413)
org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:399)
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:663)
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:655)
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:655)
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:446)
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:88)
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:47)
org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$10.apply(JavaStreamingContext.scala:776)
org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$10.apply(JavaStreamingContext.scala:775)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864)
org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:775)



The updated code:
Pipeline p =
Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

ConnectionFactory factory_a = new
ActiveMQConnectionFactory("failover://(tcp://activemq-a:61616)?initialReconnectDelay=2000=-1");
ConnectionFactory factory_b = new
ActiveMQConnectionFactory("failover://(tcp://activemq-b:61616)?initialReconnectDelay=2000=-1");
ConnectionFactory factory_c = new
ActiveMQConnectionFactory("tcp://activemq-c:61616");

PCollection a = p.apply("ReadFromQueue",
JmsIO.read().withConnectionFactory(factory_a).withUsername("admin").withPassword("admin")
.withQueue("a"));
PCollection b = p.apply("ReadFromQueue2",
JmsIO.read().withConnectionFactory(factory_b).withUsername("admin").withPassword("admin")
.withQueue("b"));

PCollection combined =
PCollectionList.of(a).and(b).apply(Flatten.pCollections());

combined.apply(MapElements.into(TypeDescriptors.strings()).via((JmsRecord
r) -> r.getPayload()))
.apply("WriteToQueue",
JmsIO.write().withConnectionFactory(factory_c).withUsername("admin")
.withPassword("admin").withQueue("c"));

p.run().waitUntilFinish();

*Trying to run the code above on spark 2.2.0 and beam 2.3.0-rc3, most of
the messages simply disappeared inside the system!!!*
*The messages were read from queues "a" and "b" but most of them didn't
arrive at queue "c".*



On Mon, Feb 12, 2018 at 5:30 AM, Jean-Baptiste Onofré 
wrote:

> Hi,
>
> here you don't use split, but different JmsIO reading from different
> queues (not the same). The two reads are not related.
>
> If you kill connection from one, you have to reconnect. That can be done
> by configuration on the ConnectionFactory.
>
> Is it what you want ? Automatically reconnect ?
>
> Regards
> JB
>
>
> On 11/02/2018 12:58, Motty Gruda wrote:
>
>> runner: spark runner (1.6.3)
>> beam: 2.2.0
>> activemq: 5.14.3
>>
>> code:
>>
>> Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).
>> withValidation().create());
>>
>> ConnectionFactory factory_a = new ActiveMQConnectionFactory("tcp
>> ://activemq-a:61616");
>> ConnectionFactory factory_b = new ActiveMQConnectionFactory("tcp
>> ://activemq-b:61616");
>> ConnectionFactory factory_c = new ActiveMQConnectionFactory("tcp
>> ://activemq-c:61616");
>>
>> PCollection a = p.apply("ReadFromQueue",
>> JmsIO.read().withConnectionFactory(factory_a).withUsername("
>> admin").withPassword("admin")
>> .withQueue("a"));
>> PCollection b = p.apply("ReadFromQueue2",
>> JmsIO.read().withConnectionFactory(factory_b).withUsername("
>> admin").withPassword("admin")
>> .withQueue("b"));
>>
>> PCollection combined = PCollectionList.of(a).and(b).a
>> pply(Flatten.pCollections());
>>
>> combined.apply(MapElements.into(TypeDescriptors.strings()).via((JmsRecord
>> r) -> r.getPayload()))
>> 

Re: Handling errors in IOs

2018-02-11 Thread Jean-Baptiste Onofré

Hi,

here you don't use split, but different JmsIO reading from different 
queues (not the same). The two reads are not related.


If you kill connection from one, you have to reconnect. That can be done 
by configuration on the ConnectionFactory.


Is it what you want ? Automatically reconnect ?

Regards
JB

On 11/02/2018 12:58, Motty Gruda wrote:

runner: spark runner (1.6.3)
beam: 2.2.0
activemq: 5.14.3

code:

Pipeline p = 
Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());


ConnectionFactory factory_a = new 
ActiveMQConnectionFactory("tcp://activemq-a:61616");
ConnectionFactory factory_b = new 
ActiveMQConnectionFactory("tcp://activemq-b:61616");
ConnectionFactory factory_c = new 
ActiveMQConnectionFactory("tcp://activemq-c:61616");


PCollection a = p.apply("ReadFromQueue",
JmsIO.read().withConnectionFactory(factory_a).withUsername("admin").withPassword("admin")
.withQueue("a"));
PCollection b = p.apply("ReadFromQueue2",
JmsIO.read().withConnectionFactory(factory_b).withUsername("admin").withPassword("admin")
.withQueue("b"));

PCollection combined = 
PCollectionList.of(a).and(b).apply(Flatten.pCollections());


combined.apply(MapElements.into(TypeDescriptors.strings()).via((JmsRecord r) 
-> r.getPayload()))
.apply("WriteToQueue", 
JmsIO.write().withConnectionFactory(factory_c).withUsername("admin")

.withPassword("admin").withQueue("c"));

p.run().waitUntilFinish();

after killing activemq-a the job dies with the following stacktrace:

18/02/11 11:39:59 INFO CacheManager: Partition rdd_1569_0 not found, computing 
it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1565_0 not found, computing 
it
18/02/11 11:39:59 INFO CoarseGrainedExecutorBackend: Got assigned task 924
18/02/11 11:39:59 INFO Executor: Running task 1.2 in stage 349.0 (TID 924)
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1561_0 not found, computing 
it
18/02/11 11:39:59 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1673_1 not found, computing 
it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1557_0 not found, computing 
it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1669_1 not found, computing 
it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1553_0 not found, computing 
it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1665_1 not found, computing 
it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1549_0 not found, computing 
it
18/02/11 11:39:59 INFO BlockManager: Found block rdd_1528_0 locally
18/02/11 11:39:59 INFO StateSpecFunctions: Continue reading from an existing 
CheckpointMark.
18/02/11 11:39:59 ERROR Executor: Exception in task 0.1 in stage 349.0 (TID 923)
java.lang.RuntimeException: Failed to read from reader.
at 
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:204)
at 
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:105)
at 
org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
at 
org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:155)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)

Re: Handling errors in IOs

2018-02-11 Thread Motty Gruda
runner: spark runner (1.6.3)
beam: 2.2.0
activemq: 5.14.3

code:

Pipeline p =
Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

ConnectionFactory factory_a = new
ActiveMQConnectionFactory("tcp://activemq-a:61616");
ConnectionFactory factory_b = new
ActiveMQConnectionFactory("tcp://activemq-b:61616");
ConnectionFactory factory_c = new
ActiveMQConnectionFactory("tcp://activemq-c:61616");

PCollection a = p.apply("ReadFromQueue",
JmsIO.read().withConnectionFactory(factory_a).withUsername("admin").withPassword("admin")
.withQueue("a"));
PCollection b = p.apply("ReadFromQueue2",
JmsIO.read().withConnectionFactory(factory_b).withUsername("admin").withPassword("admin")
.withQueue("b"));

PCollection combined =
PCollectionList.of(a).and(b).apply(Flatten.pCollections());

combined.apply(MapElements.into(TypeDescriptors.strings()).via((JmsRecord
r) -> r.getPayload()))
.apply("WriteToQueue",
JmsIO.write().withConnectionFactory(factory_c).withUsername("admin")
.withPassword("admin").withQueue("c"));

p.run().waitUntilFinish();

after killing activemq-a the job dies with the following stacktrace:

18/02/11 11:39:59 INFO CacheManager: Partition rdd_1569_0 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1565_0 not found,
computing it
18/02/11 11:39:59 INFO CoarseGrainedExecutorBackend: Got assigned task 924
18/02/11 11:39:59 INFO Executor: Running task 1.2 in stage 349.0 (TID 924)
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1561_0 not found,
computing it
18/02/11 11:39:59 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1673_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1557_0 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1669_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1553_0 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1665_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1549_0 not found,
computing it
18/02/11 11:39:59 INFO BlockManager: Found block rdd_1528_0 locally
18/02/11 11:39:59 INFO StateSpecFunctions: Continue reading from an
existing CheckpointMark.
18/02/11 11:39:59 ERROR Executor: Exception in task 0.1 in stage 349.0 (TID 923)
java.lang.RuntimeException: Failed to read from reader.
at 
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:204)
at 
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:105)
at 
org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
at 
org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:155)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
at 

Re: Handling errors in IOs

2018-02-10 Thread Jean-Baptiste Onofré
Hi Motty,

For JMS, it depends if you are using queues or topics.

Using queues, JmsIO create several readers (concurrent consumer) on the same
queue. The checkpoint used is based on the ACK (it's a client ACK, and the
source send the ACK when the checkpoint is finalized). If you close a connection
for one source, the other sources should continue to consume.

Can you explain exactly your scenario (runner, pipeline, broker) ?

Regards
JB

On 02/11/2018 07:43 AM, Motty Gruda wrote:
> Hey,
> 
> How errors in the IOs can be treated (for example connection errors)? 
> 
> I've tested few scenarios with the JmsIO. When I read from two different jms
> connections and I closed only one of them, the entire pipeline failed/froze.
> I would expect it to continue running with one source and try to reconnect to
> the second source until it's available again.
> 
> Is this a bug in the IO itself? In the SDK? In the runner (I've tested with 
> the
> direct runner and the spark runner)?

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com