Re: Handling errors in IOs

2018-02-10 Thread Jean-Baptiste Onofré
Hi Motty,

For JMS, it depends if you are using queues or topics.

Using queues, JmsIO create several readers (concurrent consumer) on the same
queue. The checkpoint used is based on the ACK (it's a client ACK, and the
source send the ACK when the checkpoint is finalized). If you close a connection
for one source, the other sources should continue to consume.

Can you explain exactly your scenario (runner, pipeline, broker) ?

Regards
JB

On 02/11/2018 07:43 AM, Motty Gruda wrote:
> Hey,
> 
> How errors in the IOs can be treated (for example connection errors)? 
> 
> I've tested few scenarios with the JmsIO. When I read from two different jms
> connections and I closed only one of them, the entire pipeline failed/froze.
> I would expect it to continue running with one source and try to reconnect to
> the second source until it's available again.
> 
> Is this a bug in the IO itself? In the SDK? In the runner (I've tested with 
> the
> direct runner and the spark runner)?

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Handling errors in IOs

2018-02-10 Thread Motty Gruda
Hey,

How errors in the IOs can be treated (for example connection errors)?

I've tested few scenarios with the JmsIO. When I read from two different
jms connections and I closed only one of them, the entire pipeline
failed/froze.
I would expect it to continue running with one source and try to reconnect
to the second source until it's available again.

Is this a bug in the IO itself? In the SDK? In the runner (I've tested with
the direct runner and the spark runner)?


Re: Trying to understand Unable to encode element exceptions

2018-02-10 Thread Carlos Alonso
I've added a comment with a link to our working Stateful and timely
processing solution:
https://github.com/spotify/scio/issues/448#issuecomment-364705100

On Fri, Jan 26, 2018 at 1:43 AM Neville Li  wrote:

> Here's a fix to #1020
> https://github.com/spotify/scio/pull/1032
>
> On Sun, Jan 21, 2018 at 4:36 PM Neville Li  wrote:
>
>> Awesome!
>> We have't wrapped any stateful processing API in scala but if you have
>> working snippet or ideas it'd be great to share in that ticket.
>>
>> On Sat, Jan 20, 2018 at 4:31 PM Carlos Alonso 
>> wrote:
>>
>>> Thanks Neville!!
>>>
>>> Your recommendation worked great. Thanks for your help!!
>>>
>>> As a side note, I found this issue:
>>> https://github.com/spotify/scio/issues/448
>>>
>>> I can share/help there with our experience, as our job, with scio +
>>> stateful + timely processing is working fine as of today
>>>
>>> Regards!!
>>>
>>> On Fri, Jan 19, 2018 at 6:21 PM Neville Li 
>>> wrote:
>>>
 Welcome.

 Added an issue so we may improve this in the future:
 https://github.com/spotify/scio/issues/1020


 On Fri, Jan 19, 2018 at 11:14 AM Carlos Alonso 
 wrote:

> To build the beam transform I was following this example:
> https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/DoFnExample.scala
>
> To be honest I don't know how to apply timely and stateful processing
> without using a beam transform or how to rewrite it using the scio 
> built-in
> you suggest. Could you please give me an example?
>
> Thanks for your help!
>
> On Fri, Jan 19, 2018 at 5:04 PM Neville Li 
> wrote:
>
>> That happens when you mix beam transforms into scio and defeats the
>> safety we have in place. Map the values into something beam-serializable
>> first or rewrite the transform with a scio built-in which takes care of
>> KvCoder.
>>
>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso 
>> wrote:
>>
>>> I'm following this example:
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>>
>>> because I'm building something very similar to a group into batches
>>> functionality. If I don't set the coder manually, this exception arises:
>>> https://pastebin.com/xxdDMXSf
>>>
>>> Thanks!
>>>
>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li 
>>> wrote:
>>>
 You shouldn't manually set coder in most cases. It defaults to
 KryoAtomicCoder for most Scala types.
 More details:

 https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders

 On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso 
 wrote:

