Re: Preferred locations (or data locality) for batch pipelines.

2016-10-03 Thread Dan Halperin
See if this is a right interpretation:

* Hadoop's InputSplit has a getLocations

method that in some cases exposes useful information about the underlying
data locality.
* Beam jobs may run on the same cluster as the HDFS storage nodes (e.g.),
in which case it's useful to expose the locality to runners to assign
mappers (e.g.) to be near the data.

In that case, I think it makes perfect sense to expose the `getLocations`
on the actual HDFS sources. To do this, we would need to make the HDFS
Source an actual BoundedSource with a getter for the locations -- rather
than an anonymous inner class. See here: https://github.com/apache/
incubator-beam/blob/master/sdks/java/io/hdfs/src/main/
java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java#L180

If that's right, makes sense to me!

Dan

On Mon, Sep 26, 2016 at 12:55 PM, Amit Sela  wrote:

> Thanks for the through response Dan, what you mentioned is very interesting
> and would clearly benefit runners.
>
> I was actually talking about something more "old-school", and specific to
> batch.
> If running a job on YARN - via MapReduce, Spark, etc. - you'd prefer that
> YARN would assign tasks working on splits locally.
>
> Spark does this for HDFS/HBase/S3:
> https://github.com/apache/spark/blob/branch-1.6/core/src/
> main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L241
> .
>
> Since for most(?) open-source runners YARN is the preferred/popular
> resource manager, and HDFS is the preferred filesystem, I was wondering if
> that's something that could be shared across runners and not being
> re-written per-runner.
> I'm talking about obtaining the locations of the input splits, and passing
> them to the runners to choose how to use them.
>
> I wonder if there's a need for that besides the Spark runner though, it's
> only for batch.. I opened https://issues.apache.org/jira/browse/BEAM-673
> as
> a "runner-spark" component for now.
>
> Thanks,
> Amit
>
>
> On Mon, Sep 26, 2016 at 10:39 PM Dan Halperin  >
> wrote:
>
> > Hi Amit,
> >
> > Sorry to be late to the thread, but I've been traveling. I'm not sure I
> > fully grokked the question, but here's one attempt at an answer:
> >
> > In general, any options on where a pipeline is executed should be
> > runner-specific. One example: for Dataflow, we have the zone
> > <
> > https://github.com/apache/incubator-beam/blob/master/runners
> /google-cloud-dataflow-java/src/main/java/org/apache/beam/
> runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java#L167
> > >
> > option,
> > which can be used to control what GCE zone VMs are launched in. I could
> > imagine similar things for Spark/Yarn, etc.
> >
> > I think your question may be a bit deeper: given a pipeline without such
> > explicit configuration from the user, can a runner do something smart? I
> > think the answer to that is also yes. Today, we have DisplayData and soon
> > we will have the Runner API -- these expose in a standard way information
> > about file paths, BigQuery tables, Bigtable clusters, Kafka clusters,
> etc.,
> > that may be used by the pipeline. Once the Runner API is standardized and
> > implemented, a runner ought to be able to inspect the metadata and say
> > "hey, I see you're reading from this Kafka cluster, let's try to be near
> > it". For example.
> >
> > Does that answer the question / did I miss something?
> >
> > Thanks,
> > Dan
> >
> > On Thu, Sep 22, 2016 at 8:29 AM, Amit Sela  wrote:
> >
> > > Generally this makes sense, though I thought that this is what
> > > IOChannelFactory was (also) about, and eventually the runner needs to
> > > facilitate the splitting/partitioning of the source, so I was wondering
> > if
> > > the source could have a generic mechanism for locality as well.
> > >
> > > On Thu, Sep 22, 2016 at 6:11 PM Jesse Anderson 
> > > wrote:
> > >
> > > > I think the runners should. Each framework has put far more effort
> into
> > > > data locality than Beam should. Beam should just take advantage of
> it.
> > > >
> > > > On Thu, Sep 22, 2016, 7:57 AM Amit Sela 
> wrote:
> > > >
> > > > > Not where in the file, where in the cluster.
> > > > >
> > > > > Like you said - mapper - in MapReduce the mapper instance will
> > *prefer*
> > > > to
> > > > > start on the same machine as the Node hosting it (unless that's
> > > changed,
> > > > > I've been out of touch with MR for a while...).
> > > > >
> > > > > And for Spark -
> > > > >
> > > > >
> > > > https://databricks.gitbooks.io/databricks-spark-knowledge-ba
> > > se/content/performance_optimization/data_locality.html
> > > > > .
> > > > >
> > > > > As for Flink, it's a streaming-first engine (sort of the opposite
> of
> > > > Spark,
> > > > > being a batch-first engine) so I *assume* they don't have this
> notion
> > > and
> > > > > simply "stream" 

