[jira] [Commented] (BEAM-5110) Reconile Flink JVM singleton management with deployment

2018-08-14 Thread Ben Sidhom (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580545#comment-16580545
 ] 

Ben Sidhom commented on BEAM-5110:
--

I meant to mention in the previous comment: as of now, this is not a 
correctness issue but rather a performance/resources issue. We generally want 
to allow SDK environment reuse to amortize startup costs (and save memory).

The fact that the Python SDK does _not_ handle multiple control clients over a 
single channel is arguably an SDK harness bug or a configuration issue (e.g., 
we may need to allow the environment factory/manager to spawn new environments 
if requested). This is something that should be fleshed out in the fn-api 
contract and is orthogonal to JVM instance management in the portable Flink 
runner.

> Reconile Flink JVM singleton management with deployment
> ---
>
> Key: BEAM-5110
> URL: https://issues.apache.org/jira/browse/BEAM-5110
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> [~angoenka] noticed through debugging that multiple instances of 
> BatchFlinkExecutableStageContext.BatchFactory are loaded for a given job when 
> executing in standalone cluster mode. This context factory is responsible for 
> maintaining singleton state across a TaskManager (JVM) in order to share SDK 
> Environments across workers in a given job. The multiple-loading breaks 
> singleton semantics and results in an indeterminate number of Environments 
> being created.
> It turns out that the [Flink classloading 
> mechanism|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/debugging_classloading.html]
>  is determined by deployment mode. Note that "user code" as referenced by 
> this link is actually the Flink job server jar. Actual end-user code lives 
> inside of the SDK Environment and uploaded artifacts.
> In order to maintain singletons without resorting to IPC (for example, using 
> file locks and/or additional gRPC servers), we need to force non-dynamic 
> classloading. For example, this happens when jobs are submitted to YARN for 
> one-off deployments via `flink run`. However, connecting to an existing 
> (Flink standalone) deployment results in dynamic classloading.
> We should investigate this behavior and either document (and attempt to 
> enforce) deployment modes that are consistent with our requirements, or (if 
> possible) create a custom classloader that enforces singleton loading.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5110) Reconile Flink JVM singleton management with deployment

2018-08-14 Thread Ben Sidhom (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580536#comment-16580536
 ] 

Ben Sidhom commented on BEAM-5110:
--

Sadly, after investigating this for a while, I think that the best option (for 
now) is to document (and possibly enforce in code where possible) the 
situations where dynamic code loading will not happen in the workers. Flink 
attempts to isolate user code with separate classloaders and Java does not 
expose a way to force shared classloaders (or even list unconnected 
classloaders) without special instrumentation.

The following deployment modes use regular system class loading for the Beam 
Job server and workers:
 * Direct YARN deployments (using bin/flink run -m yarn-cluster)
 * Standalone/Docker/Kubernetes/Mesos sessions where the job server jar has 
been dropped into the lib/ directory of distributions. In this case, Flink's 
default classloading strategy will not work because child (Operator-scoped) 
classloaders override system classloaders; any user code (e.g., Beam 
environment manager code where the job server is submitted to a 
RemoteEnvironment) will override Flink/system classes. In order to get around 
this, Flink's 
[classloader.resolve-order|https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#classloader-resolve-order]
 must be set to parent-first _or_ classloader.parent-first-patterns-additional 
must be populated with the set of packages that require JVM singletons.

In general, modes where the job server jar is sent to a new RemoteEnvironment 
will break singleton semantics. We should at the very least log a warning when 
this is detected in the job server, if disable this altogether.

> Reconile Flink JVM singleton management with deployment
> ---
>
> Key: BEAM-5110
> URL: https://issues.apache.org/jira/browse/BEAM-5110
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> [~angoenka] noticed through debugging that multiple instances of 
> BatchFlinkExecutableStageContext.BatchFactory are loaded for a given job when 
> executing in standalone cluster mode. This context factory is responsible for 
> maintaining singleton state across a TaskManager (JVM) in order to share SDK 
> Environments across workers in a given job. The multiple-loading breaks 
> singleton semantics and results in an indeterminate number of Environments 
> being created.
> It turns out that the [Flink classloading 
> mechanism|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/debugging_classloading.html]
>  is determined by deployment mode. Note that "user code" as referenced by 
> this link is actually the Flink job server jar. Actual end-user code lives 
> inside of the SDK Environment and uploaded artifacts.
> In order to maintain singletons without resorting to IPC (for example, using 
> file locks and/or additional gRPC servers), we need to force non-dynamic 
> classloading. For example, this happens when jobs are submitted to YARN for 
> one-off deployments via `flink run`. However, connecting to an existing 
> (Flink standalone) deployment results in dynamic classloading.
> We should investigate this behavior and either document (and attempt to 
> enforce) deployment modes that are consistent with our requirements, or (if 
> possible) create a custom classloader that enforces singleton loading.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-5114) Create example uber jars for supported runners

2018-08-08 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-5114:


 Summary: Create example uber jars for supported runners
 Key: BEAM-5114
 URL: https://issues.apache.org/jira/browse/BEAM-5114
 Project: Beam
  Issue Type: New Feature
  Components: examples-java
Reporter: Ben Sidhom
Assignee: Ben Sidhom


Producing these artifacts results in several benefits
 * Gives an example of how to package user code for different runners
 * Enables ad-hoc testing of runner changes against real user pipelines easier
 * Enables integration testing end-to-end pipelines against different runner 
services



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5110) Reconile Flink JVM singleton management with deployment

2018-08-08 Thread Ben Sidhom (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16573681#comment-16573681
 ] 

Ben Sidhom commented on BEAM-5110:
--

Note that we can partially fix the issue by requiring that users who deploy in 
standalone "sessions" drop the Flink job server jar into the lib/ directory of 
all workers. That ensures that the necessary classes are available in the 
static top-level classloader.

However, classloading in Flink is inverted from the usual behavior: "By 
default, Flink inverts classloading order, meaning it looks into the user code 
classloader first, and only looks into the parent (application classloader) if 
the class is not part of the dynamically loaded user code."

Making sure this works in practice also requires a change in the Flink runner 
code where we create a connection to remote environments. We have to ensure 
that _no_ user jars are staged here because any classes defined here will be 
loaded dynamically and take precedence over the root classloader.

> Reconile Flink JVM singleton management with deployment
> ---
>
> Key: BEAM-5110
> URL: https://issues.apache.org/jira/browse/BEAM-5110
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>
> [~angoenka] noticed through debugging that multiple instances of 
> BatchFlinkExecutableStageContext.BatchFactory are loaded for a given job when 
> executing in standalone cluster mode. This context factory is responsible for 
> maintaining singleton state across a TaskManager (JVM) in order to share SDK 
> Environments across workers in a given job. The multiple-loading breaks 
> singleton semantics and results in an indeterminate number of Environments 
> being created.
> It turns out that the [Flink classloading 
> mechanism|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/debugging_classloading.html]
>  is determined by deployment mode. Note that "user code" as referenced by 
> this link is actually the Flink job server jar. Actual end-user code lives 
> inside of the SDK Environment and uploaded artifacts.
> In order to maintain singletons without resorting to IPC (for example, using 
> file locks and/or additional gRPC servers), we need to force non-dynamic 
> classloading. For example, this happens when jobs are submitted to YARN for 
> one-off deployments via `flink run`. However, connecting to an existing 
> (Flink standalone) deployment results in dynamic classloading.
> We should investigate this behavior and either document (and attempt to 
> enforce) deployment modes that are consistent with our requirements, or (if 
> possible) create a custom classloader that enforces singleton loading.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-5110) Reconile Flink JVM singleton management with deployment

2018-08-08 Thread Ben Sidhom (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom reassigned BEAM-5110:


Assignee: Ben Sidhom

> Reconile Flink JVM singleton management with deployment
> ---
>
> Key: BEAM-5110
> URL: https://issues.apache.org/jira/browse/BEAM-5110
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>
> [~angoenka] noticed through debugging that multiple instances of 
> BatchFlinkExecutableStageContext.BatchFactory are loaded for a given job when 
> executing in standalone cluster mode. This context factory is responsible for 
> maintaining singleton state across a TaskManager (JVM) in order to share SDK 
> Environments across workers in a given job. The multiple-loading breaks 
> singleton semantics and results in an indeterminate number of Environments 
> being created.
> It turns out that the [Flink classloading 
> mechanism|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/debugging_classloading.html]
>  is determined by deployment mode. Note that "user code" as referenced by 
> this link is actually the Flink job server jar. Actual end-user code lives 
> inside of the SDK Environment and uploaded artifacts.
> In order to maintain singletons without resorting to IPC (for example, using 
> file locks and/or additional gRPC servers), we need to force non-dynamic 
> classloading. For example, this happens when jobs are submitted to YARN for 
> one-off deployments via `flink run`. However, connecting to an existing 
> (Flink standalone) deployment results in dynamic classloading.
> We should investigate this behavior and either document (and attempt to 
> enforce) deployment modes that are consistent with our requirements, or (if 
> possible) create a custom classloader that enforces singleton loading.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-5110) Reconile Flink JVM singleton management with deployment

