Re: Proposal: Dynamic PIpelineOptions

2016-08-04 Thread Frances Perry
+Amit, Aljoscha, Manu

Any comments from folks on the Flink, Spark, or Gearpump runners?

On Tue, Aug 2, 2016 at 11:10 AM, Robert Bradshaw <
rober...@google.com.invalid> wrote:

> Being able to "late-bind" parameters like input paths to a
> pre-constructed program would be a very useful feature, and I think is
> worth adding to Beam.
>
> Of the four API proposals, I have a strong preference for (4).
> Further, it seems that these need not be bound to the PipelineOptions
> object itself (i.e. a named RuntimeValueSupplier could be constructed
> off of a pipeline object), which the Python API makes less heavy use
> of (encouraging the user to use familiar, standard libraries for
> argument parsing), though of course such integration is useful to
> provide for convenience.
>
> - Robert
>
> On Fri, Jul 29, 2016 at 12:14 PM, Sam McVeety 
> wrote:
> > During the graph construction phase, the given SDK generates an initial
> > execution graph for the program.  At execution time, this graph is
> > executed, either locally or by a service.  Currently, Beam only supports
> > parameterization at graph construction time.  Both Flink and Spark supply
> > functionality that allows a pre-compiled job to be run without SDK
> > interaction with updated runtime parameters.
> >
> > In its current incarnation, Dataflow can read values of PipelineOptions
> at
> > job submission time, but this requires the presence of an SDK to properly
> > encode these values into the job.  We would like to build a common layer
> > into the Beam model so that these dynamic options can be properly
> provided
> > to jobs.
> >
> > Please see
> > https://docs.google.com/document/d/1I-iIgWDYasb7ZmXbGBHdok_I
> K1r1YAJ90JG5Fz0_28o/edit
> > for the high-level model, and
> > https://docs.google.com/document/d/17I7HeNQmiIfOJi0aI70tgGMM
> kOSgGi8ZUH-MOnFatZ8/edit
> > for
> > the specific API proposal.
> >
> > Cheers,
> > Sam
>


Re: [PROPOSAL] Having 2 Spark runners to support Spark 1 users while advancing towards better streaming implementation with Spark 2

2016-08-04 Thread Kenneth Knowles
+1

I definitely think it is important to support spark 1 and 2 simultaneously,
and I agree that side-by-side seems the best way to do it. I'll refrain
from commenting on the specific technical aspects of the two runners and
focus just on the split: I am also curious about the answer to Dan's
question about what code is likely to be shared, if any.

On Thu, Aug 4, 2016 at 9:40 AM, Dan Halperin 
wrote:

