Hi Dan,

thanks again for the detailed explanation.

I will prepare some questions for you ;)

Regards
JB

On 09/20/2016 01:37 AM, Dan Halperin wrote:
Hey folks,

Sorry for the confusion around sinks. Let me see if I can clear things up.

In Beam, a Source+Reader is a very integral part of the model. A source
is the root of a pipeline and it is where runners can do lots of
important things like creating bundles, producing watermarks, and taking
checkpoints.

So we initially thought that we should have a generic Write transform
that would encapsulate common patterns when outputting data. We started
with Write+Sink.

The Write+Sink class is simply a wrapper for the pattern (1-time global
initialization, parallel writing, 1-time global finalization), but it is
nothing more than a bunch of ParDos wired together by side inputs for
control flow. However, this pattern ONLY applies naturally to
PCollections that trigger exactly 1 time across all windows -- really,
only bounded PCollections. So it simply does not work with unbounded
PCollections.

Over time, we have learned that this was not a very general pattern.
Specifically, I believe there is exactly 1 use of the Sink today, which
is for files (FileBasedSink, and also HadoopFileSink). The global
patterns for FileBasedSink look like (1 time, create temp directory ;
many times, write a temp file ; 1 time, rename and renumber all files).

Most other places we write data need less structure. Kafka, Google Cloud
Datastore, Google Cloud Bigtable -- most of the time you simply insert
one record or a bundle of records in a transaction, but you don't have
global multi-transaction support across many bundles. These are
implemented purely as DoFns, because we can encapsulate bundle-based
batching or transactions at that level. See BigtableIO
<https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L538>
or DatastoreIO
<https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java#L896>
for examples.

Other times, we actually want *more* structure. For BigQueryIO in
bounded mode, we: 1) write all files in parallel ; 2) 1-time globally
break temp tables into groups that fit within 1 BigQuery import job ; 3)
create multiple temp tables in bigquery in parallel ; 4) concatenate all
the temp tables into the final table ; 5) delete all the temp files and
the temp tables.

The fact that Write+Sink has not proved that useful has slowed down our
movement on any sort of unbounded sink or streaming sink. Instead, I
think it makes more sense to identify the important patterns and build
to those.

The Flink Rolling[File]Sink is a specific kind of sink that we should
incorporate into the SDK core alongside TextIO. But the key is that
RollingSink is not "streaming TextIO.Write" - it's actually a file sink
with specific semantics that are different than those of TextIO.Write.

Similarly, we could extend TextIO.Write to support a file pattern like
"-WW-SS-of-NN-GG", where "WW" is based on the window, "SS of NN" is the
index of a particular shard within a write, and "GG" is the generation
number aka the number of times this Window+Shard have triggered (based
on PaneInfo). And of course to support this we'd have to force a fixed
sharding pattern all the time when writing, which hurts efficiency and
runner autotuning. In general, this is hard to do write in a way that
makes sense with late data, accumulating mode, etc.
   (If you could enforce 1 triggering per window, we could drop the -GG.)