2018-08-08 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-5110:


 Summary: Reconile Flink JVM singleton management with deployment
 Key: BEAM-5110
 URL: https://issues.apache.org/jira/browse/BEAM-5110
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Reporter: Ben Sidhom


[~angoenka] noticed through debugging that multiple instances of 
BatchFlinkExecutableStageContext.BatchFactory are loaded for a given job when 
executing in standalone cluster mode. This context factory is responsible for 
maintaining singleton state across a TaskManager (JVM) in order to share SDK 
Environments across workers in a given job. The multiple-loading breaks 
singleton semantics and results in an indeterminate number of Environments 
being created.

It turns out that the [Flink classloading 
mechanism|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/debugging_classloading.html]
 is determined by deployment mode. Note that "user code" as referenced by this 
link is actually the Flink job server jar. Actual end-user code lives inside of 
the SDK Environment and uploaded artifacts.

In order to maintain singletons without resorting to IPC (for example, using 
file locks and/or additional gRPC servers), we need to force non-dynamic 
classloading. For example, this happens when jobs are submitted to YARN for 
one-off deployments via `flink run`. However, connecting to an existing (Flink 
standalone) deployment results in dynamic classloading.

We should investigate this behavior and either document (and attempt to 
enforce) deployment modes that are consistent with our requirements, or (if 
possible) create a custom classloader that enforces singleton loading.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4148) Local server api descriptors contain urls that work on Mac and Linux

2018-08-03 Thread Ben Sidhom (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568949#comment-16568949
 ] 

Ben Sidhom commented on BEAM-4148:
--

Basically, if the reference runner is manually creating its own Docker 
instances, it should stop doing that and start using the Docker factory.

> Local server api descriptors contain urls that work on Mac and Linux
> 
>
> Key: BEAM-4148
> URL: https://issues.apache.org/jira/browse/BEAM-4148
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core
>Reporter: Ben Sidhom
>Priority: Minor
>
> Docker for Mac does not allow host networking and thus will not allow SDK 
> harnesses to access runner services via `localhost`. Instead, a special DNS 
> name is used to refer to the host machine: docker.for.mac.host.internal. 
> (Note that this value sometimes changes between Docker releases).
> We should attempt to detect the host operating system and return different 
> API descriptors based on this.
> See 
> [https://github.com/bsidhom/beam/commit/3adaeb0d33dc26f0910c1f8af2821cce4ee0b965]
>  for how this might be done.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4148) Local server api descriptors contain urls that work on Mac and Linux

2018-08-03 Thread Ben Sidhom (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568948#comment-16568948
 ] 

Ben Sidhom commented on BEAM-4148:
--

The linked PR should have actually solved this for the Docker environment 
factory (https://github.com/apache/beam/pull/5392). I actually meant to close 
this since this is ideally the only place where Docker environments will be 
created.

 

Where are you still seeing this?

> Local server api descriptors contain urls that work on Mac and Linux
> 
>
> Key: BEAM-4148
> URL: https://issues.apache.org/jira/browse/BEAM-4148
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core
>Reporter: Ben Sidhom
>Priority: Minor
>
> Docker for Mac does not allow host networking and thus will not allow SDK 
> harnesses to access runner services via `localhost`. Instead, a special DNS 
> name is used to refer to the host machine: docker.for.mac.host.internal. 
> (Note that this value sometimes changes between Docker releases).
> We should attempt to detect the host operating system and return different 
> API descriptors based on this.
> See 
> [https://github.com/bsidhom/beam/commit/3adaeb0d33dc26f0910c1f8af2821cce4ee0b965]
>  for how this might be done.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2930) Flink support for portable side input

2018-06-28 Thread Ben Sidhom (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-2930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526453#comment-16526453
 ] 

Ben Sidhom commented on BEAM-2930:
--

That works for the batch runner, but it will not work for the streaming runner. 
It depends on the scope of this bug.

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4615) Flink job server driver wrapper

2018-06-21 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4615:


 Summary: Flink job server driver wrapper
 Key: BEAM-4615
 URL: https://issues.apache.org/jira/browse/BEAM-4615
 Project: Beam
  Issue Type: New Feature
  Components: runner-flink
Reporter: Ben Sidhom
Assignee: Ben Sidhom


This includes:
 * A gradle wrapper that can execute the Flink job server driver so that it can 
be easily run locally for testing/debugging.
 * A shadow ("uber") target that packages all portable Flink runner 
dependencies into a runnable jar. This jar can then be submitted to Flink 
clusters via `flink run`.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-4523) Implement Flink batch ExecutableStage context

2018-06-21 Thread Ben Sidhom (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom closed BEAM-4523.

   Resolution: Fixed
Fix Version/s: Not applicable

> Implement Flink batch ExecutableStage context
> -
>
> Key: BEAM-4523
> URL: https://issues.apache.org/jira/browse/BEAM-4523
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
> Fix For: Not applicable
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> The the ExecutableStage context is a wrapper for the job and stage bundle 
> factories and pooled artifact sources. It should take care of caching the 
> overall job bundle factory since we do not have access to job lifecycle hooks 
> in Flink but would like to reuse services and resources across operators 
> within a given job.
> FlinkExecutableStageContext already exists as a skeleton, but it needs to be 
> fleshed out.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-4231) Runner utility for Coder instantiation

2018-06-21 Thread Ben Sidhom (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom closed BEAM-4231.

   Resolution: Fixed
Fix Version/s: Not applicable

> Runner utility for Coder instantiation
> --
>
> Key: BEAM-4231
> URL: https://issues.apache.org/jira/browse/BEAM-4231
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
> Fix For: Not applicable
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Portable runners need to instantiate coders for communicating with SDK 
> harnesses in a consistent way. They cannot simply instantiate coders as 
> defined by PCollections because some component coders may only be known to 
> specific SDKs. Unknown coders should be length-prefixed; the underlying 
> elements should only be exposed to runners as byte strings.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4285) Flink batch state request handler

