Re: Flink runner. Optimization for sideOutput with tags

2016-12-06 Thread Aljoscha Krettek
I'm having a look at your PRs now. I think the change is good, and it's
actually quite simple too.

Thanks for looking into this!

On Mon, 5 Dec 2016 at 05:48 Alexey Demin  wrote:

> Aljoscha
>
> I mistaken with flink runtime =)
>
> What do you think about some modification FlinkStreamingTransformTransla
> tors:
>
> move split out of for-loop:
>
> SplitStream splitStream = unionOutputStream.split(new
> OutputSelector() {
> @Override
> public Iterable select(RawUnionValue value) {
>   return Lists.newArrayList(String.valueOf(value.getUnionTag()));
> }
>   });
>
> and change filtered to
>
> DataStream filtered = splitStream.select(String.valueOf(outputTag))
> .flatMap(new FlatMapFunction Object>() {
> @Override
> public void flatMap(RawUnionValue value,
> Collector out) throws Exception {
> out.collect(value.getValue());
> }
>   }).returns(outputTypeInfo);
>
> In this implementations we always transfer data only for necessary output
> without broadcast every type by all output.
>
> p.s. I know this code not production ready, only idea for discuss.
> but for people who use side output only for alerting it's can reduce cpu
> usage (serialization will apply only on targeted value, not for all
> elements for every outputs)
>
> Thanks,
> Alexey Diomin
>
>
> 2016-12-04 23:57 GMT+04:00 Alexey Demin :
>
> > Hi
> >
> > very simple example
> > https://gist.github.com/xhumanoid/287af191314d5d867acf509129bd4931
> >
> > Sometime we need meta-information about processing element
> >
> > If i correctly understood code in FlinkStreamingTransformTranslators line
> > 557:
> > main problem not in translators, but in flink runtime, which don't know
> > about tags and simple does broadcast when have 2 output from one
> > transformation
> >
> > Correct me if I mistaken
> >
> >
> > >> this is a bit of a dangerous setting
> >
> > I know about dangerous with object-reuse, but we never use object after
> > collect.
> > In some cases we need more performance and serialization on every
> > transformation very expensive,
> > but try merge all business logic in one DoFn it to make processing
> > unsupportable.
> >
> > >> I think your stack trace is not complete, at least I can't seem to see
> > the root exception.
> >
> > We made this stacktrace on live system with jstack. It's not exception.
> >
> > Thanks,
> > Alexey Diomin
> >
> >
> > 2016-11-29 21:33 GMT+04:00 Aljoscha Krettek :
> >
> >> Hi Alexey,
> >> I think it should be possible to optimise this particular transformation
> >> by
> >> using a split/select pattern in Flink. (See split and select here:
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
> >> dev/datastream_api.html#datastream-transformations).
> >> The current implementation is not very optimised, my main goal was to
> make
> >> all features of Beam work before going into individual optimisations.
> >>
> >> About object-reuse in Flink Streaming: this is a bit of a dangerous
> >> setting
> >> and can lead to unexpected results with certain patterns. I think your
> >> stack trace is not complete, at least I can't seem to see the root
> >> exception.
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Mon, 28 Nov 2016 at 07:33 Alexey Demin  wrote:
> >>
> >> > Hi
> >> >
> >> > If we try use sideOutput with TupleTag and flink config
> >> enableObjectReuse
> >> > then we have stacktrace
> >> >
> >> > at
> >> >
> >> > org.apache.beam.sdk.transforms.DoFnAdapters$SimpleDoFnAdapte
> >> r.processElement(DoFnAdapters.java:234)
> >> > at
> >> >
> >> > org.apache.beam.runners.core.SimpleOldDoFnRunner.invokeProce
> >> ssElement(SimpleOldDoFnRunner.java:118)
> >> > at
> >> >
> >> > org.apache.beam.runners.core.SimpleOldDoFnRunner.processElem
> >> ent(SimpleOldDoFnRunner.java:104)
> >> > at
> >> >
> >> > org.apache.beam.runners.core.PushbackSideInputDoFnRunner.pro
> >> cessElement(PushbackSideInputDoFnRunner.java:106)
> >> > at
> >> >
> >> > org.apache.beam.runners.flink.translation.wrappers.streaming
> >> .DoFnOperator.processElement(DoFnOperator.java:265)
> >> > at
> >> >
> >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
> >> ingOutput.collect(OperatorChain.java:330)
> >> > at
> >> >
> >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
> >> ingOutput.collect(OperatorChain.java:315)
> >> > at
> >> >
> >> > org.apache.flink.streaming.api.operators.AbstractStreamOpera
> >> tor$CountingOutput.collect(AbstractStreamOperator.java:346)
> >> > at
> >> >
> >> > org.apache.flink.streaming.api.operators.AbstractStreamOpera
> >> tor$CountingOutput.collect(AbstractStreamOperator.java:329)
> >> > at
> >> >
> >> > 

Test failure on beam-sdks-java-maven-archetypes-examples-java8

2016-12-06 Thread Manu Zhang
Guys,

Has anyone seen the following failure on the latest master ?

[INFO] java.lang.IllegalStateException: Failed to validate
gs://apache-beam-samples/shakespeare/*
[INFO] at
it.pkg.MinimalWordCountJava8Test.testMinimalWordCountJava8(MinimalWordCountJava8Test.java:63)
[INFO] Caused by: java.io.IOException: Unable to match files in bucket
apache-beam-samples, prefix shakespeare/ against pattern shakespeare/[^/]*
[INFO] at
it.pkg.MinimalWordCountJava8Test.testMinimalWordCountJava8(MinimalWordCountJava8Test.java:63)
[INFO] Caused by: java.net.SocketTimeoutException: connect timed out
[INFO] at
it.pkg.MinimalWordCountJava8Test.testMinimalWordCountJava8(MinimalWordCountJava8Test.java:63)
[INFO]
[INFO] Running it.pkg.DebuggingWordCountTest
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
2.079 sec - in it.pkg.DebuggingWordCountTest
[INFO] Running it.pkg.complete.game.HourlyTeamScoreTest
[INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
1.894 sec - in it.pkg.complete.game.HourlyTeamScoreTest
[INFO] Running it.pkg.complete.game.GameStatsTest
[INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2.03
sec - in it.pkg.complete.game.GameStatsTest
[INFO]
[INFO] Results :
[INFO]
[INFO] Tests in error:
[INFO]   MinimalWordCountJava8Test.testMinimalWordCountJava8:63 »
IllegalState Failed t...
[INFO]
[INFO] Tests run: 19, Failures: 0, Errors: 1, Skipped: 0

It could be my network as I live in China.

Manu


Re: [DISCUSS] ExecIO

2016-12-06 Thread Eugene Kirpichov
Ben - the issues of "things aren't hung, there is a shell command running",
aren't they general to all DoFn's? i.e. I don't see why the runner would
need to know that a shell command is running, but not that, say, a heavy
monolithic computation is running. What's the benefit to the runner in
knowing that the DoFn contains a shell command?

By saying "making sure that all shell commands finish", I suppose you're
referring to the possibility of leaks if the user initiates a shell command
and forgets to wait for it? I think that should be solvable again without
Beam intervention, by making a utility class for running shell commands
which implements AutoCloseable, and document that you have to use it that
way.

Ken - I think the question here is: are we ok with a situation where the
runner doesn't check or care whether the shell command can run, and the
user accepts this risk and studies what commands will be available on the
worker environment provided by the runner they use in production, before
productionizing a pipeline with those commands.

Upon some thought I think it's ok. Of course, this carries an obligation
for runners to document their worker environment and its changes across
versions. Though for many runners such documentation may be trivial:
"whatever your YARN cluster has, the runner doesn't change it in any way"
and it may be good enough for users. And for other runners, like Dataflow,
such documentation may also be trivial: "no guarantees whatsoever, only
what you stage in --filesToStage is available".

I can also see Beam develop to a point where we'd want all runners to be
able to run your DoFn in a user-specified Docker container, and manage
those intelligently - but I think that's quite a while away and it doesn't
have to block work on a utility for executing shell commands. Though it'd
be nice if the utility was forward-compatible with that future world.

On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofré  wrote:

> Hi Eugene,
>
> thanks for the extended questions.
>
> I think we have two levels of expectations here:
> - end-user responsibility
> - worker/runner responsibility
>
> 1/ From a end-user perspective, the end-user has to know that using a
> system command (via ExecIO) and more generally speaking anything which
> relay on worker resources (for instance a local filesystem directory
> available only on a worker) can fail if the expected resource is not
> present on all workers. So, basically, all workers should have the same
> topology. It's what I'm assuming for the PR.
> For example, I have my Spark cluster, using the same Mesos/Docker setup,
> then the user knows that all nodes in the cluster will have the same
> setup and so resources (it could be provided by DevOps for instance).
> On the other hand, running on Dataflow is different because I don't
> "control" the nodes (bootstrapping or resources), but in that case, the
> user knows it (he knows the runner he's using).
>
> 2/ As you said, we can expect that runner can deal with some
> requirements (expressed depending of the pipeline and the runner), and
> the runner can know the workers which provide capabilities matching
> those requirements.
> Then, the end user is not more responsible: the runner will try to
> define if the pipeline can be executed, and where a DoFn has to be run
> (on which worker).
>
> For me, it's two different levels where 2 is smarter but 1 can also make
> sense.
>
> WDYT ?
>
> Regards
> JB
>
> On 12/05/2016 08:51 PM, Eugene Kirpichov wrote:
> > Hi JB,
> >
> > Thanks for bringing this to the mailing list. I also think that this is
> > useful in general (and that use cases for Beam are more than just classic
> > bigdata), and that there are interesting questions here at different
> levels
> > about how to do it right.
> >
> > I suggest to start with the highest-level question [and discuss the
> > particular API only after agreeing on this, possibly in a separate
> thread]:
> > how to deal with the fact that Beam gives no guarantees about the
> > environment on workers, e.g. which commands are available, which shell or
> > even OS is being used, etc. Particularly:
> >
> > - Obviously different runners will have a different environment, e.g.
> > Dataflow workers are not going to have Hadoop commands available because
> > they are not running on a Hadoop cluster. So, pipelines and transforms
> > developed using this connector will be necessarily non-portable between
> > different runners. Maybe this is ok? But we need to give users a clear
> > expectation about this. How do we phrase this expectation and where do we
> > put it in the docs?
> >
> > - I'm concerned that this puts additional compatibility requirements on
> > runners - it becomes necessary for a runner to document the environment
> of
> > its workers (OS, shell, privileges, guaranteed-installed packages, access
> > to other things on the host machine e.g. whether or not the worker runs
> in
> > its own container, etc.) 

Re: [PROPOSAL] "IOChannelFactory" Redesign and Make it Configurable

2016-12-06 Thread Kenneth Knowles
Thanks for the thorough answers. It all sounds good to me.

On Tue, Dec 6, 2016 at 12:57 PM, Pei He  wrote:

> Thanks Kenn for the feedback and questions.
>
> I responded inline.
>
> On Mon, Dec 5, 2016 at 7:49 PM, Kenneth Knowles 
> wrote:
>
> > I really like this document. It is easy to read and informative. Three
> > things not addressed by the document:
> >
> > 1. Major Beam use cases. I'm sure we have a few in the SDK that could be
> > outlined in terms of the new API with pseudocode.
>
>
> (I am writing pseudocode directly with FileSystem interface to demonstrate.
> However, clients will use the utility FileSystems. This is for us to have a
> layer between the file systems providers' interface and the client
> interface. We can add utility functions to FileSystems for common use
> patterns as needed.)
>
> Major Beam use cases are the followings:
> A. FileBasedSource:
> // a. Get input URIs and file sizes from users provided specs.
> // Note: I updated the match() to be a bulk operation after I sent my last
> email.
> List results = match(specList);
> List inputMetadataList = FluentIterable.from(results)
> .transformAndConcat(
> new Function() {
>   @Override
>   public Iterable apply(MatchResult result) {
> return Arrays.asList(result.metadata());
>   });
>
> // b. Read from a start offset to support the source splitting.
> SeekableByteChannel seekChannel = open(fileUri);
> seekChannel.position(source.getStartOffset());
> seekChannel.read(...);
>
> B. FileBasedSink:
> // bulk rename temporary files to output files
> rename(tempUris, outputUris);
>
> C. General file operations:
> a. resolve paths
> b. create file to write, open file to read (for example in tests).
> c. bulk delete files/directories
>
>
>
> 2. Related work. How does this differ from other filesystem APIs and why?
>
> We need three sets of functionalities:
> 1. resolve paths.
> 2. read and write channels.
> 3. bulk files management operations(bulk delete/rename/match).
>
> And, they are available from Java nio, hadoop FileSystem APIs, and other
> standard library such as java.net.URI.
>
> Current IOChannelFactory interface uses Java nio for (1) and (2), and
> define its own interface for (3).
>
> In my redesign, I made the following choices:
> For (1), I replaced Java nio with URI, because it is standardized and
> precise and doesn't require additional implementation of a Path interface
> from file system providers.
>
> For (2), I kept the uses of Java nio (Writable/SeekableByteChannel), since
> I don't see any things that need to improve and I don't see any better
> alternatives (hadoop's FSDataInput/OutputStream provide same
> functionalities, but requires additional dependencies).
>
> For (3), reasons that I didn't choose Java nio or hadoop are:
> 1. Beam needs bulk operations API for better performance, however Java nio
> and hadoop FileSystems are single file based API.
> 2. Have APIs that are File systems agnostic. For example, we can use URI
> instead of Path.
> 3. Have APIs that are minimum, and easy to implement by file system
> providers.
> 4. Introducing less dependencies.
> 5. It is easy to build an adaptor based on Java nio or hadoop interfaces.
>
> 3. Discussion of non-Java languages. It would be good to know what classes
> > in e.g. Python we might use in place of URI, SeekableByteChannel, etc.
>
> I don't want to mislead people here without a thorough investigation. You
> can see from your second question, that would require iterations on design
> and prototyping.
>
> I didn't introduce any Java specific requirements in the redesign.
> Resolving paths, seeking with channels or streams, file management
> operations are languages independent. And, I pretty sure there are python
> libraries for that.
>
> However, I am happy to hear thoughts and get help from people working on
> the python sdk.
>
>
> > On Mon, Dec 5, 2016 at 4:41 PM, Pei He  wrote:
> >
> > > I have received a lot of comments in "Part 1: IOChannelFactory
> > > Redesign" [1]. And, I have updated the design based on the feedback.
> > >
> > > Now, I feel it is close to be ready for implementation, and I would
> like
> > to
> > > summarize the changes:
> > > 1. Replaced FilePath with URI for resolving files paths.
> > > 2. Required match(String spec) to handle ambiguities in users provided
> > > strings (see the match() java doc in the design doc for details).
> > > 3. Changed Metadata to use Future.get() paradigm, and removed
> > exception().
> > > 4. Changed methods on FileSystem interface to be protected (visible for
> > > implementors), and created FileSystems utility (visible for callers).
> > > 5.  Simplified FileSystem interface by moving operation options, such
> as
> > > DeleteOptions, MatchOptions, to the FileSystems utility.
> > > 6. Simplified FileSystem interface by requiring certain behaviors, such
> > 

Re: [PROPOSAL] "IOChannelFactory" Redesign and Make it Configurable

2016-12-06 Thread Pei He
Thanks Kenn for the feedback and questions.

I responded inline.

On Mon, Dec 5, 2016 at 7:49 PM, Kenneth Knowles 
wrote:

> I really like this document. It is easy to read and informative. Three
> things not addressed by the document:
>
> 1. Major Beam use cases. I'm sure we have a few in the SDK that could be
> outlined in terms of the new API with pseudocode.


(I am writing pseudocode directly with FileSystem interface to demonstrate.
However, clients will use the utility FileSystems. This is for us to have a
layer between the file systems providers' interface and the client
interface. We can add utility functions to FileSystems for common use
patterns as needed.)

Major Beam use cases are the followings:
A. FileBasedSource:
// a. Get input URIs and file sizes from users provided specs.
// Note: I updated the match() to be a bulk operation after I sent my last
email.
List results = match(specList);
List inputMetadataList = FluentIterable.from(results)
.transformAndConcat(
new Function() {
  @Override
  public Iterable apply(MatchResult result) {
return Arrays.asList(result.metadata());
  });

// b. Read from a start offset to support the source splitting.
SeekableByteChannel seekChannel = open(fileUri);
seekChannel.position(source.getStartOffset());
seekChannel.read(...);

B. FileBasedSink:
// bulk rename temporary files to output files
rename(tempUris, outputUris);

C. General file operations:
a. resolve paths
b. create file to write, open file to read (for example in tests).
c. bulk delete files/directories



2. Related work. How does this differ from other filesystem APIs and why?

We need three sets of functionalities:
1. resolve paths.
2. read and write channels.
3. bulk files management operations(bulk delete/rename/match).

And, they are available from Java nio, hadoop FileSystem APIs, and other
standard library such as java.net.URI.

Current IOChannelFactory interface uses Java nio for (1) and (2), and
define its own interface for (3).

In my redesign, I made the following choices:
For (1), I replaced Java nio with URI, because it is standardized and
precise and doesn't require additional implementation of a Path interface
from file system providers.

For (2), I kept the uses of Java nio (Writable/SeekableByteChannel), since
I don't see any things that need to improve and I don't see any better
alternatives (hadoop's FSDataInput/OutputStream provide same
functionalities, but requires additional dependencies).

For (3), reasons that I didn't choose Java nio or hadoop are:
1. Beam needs bulk operations API for better performance, however Java nio
and hadoop FileSystems are single file based API.
2. Have APIs that are File systems agnostic. For example, we can use URI
instead of Path.
3. Have APIs that are minimum, and easy to implement by file system
providers.
4. Introducing less dependencies.
5. It is easy to build an adaptor based on Java nio or hadoop interfaces.

3. Discussion of non-Java languages. It would be good to know what classes
> in e.g. Python we might use in place of URI, SeekableByteChannel, etc.

I don't want to mislead people here without a thorough investigation. You
can see from your second question, that would require iterations on design
and prototyping.

I didn't introduce any Java specific requirements in the redesign.
Resolving paths, seeking with channels or streams, file management
operations are languages independent. And, I pretty sure there are python
libraries for that.

However, I am happy to hear thoughts and get help from people working on
the python sdk.


> On Mon, Dec 5, 2016 at 4:41 PM, Pei He  wrote:
>
> > I have received a lot of comments in "Part 1: IOChannelFactory
> > Redesign" [1]. And, I have updated the design based on the feedback.
> >
> > Now, I feel it is close to be ready for implementation, and I would like
> to
> > summarize the changes:
> > 1. Replaced FilePath with URI for resolving files paths.
> > 2. Required match(String spec) to handle ambiguities in users provided
> > strings (see the match() java doc in the design doc for details).
> > 3. Changed Metadata to use Future.get() paradigm, and removed
> exception().
> > 4. Changed methods on FileSystem interface to be protected (visible for
> > implementors), and created FileSystems utility (visible for callers).
> > 5.  Simplified FileSystem interface by moving operation options, such as
> > DeleteOptions, MatchOptions, to the FileSystems utility.
> > 6. Simplified FileSystem interface by requiring certain behaviors, such
> as
> > creating recursively, throwing for missing files.
> >
> > Any thoughts / feedback?
> > --
> > Pei
> >
> > [1]
> > https://docs.google.com/document/d/11TdPyZ9_zmjokhNWM3Id-
> > XJsVG3qel2lhdKTknmZ_7M/edit#
> >
> > On Wed, Nov 30, 2016 at 1:32 PM, Pei He  wrote:
> >
> > > Thanks JB for the feedback.
> > >
> > > Yes, we should 

Re: Increase stream parallelism after reading from UnboundedSource

2016-12-06 Thread Amit Sela
I think it is common in batch (and micro-batch for streaming) because at
any given time you're computing a "chunk" (pick your naming.. we have lot's
of them ;-) ) and slicing-up this chunk to distribute across more cpus if
available is clearly better, but I was wondering about "event-at-a-time"
processors and everything in-between - such as bundles that may be of size
1, but might contain more elements.

On Tue, Dec 6, 2016 at 10:18 PM Raghu Angadi 
wrote:

> On Sun, Dec 4, 2016 at 11:48 PM, Amit Sela  wrote:
>
> > For any downstream computation, is it common for stream processors to
> > "fan-out/parallelise" the stream by shuffling the data into more
> > streams/partitions/bundles ?
> >
>
> I think so. It is pretty common in batch processing too.
>


Re: Increase stream parallelism after reading from UnboundedSource

2016-12-06 Thread Raghu Angadi
On Sun, Dec 4, 2016 at 11:48 PM, Amit Sela  wrote:

> For any downstream computation, is it common for stream processors to
> "fan-out/parallelise" the stream by shuffling the data into more
> streams/partitions/bundles ?
>

I think so. It is pretty common in batch processing too.


Re: HiveIO

2016-12-06 Thread Ismaël Mejía
Hello,

If you really need to read/write via Hive, remember that you can use the
Hive Jdbc driver, and achieve this with Beam using the JdbcIO (this is
probably less efficient for the streaming case but still a valid solution).

Ismaël


On Tue, Dec 6, 2016 at 12:04 PM, Vinoth Chandar  wrote:

> Great. Thanks!
>
> Thanks,
> Vinoth
>
> > On Dec 6, 2016, at 2:06 AM, Jean-Baptiste Onofré 
> wrote:
> >
> > Hi,
> >
> > Ismaël and I started HiveIO.
> >
> > I have several IOs ready to propose as PR, but, in order to limit the
> number of open PRs, I would like to merge the pending ones.
> >
> > I will let you know when the branches/PRs will be available.
> >
> > Regards
> > JB
> >
> >> On 12/05/2016 11:40 PM, Vinoth Chandar wrote:
> >> Hi guys,
> >>
> >> Saw a post around HiveIO on the users list with a PR followup. I am
> >> interested in this too and can pitch in on developement and testing..
> >>
> >> Who & where is this work happening?
> >>
> >> Thanks
> >> VInoth
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
>


Re: HiveIO

2016-12-06 Thread Vinoth Chandar
Great. Thanks!

Thanks,
Vinoth

> On Dec 6, 2016, at 2:06 AM, Jean-Baptiste Onofré  wrote:
> 
> Hi,
> 
> Ismaël and I started HiveIO.
> 
> I have several IOs ready to propose as PR, but, in order to limit the number 
> of open PRs, I would like to merge the pending ones.
> 
> I will let you know when the branches/PRs will be available.
> 
> Regards
> JB
> 
>> On 12/05/2016 11:40 PM, Vinoth Chandar wrote:
>> Hi guys,
>> 
>> Saw a post around HiveIO on the users list with a PR followup. I am
>> interested in this too and can pitch in on developement and testing..
>> 
>> Who & where is this work happening?
>> 
>> Thanks
>> VInoth
>> 
> 
> -- 
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com


Re: [DISCUSS] ExecIO

2016-12-06 Thread Jean-Baptiste Onofré

Hi Eugene,

thanks for the extended questions.

I think we have two levels of expectations here:
- end-user responsibility
- worker/runner responsibility

1/ From a end-user perspective, the end-user has to know that using a 
system command (via ExecIO) and more generally speaking anything which 
relay on worker resources (for instance a local filesystem directory 
available only on a worker) can fail if the expected resource is not 
present on all workers. So, basically, all workers should have the same 
topology. It's what I'm assuming for the PR.
For example, I have my Spark cluster, using the same Mesos/Docker setup, 
then the user knows that all nodes in the cluster will have the same 
setup and so resources (it could be provided by DevOps for instance).
On the other hand, running on Dataflow is different because I don't 
"control" the nodes (bootstrapping or resources), but in that case, the 
user knows it (he knows the runner he's using).


2/ As you said, we can expect that runner can deal with some 
requirements (expressed depending of the pipeline and the runner), and 
the runner can know the workers which provide capabilities matching 
those requirements.
Then, the end user is not more responsible: the runner will try to 
define if the pipeline can be executed, and where a DoFn has to be run 
(on which worker).


For me, it's two different levels where 2 is smarter but 1 can also make 
sense.


WDYT ?

Regards
JB

On 12/05/2016 08:51 PM, Eugene Kirpichov wrote:

Hi JB,

Thanks for bringing this to the mailing list. I also think that this is
useful in general (and that use cases for Beam are more than just classic
bigdata), and that there are interesting questions here at different levels
about how to do it right.

I suggest to start with the highest-level question [and discuss the
particular API only after agreeing on this, possibly in a separate thread]:
how to deal with the fact that Beam gives no guarantees about the
environment on workers, e.g. which commands are available, which shell or
even OS is being used, etc. Particularly:

- Obviously different runners will have a different environment, e.g.
Dataflow workers are not going to have Hadoop commands available because
they are not running on a Hadoop cluster. So, pipelines and transforms
developed using this connector will be necessarily non-portable between
different runners. Maybe this is ok? But we need to give users a clear
expectation about this. How do we phrase this expectation and where do we
put it in the docs?

- I'm concerned that this puts additional compatibility requirements on
runners - it becomes necessary for a runner to document the environment of
its workers (OS, shell, privileges, guaranteed-installed packages, access
to other things on the host machine e.g. whether or not the worker runs in
its own container, etc.) and to keep it stable - otherwise transforms and
pipelines with this connector will be non-portable between runner versions
either.

Another way to deal with this is to give up and say "the environment on the
workers is outside the scope of Beam; consult your runner's documentation
or use your best judgment as to what the environment will be, and use this
at your own risk".

What do others think?

On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofré  wrote:

Hi beamers,

Today, Beam is mainly focused on data processing.
Since the beginning of the project, we are discussing about extending
the use cases coverage via DSLs and extensions (like for machine
learning), or via IO.

Especially for the IO, we can see Beam use for data integration and data
ingestion.

In this area, I'm proposing a first IO: ExecIO:

https://issues.apache.org/jira/browse/BEAM-1059
https://github.com/apache/incubator-beam/pull/1451

Actually, this IO is mainly an ExecFn that executes system commands
(again, keep in mind we are discussing about data integration/ingestion
and not data processing).

For convenience, this ExecFn is wrapped in Read and Write (as a regular IO).

Clearly, this IO/Fn depends of the worker where it runs. But it's under
the user responsibility.

During the review, Eugene and I discussed about:
- is it an IO or just a fn ?
- is it OK to have worker specific IO ?

IMHO, an IO makes lot of sense to me and it's very convenient for end
users. They can do something like:

PCollection output =
pipeline.apply(ExecIO.read().withCommand("/path/to/myscript.sh"));

The pipeline will execute myscript and the output pipeline will contain
command execution std out/err.

On the other hand, they can do:

pcollection.apply(ExecIO.write());

where PCollection contains the commands to execute.

Generally speaking, end users can call ExecFn wherever they want in the
pipeline steps:

PCollection output = pipeline.apply(ParDo.of(new ExecIO.ExecFn()));

The input collection contains the commands to execute, and the output
collection contains the commands execution result std out/err.

Generally speaking, I'm 

Re: HiveIO

2016-12-06 Thread Jean-Baptiste Onofré

Hi,

Ismaël and I started HiveIO.

I have several IOs ready to propose as PR, but, in order to limit the 
number of open PRs, I would like to merge the pending ones.


I will let you know when the branches/PRs will be available.

Regards
JB

On 12/05/2016 11:40 PM, Vinoth Chandar wrote:

Hi guys,

Saw a post around HiveIO on the users list with a PR followup. I am
interested in this too and can pitch in on developement and testing..

Who & where is this work happening?

Thanks
VInoth



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com