> Can they share any substantial code? If not, they will really be separate
> runners.
>
> If so, would it make more sense to fork into runners/spark and
> runners/spark2?
>
>
>
> On Thu, Aug 4, 2016 at 9:33 AM, Ismaël Mejía  wrote:
>
> > +1
> >
> > In particular for three reasons:
> >
> > 1. The new DataSet API in spark 2 and the new semantics it allows for the
> > runner (and the effect that we cannot retro port this to the spark 1
> > runner).
> > 2. The current performance regressions in spark 2 (another reason to keep
> > the spark 1 runner).
> > 3. The different dependencies between spark versions (less important but
> > also a source of runtime conflicts).
> >
> > Just two points:
> > 1.  Considering the alpha state of the Structured Streaming API and the
> > performance regressions I consider that it is important to preserve the
> > previous TransformTranslator in the spark 2 runner, at least until spark
> 2
> > releases some stability fixes.
> > 2. Porting Read.Bound to the spark 1 runner is a must, we must guarantee
> > the same IO compatibility in both runners to make this ‘split’ make
> sense.
> >
> > Negative points of the proposal:
> > - More maintenance work + tests to do, but still worth at least for some
> > time given the current state.
> >
> > Extra comments:
> >
> > - This means that we will have two compatibility matrix columns now (at
> > least while we support spark 1) ?
> > - We must probably make clear for users the advantages/disadvantages of
> > both versions of the runner, and make clear that the spark 1 runner will
> be
> > almost on maintenance mode (with not many new features).
> > - We must also decide later on to deprecate the spark 1 runner, this will
> > depend in part of the feedback from users + the progress/adoption of
> spark
> > 2.
> >
> > Ismaël
> >
> > On Thu, Aug 4, 2016 at 8:39 AM, Amit Sela  wrote:
> >
> > > After discussions with JB, and understanding that a lot of companies
> > > running Spark will probably run 1.6.x for a while, we thought it would
> > be a
> > > good idea to have (some) support for both branches.
> > >
> > > The SparkRunnerV1 will mostly support Batch, but could also support
> > > “KeyedState” workflows and Sessions. As for streaming, I suggest to
> > > eliminate the awkward
> > >  > > runners/spark#streaming>
> > > way it uses Beam Windows, and only support Processing-Time windows.
> > >
> > > The SparkRunnerV2 will have a batch/streaming support relying on
> > Structured
> > > Streaming and the functionality it provides, and will provide in the
> > > future, to support the Beam model best as it can.
> > >
> > > The runners will exist under “runners/spark/spark1” and
> > > “runners/spark/spark2”.
> > >
> > > If this proposal is accepted, I will change JIRA tickets according to a
> > > proposed roadmap for both runners.
> > >
> > > General roadmap:
> > >
> > >
> > > SparkRunnerV1 should mostly “cleanup” and get rid of the
> Window-mocking,
> > > while specifically declaring Unsupported where it should.
> > >
> > > Additional features:
> > >
> > >1.
> > >
> > >Read.Bound support - actually supported in the SparkRunnerV2 branch
> > that
> > >is at work and it already passed some tests by JB and Ismael from
> > > Talend.
> > >I’ve also asked Michael Armbrust from Apache Spark to review this,
> and
> > > once
> > >it’s all set I’ll backport it to V1 as well.
> > >2.
> > >
> > >Consider support for “Keyed-State”.
> > >3.
> > >
> > >Consider support for “Sessions”
> > >
> > >
> > > SparkRunnerV2 branch  incubator-beam/pull/495>
> > > is
> > > at work right now and I hope to have it out supporting (some)
> event-time
> > > windowing, triggers and accumulation modes for streaming.
> > >
> > > Thanks,
> > > Amit
> > >
> >
>


[PROPOSAL] Splittable DoFn - Replacing the Source API with non-monolithic element processing in DoFn

2016-08-04 Thread Eugene Kirpichov
Hello Beam community,

We (myself, Daniel Mills and Robert Bradshaw) would like to propose
"Splittable DoFn" - a major generalization of DoFn, which allows processing
of a single element to be non-monolithic, i.e. checkpointable and
parallelizable, as well as doing an unbounded amount of work per element.

This allows effectively replacing the current Bounded/UnboundedSource APIs
with DoFn's that are much easier to code, more scalable and composable with
the rest of the Beam programming model, and enables many use cases that
were previously difficult or impossible, as well as some non-obvious new
use cases.

This proposal has been mentioned before in JIRA [BEAM-65] and some Beam
meetings, and now the whole thing is written up in a document:

https://s.apache.org/splittable-do-fn

Here are some things that become possible with Splittable DoFn:
- Efficiently read a filepattern matching millions of files
- Read a collection of files that are produced by an earlier step in the
pipeline (e.g. easily implement a connector to a storage system that can
export itself to files)
- Implement a Kafka reader by composing a "list partitions" DoFn with a
DoFn that simply polls a consumer and outputs new records in a while() loop
- Implement a log tailer by composing a DoFn that incrementally returns new
files in a directory and a DoFn that tails a file
- Implement a parallel "count friends in common" algorithm (matrix
squaring) with good work balancing

Here is the meaningful part of a hypothetical Kafka reader written against
this API:

ProcessContinuation processElement(
ProcessContext context, OffsetRangeTracker tracker) {
  try (KafkaConsumer consumer =
Kafka.subscribe(context.element().topic,
context.element().partition)) {
consumer.seek(tracker.start());
while (true) {
  ConsumerRecords records = consumer.poll(100ms);
  if (records == null) return done();
  for (ConsumerRecord record : records) {
if (!tracker.tryClaim(record.offset())) {
  return resume().withFutureOutputWatermark(record.timestamp());
}
context.output(record);
  }
}
  }
}

The document describes in detail the motivations behind this feature, the
basic idea and API, open questions, and outlines an incremental delivery
plan.

The proposed API builds on the reflection-based new DoFn [new-do-fn] and is
loosely related to "State and Timers for DoFn" [beam-state].

Please take a look and comment!

Thanks.

[BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
[new-do-fn] https://s.apache.org/a-new-do-fn
[beam-state] https://s.apache.org/beam-state