2018-06-08 Thread Ben Sidhom (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom reassigned BEAM-4285:


Assignee: Ben Sidhom

> Flink batch state request handler
> -
>
> Key: BEAM-4285
> URL: https://issues.apache.org/jira/browse/BEAM-4285
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>
> In order to support side inputs Flink needs a state service request handler. 
> As in the non-portable we can start by handling batch side inputs by Flink 
> broadcast variables.
> [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
>  or 
> [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
>  can be used as a starting point. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4404) Fix docker run arguments

2018-05-24 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4404:


 Summary: Fix docker run arguments
 Key: BEAM-4404
 URL: https://issues.apache.org/jira/browse/BEAM-4404
 Project: Beam
  Issue Type: Bug
  Components: runner-core
Reporter: Ben Sidhom
Assignee: Thomas Groh


Docker options need to be separate from container entrypoint arguments. See 
[https://github.com/bsidhom/beam/commit/65c152d14d54d1505bfc2637aeb32189e00d160d.]

DockerCommand in master does not have this fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4267) Implement a reusable library that can run an ExecutableStage with a given Environment

2018-05-22 Thread Ben Sidhom (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-4267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16484482#comment-16484482
 ] 

Ben Sidhom commented on BEAM-4267:
--

Yep! Although in order to actually run stuff end-to-end, we'll also need to 
have job submission and artifact staging and retrieval wired in.

> Implement a reusable library that can run an ExecutableStage with a given 
> Environment
> -
>
> Key: BEAM-4267
> URL: https://issues.apache.org/jira/browse/BEAM-4267
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Axel Magnuson
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Build off of the interfaces introduced in 
> [BEAM-3327|https://github.com/apache/beam/pull/5152] to provide a reusable 
> execution library to runners.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4286) Pooled artifact source

2018-05-14 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4286:


 Summary: Pooled artifact source
 Key: BEAM-4286
 URL: https://issues.apache.org/jira/browse/BEAM-4286
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Reporter: Ben Sidhom
Assignee: Ben Sidhom


Because DistributeCache lifetimes are tied to operator lifetimes in Flink, we 
need a way to wrap operator-scoped artifact sources. Artifacts are inherently 
job-scoped and should be the same throughout a job's lifetime. For this reason, 
it is safe to pool artifact sources and serve artifacts from an arbitrary 
pooled source as long as the underlying source is still in scope.

We need a pooled source in order to satisfy the bundle factory interfaces. 
Using the job-scoped and stage-scoped bundle factories allows us to cache and 
reuse different components that serve SDK harnesses. Because the distributed 
cache lifetimes are specific to Flink, the pooled artifact source should 
probably live in a runner-specific directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-05-14 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom closed BEAM-2898.

   Resolution: Fixed
Fix Version/s: Not applicable

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Ben Sidhom
>Priority: Major
>  Labels: portability
> Fix For: Not applicable
>
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-05-14 Thread Ben Sidhom (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474569#comment-16474569
 ] 

Ben Sidhom commented on BEAM-2898:
--

I'm going to mark this as fixed since we have fusion and executable stage 
representation. Fusion is exercised by the portable translator.

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Ben Sidhom
>Priority: Major
>  Labels: portability
> Fix For: Not applicable
>
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4285) Flink batch state request handler

2018-05-14 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4285:


 Summary: Flink batch state request handler
 Key: BEAM-4285
 URL: https://issues.apache.org/jira/browse/BEAM-4285
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Reporter: Ben Sidhom


In order to support side inputs Flink needs a state service request handler. As 
in the non-portable we can start by handling batch side inputs by Flink 
broadcast variables.

[https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
 or 
[https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
 can be used as a starting point. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4271) Executable stages allow side input coders to be set and/or queried

2018-05-10 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4271:


 Summary: Executable stages allow side input coders to be set 
and/or queried
 Key: BEAM-4271
 URL: https://issues.apache.org/jira/browse/BEAM-4271
 Project: Beam
  Issue Type: Bug
  Components: runner-core
Reporter: Ben Sidhom


ProcessBundleDescriptors may contain side input references from inner 
PTransforms. These side inputs do not have explicit coders; instead, SDK 
harnesses use the PCollection coders by default.

Using the default PCollection coder as specified at pipeline construction is in 
general not the correct thing to do. When PCollection elements are 
materialized, any coders unknown to a runner a length-prefixed. This means that 
materialized PCollections do not use their original element coders. Side inputs 
are delivered to SDKs via MultimapSideInput StateRequests. The responses to 
these requests are expected to contain all of the values for a given key (and 
window), coded with the PCollection KV.value coder, concatenated. However, at 
the time of serving these requests on the runner side, we do not have enough 
information to reconstruct the original value coders.

There are different ways to address this issue. For example:
 * Modify the associated PCollection coder to match the coder that the runner 
uses to materialize elements. This means that anywhere a given PCollection is 
used within a given bundle, it will use the runner-safe coder. This may 
introduce inefficiencies but should be "correct".
 * Annotate side inputs with explicit coders. This guarantees that the key and 
value coders used by the runner match the coders used by SDKs. Furthermore, it 
allows the _runners_ to specify coders. This involves changes to the proto 
models and all SDKs.
 * Annotate side input state requests with both key and value coders. This 
inverts the expected responsibility and has the SDK determine runner coders. 
Additionally, because runners do not understand all SDK types, additional coder 
substitution will need to be done at request handling time to make sure that 
the requested coder can be instantiated and will remain consistent with the SDK 
coder. This requires only small changes to SDKs because they may opt to use 
their default PCollection coders.

All of the these approaches have their own downsides. Explicit side input 
coders is probably the right thing to do long-term, but the simplest change for 
now is to modify PCollection coders to match exactly how they're materialized.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2594) Python shim for submitting to the ULR

2018-05-08 Thread Ben Sidhom (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467995#comment-16467995
 ] 

Ben Sidhom commented on BEAM-2594:
--

I'm not sure exactly what this entails. It depends exactly what we want the 
"ULR" to be responsible for. To start with, I think it's reasonable to just 
submit to a running job service with the appropriate pipeline options. 
Eventually, it should actually spin up the job service it will use. However, 
before it can actually start up any job services, we need to nail down our job 
service "entry point" story.

> Python shim for submitting to the ULR
> -
>
> Key: BEAM-2594
> URL: https://issues.apache.org/jira/browse/BEAM-2594
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Kenneth Knowles
>Priority: Minor
>  Labels: portability
>
> Python SDK should support submission of portable pipelines to the ULR, as per 
> https://s.apache.org/beam-job-api.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-07 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom closed BEAM-3972.

   Resolution: Fixed
Fix Version/s: Not applicable

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
> Fix For: Not applicable
>
>  Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4231) Runner utility for Coder instantiation

2018-05-03 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4231:


 Summary: Runner utility for Coder instantiation
 Key: BEAM-4231
 URL: https://issues.apache.org/jira/browse/BEAM-4231
 Project: Beam
  Issue Type: Bug
  Components: runner-core
Reporter: Ben Sidhom
Assignee: Ben Sidhom


Portable runners need to instantiate coders for communicating with SDK 
harnesses in a consistent way. They cannot simply instantiate coders as defined 
by PCollections because some component coders may only be known to specific 
SDKs. Unknown coders should be length-prefixed; the underlying elements should 
only be exposed to runners as byte strings.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4228) The FlinkRunner shouldn't require all of the values for a key to fit in memory

2018-05-02 Thread Ben Sidhom (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16461769#comment-16461769
 ] 

Ben Sidhom commented on BEAM-4228:
--

For context, see [https://github.com/apache/beam/pull/5226/files#r185652571.]

> The FlinkRunner shouldn't require all of the values for a key to fit in memory
> --
>
> Key: BEAM-4228
> URL: https://issues.apache.org/jira/browse/BEAM-4228
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Thomas Groh
>Priority: Major
>
> The use of a reducer that adds all of the elements that it consumes to a list 
> is the primary way in which this occurs - if instead, we produce a filtered 
> iterable, or a collection of filtered iterables, we can lazily iterate over 
> all of the contained elements without having to buffer all of the elements.
>  
> For an example of where this occurs, see {{Concatenate}} in  
> {{FlinkBatchPortablePipelineTranslator}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-3914) 'Unzip' flattens before performing fusion

2018-04-30 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom resolved BEAM-3914.
--
   Resolution: Fixed
Fix Version/s: Not applicable

> 'Unzip' flattens before performing fusion
> -
>
> Key: BEAM-3914
> URL: https://issues.apache.org/jira/browse/BEAM-3914
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: Not applicable
>
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> This consists of duplicating nodes downstream of a flatten that exist within 
> an environment, and reintroducing the flatten immediately upstream of a 
> runner-executed transform (the flatten should be executed within the runner)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.

2018-04-30 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom reassigned BEAM-3327:


Assignee: Axel Magnuson  (was: Ben Sidhom)

> Add abstractions to manage Environment Instance lifecycles.
> ---
>
> Key: BEAM-3327
> URL: https://issues.apache.org/jira/browse/BEAM-3327
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 24h 50m
>  Remaining Estimate: 0h
>
> This permits remote stage execution for arbitrary environments



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4149) Java SDK Harness should populate worker id in control plane headers

2018-04-30 Thread Ben Sidhom (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-4149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459052#comment-16459052
 ] 

Ben Sidhom commented on BEAM-4149:
--

The workaround is technically fine for now, but it will break as soon as we 
have multiple concurrent environments and need to run test cases for this. More 
importantly, having an empty worker id makes it harder to debug test and 
runtime failures due to SDK/runner communication issues.

> Java SDK Harness should populate worker id in control plane headers
> ---
>
> Key: BEAM-4149
> URL: https://issues.apache.org/jira/browse/BEAM-4149
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-harness
>Reporter: Ben Sidhom
>Assignee: Luke Cwik
>Priority: Minor
>
> The Java SDK harness currently does nothing to populate control plane headers 
> with the harness worker id. This id is necessary in order to identify 
> harnesses when multiple are run from the same runner control server.
> Note that this affects the _Java_ harness specifically (e.g., when running a 
> local process or in-memory harness). When the harness launched within the 
> docker container, the go boot code takes care of setting this: 
> https://github.com/apache/beam/blob/dffe50924f34d3cc994008703f01e802c99913d2/sdks/java/container/boot.go#L70



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4147) Artifact source abstractions

2018-04-25 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom updated BEAM-4147:
-
Summary: Artifact source abstractions  (was: Portable runner Job API 
abstractions)

> Artifact source abstractions
> 
>
> Key: BEAM-4147
> URL: https://issues.apache.org/jira/browse/BEAM-4147
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Axel Magnuson
>Priority: Major
>
> We need a way to wire in arbitrary runner artifact storage backends into the 
> job server and through to artifact staging on workers. This requires some new 
> abstractions in front of the job service.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4176) Portable batch runner passes all ValidatesRunner tests that non-portable runner passes

2018-04-25 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom reassigned BEAM-4176:


Assignee: (was: Aljoscha Krettek)

