+dev

On Mon, Sep 19, 2016 at 4:37 PM, Dan Halperin <dhalp...@google.com> wrote:

> Hey folks,
>
> Sorry for the confusion around sinks. Let me see if I can clear things up.
>
> In Beam, a Source+Reader is a very integral part of the model. A source is
> the root of a pipeline and it is where runners can do lots of important
> things like creating bundles, producing watermarks, and taking checkpoints.
>
> So we initially thought that we should have a generic Write transform that
> would encapsulate common patterns when outputting data. We started with
> Write+Sink.
>
> The Write+Sink class is simply a wrapper for the pattern (1-time global
> initialization, parallel writing, 1-time global finalization), but it is
> nothing more than a bunch of ParDos wired together by side inputs for
> control flow. However, this pattern ONLY applies naturally to PCollections
> that trigger exactly 1 time across all windows -- really, only bounded
> PCollections. So it simply does not work with unbounded PCollections.
>
> Over time, we have learned that this was not a very general pattern.
> Specifically, I believe there is exactly 1 use of the Sink today, which is
> for files (FileBasedSink, and also HadoopFileSink). The global patterns for
> FileBasedSink look like (1 time, create temp directory ; many times, write
> a temp file ; 1 time, rename and renumber all files).
>
> Most other places we write data need less structure. Kafka, Google Cloud
> Datastore, Google Cloud Bigtable -- most of the time you simply insert one
> record or a bundle of records in a transaction, but you don't have global
> multi-transaction support across many bundles. These are implemented purely
> as DoFns, because we can encapsulate bundle-based batching or transactions
> at that level. See BigtableIO
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L538>
> or DatastoreIO
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java#L896>
> for examples.
>
> Other times, we actually want *more* structure. For BigQueryIO in bounded
> mode, we: 1) write all files in parallel ; 2) 1-time globally break temp
> tables into groups that fit within 1 BigQuery import job ; 3) create
> multiple temp tables in bigquery in parallel ; 4) concatenate all the temp
> tables into the final table ; 5) delete all the temp files and the temp
> tables.
>
> The fact that Write+Sink has not proved that useful has slowed down our
> movement on any sort of unbounded sink or streaming sink. Instead, I think
> it makes more sense to identify the important patterns and build to those.
>
> The Flink Rolling[File]Sink is a specific kind of sink that we should
> incorporate into the SDK core alongside TextIO. But the key is that
> RollingSink is not "streaming TextIO.Write" - it's actually a file sink
> with specific semantics that are different than those of TextIO.Write.
>
> Similarly, we could extend TextIO.Write to support a file pattern like
> "-WW-SS-of-NN-GG", where "WW" is based on the window, "SS of NN" is the
> index of a particular shard within a write, and "GG" is the generation
> number aka the number of times this Window+Shard have triggered (based on
> PaneInfo). And of course to support this we'd have to force a fixed
> sharding pattern all the time when writing, which hurts efficiency and
> runner autotuning. In general, this is hard to do write in a way that makes
> sense with late data, accumulating mode, etc.
>    (If you could enforce 1 triggering per window, we could drop the -GG.)
>
> Hope that helps explain why we don't have any awesome streaming sinks yet
> in the SDK. However, we'd love for someone to start a Beam port of
> RollingFileSink or something similar which has sensible semantics in the
> presence of windowing, triggers, multiple firing, accumulating mode...
> (Possibly just rejecting some inputs it can't handle).
>
> Thanks!
> Dan
>
>
>
> On Mon, Sep 19, 2016 at 2:11 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>
>> Hi Thomas,
>>
>> thanks for the update.
>>
>> Stupid question: right now, most of the IO Sink use a simple DoFn (to
>> write the PCollection elements).
>>
>> Does it mean that, when possible, we should use Sink abstract class, with
>> a Writer that can write bundles (using open(), write(), close() methods) ?
>> In that case, will the Window create the bundle ?
>>
>> Regards
>> JB
>>
>> On 09/19/2016 10:46 PM, Thomas Groh wrote:
>>
>>> The model doesn't support dynamically creating PCollections, so the
>>> proposed transform producing a Bounded PCollection from a window of an
>>> Unbounded PCollection means that you end up with a Bounded Pipeline - in
>>> which case it is preferable to use a Bounded Read instead. As you're
>>> reading from an unbounded input, it may be necessary to write the
>>> windowed values to a sink that supports continuous updates, from which
>>> you can execute a Bounded Pipeline over the appropriate set of input
>>> instead of on some arbitrary chunk of records (e.g. the window).
>>>
>>> The current Write implementation works effectively only for sinks that
>>> can assume they can do something exactly once and finish. This is not an
>>> assumption that can be made within arbitrary pipelines. Sinks that
>>> currently only work for Bounded inputs are generally written with
>>> assumptions that mean they do not work in the presence of multiple
>>> trigger firings, late data, or across multiple windows, and handling
>>> these facets of the model requires different behavior or configuration
>>> from different sinks.
>>>
>>> On Mon, Sep 19, 2016 at 9:53 AM, Emanuele Cesena <emanu...@shopkick.com
>>> <mailto:emanu...@shopkick.com>> wrote:
>>>
>>>     Oh yes, even better I’d say.
>>>
>>>     Best,
>>>
>>>
>>>     > On Sep 19, 2016, at 9:48 AM, Jean-Baptiste Onofré <j...@nanthrax.net
>>>     <mailto:j...@nanthrax.net>> wrote:
>>>     >
>>>     > Hi Emanuele,
>>>     >
>>>     > +1 to support Unbounded sink, but also, a very convenient function
>>>     would be a Window to create a bounded collection as a subset of a
>>>     unbounded collection.
>>>     >
>>>     > Regards
>>>     > JB
>>>     >
>>>     > On 09/19/2016 05:59 PM, Emanuele Cesena wrote:
>>>     >> Hi,
>>>     >>
>>>     >> This is a great insight. Is there any plan to support unbounded
>>>     sink in Beam?
>>>     >>
>>>     >> On the temp kafka->kafka solution, this is exactly what we’re
>>>     doing (and I wish to change). We have production stream pipelines
>>>     that are kafka->kafka. Then we have 2 main use cases: kafka connect
>>>     to dump into hive and go batch from there, and druid for real time
>>>     reporting.
>>>     >>
>>>     >> However this makes prototyping really slow, and I wanted to
>>>     introduce Beam to short cut from kafka to anywhere.
>>>     >>
>>>     >> Best,
>>>     >>
>>>     >>
>>>     >>> On Sep 18, 2016, at 10:38 PM, Aljoscha Krettek
>>>     <aljos...@apache.org <mailto:aljos...@apache.org>> wrote:
>>>     >>>
>>>     >>> Hi,
>>>     >>> right now, writing to a Beam "Sink" is only supported for
>>>     bounded streams, as you discovered. An unbounded stream cannot be
>>>     transformed to a bounded stream using a window, this will just
>>>     "chunk" the stream differently but it will still be unbounded.
>>>     >>>
>>>     >>> The options you have right now for writing are to write to your
>>>     external datastore using a DoFn, using KafkaIO to write to a Kafka
>>>     topic or to use UnboundedFlinkSink to wrap a Flink Sink for use in a
>>>     Beam pipeline. The latter would allow you to use, for example,
>>>     BucketingSink or RollingSink from Flink. I'm only mentioning
>>>     UnboundedFlinkSink for completeness, I would not recommend using it
>>>     since your program will only work on the Flink runner. The way to
>>>     go, IMHO, would be to write to Kafka and then take the data from
>>>     there and ship it to some final location such as HDFS.
>>>     >>>
>>>     >>> Cheers,
>>>     >>> Aljoscha
>>>     >>>
>>>     >>> On Sun, 18 Sep 2016 at 23:17 Emanuele Cesena
>>>     <emanu...@shopkick.com <mailto:emanu...@shopkick.com>> wrote:
>>>     >>> Thanks I’ll look into it, even if it’s not really the feature I
>>>     need (exactly because it will stop execution).
>>>     >>>
>>>     >>>
>>>     >>>> On Sep 18, 2016, at 2:11 PM, Chawla,Sumit
>>>     <sumitkcha...@gmail.com <mailto:sumitkcha...@gmail.com>> wrote:
>>>     >>>>
>>>     >>>> Hi Emanuele
>>>     >>>>
>>>     >>>> KafkaIO  supports withMaxNumRecords(X) support which will
>>>     create a bounded source from Kafka. However, the pipeline will
>>>     finish once X number of records are read.
>>>     >>>>
>>>     >>>> Regards
>>>     >>>> Sumit Chawla
>>>     >>>>
>>>     >>>>
>>>     >>>> On Sun, Sep 18, 2016 at 2:00 PM, Emanuele Cesena
>>>     <emanu...@shopkick.com <mailto:emanu...@shopkick.com>> wrote:
>>>     >>>> Hi,
>>>     >>>>
>>>     >>>> Thanks for the hint - I’ll debug better but I thought I did
>>> that:
>>>     >>>>
>>>     https://github.com/ecesena/beam-starter/blob/master/src/main
>>> /java/com/dataradiant/beam/examples/StreamWordCount.java#L140
>>>     <https://github.com/ecesena/beam-starter/blob/master/src/mai
>>> n/java/com/dataradiant/beam/examples/StreamWordCount.java#L140>
>>>     >>>>
>>>     >>>> Best,
>>>     >>>>
>>>     >>>>
>>>     >>>>> On Sep 18, 2016, at 1:54 PM, Jean-Baptiste Onofré
>>>     <j...@nanthrax.net <mailto:j...@nanthrax.net>> wrote:
>>>     >>>>>
>>>     >>>>> Hi Emanuele
>>>     >>>>>
>>>     >>>>> You have to use a window to create a bounded collection from
>>>     an unbounded source.
>>>     >>>>>
>>>     >>>>> Regards
>>>     >>>>> JB
>>>     >>>>>
>>>     >>>>> On Sep 18, 2016, at 21:04, Emanuele Cesena
>>>     <emanu...@shopkick.com <mailto:emanu...@shopkick.com>> wrote:
>>>     >>>>> Hi,
>>>     >>>>>
>>>     >>>>> I wrote a while ago about a simple example I was building to
>>>     test KafkaIO:
>>>     >>>>>
>>>     https://github.com/ecesena/beam-starter/blob/master/src/main
>>> /java/com/dataradiant/beam/examples/StreamWordCount.java
>>>     <https://github.com/ecesena/beam-starter/blob/master/src/mai
>>> n/java/com/dataradiant/beam/examples/StreamWordCount.java>
>>>     >>>>>
>>>     >>>>> Issues with Flink should be fixed now, and I’m try to run the
>>>     example on master and Flink 1.1.2.
>>>     >>>>> I’m currently getting:
>>>     >>>>> Caused by: java.lang.IllegalArgumentException: Write can only
>>>     be applied to a Bounded PCollection
>>>     >>>>>
>>>     >>>>> What is the recommended way to go here?
>>>     >>>>> - is there a way to create a bounded collection from an
>>>     unbounded one?
>>>     >>>>> - is there a plat to let TextIO write unbounded collections?
>>>     >>>>> - is there another recommended “simple sink” to use?
>>>     >>>>>
>>>     >>>>> Thank you much!
>>>     >>>>>
>>>     >>>>> Best,
>>>     >>>>
>>>     >>>> --
>>>     >>>> Emanuele Cesena, Data Eng.
>>>     >>>> http://www.shopkick.com
>>>     >>>>
>>>     >>>> Il corpo non ha ideali
>>>     >>>>
>>>     >>>>
>>>     >>>>
>>>     >>>>
>>>     >>>>
>>>     >>>
>>>     >>> --
>>>     >>> Emanuele Cesena, Data Eng.
>>>     >>> http://www.shopkick.com
>>>     >>>
>>>     >>> Il corpo non ha ideali
>>>     >>>
>>>     >>>
>>>     >>>
>>>     >>>
>>>     >>
>>>     >
>>>     > --
>>>     > Jean-Baptiste Onofré
>>>     > jbono...@apache.org <mailto:jbono...@apache.org>
>>>     > http://blog.nanthrax.net
>>>     > Talend - http://www.talend.com
>>>
>>>     --
>>>     Emanuele Cesena, Data Eng.
>>>     http://www.shopkick.com
>>>
>>>     Il corpo non ha ideali
>>>
>>>
>>>
>>>
>>>
>>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>
>

Reply via email to