Hey folks,

Sorry for the confusion around sinks. Let me see if I can clear things up.

In Beam, a Source+Reader is a very integral part of the model. A source is
the root of a pipeline and it is where runners can do lots of important
things like creating bundles, producing watermarks, and taking checkpoints.

So we initially thought that we should have a generic Write transform that
would encapsulate common patterns when outputting data. We started with
Write+Sink.

The Write+Sink class is simply a wrapper for the pattern (1-time global
initialization, parallel writing, 1-time global finalization), but it is
nothing more than a bunch of ParDos wired together by side inputs for
control flow. However, this pattern ONLY applies naturally to PCollections
that trigger exactly 1 time across all windows -- really, only bounded
PCollections. So it simply does not work with unbounded PCollections.

Over time, we have learned that this was not a very general pattern.
Specifically, I believe there is exactly 1 use of the Sink today, which is
for files (FileBasedSink, and also HadoopFileSink). The global patterns for
FileBasedSink look like (1 time, create temp directory ; many times, write
a temp file ; 1 time, rename and renumber all files).

Most other places we write data need less structure. Kafka, Google Cloud
Datastore, Google Cloud Bigtable -- most of the time you simply insert one
record or a bundle of records in a transaction, but you don't have global
multi-transaction support across many bundles. These are implemented purely
as DoFns, because we can encapsulate bundle-based batching or transactions
at that level. See BigtableIO
<https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L538>
or DatastoreIO
<https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java#L896>
for examples.

Other times, we actually want *more* structure. For BigQueryIO in bounded
mode, we: 1) write all files in parallel ; 2) 1-time globally break temp
tables into groups that fit within 1 BigQuery import job ; 3) create
multiple temp tables in bigquery in parallel ; 4) concatenate all the temp
tables into the final table ; 5) delete all the temp files and the temp
tables.

The fact that Write+Sink has not proved that useful has slowed down our
movement on any sort of unbounded sink or streaming sink. Instead, I think
it makes more sense to identify the important patterns and build to those.

The Flink Rolling[File]Sink is a specific kind of sink that we should
incorporate into the SDK core alongside TextIO. But the key is that
RollingSink is not "streaming TextIO.Write" - it's actually a file sink
with specific semantics that are different than those of TextIO.Write.

Similarly, we could extend TextIO.Write to support a file pattern like
"-WW-SS-of-NN-GG", where "WW" is based on the window, "SS of NN" is the
index of a particular shard within a write, and "GG" is the generation
number aka the number of times this Window+Shard have triggered (based on
PaneInfo). And of course to support this we'd have to force a fixed
sharding pattern all the time when writing, which hurts efficiency and
runner autotuning. In general, this is hard to do write in a way that makes
sense with late data, accumulating mode, etc.
   (If you could enforce 1 triggering per window, we could drop the -GG.)