Re: Preferred locations (or data locality) for batch pipelines.

2016-09-26 Thread Amit Sela
Thanks for the through response Dan, what you mentioned is very interesting
and would clearly benefit runners.

I was actually talking about something more "old-school", and specific to
batch.
If running a job on YARN - via MapReduce, Spark, etc. - you'd prefer that
YARN would assign tasks working on splits locally.

Spark does this for HDFS/HBase/S3:
https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L241
.

Since for most(?) open-source runners YARN is the preferred/popular
resource manager, and HDFS is the preferred filesystem, I was wondering if
that's something that could be shared across runners and not being
re-written per-runner.
I'm talking about obtaining the locations of the input splits, and passing
them to the runners to choose how to use them.

I wonder if there's a need for that besides the Spark runner though, it's
only for batch.. I opened https://issues.apache.org/jira/browse/BEAM-673 as
a "runner-spark" component for now.

Thanks,
Amit


On Mon, Sep 26, 2016 at 10:39 PM Dan Halperin 
wrote:

> Hi Amit,
>
> Sorry to be late to the thread, but I've been traveling. I'm not sure I
> fully grokked the question, but here's one attempt at an answer:
>
> In general, any options on where a pipeline is executed should be
> runner-specific. One example: for Dataflow, we have the zone
> <
> https://github.com/apache/incubator-beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java#L167
> >
> option,
> which can be used to control what GCE zone VMs are launched in. I could
> imagine similar things for Spark/Yarn, etc.
>
> I think your question may be a bit deeper: given a pipeline without such
> explicit configuration from the user, can a runner do something smart? I
> think the answer to that is also yes. Today, we have DisplayData and soon
> we will have the Runner API -- these expose in a standard way information
> about file paths, BigQuery tables, Bigtable clusters, Kafka clusters, etc.,
> that may be used by the pipeline. Once the Runner API is standardized and
> implemented, a runner ought to be able to inspect the metadata and say
> "hey, I see you're reading from this Kafka cluster, let's try to be near
> it". For example.
>
> Does that answer the question / did I miss something?
>
> Thanks,
> Dan
>
> On Thu, Sep 22, 2016 at 8:29 AM, Amit Sela  wrote:
>
> > Generally this makes sense, though I thought that this is what
> > IOChannelFactory was (also) about, and eventually the runner needs to
> > facilitate the splitting/partitioning of the source, so I was wondering
> if
> > the source could have a generic mechanism for locality as well.
> >
> > On Thu, Sep 22, 2016 at 6:11 PM Jesse Anderson 
> > wrote:
> >
> > > I think the runners should. Each framework has put far more effort into
> > > data locality than Beam should. Beam should just take advantage of it.
> > >
> > > On Thu, Sep 22, 2016, 7:57 AM Amit Sela  wrote:
> > >
> > > > Not where in the file, where in the cluster.
> > > >
> > > > Like you said - mapper - in MapReduce the mapper instance will
> *prefer*
> > > to
> > > > start on the same machine as the Node hosting it (unless that's
> > changed,
> > > > I've been out of touch with MR for a while...).
> > > >
> > > > And for Spark -
> > > >
> > > >
> > > https://databricks.gitbooks.io/databricks-spark-knowledge-ba
> > se/content/performance_optimization/data_locality.html
> > > > .
> > > >
> > > > As for Flink, it's a streaming-first engine (sort of the opposite of
> > > Spark,
> > > > being a batch-first engine) so I *assume* they don't have this notion
> > and
> > > > simply "stream" input.
> > > >
> > > > Dataflow - no idea...
> > > >
> > > > On Thu, Sep 22, 2016 at 5:45 PM Jesse Anderson <
> je...@smokinghand.com>
> > > > wrote:
> > > >
> > > > > I've only ever seen that being used to figure out which file the
> > > > > runner/mapper/operation is working on. Otherwise, I haven't seen
> > those
> > > > > operations care where in the file they're working.
> > > > >
> > > > > On Thu, Sep 22, 2016 at 5:57 AM Amit Sela 
> > > wrote:
> > > > >
> > > > > > Wouldn't it force all runners to implement this for all
> distributed
> > > > > > filesystems ? It's true that each runner has it's own
> > "partitioning"
> > > > > > mechanism, but I assume (maybe I'm wrong) that open-source
> runners
> > > use
> > > > > the
> > > > > > Hadoop InputFormat/InputSplit for that.. and the proper
> connectors
> > > for
> > > > > that
> > > > > > to run on top of s3/gs.
> > > > > >
> > > > > > If this is wrong, each runner should take care of it's own, but
> if
> > > not,
> > > > > we
> > > > > > could have a generic solution for runners, no ?
> > > > > >
> > > > > > Thanks,
> > > > > > Amit
> > > > > >
> > > > > > On Thu, Sep 22, 2016 at 3:30 

Re: Preferred locations (or data locality) for batch pipelines.

2016-09-26 Thread Dan Halperin
Hi Amit,

Sorry to be late to the thread, but I've been traveling. I'm not sure I
fully grokked the question, but here's one attempt at an answer:

In general, any options on where a pipeline is executed should be
runner-specific. One example: for Dataflow, we have the zone

option,
which can be used to control what GCE zone VMs are launched in. I could
imagine similar things for Spark/Yarn, etc.

I think your question may be a bit deeper: given a pipeline without such
explicit configuration from the user, can a runner do something smart? I
think the answer to that is also yes. Today, we have DisplayData and soon
we will have the Runner API -- these expose in a standard way information
about file paths, BigQuery tables, Bigtable clusters, Kafka clusters, etc.,
that may be used by the pipeline. Once the Runner API is standardized and
implemented, a runner ought to be able to inspect the metadata and say
"hey, I see you're reading from this Kafka cluster, let's try to be near
it". For example.

Does that answer the question / did I miss something?

Thanks,
Dan

On Thu, Sep 22, 2016 at 8:29 AM, Amit Sela  wrote:

> Generally this makes sense, though I thought that this is what
> IOChannelFactory was (also) about, and eventually the runner needs to
> facilitate the splitting/partitioning of the source, so I was wondering if
> the source could have a generic mechanism for locality as well.
>
> On Thu, Sep 22, 2016 at 6:11 PM Jesse Anderson 
> wrote:
>
> > I think the runners should. Each framework has put far more effort into
> > data locality than Beam should. Beam should just take advantage of it.
> >
> > On Thu, Sep 22, 2016, 7:57 AM Amit Sela  wrote:
> >
> > > Not where in the file, where in the cluster.
> > >
> > > Like you said - mapper - in MapReduce the mapper instance will *prefer*
> > to
> > > start on the same machine as the Node hosting it (unless that's
> changed,
> > > I've been out of touch with MR for a while...).
> > >
> > > And for Spark -
> > >
> > >
> > https://databricks.gitbooks.io/databricks-spark-knowledge-ba
> se/content/performance_optimization/data_locality.html
> > > .
> > >
> > > As for Flink, it's a streaming-first engine (sort of the opposite of
> > Spark,
> > > being a batch-first engine) so I *assume* they don't have this notion
> and
> > > simply "stream" input.
> > >
> > > Dataflow - no idea...
> > >
> > > On Thu, Sep 22, 2016 at 5:45 PM Jesse Anderson 
> > > wrote:
> > >
> > > > I've only ever seen that being used to figure out which file the
> > > > runner/mapper/operation is working on. Otherwise, I haven't seen
> those
> > > > operations care where in the file they're working.
> > > >
> > > > On Thu, Sep 22, 2016 at 5:57 AM Amit Sela 
> > wrote:
> > > >
> > > > > Wouldn't it force all runners to implement this for all distributed
> > > > > filesystems ? It's true that each runner has it's own
> "partitioning"
> > > > > mechanism, but I assume (maybe I'm wrong) that open-source runners
> > use
> > > > the
> > > > > Hadoop InputFormat/InputSplit for that.. and the proper connectors
> > for
> > > > that
> > > > > to run on top of s3/gs.
> > > > >
> > > > > If this is wrong, each runner should take care of it's own, but if
> > not,
> > > > we
> > > > > could have a generic solution for runners, no ?
> > > > >
> > > > > Thanks,
> > > > > Amit
> > > > >
> > > > > On Thu, Sep 22, 2016 at 3:30 PM Jean-Baptiste Onofré <
> > j...@nanthrax.net>
> > > > > wrote:
> > > > >
> > > > > > Hi Amit,
> > > > > >
> > > > > > as the purpose is to remove IOChannelFactory, then I would
> suggest
> > > it's
> > > > > > a runner concern. The Read.Bounded should "locate" the bundles
> on a
> > > > > > executor close to the read data (even if it's not always possible
> > > > > > depending of the source).
> > > > > >
> > > > > > My $0.01
> > > > > >
> > > > > > Regards
> > > > > > JB
> > > > > >
> > > > > > On 09/22/2016 02:26 PM, Amit Sela wrote:
> > > > > > > It's not new that batch pipeline can optimize on data locality,
> > my
> > > > > > question
> > > > > > > is regarding this responsibility in Beam.
> > > > > > > If runners should implement a generic Read.Bounded support,
> > should
> > > > they
> > > > > > > also implement locating the input blocks ? or should it be a
> part
> > > > > > > of IOChannelFactory implementations ? or another way to go at
> it
> > > that
> > > > > I'm
> > > > > > > missing ?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Amit.
> > > > > > >
> > > > > >
> > > > > > --
> > > > > > Jean-Baptiste Onofré
> > > > > > jbono...@apache.org
> > > > > > http://blog.nanthrax.net
> > > > > > Talend - http://www.talend.com
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: Preferred locations (or data locality) for batch pipelines.

2016-09-22 Thread Amit Sela
Not where in the file, where in the cluster.

Like you said - mapper - in MapReduce the mapper instance will *prefer* to
start on the same machine as the Node hosting it (unless that's changed,
I've been out of touch with MR for a while...).

And for Spark -
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/data_locality.html
.

As for Flink, it's a streaming-first engine (sort of the opposite of Spark,
being a batch-first engine) so I *assume* they don't have this notion and
simply "stream" input.

Dataflow - no idea...

On Thu, Sep 22, 2016 at 5:45 PM Jesse Anderson 
wrote:

> I've only ever seen that being used to figure out which file the
> runner/mapper/operation is working on. Otherwise, I haven't seen those
> operations care where in the file they're working.
>
> On Thu, Sep 22, 2016 at 5:57 AM Amit Sela  wrote:
>
> > Wouldn't it force all runners to implement this for all distributed
> > filesystems ? It's true that each runner has it's own "partitioning"
> > mechanism, but I assume (maybe I'm wrong) that open-source runners use
> the
> > Hadoop InputFormat/InputSplit for that.. and the proper connectors for
> that
> > to run on top of s3/gs.
> >
> > If this is wrong, each runner should take care of it's own, but if not,
> we
> > could have a generic solution for runners, no ?
> >
> > Thanks,
> > Amit
> >
> > On Thu, Sep 22, 2016 at 3:30 PM Jean-Baptiste Onofré 
> > wrote:
> >
> > > Hi Amit,
> > >
> > > as the purpose is to remove IOChannelFactory, then I would suggest it's
> > > a runner concern. The Read.Bounded should "locate" the bundles on a
> > > executor close to the read data (even if it's not always possible
> > > depending of the source).
> > >
> > > My $0.01
> > >
> > > Regards
> > > JB
> > >
> > > On 09/22/2016 02:26 PM, Amit Sela wrote:
> > > > It's not new that batch pipeline can optimize on data locality, my
> > > question
> > > > is regarding this responsibility in Beam.
> > > > If runners should implement a generic Read.Bounded support, should
> they
> > > > also implement locating the input blocks ? or should it be a part
> > > > of IOChannelFactory implementations ? or another way to go at it that
> > I'm
> > > > missing ?
> > > >
> > > > Thanks,
> > > > Amit.
> > > >
> > >
> > > --
> > > Jean-Baptiste Onofré
> > > jbono...@apache.org
> > > http://blog.nanthrax.net
> > > Talend - http://www.talend.com
> > >
> >
>


Re: Preferred locations (or data locality) for batch pipelines.

2016-09-22 Thread Jesse Anderson
I've only ever seen that being used to figure out which file the
runner/mapper/operation is working on. Otherwise, I haven't seen those
operations care where in the file they're working.

On Thu, Sep 22, 2016 at 5:57 AM Amit Sela  wrote:

> Wouldn't it force all runners to implement this for all distributed
> filesystems ? It's true that each runner has it's own "partitioning"
> mechanism, but I assume (maybe I'm wrong) that open-source runners use the
> Hadoop InputFormat/InputSplit for that.. and the proper connectors for that
> to run on top of s3/gs.
>
> If this is wrong, each runner should take care of it's own, but if not, we
> could have a generic solution for runners, no ?
>
> Thanks,
> Amit
>
> On Thu, Sep 22, 2016 at 3:30 PM Jean-Baptiste Onofré 
> wrote:
>
> > Hi Amit,
> >
> > as the purpose is to remove IOChannelFactory, then I would suggest it's
> > a runner concern. The Read.Bounded should "locate" the bundles on a
> > executor close to the read data (even if it's not always possible
> > depending of the source).
> >
> > My $0.01
> >
> > Regards
> > JB
> >
> > On 09/22/2016 02:26 PM, Amit Sela wrote:
> > > It's not new that batch pipeline can optimize on data locality, my
> > question
> > > is regarding this responsibility in Beam.
> > > If runners should implement a generic Read.Bounded support, should they
> > > also implement locating the input blocks ? or should it be a part
> > > of IOChannelFactory implementations ? or another way to go at it that
> I'm
> > > missing ?
> > >
> > > Thanks,
> > > Amit.
> > >
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>


Re: Preferred locations (or data locality) for batch pipelines.

2016-09-22 Thread Jean-Baptiste Onofré

Hi Amit,

as the purpose is to remove IOChannelFactory, then I would suggest it's 
a runner concern. The Read.Bounded should "locate" the bundles on a 
executor close to the read data (even if it's not always possible 
depending of the source).


My $0.01

Regards
JB

On 09/22/2016 02:26 PM, Amit Sela wrote:

It's not new that batch pipeline can optimize on data locality, my question
is regarding this responsibility in Beam.
If runners should implement a generic Read.Bounded support, should they
also implement locating the input blocks ? or should it be a part
of IOChannelFactory implementations ? or another way to go at it that I'm
missing ?

Thanks,
Amit.



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com