> Portable batch runner passes all ValidatesRunner tests that non-portable 
> runner passes
> --
>
> Key: BEAM-4176
> URL: https://issues.apache.org/jira/browse/BEAM-4176
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Major
>
> We need this as a sanity check that runner execution is correct.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4176) Portable batch runner passes all ValidatesRunner tests that non-portable runner passes

2018-04-25 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4176:


 Summary: Portable batch runner passes all ValidatesRunner tests 
that non-portable runner passes
 Key: BEAM-4176
 URL: https://issues.apache.org/jira/browse/BEAM-4176
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Reporter: Ben Sidhom
Assignee: Aljoscha Krettek


We need this as a sanity check that runner execution is correct.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4063) Flink runner supports cluster-wide artifact deployments through the Distributed Cache

2018-04-25 Thread Ben Sidhom (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-4063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453033#comment-16453033
 ] 

Ben Sidhom commented on BEAM-4063:
--

Note that the DistributedCache already supports registering artifacts by 
arbitrary names–the registered artifact name mapping is stored in memory in a 
map and written out using Java serialization.

> Flink runner supports cluster-wide artifact deployments through the 
> Distributed Cache
> -
>
> Key: BEAM-4063
> URL: https://issues.apache.org/jira/browse/BEAM-4063
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>
> As of now, Flink effectively has a dependency on an external storage system 
> for artifact management. This is because the Flink Distributed Cache does not 
> actually distribute and cache blobs itself, but rather expects that each node 
> in a running cluster has access to a well-known artifact resource.
> We should get this for free whenever 
> [https://github.com/apache/flink/pull/5580] is merged (likely in 1.5). For 
> now, we will have to defer to external storage systems like GCS or HDFS.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2421) Migrate Apache Beam to use impulse primitive as the only root primitive

2018-04-23 Thread Ben Sidhom (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448782#comment-16448782
 ] 

Ben Sidhom commented on BEAM-2421:
--

[https://github.com/apache/beam/pull/4783] adds support for the Flink batch 
runner.

> Migrate Apache Beam to use impulse primitive as the only root primitive
> ---
>
> Key: BEAM-2421
> URL: https://issues.apache.org/jira/browse/BEAM-2421
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Luke Cwik
>Priority: Major
>  Labels: portability
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> The impulse source emits a single byte array element within the global window.
> This is in preparation for using SDF as the replacement for different bounded 
> and unbounded source implementations.
> Before:
> Read(Source) -> ParDo -> ...
> Impulse -> SDF(Source) -> ParDo -> ...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4149) Java SDK Harness should populate worker id in control plane headers

2018-04-23 Thread Ben Sidhom (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-4149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448638#comment-16448638
 ] 

Ben Sidhom commented on BEAM-4149:
--

As a workaround for now, we should explicitly set "worker_id" to the empty 
string in the header accessor.

> Java SDK Harness should populate worker id in control plane headers
> ---
>
> Key: BEAM-4149
> URL: https://issues.apache.org/jira/browse/BEAM-4149
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-harness
>Reporter: Ben Sidhom
>Assignee: Luke Cwik
>Priority: Minor
>
> The Java SDK harness currently does nothing to populate control plane headers 
> with the harness worker id. This id is necessary in order to identify 
> harnesses when multiple are run from the same runner control server.
> Note that this affects the _Java_ harness specifically (e.g., when running a 
> local process or in-memory harness). When the harness launched within the 
> docker container, the go boot code takes care of setting this: 
> https://github.com/apache/beam/blob/dffe50924f34d3cc994008703f01e802c99913d2/sdks/java/container/boot.go#L70



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4106) Merge staging file options between Dataflow runner and portable runner

2018-04-20 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom updated BEAM-4106:
-
Description: 
Both runners (will) require staging file options. This should be refactored 
such that the same options are used.

Java pipelines stage all entries on the classpath (assuming a URLClassLoader is 
used) by default. The new merged pipeline options should have similar behavior 
and be documented as such.

  was:Both runners (will) require staging file options. This should be 
refactored such that the same options are used.


> Merge staging file options between Dataflow runner and portable runner
> --
>
> Key: BEAM-4106
> URL: https://issues.apache.org/jira/browse/BEAM-4106
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core, runner-dataflow
>Reporter: Ben Sidhom
>Assignee: Kenneth Knowles
>Priority: Trivial
>
> Both runners (will) require staging file options. This should be refactored 
> such that the same options are used.
> Java pipelines stage all entries on the classpath (assuming a URLClassLoader 
> is used) by default. The new merged pipeline options should have similar 
> behavior and be documented as such.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4151) Prevent control client pool service from leaking stale client references

2018-04-20 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom reassigned BEAM-4151:


Assignee: Ben Sidhom  (was: Kenneth Knowles)

> Prevent control client pool service from leaking stale client references
> 
>
> Key: BEAM-4151
> URL: https://issues.apache.org/jira/browse/BEAM-4151
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>
> The pool service that connects incoming gRPC control clients to consumers may 
> leak client references. See 
> [https://github.com/apache/beam/pull/5189#discussion_r183144364] for context.
> It's worth considering whether we actually expect to have enough incoming 
> control clients to make this an issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4151) Prevent control client pool service from leaking stale client references

2018-04-20 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4151:


 Summary: Prevent control client pool service from leaking stale 
client references
 Key: BEAM-4151
 URL: https://issues.apache.org/jira/browse/BEAM-4151
 Project: Beam
  Issue Type: Bug
  Components: runner-core
Reporter: Ben Sidhom
Assignee: Kenneth Knowles


The pool service that connects incoming gRPC control clients to consumers may 
leak client references. See 
[https://github.com/apache/beam/pull/5189#discussion_r183144364] for context.

It's worth considering whether we actually expect to have enough incoming 
control clients to make this an issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-2889) Flink runs portable pipelines

2018-04-20 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom reassigned BEAM-2889:


Assignee: Ben Sidhom  (was: Aljoscha Krettek)

> Flink runs portable pipelines
> -
>
> Key: BEAM-2889
> URL: https://issues.apache.org/jira/browse/BEAM-2889
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Ben Sidhom
>Priority: Major
>  Labels: portability
>
> Flink should run pipelines using the full portability API as currently 
> defined:
> https://s.apache.org/beam-fn-api 
> https://s.apache.org/beam-runner-api
> https://s.apache.org/beam-job-api
> https://s.apache.org/beam-fn-api-container-contract
> This issue tracks its adoption of the portability framework. New Fn API and 
> other features will be tracked separately.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-3792) Python submits portable pipelines to the Flink-served endpoint.

2018-04-20 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom updated BEAM-3792:
-
Issue Type: Bug  (was: Sub-task)
Parent: (was: BEAM-2889)

> Python submits portable pipelines to the Flink-served endpoint.
> ---
>
> Key: BEAM-3792
> URL: https://issues.apache.org/jira/browse/BEAM-3792
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-2588) Portable Flink Runner Job API

2018-04-20 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom updated BEAM-2588:
-
Issue Type: New Feature  (was: Sub-task)
Parent: (was: BEAM-2889)

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-2795) FlinkRunner: translate using SDK-agnostic means

2018-04-20 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom updated BEAM-2795:
-
Issue Type: Improvement  (was: Sub-task)
Parent: (was: BEAM-2889)

> FlinkRunner: translate using SDK-agnostic means
> ---
>
> Key: BEAM-2795
> URL: https://issues.apache.org/jira/browse/BEAM-2795
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Ben Sidhom
>Priority: Major
>  Labels: portability
> Fix For: 2.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-2897) Configurable container/process management

2018-04-20 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom updated BEAM-2897:
-
Issue Type: New Feature  (was: Sub-task)
Parent: (was: BEAM-2889)

> Configurable container/process management
> -
>
> Key: BEAM-2897
> URL: https://issues.apache.org/jira/browse/BEAM-2897
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
> Fix For: Not applicable
>
>
> Filnk should support configurable container/process management as per 
> https://s.apache.org/beam-fn-api-container-contract.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-3886) Python SDK harness uses State API from ProcessBundleDescriptor

2018-04-20 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom updated BEAM-3886:
-
Description: 
The Python harness should pull the state api descriptor from the current 
process bundle descriptor when processing bundles.

As a minor optimization and to make implementing new runners easier, the 
harness should not talk to the State server unless it's actually needed.

  was:The Python harness always talks to the State API, even if it is never 
used by the current process bundle. As a minor optimization and to make 
implementing new runners easier, the harness should not talk to the State 
server unless it's actually needed.


> Python SDK harness uses State API from ProcessBundleDescriptor
> --
>
> Key: BEAM-3886
> URL: https://issues.apache.org/jira/browse/BEAM-3886
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Ben Sidhom
>Assignee: Ahmet Altay
>Priority: Minor
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The Python harness should pull the state api descriptor from the current 
> process bundle descriptor when processing bundles.
> As a minor optimization and to make implementing new runners easier, the 
> harness should not talk to the State server unless it's actually needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-3886) Python SDK harness uses State API from ProcessBundleDescriptor

