Hi Motty,

yes, you can configure reconnect, timeout, etc on the ConnectionFactory
(especially when you use ActiveMqPooledConnectionFactory).

For the split, I didn't mean regarding the Spark cluster but more in term of
workers.
When you use something like:

pipeline.apply(JmsIO.read().fromQueue("foo"))

the runner can send a desired num of splits
(https://github.com/apache/beam/blob/master/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java#L407).
Then the IO will create a consumer per split to the queue.

It scales in term of speed of consuming as we have concurrent consumers on the
queue.

Regards
JB

On 02/12/2018 07:32 PM, Motty Gruda wrote:
> Hi,
> 
> I managed to set the automatic reconnect through the ConnectionFactory, I 
> didn't
> know it was possible. Thanks!
> 
> What do you mean by using "split"? Now when running on the spark runner, if 
> one
> of the brokers becomes unavailable the entire pipeline is stuck on the 
> following
> job:
> 
> org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:115)
> org.apache.beam.runners.spark.io.SparkUnboundedSource$ReadReportDStream.<init>(SparkUnboundedSource.java:171)
> org.apache.beam.runners.spark.io.SparkUnboundedSource.read(SparkUnboundedSource.java:113)
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$2.evaluate(StreamingTransformTranslator.java:125)
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$2.evaluate(StreamingTransformTranslator.java:119)
> org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:413)
> org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:399)
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:663)
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:655)
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:655)
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:446)
> org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:88)
> org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:47)
> org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$10.apply(JavaStreamingContext.scala:776)
> org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$10.apply(JavaStreamingContext.scala:775)
> scala.Option.getOrElse(Option.scala:120)
> org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864)
> org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:775)
> 
> 
> 
> The updated code:
> Pipeline p =
> Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
> 
> ConnectionFactory factory_a = new
> ActiveMQConnectionFactory("failover://(tcp://activemq-a:61616)?initialReconnectDelay=2000&maxReconnectAttempts=-1");
> ConnectionFactory factory_b = new
> ActiveMQConnectionFactory("failover://(tcp://activemq-b:61616)?initialReconnectDelay=2000&maxReconnectAttempts=-1");
> ConnectionFactory factory_c = new
> ActiveMQConnectionFactory("tcp://activemq-c:61616");
> 
> PCollection<JmsRecord> a = p.apply("ReadFromQueue",
> JmsIO.read().withConnectionFactory(factory_a).withUsername("admin").withPassword("admin")
> .withQueue("a"));
> PCollection<JmsRecord> b = p.apply("ReadFromQueue2",
> JmsIO.read().withConnectionFactory(factory_b).withUsername("admin").withPassword("admin")
> .withQueue("b"));
> 
> PCollection<JmsRecord> combined =
> PCollectionList.of(a).and(b).apply(Flatten.pCollections());
> 
> combined.apply(MapElements.into(TypeDescriptors.strings()).via((JmsRecord r) 
> ->
> r.getPayload()))
> .apply("WriteToQueue",
> JmsIO.write().withConnectionFactory(factory_c).withUsername("admin")
> .withPassword("admin").withQueue("c"));
> 
> p.run().waitUntilFinish();
> 
> *Trying to run the code above on spark 2.2.0 and beam 2.3.0-rc3, most of the
> messages simply disappeared inside the system!!!*
> *The messages were read from queues "a" and "b" but most of them didn't arrive
> at queue "c".*
> *
> *
> *
> *
> 
> On Mon, Feb 12, 2018 at 5:30 AM, Jean-Baptiste Onofré <j...@nanthrax.net
> <mailto:j...@nanthrax.net>> wrote:
> 
>     Hi,
> 
>     here you don't use split, but different JmsIO reading from different 
> queues
>     (not the same). The two reads are not related.
> 
>     If you kill connection from one, you have to reconnect. That can be done 
> by
>     configuration on the ConnectionFactory.
> 
>     Is it what you want ? Automatically reconnect ?
> 
>     Regards
>     JB
> 
> 
>     On 11/02/2018 12:58, Motty Gruda wrote:
> 
>         runner: spark runner (1.6.3)
>         beam: 2.2.0
>         activemq: 5.14.3
> 
>         code:
> 
>         Pipeline p =
>         
> Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
> 
>         ConnectionFactory factory_a = new
>         ActiveMQConnectionFactory("tcp://activemq-a:61616");
>         ConnectionFactory factory_b = new
>         ActiveMQConnectionFactory("tcp://activemq-b:61616");
>         ConnectionFactory factory_c = new
>         ActiveMQConnectionFactory("tcp://activemq-c:61616");
> 
>         PCollection<JmsRecord> a = p.apply("ReadFromQueue",
>         
> JmsIO.read().withConnectionFactory(factory_a).withUsername("admin").withPassword("admin")
>         .withQueue("a"));
>         PCollection<JmsRecord> b = p.apply("ReadFromQueue2",
>         
> JmsIO.read().withConnectionFactory(factory_b).withUsername("admin").withPassword("admin")
>         .withQueue("b"));
> 
>         PCollection<JmsRecord> combined =
>         PCollectionList.of(a).and(b).apply(Flatten.pCollections());
> 
>         
> combined.apply(MapElements.into(TypeDescriptors.strings()).via((JmsRecord r)
>         -> r.getPayload()))
>         .apply("WriteToQueue",
>         JmsIO.write().withConnectionFactory(factory_c).withUsername("admin")
>         .withPassword("admin").withQueue("c"));
> 
>         p.run().waitUntilFinish();
> 
>         after killing activemq-a the job dies with the following stacktrace:
> 
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1569_0 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1565_0 not found,
>         computing it
>         18/02/11 11:39:59 INFO CoarseGrainedExecutorBackend: Got assigned 
> task 924
>         18/02/11 11:39:59 INFO Executor: Running task 1.2 in stage 349.0 (TID 
> 924)
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1561_0 not found,
>         computing it
>         18/02/11 11:39:59 INFO CoarseGrainedExecutorBackend: Driver commanded 
> a
>         shutdown
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1673_1 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1557_0 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1669_1 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1553_0 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1665_1 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1549_0 not found,
>         computing it
>         18/02/11 11:39:59 INFO BlockManager: Found block rdd_1528_0 locally
>         18/02/11 11:39:59 INFO StateSpecFunctions: Continue reading from an
>         existing CheckpointMark.
>         18/02/11 11:39:59 ERROR Executor: Exception in task 0.1 in stage 349.0
>         (TID 923)
>         java.lang.RuntimeException: Failed to read from reader.
>                 at
>         
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:204)
>                 at
>         
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:105)
>                 at
>         
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>                 at
>         
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>                 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>                 at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:155)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>                 at
>         
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>                 at
>         
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>                 at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>                 at
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>                 at org.apache.spark.scheduler.Task.run(Task.scala:89)
>                 at
>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>                 at
>         
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>                 at
>         
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>                 at java.lang.Thread.run(Thread.java:745)
>         Caused by: java.io.IOException: java.lang.NullPointerException
>                 at
>         
> org.apache.beam.sdk.io.jms.JmsIO$UnboundedJmsReader.advance(JmsIO.java:472)
>                 at
>         
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:249)
>                 at
>         
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:227)
>                 at
>         
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
>                 ... 148 more
>         Caused by: java.lang.NullPointerException
>                 at
>         
> org.apache.beam.sdk.io.jms.JmsIO$UnboundedJmsReader.advance(JmsIO.java:436)
>                 ... 151 more
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1661_1 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1657_1 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1653_1 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1649_1 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1645_1 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1641_1 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1637_1 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1633_1 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1629_1 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1625_1 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1621_1 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1617_1 not found,
>         computing it
>         18/02/11 11:39:59 INFO CacheManager: Partition rdd_1613_1 not found,
>         computing it
> 
> 
> 
>         Trying to run the same code on spark 2.2.0 and beam 2.3.0-rc3 only 
> half
>         of the message sent ended up in queue c!
> 
> 
> 
>         On Sun, Feb 11, 2018 at 8:56 AM, Jean-Baptiste Onofré 
> <j...@nanthrax.net
>         <mailto:j...@nanthrax.net> <mailto:j...@nanthrax.net
>         <mailto:j...@nanthrax.net>>> wrote:
> 
>             Hi Motty,
> 
>             For JMS, it depends if you are using queues or topics.
> 
>             Using queues, JmsIO create several readers (concurrent consumer) 
> on
>             the same
>             queue. The checkpoint used is based on the ACK (it's a client ACK,
>             and the
>             source send the ACK when the checkpoint is finalized). If you 
> close
>             a connection
>             for one source, the other sources should continue to consume.
> 
>             Can you explain exactly your scenario (runner, pipeline, broker) ?
> 
>             Regards
>             JB
> 
>             On 02/11/2018 07:43 AM, Motty Gruda wrote:
>              > Hey,
>              >
>              > How errors in the IOs can be treated (for example connection
>             errors)?
>              >
>              > I've tested few scenarios with the JmsIO. When I read from two
>             different jms
>              > connections and I closed only one of them, the entire pipeline
>             failed/froze.
>              > I would expect it to continue running with one source and try 
> to
>             reconnect to
>              > the second source until it's available again.
>              >
>              > Is this a bug in the IO itself? In the SDK? In the runner (I've
>             tested with the
>              > direct runner and the spark runner)?
> 
>             --
>             Jean-Baptiste Onofré
>             jbono...@apache.org <mailto:jbono...@apache.org>
>         <mailto:jbono...@apache.org <mailto:jbono...@apache.org>>
>             http://blog.nanthrax.net
>             Talend - http://www.talend.com
> 
> 
> 
> 
>         -- 
>           מוטי גרודה                                                          
>  
>                     Motty Gruda
> 
> 
> 
> 
> -- 
>  מוטי גרודה                                                                   
>  
>   Motty Gruda

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to