Hope that helps explain why we don't have any awesome streaming sinks
yet in the SDK. However, we'd love for someone to start a Beam port of
RollingFileSink or something similar which has sensible semantics in the
presence of windowing, triggers, multiple firing, accumulating mode...
(Possibly just rejecting some inputs it can't handle).

Thanks!
Dan



On Mon, Sep 19, 2016 at 2:11 PM, Jean-Baptiste Onofré <j...@nanthrax.net
<mailto:j...@nanthrax.net>> wrote:

    Hi Thomas,

    thanks for the update.

    Stupid question: right now, most of the IO Sink use a simple DoFn
    (to write the PCollection elements).

    Does it mean that, when possible, we should use Sink abstract class,
    with a Writer that can write bundles (using open(), write(), close()
    methods) ?
    In that case, will the Window create the bundle ?

    Regards
    JB

    On 09/19/2016 10:46 PM, Thomas Groh wrote:

        The model doesn't support dynamically creating PCollections, so the
        proposed transform producing a Bounded PCollection from a window
        of an
        Unbounded PCollection means that you end up with a Bounded
        Pipeline - in
        which case it is preferable to use a Bounded Read instead. As you're
        reading from an unbounded input, it may be necessary to write the
        windowed values to a sink that supports continuous updates, from
        which
        you can execute a Bounded Pipeline over the appropriate set of input
        instead of on some arbitrary chunk of records (e.g. the window).

        The current Write implementation works effectively only for
        sinks that
        can assume they can do something exactly once and finish. This
        is not an
        assumption that can be made within arbitrary pipelines. Sinks that
        currently only work for Bounded inputs are generally written with
        assumptions that mean they do not work in the presence of multiple
        trigger firings, late data, or across multiple windows, and handling
        these facets of the model requires different behavior or
        configuration
        from different sinks.

        On Mon, Sep 19, 2016 at 9:53 AM, Emanuele Cesena
        <emanu...@shopkick.com <mailto:emanu...@shopkick.com>
        <mailto:emanu...@shopkick.com <mailto:emanu...@shopkick.com>>>
        wrote:

            Oh yes, even better I’d say.

            Best,


            > On Sep 19, 2016, at 9:48 AM, Jean-Baptiste Onofré
        <j...@nanthrax.net <mailto:j...@nanthrax.net>
            <mailto:j...@nanthrax.net <mailto:j...@nanthrax.net>>> wrote:
            >
            > Hi Emanuele,
            >
            > +1 to support Unbounded sink, but also, a very convenient
        function
            would be a Window to create a bounded collection as a subset
        of a
            unbounded collection.
            >
            > Regards
            > JB
            >
            > On 09/19/2016 05:59 PM, Emanuele Cesena wrote:
            >> Hi,
            >>
            >> This is a great insight. Is there any plan to support
        unbounded
            sink in Beam?
            >>
            >> On the temp kafka->kafka solution, this is exactly what we’re
            doing (and I wish to change). We have production stream
        pipelines
            that are kafka->kafka. Then we have 2 main use cases: kafka
        connect
            to dump into hive and go batch from there, and druid for
        real time
            reporting.
            >>
            >> However this makes prototyping really slow, and I wanted to
            introduce Beam to short cut from kafka to anywhere.
            >>
            >> Best,
            >>
            >>
            >>> On Sep 18, 2016, at 10:38 PM, Aljoscha Krettek
            <aljos...@apache.org <mailto:aljos...@apache.org>
        <mailto:aljos...@apache.org <mailto:aljos...@apache.org>>> wrote:
            >>>
            >>> Hi,
            >>> right now, writing to a Beam "Sink" is only supported for
            bounded streams, as you discovered. An unbounded stream
        cannot be
            transformed to a bounded stream using a window, this will just
            "chunk" the stream differently but it will still be unbounded.
            >>>
            >>> The options you have right now for writing are to write
        to your
            external datastore using a DoFn, using KafkaIO to write to a
        Kafka
            topic or to use UnboundedFlinkSink to wrap a Flink Sink for
        use in a
            Beam pipeline. The latter would allow you to use, for example,
            BucketingSink or RollingSink from Flink. I'm only mentioning
            UnboundedFlinkSink for completeness, I would not recommend
        using it
            since your program will only work on the Flink runner. The
        way to
            go, IMHO, would be to write to Kafka and then take the data from
            there and ship it to some final location such as HDFS.
            >>>
            >>> Cheers,
            >>> Aljoscha
            >>>
            >>> On Sun, 18 Sep 2016 at 23:17 Emanuele Cesena
            <emanu...@shopkick.com <mailto:emanu...@shopkick.com>
        <mailto:emanu...@shopkick.com <mailto:emanu...@shopkick.com>>>
        wrote:
            >>> Thanks I’ll look into it, even if it’s not really the
        feature I
            need (exactly because it will stop execution).
            >>>
            >>>
            >>>> On Sep 18, 2016, at 2:11 PM, Chawla,Sumit
            <sumitkcha...@gmail.com <mailto:sumitkcha...@gmail.com>
        <mailto:sumitkcha...@gmail.com <mailto:sumitkcha...@gmail.com>>>
        wrote:
            >>>>
            >>>> Hi Emanuele
            >>>>
            >>>> KafkaIO  supports withMaxNumRecords(X) support which will
            create a bounded source from Kafka. However, the pipeline will
            finish once X number of records are read.
            >>>>
            >>>> Regards
            >>>> Sumit Chawla
            >>>>
            >>>>
            >>>> On Sun, Sep 18, 2016 at 2:00 PM, Emanuele Cesena
            <emanu...@shopkick.com <mailto:emanu...@shopkick.com>
        <mailto:emanu...@shopkick.com <mailto:emanu...@shopkick.com>>>
        wrote:
            >>>> Hi,
            >>>>
            >>>> Thanks for the hint - I’ll debug better but I thought I
        did that:
            >>>>

        
https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java#L140
        
<https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java#L140>

        
<https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java#L140
        
<https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java#L140>>
            >>>>
            >>>> Best,
            >>>>
            >>>>
            >>>>> On Sep 18, 2016, at 1:54 PM, Jean-Baptiste Onofré
            <j...@nanthrax.net <mailto:j...@nanthrax.net>
        <mailto:j...@nanthrax.net <mailto:j...@nanthrax.net>>> wrote:
            >>>>>
            >>>>> Hi Emanuele
            >>>>>
            >>>>> You have to use a window to create a bounded
        collection from
            an unbounded source.
            >>>>>
            >>>>> Regards
            >>>>> JB
            >>>>>
            >>>>> On Sep 18, 2016, at 21:04, Emanuele Cesena
            <emanu...@shopkick.com <mailto:emanu...@shopkick.com>
        <mailto:emanu...@shopkick.com <mailto:emanu...@shopkick.com>>>
        wrote:
            >>>>> Hi,
            >>>>>
            >>>>> I wrote a while ago about a simple example I was
        building to
            test KafkaIO:
            >>>>>

        
https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java
        
<https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java>

        
<https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java
        
<https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java>>
            >>>>>
            >>>>> Issues with Flink should be fixed now, and I’m try to
        run the
            example on master and Flink 1.1.2.
            >>>>> I’m currently getting:
            >>>>> Caused by: java.lang.IllegalArgumentException: Write
        can only
            be applied to a Bounded PCollection
            >>>>>
            >>>>> What is the recommended way to go here?
            >>>>> - is there a way to create a bounded collection from an
            unbounded one?
            >>>>> - is there a plat to let TextIO write unbounded
        collections?
            >>>>> - is there another recommended “simple sink” to use?
            >>>>>
            >>>>> Thank you much!
            >>>>>
            >>>>> Best,
            >>>>
            >>>> --
            >>>> Emanuele Cesena, Data Eng.
            >>>> http://www.shopkick.com
            >>>>
            >>>> Il corpo non ha ideali
            >>>>
            >>>>
            >>>>
            >>>>
            >>>>
            >>>
            >>> --
            >>> Emanuele Cesena, Data Eng.
            >>> http://www.shopkick.com
            >>>
            >>> Il corpo non ha ideali
            >>>
            >>>
            >>>
            >>>
            >>
            >
            > --
            > Jean-Baptiste Onofré
            > jbono...@apache.org <mailto:jbono...@apache.org>
        <mailto:jbono...@apache.org <mailto:jbono...@apache.org>>
            > http://blog.nanthrax.net
            > Talend - http://www.talend.com

            --
            Emanuele Cesena, Data Eng.
            http://www.shopkick.com

            Il corpo non ha ideali






    --
    Jean-Baptiste Onofré
    jbono...@apache.org <mailto:jbono...@apache.org>
    http://blog.nanthrax.net
    Talend - http://www.talend.com



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to