2018-04-20 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom updated BEAM-3886:
-
Summary: Python SDK harness uses State API from ProcessBundleDescriptor  
(was: Python SDK harness does not contact State API if not needed)

> Python SDK harness uses State API from ProcessBundleDescriptor
> --
>
> Key: BEAM-3886
> URL: https://issues.apache.org/jira/browse/BEAM-3886
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Ben Sidhom
>Assignee: Ahmet Altay
>Priority: Minor
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The Python harness always talks to the State API, even if it is never used by 
> the current process bundle. As a minor optimization and to make implementing 
> new runners easier, the harness should not talk to the State server unless 
> it's actually needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4149) Java SDK Harness should populate worker id in control plane headers

2018-04-19 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4149:


 Summary: Java SDK Harness should populate worker id in control 
plane headers
 Key: BEAM-4149
 URL: https://issues.apache.org/jira/browse/BEAM-4149
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-harness
Reporter: Ben Sidhom
Assignee: Luke Cwik


The Java SDK harness currently does nothing to populate control plane headers 
with the harness worker id. This id is necessary in order to identify harnesses 
when multiple are run from the same runner control server.

Note that this affects the _Java_ harness specifically (e.g., when running a 
local process or in-memory harness). When the harness launched within the 
docker container, the go boot code takes care of setting this: 
https://github.com/apache/beam/blob/dffe50924f34d3cc994008703f01e802c99913d2/sdks/java/container/boot.go#L70



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4148) Local server api descriptors contain urls that work on Mac and Linux

2018-04-19 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4148:


 Summary: Local server api descriptors contain urls that work on 
Mac and Linux
 Key: BEAM-4148
 URL: https://issues.apache.org/jira/browse/BEAM-4148
 Project: Beam
  Issue Type: Improvement
  Components: runner-core
Reporter: Ben Sidhom
Assignee: Kenneth Knowles


Docker for Mac does not allow host networking and thus will not allow SDK 
harnesses to access runner services via `localhost`. Instead, a special DNS 
name is used to refer to the host machine: docker.for.mac.host.internal. (Note 
that this value sometimes changes between Docker releases).

We should attempt to detect the host operating system and return different API 
descriptors based on this.

See 
[https://github.com/bsidhom/beam/commit/3adaeb0d33dc26f0910c1f8af2821cce4ee0b965]
 for how this might be done.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4147) Portable runner Job API abstractions

2018-04-19 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4147:


 Summary: Portable runner Job API abstractions
 Key: BEAM-4147
 URL: https://issues.apache.org/jira/browse/BEAM-4147
 Project: Beam
  Issue Type: New Feature
  Components: runner-core
Reporter: Ben Sidhom
Assignee: Axel Magnuson


We need a way to wire in arbitrary runner artifact storage backends into the 
job server and through to artifact staging on workers. This requires some new 
abstractions in front of the job service.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4146) Python SDK sets environment in portable pipelines

2018-04-19 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4146:


 Summary: Python SDK sets environment in portable pipelines
 Key: BEAM-4146
 URL: https://issues.apache.org/jira/browse/BEAM-4146
 Project: Beam
  Issue Type: Bug
  Components: sdk-py-core
Reporter: Ben Sidhom
Assignee: Ahmet Altay


Environments must be set in any non-runner-executed transforms. See 
[https://github.com/bsidhom/beam/commit/0362fd1f25] for a possible approach 
until canonical image urls are created.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4145) Java SDK Harness populates control request headers with worker id

2018-04-19 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4145:


 Summary: Java SDK Harness populates control request headers with 
worker id
 Key: BEAM-4145
 URL: https://issues.apache.org/jira/browse/BEAM-4145
 Project: Beam
  Issue Type: New Feature
  Components: sdk-java-harness
Reporter: Ben Sidhom
Assignee: Luke Cwik


Runner code needs to be able to identify incoming harness connections by the 
worker ids that it assigns to them on creation. This is currently done by the 
go boot code when the harness runs in a docker container. However, in-process 
harnesses never specify worker ids. This prevents in-process harnesses from 
being multiplexed by a runner (most likely the ULR and test code).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3673) FlinkRunner: Harness manager for connecting operators to SDK Harnesses

2018-04-19 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom reassigned BEAM-3673:


Assignee: Axel Magnuson

> FlinkRunner: Harness manager for connecting operators to SDK Harnesses
> --
>
> Key: BEAM-3673
> URL: https://issues.apache.org/jira/browse/BEAM-3673
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Axel Magnuson
>Priority: Major
>
> SDK harnesses require a common set of gRPC services to operate. The role of 
> the harness manager is to provide a uniform interface that multiplexes data 
> streams and auxiliary data between SDK environments and operators within a 
> given job.
> Note that multiple operators may communicate with a single SDK environment to 
> amortize container initialization cost. Environments are _not_ shared between 
> different jobs.
> The initial implementation will shell out to local docker, but the harness 
> manager should eventually support working with externally-managed 
> environments (e.g., created by Kubernetes).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.

2018-04-19 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom reassigned BEAM-3327:


Assignee: Ben Sidhom  (was: Axel Magnuson)

> Add abstractions to manage Environment Instance lifecycles.
> ---
>
> Key: BEAM-3327
> URL: https://issues.apache.org/jira/browse/BEAM-3327
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Ben Sidhom
>Priority: Major
>  Labels: portability
>  Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> This permits remote stage execution for arbitrary environments



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4109) Support arbitrary artifact names in local file artifact staging service

2018-04-18 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom reassigned BEAM-4109:


Assignee: (was: Kenneth Knowles)

> Support arbitrary artifact names in local file artifact staging service
> ---
>
> Key: BEAM-4109
> URL: https://issues.apache.org/jira/browse/BEAM-4109
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Ben Sidhom
>Priority: Minor
>
> The local-file based artifact staging service implementation stores artifacts 
> in a flat directory under the exact names they are given by the artifact 
> staging request. This assumes that all artifact names are safe file names and 
> requires staging clients to manually escape names.
> Instead, the staging service should perform its own escaping/mapping 
> transparently and allow clients to specify arbitrary artifact staging names.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4131) Python SDK harness container image contains SDK and dependencies

2018-04-18 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4131:


 Summary: Python SDK harness container image contains SDK and 
dependencies
 Key: BEAM-4131
 URL: https://issues.apache.org/jira/browse/BEAM-4131
 Project: Beam
  Issue Type: New Feature
  Components: sdk-py-harness
Reporter: Ben Sidhom
Assignee: Robert Bradshaw


The Docker image for the SDK harness should contain SDK code and dependencies 
so that this does not need to be downloaded from the artifact retrieval service 
at each boot.

This is required for operation with portable runners right now because the 
python client does not currently stage the SDK itself (as it does with the 
Dataflow runner).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4130) Portable Flink runner entry point

2018-04-18 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4130:


 Summary: Portable Flink runner entry point
 Key: BEAM-4130
 URL: https://issues.apache.org/jira/browse/BEAM-4130
 Project: Beam
  Issue Type: New Feature
  Components: runner-flink
Reporter: Ben Sidhom