Hope that helps explain why we don't have any awesome streaming sinks yet
in the SDK. However, we'd love for someone to start a Beam port of
RollingFileSink or something similar which has sensible semantics in the
presence of windowing, triggers, multiple firing, accumulating mode...
(Possibly just rejecting some inputs it can't handle).

Thanks!
Dan



On Mon, Sep 19, 2016 at 2:11 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi Thomas,
>
> thanks for the update.
>
> Stupid question: right now, most of the IO Sink use a simple DoFn (to
> write the PCollection elements).
>
> Does it mean that, when possible, we should use Sink abstract class, with
> a Writer that can write bundles (using open(), write(), close() methods) ?
> In that case, will the Window create the bundle ?
>
> Regards
> JB
>
> On 09/19/2016 10:46 PM, Thomas Groh wrote:
>
>> The model doesn't support dynamically creating PCollections, so the
>> proposed transform producing a Bounded PCollection from a window of an
>> Unbounded PCollection means that you end up with a Bounded Pipeline - in
>> which case it is preferable to use a Bounded Read instead. As you're
>> reading from an unbounded input, it may be necessary to write the
>> windowed values to a sink that supports continuous updates, from which
>> you can execute a Bounded Pipeline over the appropriate set of input
>> instead of on some arbitrary chunk of records (e.g. the window).
>>
>> The current Write implementation works effectively only for sinks that
>> can assume they can do something exactly once and finish. This is not an
>> assumption that can be made within arbitrary pipelines. Sinks that
>> currently only work for Bounded inputs are generally written with
>> assumptions that mean they do not work in the presence of multiple
>> trigger firings, late data, or across multiple windows, and handling
>> these facets of the model requires different behavior or configuration
>> from different sinks.
>>
>> On Mon, Sep 19, 2016 at 9:53 AM, Emanuele Cesena <emanu...@shopkick.com
>> <mailto:emanu...@shopkick.com>> wrote:
>>
>>     Oh yes, even better I’d say.
>>
>>     Best,
>>
>>
>>     > On Sep 19, 2016, at 9:48 AM, Jean-Baptiste Onofré <j...@nanthrax.net
>>     <mailto:j...@nanthrax.net>> wrote:
>>     >
>>     > Hi Emanuele,
>>     >
>>     > +1 to support Unbounded sink, but also, a very convenient function
>>     would be a Window to create a bounded collection as a subset of a
>>     unbounded collection.
>>     >
>>     > Regards
>>     > JB
>>     >
>>     > On 09/19/2016 05:59 PM, Emanuele Cesena wrote:
>>     >> Hi,
>>     >>
>>     >> This is a great insight. Is there any plan to support unbounded
>>     sink in Beam?
>>     >>
>>     >> On the temp kafka->kafka solution, this is exactly what we’re
>>     doing (and I wish to change). We have production stream pipelines
>>     that are kafka->kafka. Then we have 2 main use cases: kafka connect
>>     to dump into hive and go batch from there, and druid for real time
>>     reporting.
>>     >>
>>     >> However this makes prototyping really slow, and I wanted to
>>     introduce Beam to short cut from kafka to anywhere.
>>     >>
>>     >> Best,
>>     >>
>>     >>
>>     >>> On Sep 18, 2016, at 10:38 PM, Aljoscha Krettek
>>     <aljos...@apache.org <mailto:aljos...@apache.org>> wrote:
>>     >>>
>>     >>> Hi,
>>     >>> right now, writing to a Beam "Sink" is only supported for
>>     bounded streams, as you discovered. An unbounded stream cannot be
>>     transformed to a bounded stream using a window, this will just
>>     "chunk" the stream differently but it will still be unbounded.
>>     >>>
>>     >>> The options you have right now for writing are to write to your
>>     external datastore using a DoFn, using KafkaIO to write to a Kafka
>>     topic or to use UnboundedFlinkSink to wrap a Flink Sink for use in a
>>     Beam pipeline. The latter would allow you to use, for example,
>>     BucketingSink or RollingSink from Flink. I'm only mentioning
>>     UnboundedFlinkSink for completeness, I would not recommend using it
>>     since your program will only work on the Flink runner. The way to
>>     go, IMHO, would be to write to Kafka and then take the data from
>>     there and ship it to some final location such as HDFS.
>>     >>>
>>     >>> Cheers,
>>     >>> Aljoscha
>>     >>>
>>     >>> On Sun, 18 Sep 2016 at 23:17 Emanuele Cesena
>>     <emanu...@shopkick.com <mailto:emanu...@shopkick.com>> wrote:
>>     >>> Thanks I’ll look into it, even if it’s not really the feature I
>>     need (exactly because it will stop execution).
>>     >>>
>>     >>>
>>     >>>> On Sep 18, 2016, at 2:11 PM, Chawla,Sumit
>>     <sumitkcha...@gmail.com <mailto:sumitkcha...@gmail.com>> wrote:
>>     >>>>
>>     >>>> Hi Emanuele
>>     >>>>
>>     >>>> KafkaIO  supports withMaxNumRecords(X) support which will
>>     create a bounded source from Kafka. However, the pipeline will
>>     finish once X number of records are read.
>>     >>>>
>>     >>>> Regards
>>     >>>> Sumit Chawla
>>     >>>>
>>     >>>>
>>     >>>> On Sun, Sep 18, 2016 at 2:00 PM, Emanuele Cesena
>>     <emanu...@shopkick.com <mailto:emanu...@shopkick.com>> wrote:
>>     >>>> Hi,
>>     >>>>
>>     >>>> Thanks for the hint - I’ll debug better but I thought I did that:
>>     >>>>
>>     https://github.com/ecesena/beam-starter/blob/master/src/main
>> /java/com/dataradiant/beam/examples/StreamWordCount.java#L140
>>     <https://github.com/ecesena/beam-starter/blob/master/src/mai
>> n/java/com/dataradiant/beam/examples/StreamWordCount.java#L140>
>>     >>>>
>>     >>>> Best,
>>     >>>>
>>     >>>>
>>     >>>>> On Sep 18, 2016, at 1:54 PM, Jean-Baptiste Onofré
>>     <j...@nanthrax.net <mailto:j...@nanthrax.net>> wrote:
>>     >>>>>
>>     >>>>> Hi Emanuele
>>     >>>>>
>>     >>>>> You have to use a window to create a bounded collection from
>>     an unbounded source.
>>     >>>>>
>>     >>>>> Regards
>>     >>>>> JB
>>     >>>>>
>>     >>>>> On Sep 18, 2016, at 21:04, Emanuele Cesena
>>     <emanu...@shopkick.com <mailto:emanu...@shopkick.com>> wrote:
>>     >>>>> Hi,
>>     >>>>>
>>     >>>>> I wrote a while ago about a simple example I was building to
>>     test KafkaIO:
>>     >>>>>
>>     https://github.com/ecesena/beam-starter/blob/master/src/main
>> /java/com/dataradiant/beam/examples/StreamWordCount.java
>>     <https://github.com/ecesena/beam-starter/blob/master/src/mai
>> n/java/com/dataradiant/beam/examples/StreamWordCount.java>
>>     >>>>>
>>     >>>>> Issues with Flink should be fixed now, and I’m try to run the
>>     example on master and Flink 1.1.2.
>>     >>>>> I’m currently getting:
>>     >>>>> Caused by: java.lang.IllegalArgumentException: Write can only
>>     be applied to a Bounded PCollection
>>     >>>>>
>>     >>>>> What is the recommended way to go here?
>>     >>>>> - is there a way to create a bounded collection from an
>>     unbounded one?
>>     >>>>> - is there a plat to let TextIO write unbounded collections?
>>     >>>>> - is there another recommended “simple sink” to use?
>>     >>>>>
>>     >>>>> Thank you much!
>>     >>>>>
>>     >>>>> Best,
>>     >>>>
>>     >>>> --
>>     >>>> Emanuele Cesena, Data Eng.
>>     >>>> http://www.shopkick.com
>>     >>>>
>>     >>>> Il corpo non ha ideali
>>     >>>>
>>     >>>>
>>     >>>>
>>     >>>>
>>     >>>>
>>     >>>
>>     >>> --
>>     >>> Emanuele Cesena, Data Eng.
>>     >>> http://www.shopkick.com
>>     >>>
>>     >>> Il corpo non ha ideali
>>     >>>
>>     >>>
>>     >>>
>>     >>>
>>     >>
>>     >
>>     > --
>>     > Jean-Baptiste Onofré
>>     > jbono...@apache.org <mailto:jbono...@apache.org>
>>     > http://blog.nanthrax.net
>>     > Talend - http://www.talend.com
>>
>>     --
>>     Emanuele Cesena, Data Eng.
>>     http://www.shopkick.com
>>
>>     Il corpo non ha ideali
>>
>>
>>
>>
>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to