unsubscribe

2018-07-19 Thread Saiguang Che
unsubscribe stonnya...@gmail.com


unsubscribe

2018-07-19 Thread Apurva Desai
unsubscribe apur...@google.com


Re: Write bulks files from streaming app

2018-07-19 Thread Raghu Angadi
One option (but requires more code): Write to smaller files with frequent
triggers to directory_X and once the window properly closes, copy all the
files to a single file in your own DoFn. This is certainly more code on
your part, but might be worth it. You can use Wait.on() transoform to run
your finalizer DoFn right after the window that writes smaller files closes.


On Thu, Jul 19, 2018 at 2:43 AM Jozef Vilcek  wrote:

> Hey,
>
> I am looking for the advice.
>
> I am trying to do a stream processing with Beam on Flink runtime. Reading
> data from Kafka, doing some processing with it which is not important here
> and in the same time want to store consumed data to history storage for
> archive and reprocessing, which is HDFS.
>
> Now, the part of writing batches to HDFS is giving me hard time.
> Logically, I want to do:
>
> fileIO = FileIO.writeDynamic()
> .by(destinationFn)
> .via(AvroIO.sink(avroClass))
> .to(path)
> .withNaming(namingFn)
> .withTempDirectory(tmp)
> .withNumShards(shards)
>
> data
>.withFixedWindow(1H, afterWatermarkTrigger, discardFiredPanes)
>.saveTo(fileIO)
>
>
> This write generates in Flink execution graph 3 operators, which I do not
> full understand yet.
>
> Now, the problem is, that I am not able to run this at scale.
>
> If I want to write big enough files to not to have lots of files on HDFS,
> I keep running into the OOM. With Flink, I use rocksdb state backend and I
> was warned about this JIRA which is probably related to my OOM
> https://issues.apache.org/jira/browse/FLINK-8297
> Therefore, I need to trigger more often and small batches which leads to
> too many files on HDFS.
>
> Question here is, if there is some path I do not see how to make this work
> ( write bulks of data to HDFS of my choosing without running to memory
> troubles ). Also, keeping whole window data which is designated for write
> to output to filesystem in state involves more IO.
>
> Thanks for any thoughts and guidelines,
> Jozef
>
>


Re: Pass complex type to FlinkPipelineOptions

2018-07-19 Thread Lukasz Cwik
You can't set it via the command line. You'll need to programmatically set
it.

Normally complex types are configured via using a JSON map as shown in this
commit:
https://github.com/apache/beam/commit/f1d4d41279ad93370a6151902fae6dbd13dde53b#diff-03b47195a404600b2f4576d4ab5fbbb3

This requires the pipeline option to have a JSON -> Complex type -> JSON
mapping defined. Please consider contributing one if you find it useful.

On Thu, Jul 19, 2018 at 1:48 AM Jozef Vilcek  wrote:

> Hey,
>
> I am using Beam with Flink and want to set `stateBacked` via pipeline
> options available here
>
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java#L120
>
> The property is an abstract class. I was not able to figure out so far
> what to pass to command line for initialization. How to provide such
> complex abstract type? Is it possible?
> .. I am trying to use rocksdb backend with incremental checkpointing ..
>


Re: Generating a window identifier while using a Trigger AfterProcessingTime.pastFirstElementInPane()

2018-07-19 Thread Lukasz Cwik
Note that @StartBundle is not co-related with a new pane, but an arbitrary
runner chosen amount of elements containing any number of windows and
elements restricted by the triggering semantics.

You can introspect the PaneInfo to see the firing index, index 0 represents
the first firing. I don't believe there is a way to know what is the last
firing without using a trigger that will produce a known number of firings
(e.g. watermark trigger with no speculative or late firings).

On Thu, Jul 19, 2018 at 6:12 AM Juan Carlos Garcia 
wrote:

> Hi Folks,
>
> I would like to ask if its possible to be notified when a Windows is
> created or closed while processing a batch of data. (Sorry for the long
> post)
>
> My scenario:
> I am using a Session window with a GapDuration of 2 minutes (for testing),
> during this processing we are assigning a Session identifier to the
> incoming messages so we can identify them later in ElasticSearch / Other
> tools, the process works as expected as long as we don't introduce any
> trigger (during the @ProcessElement we have the the Iterables elements from
> this windows and from there we can just generate our session identifier
> like) i.e:
>
> 
> PCollection>> windowedResult = input
> .apply("Session",
> Window.into(Sessions.withGapDuration(Duration.standardMinutes(2
> .apply("Create KV of Users", ParDo.of(new CreateMyKV()))
> .apply(GroupByKey.create())
> .apply(ParDo.of(new DoFn>, KV Iterable>>() {
> @ProcessElement
> public void processElement(ProcessContext c, BoundedWindow _window) {
> System.out.println("-- window:" + _window);
> System.out.println("session:" + UUID.randomUUID().toString());
> System.out.println(c.element().getValue());
> System.out.println("--");
> c.output(c.element());
> }
> }));
> 
>
> After i added the trigger "AfterProcessingTime.pastFirstElementInPane()",
> each fired pane doesn't contain any indications of the windows they belong
> to, and there is no way (at least i couldn't find) to actually hook into it
> and generate a Session identifier for the elements that belongs to the same
> windows.
>
> The behavior for @StartBundle is that it fires for each pane and the
> behavior for @Setup is not consistent as it fires more times than windows
> we have or sometime it fires less time.
>
> Any advised on this matter is welcome and by the way, in production we are
> using the SparkRunner (which only support ProcessingTime triggers based on
> the capability-matrix), please find below a JUnit class i am using to
> validate this behavior.
>
> 
> public class SessionWindowTest {
> private long TIME = System.currentTimeMillis();
>
> @Test
> public void testSessionWindowWorkAsExpected() {
> final List testMesages = new LinkedList<>();
> TIME = System.currentTimeMillis();
>
> //
> // 3 Windows of data
> //
>
> IntStream.range(0, 10).forEach(i -> {
> testMesages.add("{\"user_id\":123456789,
> \"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
> TIME += TimeUnit.SECONDS.toMillis(1);
> });
> TIME += TimeUnit.MINUTES.toMillis( 5);
> IntStream.range(0, 10).forEach(i -> {
> testMesages.add("{\"user_id\":123456789,
> \"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
> TIME += TimeUnit.SECONDS.toMillis(2);
> });
> TIME += TimeUnit.MINUTES.toMillis(6);
> IntStream.range(0, 10).forEach(i -> {
> testMesages.add("{\"user_id\":123456789,
> \"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
> TIME += TimeUnit.SECONDS.toMillis(4);
> });
>
> Pipeline pipe = Pipeline.create();
> PCollection input = pipe.apply("Create",
> Create.of(testMesages));
>
> PCollection>> windowedResult =
> input.apply("setting the time",
> ParDo.of(new DoFn() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> Any deserialize =
> JsonIterator.deserialize(c.element());
> c.outputWithTimestamp(c.element(), new
> Instant(deserialize.get("time").toLong()));
> }
> }))
> .apply("Session",
>
> Window.into(Sessions.withGapDuration(Duration.standardMinutes(2)))
> //
> .withAllowedLateness(Duration.standardSeconds(1))
> //.discardingFiredPanes()
> //
> .triggering(AfterProcessingTime.pastFirstElementInPane())
> )
> .apply("Create KV of Users", ParDo.of(new CreateUserKV()))
> .apply(GroupByKey.create())
> .apply(ParDo.of(new DoFn>,
> KV>>() {
> private int counter = 0;
>
> @StartBundle
> public void startBundle() {
> System.out.println("--BUNDLE--");
> 

Re: Open Source Challenge, Guadalajara on August 16th.

2018-07-19 Thread jb

Hi Arianne,

Thanks for the invite ! I would love to be speaker there !

Let's talk private for the logistic details (connection, etc).

Thanks again, much appreciated !

Regards
JB

On 2018-07-19 18:28, Arianne Lisset Navarro Lepe wrote:

Forgot to mention, the format is a video-conference !

Regards,

-Arianne Navarro


- Original message -
From: "Arianne Lisset Navarro Lepe" 
To: j...@nanthrax.net
Cc: user@beam.apache.org
Subject: Re: Open Source Challenge, Guadalajara on August 16th.
Date: Thu, Jul 19, 2018 11:24 AM

Hello JB,

We have new date (August 16th) for the event to bring on board new
contributors in Guadalajara, Mx.

We are looking to have a talk about the Apache Foundation, pretty
similar to the one you did in the Apache Beam Summit in San
Francisco.

Would you like to be the speaker =) ? Tentative dates are July 31st
or August 1st, but we can adjust that depend on your times.

Thanks !

Regards,
- Arianne Navarro


- Original message -
From: "Jean-Baptiste Onofré" 
To: user@beam.apache.org
Cc:
Subject: Re: Open Source Challenge, Guadalajara on August 2nd.
Date: Wed, Jun 20, 2018 9:51 AM

Hi,

You can count on my to help new contributors !

I'm pretty sure that others will jump on too.

Thanks !

I'm looking forward new contributions !

Regards
JB

On 20/06/2018 15:10, Arianne Lisset Navarro Lepe wrote:

Hi Beam Community,

Here in Mexico we are starting a consortium to foster an
open-source-first culture, we have named it OSOM ( Open Source

Mexico).


As par of our activities, we are hosting the Open Source

Challenge









on August 2nd at the IBM Guadalajara offices, with the intention

to

bring on board new contributors to the open community.


*We are looking for Beam coaches, that can provide guidance

(remote) to

the teams that choose Beam for the challenge. *


Take a look at the event description and we have some blank

spaces in

the page 4 for the coaches and speakers. Feel free to write your

name =)

then we can coordinate times.







https://docs.google.com/document/d/1jvwcpzn1mp5xCqcDnqcxne-1_RS5zY-DH35-Wr5eTng/edit?usp=sharing

[1]


Thanks


- Arianne Navarro




--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net [2]
Talend - http://www.talend.com [3]




Links:
--
[1]
https://docs.google.com/document/d/1jvwcpzn1mp5xCqcDnqcxne-1_RS5zY-DH35-Wr5eTng/edit?usp=sharing
[2] http://blog.nanthrax.net
[3] http://www.talend.com




Re: Open Source Challenge, Guadalajara on August 16th.

2018-07-19 Thread Arianne Lisset Navarro Lepe
Hello JB,
 
We have new date (August 16th) for the event to bring on board new contributors in Guadalajara, Mx.
 
We are looking to have a talk about the Apache Foundation, pretty similar to the one you did in the Apache Beam Summit in San Francisco.
 
Would you like to be the speaker =) ? Tentative dates are July 31st or August 1st, but we can adjust that depend on your times.
 
 
Thanks !
 
Regards,
- Arianne Navarro
 
 
 
 
- Original message -From: "Jean-Baptiste Onofré" To: user@beam.apache.orgCc:Subject: Re: Open Source Challenge, Guadalajara on August 2nd.Date: Wed, Jun 20, 2018 9:51 AM 
Hi,You can count on my to help new contributors !I'm pretty sure that others will jump on too.Thanks !I'm looking forward new contributions !RegardsJBOn 20/06/2018 15:10, Arianne Lisset Navarro Lepe wrote:> Hi Beam Community,>  > Here in Mexico we are starting a consortium to foster an> open-source-first culture, we have named it OSOM ( Open Source Mexico).>  > As par of our activities, we are hosting the Open Source Challenge> > on August 2nd at the IBM Guadalajara offices, with the intention to> bring on board new contributors to the open community.>  >  > *We are looking for Beam coaches, that can provide guidance (remote) to> the teams that choose Beam for the challenge.  *>  >  > Take a look at the event description and we have some blank spaces in> the page 4 for the coaches and speakers. Feel free to write your name =)> then we can coordinate times.>  > https://docs.google.com/document/d/1jvwcpzn1mp5xCqcDnqcxne-1_RS5zY-DH35-Wr5eTng/edit?usp=sharing>> Thanks>  >  > - Arianne Navarro>  >--Jean-Baptiste Onofréjbono...@apache.orghttp://blog.nanthrax.netTalend - http://www.talend.com 
 



Beam SparkRunner and Spark KryoSerializer problem

2018-07-19 Thread Juan Carlos Garcia
Folks,

Its someone using the SparkRunner out there with the Spark KryoSerializer ?

We are being force to use the not so efficient 'JavaSerializer' with Spark
because we face the following exception:


Exception in thread "main" java.lang.RuntimeException:
org.apache.spark.SparkException: Job aborted due to stage failure:
Exception while getting task result:
com.esotericsoftware.kryo.KryoException: Unable to find class:
org.apache.beam.runners.core.metrics.MetricsContainerImpl$$Lambda$31/1875283985
Serialization trace:
factory (org.apache.beam.runners.core.metrics.MetricsMap)
counters (org.apache.beam.runners.core.metrics.MetricsContainerImpl)
metricsContainers
(org.apache.beam.runners.core.metrics.MetricsContainerStepMap)
metricsContainers
(org.apache.beam.runners.spark.io.SparkUnboundedSource$Metadata)
at
org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:55)
at
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
at
org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:41)
at
org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:163)
at
org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:198)
at
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
at
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:87)
at
org.apache.beam.examples.BugWithKryoOnSpark.main(BugWithKryoOnSpark.java:75)


I created a jira ticket and attached a project example on it,
https://issues.apache.org/jira/browse/BEAM-4597

Any feedback is appreciated.

-- 

JC


Generating a window identifier while using a Trigger AfterProcessingTime.pastFirstElementInPane()

2018-07-19 Thread Juan Carlos Garcia
Hi Folks,

I would like to ask if its possible to be notified when a Windows is
created or closed while processing a batch of data. (Sorry for the long
post)

My scenario:
I am using a Session window with a GapDuration of 2 minutes (for testing),
during this processing we are assigning a Session identifier to the
incoming messages so we can identify them later in ElasticSearch / Other
tools, the process works as expected as long as we don't introduce any
trigger (during the @ProcessElement we have the the Iterables elements from
this windows and from there we can just generate our session identifier
like) i.e:


PCollection>> windowedResult = input
.apply("Session",
Window.into(Sessions.withGapDuration(Duration.standardMinutes(2
.apply("Create KV of Users", ParDo.of(new CreateMyKV()))
.apply(GroupByKey.create())
.apply(ParDo.of(new DoFn>, KV>>() {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow _window) {
System.out.println("-- window:" + _window);
System.out.println("session:" + UUID.randomUUID().toString());
System.out.println(c.element().getValue());
System.out.println("--");
c.output(c.element());
}
}));


After i added the trigger "AfterProcessingTime.pastFirstElementInPane()",
each fired pane doesn't contain any indications of the windows they belong
to, and there is no way (at least i couldn't find) to actually hook into it
and generate a Session identifier for the elements that belongs to the same
windows.

The behavior for @StartBundle is that it fires for each pane and the
behavior for @Setup is not consistent as it fires more times than windows
we have or sometime it fires less time.

Any advised on this matter is welcome and by the way, in production we are
using the SparkRunner (which only support ProcessingTime triggers based on
the capability-matrix), please find below a JUnit class i am using to
validate this behavior.


public class SessionWindowTest {
private long TIME = System.currentTimeMillis();

@Test
public void testSessionWindowWorkAsExpected() {
final List testMesages = new LinkedList<>();
TIME = System.currentTimeMillis();

//
// 3 Windows of data
//

IntStream.range(0, 10).forEach(i -> {
testMesages.add("{\"user_id\":123456789,
\"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
TIME += TimeUnit.SECONDS.toMillis(1);
});
TIME += TimeUnit.MINUTES.toMillis( 5);
IntStream.range(0, 10).forEach(i -> {
testMesages.add("{\"user_id\":123456789,
\"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
TIME += TimeUnit.SECONDS.toMillis(2);
});
TIME += TimeUnit.MINUTES.toMillis(6);
IntStream.range(0, 10).forEach(i -> {
testMesages.add("{\"user_id\":123456789,
\"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
TIME += TimeUnit.SECONDS.toMillis(4);
});

Pipeline pipe = Pipeline.create();
PCollection input = pipe.apply("Create",
Create.of(testMesages));

PCollection>> windowedResult =
input.apply("setting the time",
ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c) {
Any deserialize =
JsonIterator.deserialize(c.element());
c.outputWithTimestamp(c.element(), new
Instant(deserialize.get("time").toLong()));
}
}))
.apply("Session",

Window.into(Sessions.withGapDuration(Duration.standardMinutes(2)))
//
.withAllowedLateness(Duration.standardSeconds(1))
//.discardingFiredPanes()
//
.triggering(AfterProcessingTime.pastFirstElementInPane())
)
.apply("Create KV of Users", ParDo.of(new CreateUserKV()))
.apply(GroupByKey.create())
.apply(ParDo.of(new DoFn>,
KV>>() {
private int counter = 0;

@StartBundle
public void startBundle() {
System.out.println("--BUNDLE--");
}

@Setup
public void setupFn() {
System.out.println("--SETUP--");
}
@ProcessElement
public void processElement(ProcessContext c,
BoundedWindow _window) {
System.out.println("-- window:" + _window);
System.out.println("session:" +
UUID.randomUUID().toString());
//System.out.println(c.element().getValue());
System.out.println("--");
c.output(c.element());
}
}));

PipelineResult run = pipe.run();
Assert.assertTrue("Pipeline is done", run.getState() ==

Write bulks files from streaming app

2018-07-19 Thread Jozef Vilcek
Hey,

I am looking for the advice.

I am trying to do a stream processing with Beam on Flink runtime. Reading
data from Kafka, doing some processing with it which is not important here
and in the same time want to store consumed data to history storage for
archive and reprocessing, which is HDFS.

Now, the part of writing batches to HDFS is giving me hard time. Logically,
I want to do:

fileIO = FileIO.writeDynamic()
.by(destinationFn)
.via(AvroIO.sink(avroClass))
.to(path)
.withNaming(namingFn)
.withTempDirectory(tmp)
.withNumShards(shards)

data
   .withFixedWindow(1H, afterWatermarkTrigger, discardFiredPanes)
   .saveTo(fileIO)


This write generates in Flink execution graph 3 operators, which I do not
full understand yet.

Now, the problem is, that I am not able to run this at scale.

If I want to write big enough files to not to have lots of files on HDFS, I
keep running into the OOM. With Flink, I use rocksdb state backend and I
was warned about this JIRA which is probably related to my OOM
https://issues.apache.org/jira/browse/FLINK-8297
Therefore, I need to trigger more often and small batches which leads to
too many files on HDFS.

Question here is, if there is some path I do not see how to make this work
( write bulks of data to HDFS of my choosing without running to memory
troubles ). Also, keeping whole window data which is designated for write
to output to filesystem in state involves more IO.

Thanks for any thoughts and guidelines,
Jozef


Pass complex type to FlinkPipelineOptions

2018-07-19 Thread Jozef Vilcek
Hey,

I am using Beam with Flink and want to set `stateBacked` via pipeline
options available here
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java#L120

The property is an abstract class. I was not able to figure out so far what
to pass to command line for initialization. How to provide such complex
abstract type? Is it possible?
.. I am trying to use rocksdb backend with incremental checkpointing ..