The portable Flink runner exists as a Job Service that runs somewhere. We need 
a main entry point that itself spins up the job service (and artifact staging 
service). The main program itself should be packaged into an uberjar such that 
it can be run locally or submitted to a Flink deployment via `flink run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-2597) FlinkRunner ExecutableStage batch operator

2018-04-18 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom updated BEAM-2597:
-
Description: This operator will execute user code in the context of an SDK 
harness by constructing a ProcessBundleDescriptor from an ExecutableStage 
(physical stage plan) and sending instructions/elements over the control and 
data planes.  (was:  

To run non-Java SDK code is to put together an operator that manages a Fn API 
client DoFnRunner and an SDK harness Fn API server.)

> FlinkRunner ExecutableStage batch operator
> --
>
> Key: BEAM-2597
> URL: https://issues.apache.org/jira/browse/BEAM-2597
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Ben Sidhom
>Priority: Major
>  Labels: portability
>
> This operator will execute user code in the context of an SDK harness by 
> constructing a ProcessBundleDescriptor from an ExecutableStage (physical 
> stage plan) and sending instructions/elements over the control and data 
> planes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-2597) FlinkRunner ExecutableStage operator

2018-04-18 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom updated BEAM-2597:
-
Description: 
 

To run non-Java SDK code is to put together an operator that manages a Fn API 
client DoFnRunner and an SDK harness Fn API server.

  was:
To run non-Java SDK code is to put together an operator that manages a Fn API 
client DoFnRunner and an SDK harness Fn API server.

(filing to organize steps, details of this may evolve as it is implemented)


> FlinkRunner ExecutableStage operator
> 
>
> Key: BEAM-2597
> URL: https://issues.apache.org/jira/browse/BEAM-2597
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Ben Sidhom
>Priority: Major
>  Labels: portability
>
>  
> To run non-Java SDK code is to put together an operator that manages a Fn API 
> client DoFnRunner and an SDK harness Fn API server.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-2597) FlinkRunner ExecutableStage batch operator

2018-04-18 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom updated BEAM-2597:
-
Summary: FlinkRunner ExecutableStage batch operator  (was: FlinkRunner 
ExecutableStage operator)

> FlinkRunner ExecutableStage batch operator
> --
>
> Key: BEAM-2597
> URL: https://issues.apache.org/jira/browse/BEAM-2597
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Ben Sidhom
>Priority: Major
>  Labels: portability
>
>  
> To run non-Java SDK code is to put together an operator that manages a Fn API 
> client DoFnRunner and an SDK harness Fn API server.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-2597) FlinkRunner ExecutableStage operator

2018-04-18 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom updated BEAM-2597:
-
Summary: FlinkRunner ExecutableStage operator  (was: FlinkRunner Fn API 
based ParDo operator)

> FlinkRunner ExecutableStage operator
> 
>
> Key: BEAM-2597
> URL: https://issues.apache.org/jira/browse/BEAM-2597
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Ben Sidhom
>Priority: Major
>  Labels: portability
>
> To run non-Java SDK code is to put together an operator that manages a Fn API 
> client DoFnRunner and an SDK harness Fn API server.
> (filing to organize steps, details of this may evolve as it is implemented)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-2897) Configurable container/process management

2018-04-18 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom resolved BEAM-2897.
--
   Resolution: Duplicate
Fix Version/s: Not applicable

> Configurable container/process management
> -
>
> Key: BEAM-2897
> URL: https://issues.apache.org/jira/browse/BEAM-2897
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
> Fix For: Not applicable
>
>
> Filnk should support configurable container/process management as per 
> https://s.apache.org/beam-fn-api-container-contract.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4128) Handle WindowInto transforms in portable Flink runner directly

2018-04-18 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4128:


 Summary: Handle WindowInto transforms in portable Flink runner 
directly
 Key: BEAM-4128
 URL: https://issues.apache.org/jira/browse/BEAM-4128
 Project: Beam
  Issue Type: New Feature
  Components: runner-flink
Reporter: Ben Sidhom
Assignee: Aljoscha Krettek


This is mostly an optimization, but by having the runner itself handle 
well-known WindowFns (e.g., global windowing, fixed windowing), we can avoid 
round trips to the SDK harness. This requires some pipeline surgery 
pre-translation if we want it to work efficiently in conjunction with fusion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4127) Flink portable runner translates streaming pipelines by proto

2018-04-18 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom reassigned BEAM-4127:


Assignee: (was: Aljoscha Krettek)

> Flink portable runner translates streaming pipelines by proto
> -
>
> Key: BEAM-4127
> URL: https://issues.apache.org/jira/browse/BEAM-4127
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-18 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom updated BEAM-3972:
-
Summary: Flink runner translates batch pipelines directly by proto  (was: 
Flink runner translates pipelines directly by proto)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4127) Flink portable runner translates streaming pipelines by proto

2018-04-18 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4127:


 Summary: Flink portable runner translates streaming pipelines by 
proto
 Key: BEAM-4127
 URL: https://issues.apache.org/jira/browse/BEAM-4127
 Project: Beam
  Issue Type: New Feature
  Components: runner-flink
Reporter: Ben Sidhom
Assignee: Aljoscha Krettek






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4109) Support arbitrary artifact names in local file artifact staging service

2018-04-18 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom updated BEAM-4109:
-
Issue Type: Bug  (was: Improvement)

> Support arbitrary artifact names in local file artifact staging service
> ---
>
> Key: BEAM-4109
> URL: https://issues.apache.org/jira/browse/BEAM-4109
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Kenneth Knowles
>Priority: Minor
>
> The local-file based artifact staging service implementation stores artifacts 
> in a flat directory under the exact names they are given by the artifact 
> staging request. This assumes that all artifact names are safe file names and 
> requires staging clients to manually escape names.
> Instead, the staging service should perform its own escaping/mapping 
> transparently and allow clients to specify arbitrary artifact staging names.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-4056) Identify Side Inputs by PTransform ID and local name

2018-04-18 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom closed BEAM-4056.

   Resolution: Fixed
Fix Version/s: Not applicable

> Identify Side Inputs by PTransform ID and local name
> 
>
> Key: BEAM-4056
> URL: https://issues.apache.org/jira/browse/BEAM-4056
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
> Fix For: Not applicable
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> This is necessary in order to correctly identify side inputs during all 
> phases of portable pipeline execution (fusion, translation, and SDK 
> execution).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3994) Use typed sinks and sources for FnApiControlClientPoolService

2018-04-18 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom closed BEAM-3994.

   Resolution: Fixed
Fix Version/s: Not applicable

> Use typed sinks and sources for FnApiControlClientPoolService
> -
>
> Key: BEAM-3994
> URL: https://issues.apache.org/jira/browse/BEAM-3994
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
> Fix For: Not applicable
>
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> We operate with blocking queues directly when managing control clients with 
> the FnApiControlClientPoolService. This makes interactions with the client 
> pool difficult to understand. We should instead make client sources and sinks 
> explicit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-4069) Empty pipeline options can be gracefully serialized/deserialized

2018-04-18 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom closed BEAM-4069.

   Resolution: Fixed
Fix Version/s: Not applicable

> Empty pipeline options can be gracefully serialized/deserialized
> 
>
> Key: BEAM-4069
> URL: https://issues.apache.org/jira/browse/BEAM-4069
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
> Fix For: Not applicable
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> PipelineOptionsTranslation.fromProto currently crashes with a 
> NullPointerException when passed an empty options Struct. This is due to 
> ProxyInvocationHandler.Deserializer expecting a non-empty enclosing Struct.
> Empty pipeline options may be passed by SDKs interacting with a job server, 
> so this case needs to be handled. Note that testing a round-trip of an 
> effectively-empty Java PipelineOptions object is not sufficient to catch this 
> because "empty" Java options still contain default fields not defined in 
> other SDKs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4116) Add a timeout to artifact stager

2018-04-18 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4116:


 Summary: Add a timeout to artifact stager
 Key: BEAM-4116
 URL: https://issues.apache.org/jira/browse/BEAM-4116
 Project: Beam
  Issue Type: Improvement
  Components: runner-core
Reporter: Ben Sidhom
Assignee: Kenneth Knowles


ArtifactServiceStager currently blocks indefinitely while waiting for all 
artifacts to stage. It would be nice to have a configurable timeout here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4109) Support arbitrary artifact names in local file artifact staging service

2018-04-17 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4109:


 Summary: Support arbitrary artifact names in local file artifact 
staging service
 Key: BEAM-4109
 URL: https://issues.apache.org/jira/browse/BEAM-4109
 Project: Beam
  Issue Type: Improvement
  Components: runner-core
Reporter: Ben Sidhom
Assignee: Kenneth Knowles


The local-file based artifact staging service implementation stores artifacts 
in a flat directory under the exact names they are given by the artifact 
staging request. This assumes that all artifact names are safe file names and 
requires staging clients to manually escape names.

Instead, the staging service should perform its own escaping/mapping 
transparently and allow clients to specify arbitrary artifact staging names.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4106) Merge staging file options between Dataflow runner and portable runner

2018-04-17 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4106:


 Summary: Merge staging file options between Dataflow runner and 
portable runner
 Key: BEAM-4106
 URL: https://issues.apache.org/jira/browse/BEAM-4106
 Project: Beam
  Issue Type: Improvement
  Components: runner-core, runner-dataflow
Reporter: Ben Sidhom
Assignee: Kenneth Knowles


Both runners (will) require staging file options. This should be refactored 
such that the same options are used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.

2018-04-16 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom reassigned BEAM-3327:


Assignee: Axel Magnuson  (was: Ben Sidhom)

> Add abstractions to manage Environment Instance lifecycles.
> ---
>
> Key: BEAM-3327
> URL: https://issues.apache.org/jira/browse/BEAM-3327
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> This permits remote stage execution for arbitrary environments



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4071) Portable Runner Job API shim

2018-04-13 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4071:


 Summary: Portable Runner Job API shim
 Key: BEAM-4071
 URL: https://issues.apache.org/jira/browse/BEAM-4071
 Project: Beam
  Issue Type: New Feature
  Components: runner-core
Reporter: Ben Sidhom
Assignee: Ben Sidhom


There needs to be a way to execute Java-SDK pipelines against a portable job 
server. The job server itself is expected to be started up out-of-band. The 
"PortableRunner" should take an option indicating the Job API endpoint and 
defer other runner configurations to the backend itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-2588) Portable Flink Runner Job API

2018-04-13 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom updated BEAM-2588:
-
Description: The portable Flink runner needs to be wired into a job server 
so that it can accept jobs the job api (https://s.apache.org/beam-job-api).  
(was: Whatever the result of https://s.apache.org/beam-job-api we will need a 
way for the JVM-based FlinkRunner to receive and run pipelines authors in 
Python.)

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2588) Portable Flink Runner Job API

2018-04-13 Thread Ben Sidhom (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437781#comment-16437781
 ] 

Ben Sidhom commented on BEAM-2588:
--

I've renamed this bug to reflect the fact that the portable Flink runner will 
effectively be its own runner entrypoint. The Job API itself does not actually 
deal with Java "Runners" at all.

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>
> Whatever the result of https://s.apache.org/beam-job-api we will need a way 
> for the JVM-based FlinkRunner to receive and run pipelines authors in Python.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-2588) Portable Flink Runner Job API

2018-04-13 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom updated BEAM-2588:
-
Summary: Portable Flink Runner Job API  (was: FlinkRunner shim for serving 
Job API)

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>
> Whatever the result of https://s.apache.org/beam-job-api we will need a way 
> for the JVM-based FlinkRunner to receive and run pipelines authors in Python.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-04-13 Thread Ben Sidhom (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437763#comment-16437763
 ] 

Ben Sidhom commented on BEAM-2898:
--

Pipeline fusion has been implemented in runner-core. With support for impulse, 
the Flink runner will support pipeline fusion once translation by proto has 
been implemented.

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Ben Sidhom
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-04-13 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom reassigned BEAM-2898:


Assignee: Ben Sidhom

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Ben Sidhom
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4068) Consistent option specification between SDKs and runners by URN

2018-04-13 Thread Ben Sidhom (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-4068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437686#comment-16437686
 ] 

Ben Sidhom commented on BEAM-4068:
--

See [https://github.com/axelmagn/beam/pull/5/files] for an example. Note that 
the specific runner options there are not relevant, but that runners in general 
require arbitrary options.

> Consistent option specification between SDKs and runners by URN
> ---
>
> Key: BEAM-4068
> URL: https://issues.apache.org/jira/browse/BEAM-4068
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Kenneth Knowles
>Priority: Minor
>
> Pipeline options are materialized differently by different SDKs. However, in 
> some cases, runners require Java-specific options that are not available 
> elsewhere. We should decide on well-known URNs and use them across SDKs where 
> applicable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4069) Empty pipeline options can be gracefully serialized/deserialized

2018-04-13 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4069:


 Summary: Empty pipeline options can be gracefully 
serialized/deserialized
 Key: BEAM-4069
 URL: https://issues.apache.org/jira/browse/BEAM-4069
 Project: Beam
  Issue Type: Bug
  Components: runner-core
Reporter: Ben Sidhom
Assignee: Ben Sidhom


PipelineOptionsTranslation.fromProto currently crashes with a 
NullPointerException when passed an empty options Struct. This is due to 
ProxyInvocationHandler.Deserializer expecting a non-empty enclosing Struct.

Empty pipeline options may be passed by SDKs interacting with a job server, so 
this case needs to be handled. Note that testing a round-trip of an 
effectively-empty Java PipelineOptions object is not sufficient to catch this 
because "empty" Java options still contain default fields not defined in other 
SDKs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4068) Consistent option specification between SDKs and runners by URN

2018-04-13 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4068:


 Summary: Consistent option specification between SDKs and runners 
by URN
 Key: BEAM-4068
 URL: https://issues.apache.org/jira/browse/BEAM-4068
 Project: Beam
  Issue Type: New Feature
  Components: runner-core
Reporter: Ben Sidhom
Assignee: Kenneth Knowles


Pipeline options are materialized differently by different SDKs. However, in 
some cases, runners require Java-specific options that are not available 
elsewhere. We should decide on well-known URNs and use them across SDKs where 
applicable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4067) Add portable Flink test runner

2018-04-13 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4067:


 Summary: Add portable Flink test runner
 Key: BEAM-4067
 URL: https://issues.apache.org/jira/browse/BEAM-4067
 Project: Beam
  Issue Type: New Feature
  Components: runner-flink
Reporter: Ben Sidhom
Assignee: Aljoscha Krettek


The portable Flink runner cannot be tested through the normal mechanisms used 
for ValidatesRunner tests because it requires a job server to be constructed 
out of band and for pipelines to be run through it. We should implement a shim 
that acts as a standard Java SDK Runner that spins up the necessary server 
(possibly in-process) and runs against it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4063) Flink runner supports cluster-wide artifact deployments through the Distributed Cache

2018-04-12 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4063:


 Summary: Flink runner supports cluster-wide artifact deployments 
through the Distributed Cache
 Key: BEAM-4063
 URL: https://issues.apache.org/jira/browse/BEAM-4063
 Project: Beam
  Issue Type: New Feature
  Components: runner-flink
Reporter: Ben Sidhom
Assignee: Aljoscha Krettek


As of now, Flink effectively has a dependency on an external storage system for 
artifact management. This is because the Flink Distributed Cache does not 
actually distribute and cache blobs itself, but rather expects that each node 
in a running cluster has access to a well-known artifact resource.

We should get this for free whenever 
[https://github.com/apache/flink/pull/5580] is merged (likely in 1.5). For now, 
we will have to defer to external storage systems like GCS or HDFS.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4056) Identify Side Inputs by PTransform ID and local name

2018-04-11 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-4056:


 Summary: Identify Side Inputs by PTransform ID and local name
 Key: BEAM-4056
 URL: https://issues.apache.org/jira/browse/BEAM-4056
 Project: Beam
  Issue Type: New Feature
  Components: runner-core
Reporter: Ben Sidhom
Assignee: Ben Sidhom


This is necessary in order to correctly identify side inputs during all phases 
of portable pipeline execution (fusion, translation, and SDK execution).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3779) Enable deserialization of a non-java Pipeline

2018-04-11 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom closed BEAM-3779.

   Resolution: Won't Do
Fix Version/s: Not applicable

> Enable deserialization of a non-java Pipeline
> -
>
> Key: BEAM-3779
> URL: https://issues.apache.org/jira/browse/BEAM-3779
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Ben Sidhom
>Priority: Major
>  Labels: portability
> Fix For: Not applicable
>
>
> Currently, rehydrating a Pipeline works on the PCollection and PTransform 
> levels with the use of RawPTransform, but the runner-core-construction-java 
> utilities will throw if the runner attempts to deserialize a 
> WindowingStrategy or Coder which contains non-Java custom (or otherwise 
> unknown) Coders or WindowFns.
>  
> Use a strategy like RawPTransform to deserialize WindowFns and Coders, so 
> they can be interacted with as intermediate tokens in the java form.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3779) Enable deserialization of a non-java Pipeline

2018-04-11 Thread Ben Sidhom (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16434685#comment-16434685
 ] 

Ben Sidhom commented on BEAM-3779:
--

This is no longer relevant as we have decided to use protos directly during 
portable pipeline translation.

> Enable deserialization of a non-java Pipeline
> -
>
> Key: BEAM-3779
> URL: https://issues.apache.org/jira/browse/BEAM-3779
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Ben Sidhom
>Priority: Major
>  Labels: portability
>
> Currently, rehydrating a Pipeline works on the PCollection and PTransform 
> levels with the use of RawPTransform, but the runner-core-construction-java 
> utilities will throw if the runner attempts to deserialize a 
> WindowingStrategy or Coder which contains non-Java custom (or otherwise 
> unknown) Coders or WindowFns.
>  
> Use a strategy like RawPTransform to deserialize WindowFns and Coders, so 
> they can be interacted with as intermediate tokens in the java form.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3994) Use typed sinks and sources for FnApiControlClientPoolService

2018-04-03 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-3994:


 Summary: Use typed sinks and sources for 
FnApiControlClientPoolService
 Key: BEAM-3994
 URL: https://issues.apache.org/jira/browse/BEAM-3994
 Project: Beam
  Issue Type: Bug
  Components: runner-core
Reporter: Ben Sidhom
Assignee: Ben Sidhom


We operate with blocking queues directly when managing control clients with the 
FnApiControlClientPoolService. This makes interactions with the client pool 
difficult to understand. We should instead make client sources and sinks 
explicit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3976) SdkHarnessClient is thread-safe

2018-03-30 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-3976:


 Summary: SdkHarnessClient is thread-safe
 Key: BEAM-3976
 URL: https://issues.apache.org/jira/browse/BEAM-3976
 Project: Beam
  Issue Type: Bug
  Components: runner-core
Reporter: Ben Sidhom
Assignee: Axel Magnuson


In general, we want to share access to a given SDK harness among multiple 
runner workers as a way to amortize container startup and resource costs. 
Because control messages are multiplexed over the same shared control 
connection, this sharing currently requires external locking on the same shared 
lock object. This is error-prone and difficult to verify. The SdkHarnesClient 
should use internal mechanisms to provide thread-safety.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3970) Java SDK harness supports window_into

2018-03-29 Thread Ben Sidhom (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16419746#comment-16419746
 ] 

Ben Sidhom commented on BEAM-3970:
--

Scratch that last comment. I didn't realize that window mapping was distinct 
from window assignment.

> Java SDK harness supports window_into
> -
>
> Key: BEAM-3970
> URL: https://issues.apache.org/jira/browse/BEAM-3970
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-harness
>Reporter: Ben Sidhom
>Assignee: Luke Cwik
>Priority: Major
>
> The Java SDK harness does not currently register a PTransformRunnerFactory 
> for beam:transform:window_into:v1. We need this functionality for GroupByKey 
> transforms.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3970) Java SDK harness supports window_into

2018-03-29 Thread Ben Sidhom (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16419739#comment-16419739
 ] 

Ben Sidhom commented on BEAM-3970:
--

It looks like 
[WindowMappingFnRunner|https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java]
 should already support this. If so, we need to investigate why this is not 
being picked up by ExecutableStages.

> Java SDK harness supports window_into
> -
>
> Key: BEAM-3970
> URL: https://issues.apache.org/jira/browse/BEAM-3970
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-harness
>Reporter: Ben Sidhom
>Assignee: Luke Cwik
>Priority: Major
>
> The Java SDK harness does not currently register a PTransformRunnerFactory 
> for beam:transform:window_into:v1. We need this functionality for GroupByKey 
> transforms.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3971) Pipeline translation utilities should not use SDK construction classes

2018-03-29 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-3971:


 Summary: Pipeline translation utilities should not use SDK 
construction classes
 Key: BEAM-3971
 URL: https://issues.apache.org/jira/browse/BEAM-3971
 Project: Beam
  Issue Type: Bug
  Components: runner-core
Reporter: Ben Sidhom
Assignee: Kenneth Knowles


In general, portable runners will require access to pipeline information not 
available in rehydrated pipelines while constructing physical plans. 
Translation utilities should operate directly on protos or on thin, 
information-preserving wrappers.

The pipeline fusion utilities already operate on protos directly and can be 
used as an example of how this could be done.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3972) Flink runner translates pipelines directly by proto

2018-03-29 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-3972:


 Summary: Flink runner translates pipelines directly by proto
 Key: BEAM-3972
 URL: https://issues.apache.org/jira/browse/BEAM-3972
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Reporter: Ben Sidhom
Assignee: Ben Sidhom


The non-portable runner uses reydrated pipelines which lack necessary 
information. The portable Flink runner needs to translate pipelines directly by 
proto in order to wire components into individual executable stages correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3970) Java SDK harness supports window_into

2018-03-29 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-3970:


 Summary: Java SDK harness supports window_into
 Key: BEAM-3970
 URL: https://issues.apache.org/jira/browse/BEAM-3970
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-harness
Reporter: Ben Sidhom
Assignee: Luke Cwik


The Java SDK harness does not currently register a PTransformRunnerFactory for 
beam:transform:window_into:v1. We need this functionality for GroupByKey 
transforms.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3966) Move core utilities into a new top-level module

2018-03-28 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-3966:


 Summary: Move core utilities into a new top-level module
 Key: BEAM-3966
 URL: https://issues.apache.org/jira/browse/BEAM-3966
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-core
Reporter: Ben Sidhom
Assignee: Kenneth Knowles


As part of a longer-term dependency cleanup, fn-execution and similar utilities 
should be moved into a new top-level module (util?) that can be shared among 
runner and/or SDK code while clearly delineating the boundary between runner 
and SDK.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3952) GreedyStageFuserTest broken

2018-03-27 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-3952:


 Summary: GreedyStageFuserTest broken
 Key: BEAM-3952
 URL: https://issues.apache.org/jira/browse/BEAM-3952
 Project: Beam
  Issue Type: Bug
  Components: runner-core
Reporter: Ben Sidhom
Assignee: Thomas Groh


The materializesWithDifferentEnvConsumer test is currently failing due to a bad 
assertion. The fused subgraph contains the parDo.out PCollection but the test 
expects an empty output.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3888) Python SDK propagates pipeline options correctly when talking to Job API

2018-03-19 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-3888:


 Summary: Python SDK propagates pipeline options correctly when 
talking to Job API
 Key: BEAM-3888
 URL: https://issues.apache.org/jira/browse/BEAM-3888
 Project: Beam
  Issue Type: Sub-task
  Components: sdk-py-core
Reporter: Ben Sidhom
Assignee: Ahmet Altay


Pipeline options are not propagated correctly from the Python SDK to the 
runner. At least part of the problem is due to option key casing (e.g., between 
Java and Python). Additionally, some runners require special options for 
configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3887) Python SDK harness retrieves element codes from gRPC read/write nodes

2018-03-19 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-3887:


 Summary: Python SDK harness retrieves element codes from gRPC 
read/write nodes
 Key: BEAM-3887
 URL: https://issues.apache.org/jira/browse/BEAM-3887
 Project: Beam
  Issue Type: Sub-task
  Components: sdk-py-core
Reporter: Ben Sidhom
Assignee: Ahmet Altay


The python harness does not correctly get coders from the gRPC read/write 
transforms but rather from PCollection inputs/outputs. This prevents the 
harness from correctly receiving/transmitting prefix-encoded elements (seen as 
prefix-encoded byte arrays by the runner).

It should instead retrieve input and output coders from the gRPC nodes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3886) Python SDK harness does not contact State API if not needed

2018-03-19 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-3886:


 Summary: Python SDK harness does not contact State API if not 
needed
 Key: BEAM-3886
 URL: https://issues.apache.org/jira/browse/BEAM-3886
 Project: Beam
  Issue Type: Sub-task
  Components: sdk-py-core
Reporter: Ben Sidhom
Assignee: Ahmet Altay


The Python harness always talks to the State API, even if it is never used by 
the current process bundle. As a minor optimization and to make implementing 
new runners easier, the harness should not talk to the State server unless it's 
actually needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3885) Python SDK Read IO is expressed as Impulse -> ParDo

2018-03-19 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-3885:


 Summary: Python SDK Read IO is expressed as Impulse -> ParDo
 Key: BEAM-3885
 URL: https://issues.apache.org/jira/browse/BEAM-3885
 Project: Beam
  Issue Type: Sub-task
  Components: sdk-py-core
Reporter: Ben Sidhom
Assignee: Ahmet Altay


Portable runners cannot understand SDK-specific Read transforms. The Python SDK 
will need to rewrite Read as Impulse followed by a ParDo that actually does the 
IO.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3884) Python SDK supports Impulse as a primitive transform

2018-03-19 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-3884:


 Summary: Python SDK supports Impulse as a primitive transform
 Key: BEAM-3884
 URL: https://issues.apache.org/jira/browse/BEAM-3884
 Project: Beam
  Issue Type: Sub-task
  Components: sdk-py-core
Reporter: Ben Sidhom
Assignee: Ahmet Altay


Portable runners require Impulse to be the only root nodes of pipelines. The 
Python SDK should provide this for pipeline construction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3883) Python SDK stages artifacts when talking to job server

2018-03-19 Thread Ben Sidhom (JIRA)
Ben Sidhom created BEAM-3883:


 Summary: Python SDK stages artifacts when talking to job server
 Key: BEAM-3883
 URL: https://issues.apache.org/jira/browse/BEAM-3883
 Project: Beam
  Issue Type: Sub-task
  Components: sdk-py-core
Reporter: Ben Sidhom
Assignee: Ahmet Altay


The Python SDK does not currently stage its user-defined functions or 
dependencies when talking to the job API. Artifacts that need to be staged 
include the user code itself, any SDK components not included in the container 
image, and the list of Python packages that must be installed at runtime.

 

Artifacts that are currently expected can be found in the harness boot code: 
[https://github.com/apache/beam/blob/58e3b06bee7378d2d8db1c8dd534b415864f63e1/sdks/python/container/boot.go#L52.]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-2588) FlinkRunner shim for serving Job API

2018-03-07 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom reassigned BEAM-2588:


Assignee: Axel Magnuson

> FlinkRunner shim for serving Job API
> 
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>
> Whatever the result of https://s.apache.org/beam-job-api we will need a way 
> for the JVM-based FlinkRunner to receive and run pipelines authors in Python.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >