Re: Windowing in a batch pipeline

2017-11-08 Thread Robert Bradshaw
On Wed, Nov 8, 2017 at 5:33 PM, Jacob Marble  wrote:
> Good evening. I'm trying to nail down windowing. The concept is clear, just
> struggling with writing a working pipeline. Tonight the goal is group events
> by key and window, in a batch pipeline. All data is "late" because it's a
> batch pipeline, and I expect nothing to be dropped or processed in a "late"
> context.

Traditionally, in a batch pipeline we consider no data to be late, as
we have perfect knowledge of the watermark.

> Read section 7 and 8 of the Beam Programming Guide roughly twice.
> Sifted through the examples, WindowedWordCount is close, but it doesn't use
> triggering, which is where (2b) is probably off track.
>
> 1)
> PCollection is created through a series of transforms, including a
> Join.leftOuterJoin(). Apply a timestamp with something simple:
>
> collection.apply("add window timestamp",
>  ParDo.of(new DoFn() {
>   @ProcessElement
>   public void map(ProcessContext context) {
>Foo element = context.element();
>Instant timestamp = new Instant(element.getActivityUnixSeconds() * 1000);
>context.outputWithTimestamp(element, timestamp);
>   }
>  }));
>
> This fails with "java.lang.IllegalArgumentException: Cannot output with
> timestamp 2017-04-01T00:00:00.000Z. Output timestamps must be no earlier
> than the timestamp of the current input (294247-01-09T04:00:54.775Z) minus
> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
> Javadoc for details on changing the allowed skew."
>
> Is this expected? I don't care about skew, just want to set the timestamp
> per element.
>
> I worked around this by applying the timestamp earlier in the pipeline,
> right after a TextIO.read(). Why does that fix the problem?

I would suspect that very-far-in-the-future timestamp is the end of
the global window, set as the timestamp as the result of a
group-by-key.

You can set your timestamps earlier, as you have done, but in this
case they will get reset after passing through any GBK. It's possible
you could get what you want by setting TimestampCombiner to EARLIEST
(see 
https://github.com/apache/beam/blob/v2.1.1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java#L47)
but probably the right solution is to set the allowed timestamp skew
to infinity (or Long.MAX_VALUE or similar).

Generally this skew is needed in streaming to hold the watermark back
the right amount... Definitely not intuitive in your case; we should
think if there's something better we could do here.

> 2a)
> After applying the timestamp, let's window!
>
> collection.apply("window into sessions",
>  Window.into(Sessions.withGapDuration(Duration.standardMinutes(10
>  .apply("key by something, reduce")
>  .apply(TextIO.write()...)
>
> Now I see an output file, what joy! But the output file is empty. Confirmed
> that the PCollection feeding TextIO.write() is seeing data. Maybe this is
> because the default trigger is incorrect for my use case? I expected not to
> need triggering in batch context, but the DefaultTrigger Javadoc makes me
> believe otherwise.
>
> 2b)
> How about the Never.ever() trigger? Javadoc: "Using this trigger will only
> produce output when the watermark passes the end of the {@link BoundedWindow
> window}". I don't know, but let's try. There's some error about allowed
> lateness and firing panes, so I'll try values that look standard:
>
> collection.apply("window into sessions",
>  Window.into(Sessions.withGapDuration(Duration.standardMinutes(10)))
>
> .triggering(Never.ever()).withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes())
>  .apply("key by something, reduce")
>  .apply(TextIO.write()...)
>
> This yields a new error:
> "java.lang.IllegalStateException: TimestampCombiner moved element from
> 294247-01-09T04:10:54.774Z to earlier time 294247-01-09T04:00:54.775Z (end
> of global window) for window
> org.apache.beam.sdk.transforms.windowing.GlobalWindow"
>
> So I'm probably looking in the wrong place.

I think if you resolve the issues above than this will take care of itself.

- Robert


Windowing in a batch pipeline

2017-11-08 Thread Jacob Marble
Good evening. I'm trying to nail down windowing. The concept is clear, just
struggling with writing a working pipeline. Tonight the goal is group
events by key and window, in a batch pipeline. All data is "late" because
it's a batch pipeline, and I expect nothing to be dropped or processed in a
"late" context.

Read section 7 and 8 of the Beam Programming Guide roughly twice.
Sifted through the examples, WindowedWordCount is close, but it doesn't use
triggering, which is where (2b) is probably off track.

1)
PCollection is created through a series of transforms, including a
Join.leftOuterJoin(). Apply a timestamp with something simple:

collection.apply("add window timestamp",
 ParDo.of(new DoFn() {
  @ProcessElement
  public void map(ProcessContext context) {
   Foo element = context.element();
   Instant timestamp = new Instant(element.getActivityUnixSeconds() * 1000);
   context.outputWithTimestamp(element, timestamp);
  }
 }));

This fails with "java.lang.IllegalArgumentException: Cannot output with
timestamp 2017-04-01T00:00:00.000Z. Output timestamps must be no earlier
than the timestamp of the current input (294247-01-09T04:00:54.775Z) minus
the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
Javadoc for details on changing the allowed skew."

*Is this expected? I don't care about skew, just want to set the timestamp
per element.*

I worked around this by applying the timestamp earlier in the pipeline,
right after a TextIO.read(). Why does that fix the problem?

2a)
After applying the timestamp, let's window!

collection.apply("window into sessions",
 Window.into(Sessions.withGapDuration(Duration.standardMinutes(10
 .apply("key by something, reduce")
 .apply(TextIO.write()...)

Now I see an output file, what joy! *But the output file is empty.* Confirmed
that the PCollection feeding TextIO.write() is seeing data. Maybe this is
because the default trigger is incorrect for my use case? I expected not to
need triggering in batch context, but the DefaultTrigger Javadoc makes me
believe otherwise.

2b)
How about the Never.ever() trigger? Javadoc: "Using this trigger will only
produce output when the watermark passes the end of the {@link
BoundedWindow window}". I don't know, but let's try. There's some error
about allowed lateness and firing panes, so I'll try values that look
standard:

collection.apply("window into sessions",
 Window.into(Sessions.withGapDuration(Duration.standardMinutes(10)))
  .triggering(Never.ever()).withAllowedLateness(Duration.stand
ardDays(1)).discardingFiredPanes())
 .apply("key by something, reduce")
 .apply(TextIO.write()...)

This yields a new error:
"java.lang.IllegalStateException: TimestampCombiner moved element from
294247-01-09T04:10:54.774Z to earlier time 294247-01-09T04:00:54.775Z (end
of global window) for window org.apache.beam.sdk.transforms
.windowing.GlobalWindow"

So I'm probably looking in the wrong place.

Thanks!

Jacob


Re: design pattern for enriching data via db lookups?

2017-11-08 Thread Lukasz Cwik
For joining with external data you have some options:
* Do direct calls to the external datastore, perform your own in memory
caching/expiration. You control exactly what happens and when it happens
but as you have done this in the past you know what this entails.
* Ingest the external data and perform CoGBK
 on
a common key. Works well for datasets which have comparable data sizes.
* Ingest the external data and use it as a map/multimap side input
.
Works well for datasets where one dataset is much smaller then the other.
(Especially if the dataset can fit in memory).

Based on your data set being small I would suggest using the side input
approach. When you ingest the external data, you can perform any transforms
that are required to get to the common key including running the Beam SQL
stuff. The SQL stuff is available for Cloud Dataflow but not yet officially
supported. As for ingesting the external data, it all depends on where it
is coming from but the closest IO connector to your data source is the best.


On Thu, Nov 2, 2017 at 10:26 AM, Christopher Bockman 
wrote:

> Hi,
>
> We're evaluating Beam and trying to figure out if it meets our needs
> (suspect it does!), and, if so, how to best set up our code.
>
> Many thanks in advance.
>
> *Basic scenario:*
>
> * Data (for machine learning/prediction) comes in.  Data is a set of
> documents which are 100% independent.  We want to apply some
> transformations to those items on a per-doc basis.
>
> - Many of the transformations are easily and obviously encapsulated in
> beam user code.
>
> - *However, we'd also like to enrich the data via queries to external
> databases.  How do we do that efficiently *(largely in time, but also in
> compute resources)*?*
>
> *Additional constraints:*
>
> - We are developing on Google Cloud, if it matters.
>
> - Ideally we can achieve below in Python (versus Java), to ease porting
> existing code.
>
> *Some examples:*
>
> 1) *Key-value query.*
>
> Text comes in, and we'd like to do some pre-processing to the text, and
> then look up certain subsets of that text against an external database.
> Those found mappings need to be associated with the underlying text.
>
> E.g., imagine we're doing Named Entity Recognition and trying to augment
> with a large, fine-grained external gazetteer.
>
> "I went to the Los Angeles Dodgers game."  (RIP)
>
> Perhaps we generate ngrams ("I", ..., "I went", "went to", ..., "I went
> to", ..., "Los Angeles Dodgers", ...) and then find that "Los Angeles
> Dodgers" maps to entity 123456, and "Los Angeles" maps to 234567, and we
> want to map those back into the underlying document.
>
> 2) *More sophisticated query.*
>
> We do a bunch of calculations on the data, and then derive some
> intermediary result, and need to look that result up against an external
> database to generate a final result for the data.  These queries might
> require a bit more SQL sophistication (table joining, looking up multiple
> rows and filtering, etc.).
>
> * Scenario #1 is more important than #2, because, worst case, we can
> probably cast most of our external enrichment to a key-value paradigm.
>
> *The concern: the database query becomes the choke point*
>
> * Most naive implementation would seem to be write user code that grabs
> each doc and does a remote database lookup for that doc.
>
> We initially had this implemented (outside of Beam), but found that
> (unsurprisingly) *the round-trip to the database became a blocker*--code
> would just be waiting on the DB round-trip and so processing slowed down
> dramatically (from keeping the db local via, ultimately unmanagable, a
> local SQLlite instance).
>
> Our solution was to 1) implement multi-threading (to limit the db queries
> blocking) and 2) implement local caching of lookups (using
> https://dogpilecache.readthedocs.io/en/latest/usage.html).  Both of these
> did dramatically sped things up for the single-machine (non-Beam) scenario.
>
> *Is there an equivalent (direct code or design pattern) of either #1 or #2
> in Beam?  *(The answer to #1 might just be that Beam automatically adds
> more documents to be processed when it realizes things are slower than they
> "should be"?)
>
> *Possible solution?: pull the table(s), in full, down to the Beam cluster*
>
> * The tables we're working with aren't terribly large by modern standards
> (although I'd like to design for this potentially changing)--maybe a few GB
> at most, and probably easily shaved down to 100s of MBs.  Conceptually--if
> quicker--we could (I think?) do something like pull the entire table down
> in a PCollection and then use that data "locally" (i.e., within the Beam
> cluster).
>
> - Presumably, for key-value lookup, we could write some query to
> efficiently cross-reference the two PCollection's (i.e., the db and the
> actual source 

Re: London Apache Beam meetup 2: call for speakers

2017-11-08 Thread Matthias Baetens
No worries JB, I'll send you a message on how we can plan around this
(reschedule the meetup or postpone your session).
Thanks for the heads-up, have fun in Singapore!

Best,
Matthias

Op di 7 nov. 2017 om 04:52 schreef Jean-Baptiste Onofré :

> Hi,
>
> unfortunately, I have to decline the invite as I will be at Strata
> Singapore in
> the same time :(
>
> I'm very sorry about that. You can count on me for the 3rd edition !
>
> Regards
> JB
>
> On 11/07/2017 01:41 AM, Matthias Baetens wrote:
> > Hi all!
> >
> > Hope you are well.
> > We are back for a second edition of the London Apache Beam meetup,
> aiming for
> > the 5th of December.
> >
> > We are pretty excited to announce that our first speaker will be
> > Jean-Baptiste Onofré  himself!
> >
> > If you have an interesting *use-case* to share and are in London on the
> *5th of
> > December*, don't hesitate to reach out to me :)
> > Else: keep track of the meetup page
> >  to be updated on our
> > activity in the space.
> >
> > Best,
> > Matthias
> > --
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
--


Re: IBM Streams now supports Apache Beam Java applications

2017-11-08 Thread Kenneth Knowles
This is very cool!

I'm curious about the Cloud/Streams specific features - can you give a gist
of those?

Kenn

On Tue, Nov 7, 2017 at 2:45 PM, Daniel Debrunner  wrote:

> We are excited to announce the release of IBM Streams Runner for Apache
> Beam.
>
> Beam users can submit Beam 2.0 Java applications to IBM Cloud
> Streaming Analytics Service (free trials are available,
> https://console.bluemix.net/catalog/services/streaming-analytics ).
>
> Besides the Beam API, this runner also exposes several IBM
> Cloud/Streams specific features.
>
> Find more details here:
> https://ibmstreams.github.io/streamsx.documentation/docs/
> beamrunner/beamrunner-1-intro
>
> If you have any questions or requirements regarding Streams Runner,
> feel free to post them on StreamsDev forum:
> https://developer.ibm.com/answers/smartspace/streamsdev/index.html.
>
> Best,
> IBM Streams Team
>