> May it be because I’m using
> .setCoder(KvCoder.of(StringUtf8Coder.of(),
> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])))
>  at
> some point in the pipeline
> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
> outputs a SerializableCoder)?
>
> This is something I've always wondered. How does one specify a
> coder for a case class?
>
> Regards
>
> On Fri, 19 Jan 2018 at 15:51, Neville Li 
> wrote:
>
>> Not sure why it falls back to SerializableCoder. Can you file an
>> GH issue with ideally a snippet that can reproduce the problem?
>>
>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso 
>> wrote:
>>
>>> Hi everyone!!
>>>
>>> I'm building a pipeline to store items from a Google PubSub
>>> subscription into GCS buckets. In order to do it I'm using both 
>>> stateful
>>> and timely processing and after building and testing the project 
>>> locally I
>>> tried to run it on Google Dataflow and I started getting those 
>>> errors.
>>>
>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>
>>> The item I'm trying to serialize is a KV[String,
>>> MessageWithAttributes] and MessageWithAttributes is a case class 
>>> defined as
>>> (content: String, attrs: Map[String, String])
>>>
>>> The underlying clause is java.io.NotSerializableException:
>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's 
>>> Scio as
>>> well) which may suggest that the issue is on serializing the Map, 
>>> but to be
>>> honest, I don't know what does it mean and how to fix it.
>>>
>>> Can 

Re: Stateful processing with session window

2018-02-10 Thread Carlos Alonso
Hi Maurizio, I'm not a very experienced user here, I'm actually getting
started into all this, but I'm going to give this one a try and see if I
can help.

What I think is happening here is that the third 'a' you see is actually on
a different window of the other 3 a's. Stateful being per key and window
means that it keeps state for each key-window pairs, therefore, if your
'a's counter is being restarted is probably because it is actually a
different one, and as the key is the same then the only possibility is that
the window is different. You can try to debug your pipeline and see if my
guess is actually right or not by printing also the window information of
your elements.

Hope it helps.

On Fri, Feb 9, 2018 at 4:46 PM Maurizio Sambati 
wrote:

> Hi everyone,
>
> I'm trying to write a simple pipeline to experiment both stateful
> processing and session window.
>
> I have an event stream, each event has a timestamp and a session key, I
> want to group by each session and enrich all events using a common state of
> the session. In this case I'm just replacing the event with an incremental
> counter.
>
> So, let's say I have a source that outputs an event every second and my
> stream is [a, a, b, a, a, c, a, a, b, c, c, c, a, a] (I'm just writing only
> the session key as the value is useless for the purpose of the issue I'm
> experiencing)
>
> I want the following output: [, , , , , ...]
> (actually the order is not important)
>
> Unluckily my code seems not to work as I was expecting and I'm not able to
> understand the reason. (to be honest I haven't found many resources on the
> topic) What I actually get is something like:
>
> a, 0
> a, 1
> b, 0
> a, 0<-- ???
> a, 2,   <---???
> c, 0,
> ...
>
> that makes me wonder if I have actually understood how the state is
> related to the key-window pair or maybe if I have just misunderstood how
> the window/triggering works.
>
> My pipeline looks something like:
>
> p.apply(TextIO.read().from("input.json"))
>
>  .apply(MapElements.via(new ParseTableRowJson()))
>
>  .apply(new AugmentEvents())
>
>  .apply(ParDo.of(new DoFn, Void>() {
>
>   @ProcessElement
>
>   public void processElement(ProcessContext c)  {
>
> LOG.info(c.element().getKey() + ": " + c.element().getValue());
>
>   }
>
> }));
>
> ...
>
> static class AugmentEvents extends PTransform PCollection>> {
>
>   @Override
>
>   public PCollection> expand(PCollection input)
> {
>
> return input
>
>   .apply(ParDo.of(new ExtractSessionIdAndTimestamp()))
>
>   .apply(new ComputeSessions());
>
>   }
>
> }
>
>
> static class ComputeSessions extends PTransform TableRow>>, PCollection>> {
>
>   @Override
>
>   public PCollection> expand(PCollection TableRow>> events) {
>
> return events
>
>   .apply(Window. TableRow>>into(Sessions.withGapDuration(Duration.standardMinutes(10)))
>
>   .triggering(AfterPane.elementCountAtLeast(1))
>
>   .discardingFiredPanes()
>
>   .withAllowedLateness(Duration.standardMinutes(10)))
>
>   .apply(ParDo.of(new StatefulCount()));
>
>   }
>
> }
>
> static class StatefulCount extends DoFn, KV Long>> {
>
>   @StateId("storage")
>
>   private final StateSpec storageSpec =
>  StateSpecs.value(VarIntCoder.of());
>
>   @ProcessElement
>
>   public void processElement(ProcessContext context, BoundedWindow window
> , @StateId("storage") ValueState storage) {
>
> Integer val = storage.read();
>
> if (val == null) {
>
>   val = new Integer(0);
>
> }
>
> int current = val.intValue();
>
> context.output(KV.of(context.element().getKey(), new Long(current)));
>
> storage.write(current+1);
>
>   }
>
> }
>
> Maurizio
>
>