[jira] [Commented] (BEAM-5110) Reconile Flink JVM singleton management with deployment
[ https://issues.apache.org/jira/browse/BEAM-5110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580545#comment-16580545 ] Ben Sidhom commented on BEAM-5110: -- I meant to mention in the previous comment: as of now, this is not a correctness issue but rather a performance/resources issue. We generally want to allow SDK environment reuse to amortize startup costs (and save memory). The fact that the Python SDK does _not_ handle multiple control clients over a single channel is arguably an SDK harness bug or a configuration issue (e.g., we may need to allow the environment factory/manager to spawn new environments if requested). This is something that should be fleshed out in the fn-api contract and is orthogonal to JVM instance management in the portable Flink runner. > Reconile Flink JVM singleton management with deployment > --- > > Key: BEAM-5110 > URL: https://issues.apache.org/jira/browse/BEAM-5110 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > [~angoenka] noticed through debugging that multiple instances of > BatchFlinkExecutableStageContext.BatchFactory are loaded for a given job when > executing in standalone cluster mode. This context factory is responsible for > maintaining singleton state across a TaskManager (JVM) in order to share SDK > Environments across workers in a given job. The multiple-loading breaks > singleton semantics and results in an indeterminate number of Environments > being created. > It turns out that the [Flink classloading > mechanism|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/debugging_classloading.html] > is determined by deployment mode. Note that "user code" as referenced by > this link is actually the Flink job server jar. Actual end-user code lives > inside of the SDK Environment and uploaded artifacts. > In order to maintain singletons without resorting to IPC (for example, using > file locks and/or additional gRPC servers), we need to force non-dynamic > classloading. For example, this happens when jobs are submitted to YARN for > one-off deployments via `flink run`. However, connecting to an existing > (Flink standalone) deployment results in dynamic classloading. > We should investigate this behavior and either document (and attempt to > enforce) deployment modes that are consistent with our requirements, or (if > possible) create a custom classloader that enforces singleton loading. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5110) Reconile Flink JVM singleton management with deployment
[ https://issues.apache.org/jira/browse/BEAM-5110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580536#comment-16580536 ] Ben Sidhom commented on BEAM-5110: -- Sadly, after investigating this for a while, I think that the best option (for now) is to document (and possibly enforce in code where possible) the situations where dynamic code loading will not happen in the workers. Flink attempts to isolate user code with separate classloaders and Java does not expose a way to force shared classloaders (or even list unconnected classloaders) without special instrumentation. The following deployment modes use regular system class loading for the Beam Job server and workers: * Direct YARN deployments (using bin/flink run -m yarn-cluster) * Standalone/Docker/Kubernetes/Mesos sessions where the job server jar has been dropped into the lib/ directory of distributions. In this case, Flink's default classloading strategy will not work because child (Operator-scoped) classloaders override system classloaders; any user code (e.g., Beam environment manager code where the job server is submitted to a RemoteEnvironment) will override Flink/system classes. In order to get around this, Flink's [classloader.resolve-order|https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#classloader-resolve-order] must be set to parent-first _or_ classloader.parent-first-patterns-additional must be populated with the set of packages that require JVM singletons. In general, modes where the job server jar is sent to a new RemoteEnvironment will break singleton semantics. We should at the very least log a warning when this is detected in the job server, if disable this altogether. > Reconile Flink JVM singleton management with deployment > --- > > Key: BEAM-5110 > URL: https://issues.apache.org/jira/browse/BEAM-5110 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > [~angoenka] noticed through debugging that multiple instances of > BatchFlinkExecutableStageContext.BatchFactory are loaded for a given job when > executing in standalone cluster mode. This context factory is responsible for > maintaining singleton state across a TaskManager (JVM) in order to share SDK > Environments across workers in a given job. The multiple-loading breaks > singleton semantics and results in an indeterminate number of Environments > being created. > It turns out that the [Flink classloading > mechanism|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/debugging_classloading.html] > is determined by deployment mode. Note that "user code" as referenced by > this link is actually the Flink job server jar. Actual end-user code lives > inside of the SDK Environment and uploaded artifacts. > In order to maintain singletons without resorting to IPC (for example, using > file locks and/or additional gRPC servers), we need to force non-dynamic > classloading. For example, this happens when jobs are submitted to YARN for > one-off deployments via `flink run`. However, connecting to an existing > (Flink standalone) deployment results in dynamic classloading. > We should investigate this behavior and either document (and attempt to > enforce) deployment modes that are consistent with our requirements, or (if > possible) create a custom classloader that enforces singleton loading. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-5114) Create example uber jars for supported runners
Ben Sidhom created BEAM-5114: Summary: Create example uber jars for supported runners Key: BEAM-5114 URL: https://issues.apache.org/jira/browse/BEAM-5114 Project: Beam Issue Type: New Feature Components: examples-java Reporter: Ben Sidhom Assignee: Ben Sidhom Producing these artifacts results in several benefits * Gives an example of how to package user code for different runners * Enables ad-hoc testing of runner changes against real user pipelines easier * Enables integration testing end-to-end pipelines against different runner services -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5110) Reconile Flink JVM singleton management with deployment
[ https://issues.apache.org/jira/browse/BEAM-5110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16573681#comment-16573681 ] Ben Sidhom commented on BEAM-5110: -- Note that we can partially fix the issue by requiring that users who deploy in standalone "sessions" drop the Flink job server jar into the lib/ directory of all workers. That ensures that the necessary classes are available in the static top-level classloader. However, classloading in Flink is inverted from the usual behavior: "By default, Flink inverts classloading order, meaning it looks into the user code classloader first, and only looks into the parent (application classloader) if the class is not part of the dynamically loaded user code." Making sure this works in practice also requires a change in the Flink runner code where we create a connection to remote environments. We have to ensure that _no_ user jars are staged here because any classes defined here will be loaded dynamically and take precedence over the root classloader. > Reconile Flink JVM singleton management with deployment > --- > > Key: BEAM-5110 > URL: https://issues.apache.org/jira/browse/BEAM-5110 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > > [~angoenka] noticed through debugging that multiple instances of > BatchFlinkExecutableStageContext.BatchFactory are loaded for a given job when > executing in standalone cluster mode. This context factory is responsible for > maintaining singleton state across a TaskManager (JVM) in order to share SDK > Environments across workers in a given job. The multiple-loading breaks > singleton semantics and results in an indeterminate number of Environments > being created. > It turns out that the [Flink classloading > mechanism|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/debugging_classloading.html] > is determined by deployment mode. Note that "user code" as referenced by > this link is actually the Flink job server jar. Actual end-user code lives > inside of the SDK Environment and uploaded artifacts. > In order to maintain singletons without resorting to IPC (for example, using > file locks and/or additional gRPC servers), we need to force non-dynamic > classloading. For example, this happens when jobs are submitted to YARN for > one-off deployments via `flink run`. However, connecting to an existing > (Flink standalone) deployment results in dynamic classloading. > We should investigate this behavior and either document (and attempt to > enforce) deployment modes that are consistent with our requirements, or (if > possible) create a custom classloader that enforces singleton loading. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-5110) Reconile Flink JVM singleton management with deployment
[ https://issues.apache.org/jira/browse/BEAM-5110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom reassigned BEAM-5110: Assignee: Ben Sidhom > Reconile Flink JVM singleton management with deployment > --- > > Key: BEAM-5110 > URL: https://issues.apache.org/jira/browse/BEAM-5110 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > > [~angoenka] noticed through debugging that multiple instances of > BatchFlinkExecutableStageContext.BatchFactory are loaded for a given job when > executing in standalone cluster mode. This context factory is responsible for > maintaining singleton state across a TaskManager (JVM) in order to share SDK > Environments across workers in a given job. The multiple-loading breaks > singleton semantics and results in an indeterminate number of Environments > being created. > It turns out that the [Flink classloading > mechanism|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/debugging_classloading.html] > is determined by deployment mode. Note that "user code" as referenced by > this link is actually the Flink job server jar. Actual end-user code lives > inside of the SDK Environment and uploaded artifacts. > In order to maintain singletons without resorting to IPC (for example, using > file locks and/or additional gRPC servers), we need to force non-dynamic > classloading. For example, this happens when jobs are submitted to YARN for > one-off deployments via `flink run`. However, connecting to an existing > (Flink standalone) deployment results in dynamic classloading. > We should investigate this behavior and either document (and attempt to > enforce) deployment modes that are consistent with our requirements, or (if > possible) create a custom classloader that enforces singleton loading. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-5110) Reconile Flink JVM singleton management with deployment
Ben Sidhom created BEAM-5110: Summary: Reconile Flink JVM singleton management with deployment Key: BEAM-5110 URL: https://issues.apache.org/jira/browse/BEAM-5110 Project: Beam Issue Type: Bug Components: runner-flink Reporter: Ben Sidhom [~angoenka] noticed through debugging that multiple instances of BatchFlinkExecutableStageContext.BatchFactory are loaded for a given job when executing in standalone cluster mode. This context factory is responsible for maintaining singleton state across a TaskManager (JVM) in order to share SDK Environments across workers in a given job. The multiple-loading breaks singleton semantics and results in an indeterminate number of Environments being created. It turns out that the [Flink classloading mechanism|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/debugging_classloading.html] is determined by deployment mode. Note that "user code" as referenced by this link is actually the Flink job server jar. Actual end-user code lives inside of the SDK Environment and uploaded artifacts. In order to maintain singletons without resorting to IPC (for example, using file locks and/or additional gRPC servers), we need to force non-dynamic classloading. For example, this happens when jobs are submitted to YARN for one-off deployments via `flink run`. However, connecting to an existing (Flink standalone) deployment results in dynamic classloading. We should investigate this behavior and either document (and attempt to enforce) deployment modes that are consistent with our requirements, or (if possible) create a custom classloader that enforces singleton loading. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-4148) Local server api descriptors contain urls that work on Mac and Linux
[ https://issues.apache.org/jira/browse/BEAM-4148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568949#comment-16568949 ] Ben Sidhom commented on BEAM-4148: -- Basically, if the reference runner is manually creating its own Docker instances, it should stop doing that and start using the Docker factory. > Local server api descriptors contain urls that work on Mac and Linux > > > Key: BEAM-4148 > URL: https://issues.apache.org/jira/browse/BEAM-4148 > Project: Beam > Issue Type: Improvement > Components: runner-core >Reporter: Ben Sidhom >Priority: Minor > > Docker for Mac does not allow host networking and thus will not allow SDK > harnesses to access runner services via `localhost`. Instead, a special DNS > name is used to refer to the host machine: docker.for.mac.host.internal. > (Note that this value sometimes changes between Docker releases). > We should attempt to detect the host operating system and return different > API descriptors based on this. > See > [https://github.com/bsidhom/beam/commit/3adaeb0d33dc26f0910c1f8af2821cce4ee0b965] > for how this might be done. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-4148) Local server api descriptors contain urls that work on Mac and Linux
[ https://issues.apache.org/jira/browse/BEAM-4148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568948#comment-16568948 ] Ben Sidhom commented on BEAM-4148: -- The linked PR should have actually solved this for the Docker environment factory (https://github.com/apache/beam/pull/5392). I actually meant to close this since this is ideally the only place where Docker environments will be created. Where are you still seeing this? > Local server api descriptors contain urls that work on Mac and Linux > > > Key: BEAM-4148 > URL: https://issues.apache.org/jira/browse/BEAM-4148 > Project: Beam > Issue Type: Improvement > Components: runner-core >Reporter: Ben Sidhom >Priority: Minor > > Docker for Mac does not allow host networking and thus will not allow SDK > harnesses to access runner services via `localhost`. Instead, a special DNS > name is used to refer to the host machine: docker.for.mac.host.internal. > (Note that this value sometimes changes between Docker releases). > We should attempt to detect the host operating system and return different > API descriptors based on this. > See > [https://github.com/bsidhom/beam/commit/3adaeb0d33dc26f0910c1f8af2821cce4ee0b965] > for how this might be done. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526453#comment-16526453 ] Ben Sidhom commented on BEAM-2930: -- That works for the batch runner, but it will not work for the streaming runner. It depends on the scope of this bug. > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4615) Flink job server driver wrapper
Ben Sidhom created BEAM-4615: Summary: Flink job server driver wrapper Key: BEAM-4615 URL: https://issues.apache.org/jira/browse/BEAM-4615 Project: Beam Issue Type: New Feature Components: runner-flink Reporter: Ben Sidhom Assignee: Ben Sidhom This includes: * A gradle wrapper that can execute the Flink job server driver so that it can be easily run locally for testing/debugging. * A shadow ("uber") target that packages all portable Flink runner dependencies into a runnable jar. This jar can then be submitted to Flink clusters via `flink run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (BEAM-4523) Implement Flink batch ExecutableStage context
[ https://issues.apache.org/jira/browse/BEAM-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom closed BEAM-4523. Resolution: Fixed Fix Version/s: Not applicable > Implement Flink batch ExecutableStage context > - > > Key: BEAM-4523 > URL: https://issues.apache.org/jira/browse/BEAM-4523 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Fix For: Not applicable > > Time Spent: 1.5h > Remaining Estimate: 0h > > The the ExecutableStage context is a wrapper for the job and stage bundle > factories and pooled artifact sources. It should take care of caching the > overall job bundle factory since we do not have access to job lifecycle hooks > in Flink but would like to reuse services and resources across operators > within a given job. > FlinkExecutableStageContext already exists as a skeleton, but it needs to be > fleshed out. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (BEAM-4231) Runner utility for Coder instantiation
[ https://issues.apache.org/jira/browse/BEAM-4231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom closed BEAM-4231. Resolution: Fixed Fix Version/s: Not applicable > Runner utility for Coder instantiation > -- > > Key: BEAM-4231 > URL: https://issues.apache.org/jira/browse/BEAM-4231 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Minor > Fix For: Not applicable > > Time Spent: 1.5h > Remaining Estimate: 0h > > Portable runners need to instantiate coders for communicating with SDK > harnesses in a consistent way. They cannot simply instantiate coders as > defined by PCollections because some component coders may only be known to > specific SDKs. Unknown coders should be length-prefixed; the underlying > elements should only be exposed to runners as byte strings. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-4285) Flink batch state request handler
[ https://issues.apache.org/jira/browse/BEAM-4285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom reassigned BEAM-4285: Assignee: Ben Sidhom > Flink batch state request handler > - > > Key: BEAM-4285 > URL: https://issues.apache.org/jira/browse/BEAM-4285 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > > In order to support side inputs Flink needs a state service request handler. > As in the non-portable we can start by handling batch side inputs by Flink > broadcast variables. > [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java] > or > [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java] > can be used as a starting point. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4404) Fix docker run arguments
Ben Sidhom created BEAM-4404: Summary: Fix docker run arguments Key: BEAM-4404 URL: https://issues.apache.org/jira/browse/BEAM-4404 Project: Beam Issue Type: Bug Components: runner-core Reporter: Ben Sidhom Assignee: Thomas Groh Docker options need to be separate from container entrypoint arguments. See [https://github.com/bsidhom/beam/commit/65c152d14d54d1505bfc2637aeb32189e00d160d.] DockerCommand in master does not have this fix. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-4267) Implement a reusable library that can run an ExecutableStage with a given Environment
[ https://issues.apache.org/jira/browse/BEAM-4267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16484482#comment-16484482 ] Ben Sidhom commented on BEAM-4267: -- Yep! Although in order to actually run stuff end-to-end, we'll also need to have job submission and artifact staging and retrieval wired in. > Implement a reusable library that can run an ExecutableStage with a given > Environment > - > > Key: BEAM-4267 > URL: https://issues.apache.org/jira/browse/BEAM-4267 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Axel Magnuson >Assignee: Ben Sidhom >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > Build off of the interfaces introduced in > [BEAM-3327|https://github.com/apache/beam/pull/5152] to provide a reusable > execution library to runners. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4286) Pooled artifact source
Ben Sidhom created BEAM-4286: Summary: Pooled artifact source Key: BEAM-4286 URL: https://issues.apache.org/jira/browse/BEAM-4286 Project: Beam Issue Type: Bug Components: runner-flink Reporter: Ben Sidhom Assignee: Ben Sidhom Because DistributeCache lifetimes are tied to operator lifetimes in Flink, we need a way to wrap operator-scoped artifact sources. Artifacts are inherently job-scoped and should be the same throughout a job's lifetime. For this reason, it is safe to pool artifact sources and serve artifacts from an arbitrary pooled source as long as the underlying source is still in scope. We need a pooled source in order to satisfy the bundle factory interfaces. Using the job-scoped and stage-scoped bundle factories allows us to cache and reuse different components that serve SDK harnesses. Because the distributed cache lifetimes are specific to Flink, the pooled artifact source should probably live in a runner-specific directory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages
[ https://issues.apache.org/jira/browse/BEAM-2898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom closed BEAM-2898. Resolution: Fixed Fix Version/s: Not applicable > Flink supports chaining/fusion of single-SDK stages > --- > > Key: BEAM-2898 > URL: https://issues.apache.org/jira/browse/BEAM-2898 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Fix For: Not applicable > > Time Spent: 4h 50m > Remaining Estimate: 0h > > The Fn API supports fused stages, which avoids unnecessarily round-tripping > the data over the Fn API between stages. The Flink runner should use that > capability for better performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages
[ https://issues.apache.org/jira/browse/BEAM-2898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474569#comment-16474569 ] Ben Sidhom commented on BEAM-2898: -- I'm going to mark this as fixed since we have fusion and executable stage representation. Fusion is exercised by the portable translator. > Flink supports chaining/fusion of single-SDK stages > --- > > Key: BEAM-2898 > URL: https://issues.apache.org/jira/browse/BEAM-2898 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Fix For: Not applicable > > Time Spent: 4h 50m > Remaining Estimate: 0h > > The Fn API supports fused stages, which avoids unnecessarily round-tripping > the data over the Fn API between stages. The Flink runner should use that > capability for better performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4285) Flink batch state request handler
Ben Sidhom created BEAM-4285: Summary: Flink batch state request handler Key: BEAM-4285 URL: https://issues.apache.org/jira/browse/BEAM-4285 Project: Beam Issue Type: Bug Components: runner-flink Reporter: Ben Sidhom In order to support side inputs Flink needs a state service request handler. As in the non-portable we can start by handling batch side inputs by Flink broadcast variables. [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java] or [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java] can be used as a starting point. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4271) Executable stages allow side input coders to be set and/or queried
Ben Sidhom created BEAM-4271: Summary: Executable stages allow side input coders to be set and/or queried Key: BEAM-4271 URL: https://issues.apache.org/jira/browse/BEAM-4271 Project: Beam Issue Type: Bug Components: runner-core Reporter: Ben Sidhom ProcessBundleDescriptors may contain side input references from inner PTransforms. These side inputs do not have explicit coders; instead, SDK harnesses use the PCollection coders by default. Using the default PCollection coder as specified at pipeline construction is in general not the correct thing to do. When PCollection elements are materialized, any coders unknown to a runner a length-prefixed. This means that materialized PCollections do not use their original element coders. Side inputs are delivered to SDKs via MultimapSideInput StateRequests. The responses to these requests are expected to contain all of the values for a given key (and window), coded with the PCollection KV.value coder, concatenated. However, at the time of serving these requests on the runner side, we do not have enough information to reconstruct the original value coders. There are different ways to address this issue. For example: * Modify the associated PCollection coder to match the coder that the runner uses to materialize elements. This means that anywhere a given PCollection is used within a given bundle, it will use the runner-safe coder. This may introduce inefficiencies but should be "correct". * Annotate side inputs with explicit coders. This guarantees that the key and value coders used by the runner match the coders used by SDKs. Furthermore, it allows the _runners_ to specify coders. This involves changes to the proto models and all SDKs. * Annotate side input state requests with both key and value coders. This inverts the expected responsibility and has the SDK determine runner coders. Additionally, because runners do not understand all SDK types, additional coder substitution will need to be done at request handling time to make sure that the requested coder can be instantiated and will remain consistent with the SDK coder. This requires only small changes to SDKs because they may opt to use their default PCollection coders. All of the these approaches have their own downsides. Explicit side input coders is probably the right thing to do long-term, but the simplest change for now is to modify PCollection coders to match exactly how they're materialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-2594) Python shim for submitting to the ULR
[ https://issues.apache.org/jira/browse/BEAM-2594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467995#comment-16467995 ] Ben Sidhom commented on BEAM-2594: -- I'm not sure exactly what this entails. It depends exactly what we want the "ULR" to be responsible for. To start with, I think it's reasonable to just submit to a running job service with the appropriate pipeline options. Eventually, it should actually spin up the job service it will use. However, before it can actually start up any job services, we need to nail down our job service "entry point" story. > Python shim for submitting to the ULR > - > > Key: BEAM-2594 > URL: https://issues.apache.org/jira/browse/BEAM-2594 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Kenneth Knowles >Priority: Minor > Labels: portability > > Python SDK should support submission of portable pipelines to the ULR, as per > https://s.apache.org/beam-job-api. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom closed BEAM-3972. Resolution: Fixed Fix Version/s: Not applicable > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Fix For: Not applicable > > Time Spent: 8h 40m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4231) Runner utility for Coder instantiation
Ben Sidhom created BEAM-4231: Summary: Runner utility for Coder instantiation Key: BEAM-4231 URL: https://issues.apache.org/jira/browse/BEAM-4231 Project: Beam Issue Type: Bug Components: runner-core Reporter: Ben Sidhom Assignee: Ben Sidhom Portable runners need to instantiate coders for communicating with SDK harnesses in a consistent way. They cannot simply instantiate coders as defined by PCollections because some component coders may only be known to specific SDKs. Unknown coders should be length-prefixed; the underlying elements should only be exposed to runners as byte strings. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-4228) The FlinkRunner shouldn't require all of the values for a key to fit in memory
[ https://issues.apache.org/jira/browse/BEAM-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16461769#comment-16461769 ] Ben Sidhom commented on BEAM-4228: -- For context, see [https://github.com/apache/beam/pull/5226/files#r185652571.] > The FlinkRunner shouldn't require all of the values for a key to fit in memory > -- > > Key: BEAM-4228 > URL: https://issues.apache.org/jira/browse/BEAM-4228 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Thomas Groh >Priority: Major > > The use of a reducer that adds all of the elements that it consumes to a list > is the primary way in which this occurs - if instead, we produce a filtered > iterable, or a collection of filtered iterables, we can lazily iterate over > all of the contained elements without having to buffer all of the elements. > > For an example of where this occurs, see {{Concatenate}} in > {{FlinkBatchPortablePipelineTranslator}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-3914) 'Unzip' flattens before performing fusion
[ https://issues.apache.org/jira/browse/BEAM-3914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom resolved BEAM-3914. -- Resolution: Fixed Fix Version/s: Not applicable > 'Unzip' flattens before performing fusion > - > > Key: BEAM-3914 > URL: https://issues.apache.org/jira/browse/BEAM-3914 > Project: Beam > Issue Type: Improvement > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: Not applicable > > Time Spent: 4h > Remaining Estimate: 0h > > This consists of duplicating nodes downstream of a flatten that exist within > an environment, and reintroducing the flatten immediately upstream of a > runner-executed transform (the flatten should be executed within the runner) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom reassigned BEAM-3327: Assignee: Axel Magnuson (was: Ben Sidhom) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 24h 50m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-4149) Java SDK Harness should populate worker id in control plane headers
[ https://issues.apache.org/jira/browse/BEAM-4149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459052#comment-16459052 ] Ben Sidhom commented on BEAM-4149: -- The workaround is technically fine for now, but it will break as soon as we have multiple concurrent environments and need to run test cases for this. More importantly, having an empty worker id makes it harder to debug test and runtime failures due to SDK/runner communication issues. > Java SDK Harness should populate worker id in control plane headers > --- > > Key: BEAM-4149 > URL: https://issues.apache.org/jira/browse/BEAM-4149 > Project: Beam > Issue Type: Bug > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Luke Cwik >Priority: Minor > > The Java SDK harness currently does nothing to populate control plane headers > with the harness worker id. This id is necessary in order to identify > harnesses when multiple are run from the same runner control server. > Note that this affects the _Java_ harness specifically (e.g., when running a > local process or in-memory harness). When the harness launched within the > docker container, the go boot code takes care of setting this: > https://github.com/apache/beam/blob/dffe50924f34d3cc994008703f01e802c99913d2/sdks/java/container/boot.go#L70 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-4147) Artifact source abstractions
[ https://issues.apache.org/jira/browse/BEAM-4147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom updated BEAM-4147: - Summary: Artifact source abstractions (was: Portable runner Job API abstractions) > Artifact source abstractions > > > Key: BEAM-4147 > URL: https://issues.apache.org/jira/browse/BEAM-4147 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Ben Sidhom >Assignee: Axel Magnuson >Priority: Major > > We need a way to wire in arbitrary runner artifact storage backends into the > job server and through to artifact staging on workers. This requires some new > abstractions in front of the job service. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-4176) Portable batch runner passes all ValidatesRunner tests that non-portable runner passes
[ https://issues.apache.org/jira/browse/BEAM-4176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom reassigned BEAM-4176: Assignee: (was: Aljoscha Krettek) > Portable batch runner passes all ValidatesRunner tests that non-portable > runner passes > -- > > Key: BEAM-4176 > URL: https://issues.apache.org/jira/browse/BEAM-4176 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Priority: Major > > We need this as a sanity check that runner execution is correct. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4176) Portable batch runner passes all ValidatesRunner tests that non-portable runner passes
Ben Sidhom created BEAM-4176: Summary: Portable batch runner passes all ValidatesRunner tests that non-portable runner passes Key: BEAM-4176 URL: https://issues.apache.org/jira/browse/BEAM-4176 Project: Beam Issue Type: Bug Components: runner-flink Reporter: Ben Sidhom Assignee: Aljoscha Krettek We need this as a sanity check that runner execution is correct. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-4063) Flink runner supports cluster-wide artifact deployments through the Distributed Cache
[ https://issues.apache.org/jira/browse/BEAM-4063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453033#comment-16453033 ] Ben Sidhom commented on BEAM-4063: -- Note that the DistributedCache already supports registering artifacts by arbitrary names–the registered artifact name mapping is stored in memory in a map and written out using Java serialization. > Flink runner supports cluster-wide artifact deployments through the > Distributed Cache > - > > Key: BEAM-4063 > URL: https://issues.apache.org/jira/browse/BEAM-4063 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Priority: Minor > > As of now, Flink effectively has a dependency on an external storage system > for artifact management. This is because the Flink Distributed Cache does not > actually distribute and cache blobs itself, but rather expects that each node > in a running cluster has access to a well-known artifact resource. > We should get this for free whenever > [https://github.com/apache/flink/pull/5580] is merged (likely in 1.5). For > now, we will have to defer to external storage systems like GCS or HDFS. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-2421) Migrate Apache Beam to use impulse primitive as the only root primitive
[ https://issues.apache.org/jira/browse/BEAM-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448782#comment-16448782 ] Ben Sidhom commented on BEAM-2421: -- [https://github.com/apache/beam/pull/4783] adds support for the Flink batch runner. > Migrate Apache Beam to use impulse primitive as the only root primitive > --- > > Key: BEAM-2421 > URL: https://issues.apache.org/jira/browse/BEAM-2421 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Luke Cwik >Priority: Major > Labels: portability > Time Spent: 4.5h > Remaining Estimate: 0h > > The impulse source emits a single byte array element within the global window. > This is in preparation for using SDF as the replacement for different bounded > and unbounded source implementations. > Before: > Read(Source) -> ParDo -> ... > Impulse -> SDF(Source) -> ParDo -> ... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-4149) Java SDK Harness should populate worker id in control plane headers
[ https://issues.apache.org/jira/browse/BEAM-4149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448638#comment-16448638 ] Ben Sidhom commented on BEAM-4149: -- As a workaround for now, we should explicitly set "worker_id" to the empty string in the header accessor. > Java SDK Harness should populate worker id in control plane headers > --- > > Key: BEAM-4149 > URL: https://issues.apache.org/jira/browse/BEAM-4149 > Project: Beam > Issue Type: Bug > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Luke Cwik >Priority: Minor > > The Java SDK harness currently does nothing to populate control plane headers > with the harness worker id. This id is necessary in order to identify > harnesses when multiple are run from the same runner control server. > Note that this affects the _Java_ harness specifically (e.g., when running a > local process or in-memory harness). When the harness launched within the > docker container, the go boot code takes care of setting this: > https://github.com/apache/beam/blob/dffe50924f34d3cc994008703f01e802c99913d2/sdks/java/container/boot.go#L70 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-4106) Merge staging file options between Dataflow runner and portable runner
[ https://issues.apache.org/jira/browse/BEAM-4106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom updated BEAM-4106: - Description: Both runners (will) require staging file options. This should be refactored such that the same options are used. Java pipelines stage all entries on the classpath (assuming a URLClassLoader is used) by default. The new merged pipeline options should have similar behavior and be documented as such. was:Both runners (will) require staging file options. This should be refactored such that the same options are used. > Merge staging file options between Dataflow runner and portable runner > -- > > Key: BEAM-4106 > URL: https://issues.apache.org/jira/browse/BEAM-4106 > Project: Beam > Issue Type: Improvement > Components: runner-core, runner-dataflow >Reporter: Ben Sidhom >Assignee: Kenneth Knowles >Priority: Trivial > > Both runners (will) require staging file options. This should be refactored > such that the same options are used. > Java pipelines stage all entries on the classpath (assuming a URLClassLoader > is used) by default. The new merged pipeline options should have similar > behavior and be documented as such. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-4151) Prevent control client pool service from leaking stale client references
[ https://issues.apache.org/jira/browse/BEAM-4151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom reassigned BEAM-4151: Assignee: Ben Sidhom (was: Kenneth Knowles) > Prevent control client pool service from leaking stale client references > > > Key: BEAM-4151 > URL: https://issues.apache.org/jira/browse/BEAM-4151 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Minor > > The pool service that connects incoming gRPC control clients to consumers may > leak client references. See > [https://github.com/apache/beam/pull/5189#discussion_r183144364] for context. > It's worth considering whether we actually expect to have enough incoming > control clients to make this an issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4151) Prevent control client pool service from leaking stale client references
Ben Sidhom created BEAM-4151: Summary: Prevent control client pool service from leaking stale client references Key: BEAM-4151 URL: https://issues.apache.org/jira/browse/BEAM-4151 Project: Beam Issue Type: Bug Components: runner-core Reporter: Ben Sidhom Assignee: Kenneth Knowles The pool service that connects incoming gRPC control clients to consumers may leak client references. See [https://github.com/apache/beam/pull/5189#discussion_r183144364] for context. It's worth considering whether we actually expect to have enough incoming control clients to make this an issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-2889) Flink runs portable pipelines
[ https://issues.apache.org/jira/browse/BEAM-2889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom reassigned BEAM-2889: Assignee: Ben Sidhom (was: Aljoscha Krettek) > Flink runs portable pipelines > - > > Key: BEAM-2889 > URL: https://issues.apache.org/jira/browse/BEAM-2889 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Henning Rohde >Assignee: Ben Sidhom >Priority: Major > Labels: portability > > Flink should run pipelines using the full portability API as currently > defined: > https://s.apache.org/beam-fn-api > https://s.apache.org/beam-runner-api > https://s.apache.org/beam-job-api > https://s.apache.org/beam-fn-api-container-contract > This issue tracks its adoption of the portability framework. New Fn API and > other features will be tracked separately. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-3792) Python submits portable pipelines to the Flink-served endpoint.
[ https://issues.apache.org/jira/browse/BEAM-3792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom updated BEAM-3792: - Issue Type: Bug (was: Sub-task) Parent: (was: BEAM-2889) > Python submits portable pipelines to the Flink-served endpoint. > --- > > Key: BEAM-3792 > URL: https://issues.apache.org/jira/browse/BEAM-3792 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Robert Bradshaw >Assignee: Robert Bradshaw >Priority: Major > Fix For: 2.5.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom updated BEAM-2588: - Issue Type: New Feature (was: Sub-task) Parent: (was: BEAM-2889) > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > > The portable Flink runner needs to be wired into a job server so that it can > accept jobs the job api (https://s.apache.org/beam-job-api). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-2795) FlinkRunner: translate using SDK-agnostic means
[ https://issues.apache.org/jira/browse/BEAM-2795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom updated BEAM-2795: - Issue Type: Improvement (was: Sub-task) Parent: (was: BEAM-2889) > FlinkRunner: translate using SDK-agnostic means > --- > > Key: BEAM-2795 > URL: https://issues.apache.org/jira/browse/BEAM-2795 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Fix For: 2.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-2897) Configurable container/process management
[ https://issues.apache.org/jira/browse/BEAM-2897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom updated BEAM-2897: - Issue Type: New Feature (was: Sub-task) Parent: (was: BEAM-2889) > Configurable container/process management > - > > Key: BEAM-2897 > URL: https://issues.apache.org/jira/browse/BEAM-2897 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Fix For: Not applicable > > > Filnk should support configurable container/process management as per > https://s.apache.org/beam-fn-api-container-contract. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-3886) Python SDK harness uses State API from ProcessBundleDescriptor
[ https://issues.apache.org/jira/browse/BEAM-3886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom updated BEAM-3886: - Description: The Python harness should pull the state api descriptor from the current process bundle descriptor when processing bundles. As a minor optimization and to make implementing new runners easier, the harness should not talk to the State server unless it's actually needed. was:The Python harness always talks to the State API, even if it is never used by the current process bundle. As a minor optimization and to make implementing new runners easier, the harness should not talk to the State server unless it's actually needed. > Python SDK harness uses State API from ProcessBundleDescriptor > -- > > Key: BEAM-3886 > URL: https://issues.apache.org/jira/browse/BEAM-3886 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Ben Sidhom >Assignee: Ahmet Altay >Priority: Minor > Time Spent: 10m > Remaining Estimate: 0h > > The Python harness should pull the state api descriptor from the current > process bundle descriptor when processing bundles. > As a minor optimization and to make implementing new runners easier, the > harness should not talk to the State server unless it's actually needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-3886) Python SDK harness uses State API from ProcessBundleDescriptor
[ https://issues.apache.org/jira/browse/BEAM-3886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom updated BEAM-3886: - Summary: Python SDK harness uses State API from ProcessBundleDescriptor (was: Python SDK harness does not contact State API if not needed) > Python SDK harness uses State API from ProcessBundleDescriptor > -- > > Key: BEAM-3886 > URL: https://issues.apache.org/jira/browse/BEAM-3886 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Ben Sidhom >Assignee: Ahmet Altay >Priority: Minor > Time Spent: 10m > Remaining Estimate: 0h > > The Python harness always talks to the State API, even if it is never used by > the current process bundle. As a minor optimization and to make implementing > new runners easier, the harness should not talk to the State server unless > it's actually needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4149) Java SDK Harness should populate worker id in control plane headers
Ben Sidhom created BEAM-4149: Summary: Java SDK Harness should populate worker id in control plane headers Key: BEAM-4149 URL: https://issues.apache.org/jira/browse/BEAM-4149 Project: Beam Issue Type: Bug Components: sdk-java-harness Reporter: Ben Sidhom Assignee: Luke Cwik The Java SDK harness currently does nothing to populate control plane headers with the harness worker id. This id is necessary in order to identify harnesses when multiple are run from the same runner control server. Note that this affects the _Java_ harness specifically (e.g., when running a local process or in-memory harness). When the harness launched within the docker container, the go boot code takes care of setting this: https://github.com/apache/beam/blob/dffe50924f34d3cc994008703f01e802c99913d2/sdks/java/container/boot.go#L70 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4148) Local server api descriptors contain urls that work on Mac and Linux
Ben Sidhom created BEAM-4148: Summary: Local server api descriptors contain urls that work on Mac and Linux Key: BEAM-4148 URL: https://issues.apache.org/jira/browse/BEAM-4148 Project: Beam Issue Type: Improvement Components: runner-core Reporter: Ben Sidhom Assignee: Kenneth Knowles Docker for Mac does not allow host networking and thus will not allow SDK harnesses to access runner services via `localhost`. Instead, a special DNS name is used to refer to the host machine: docker.for.mac.host.internal. (Note that this value sometimes changes between Docker releases). We should attempt to detect the host operating system and return different API descriptors based on this. See [https://github.com/bsidhom/beam/commit/3adaeb0d33dc26f0910c1f8af2821cce4ee0b965] for how this might be done. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4147) Portable runner Job API abstractions
Ben Sidhom created BEAM-4147: Summary: Portable runner Job API abstractions Key: BEAM-4147 URL: https://issues.apache.org/jira/browse/BEAM-4147 Project: Beam Issue Type: New Feature Components: runner-core Reporter: Ben Sidhom Assignee: Axel Magnuson We need a way to wire in arbitrary runner artifact storage backends into the job server and through to artifact staging on workers. This requires some new abstractions in front of the job service. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4146) Python SDK sets environment in portable pipelines
Ben Sidhom created BEAM-4146: Summary: Python SDK sets environment in portable pipelines Key: BEAM-4146 URL: https://issues.apache.org/jira/browse/BEAM-4146 Project: Beam Issue Type: Bug Components: sdk-py-core Reporter: Ben Sidhom Assignee: Ahmet Altay Environments must be set in any non-runner-executed transforms. See [https://github.com/bsidhom/beam/commit/0362fd1f25] for a possible approach until canonical image urls are created. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4145) Java SDK Harness populates control request headers with worker id
Ben Sidhom created BEAM-4145: Summary: Java SDK Harness populates control request headers with worker id Key: BEAM-4145 URL: https://issues.apache.org/jira/browse/BEAM-4145 Project: Beam Issue Type: New Feature Components: sdk-java-harness Reporter: Ben Sidhom Assignee: Luke Cwik Runner code needs to be able to identify incoming harness connections by the worker ids that it assigns to them on creation. This is currently done by the go boot code when the harness runs in a docker container. However, in-process harnesses never specify worker ids. This prevents in-process harnesses from being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-3673) FlinkRunner: Harness manager for connecting operators to SDK Harnesses
[ https://issues.apache.org/jira/browse/BEAM-3673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom reassigned BEAM-3673: Assignee: Axel Magnuson > FlinkRunner: Harness manager for connecting operators to SDK Harnesses > -- > > Key: BEAM-3673 > URL: https://issues.apache.org/jira/browse/BEAM-3673 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Axel Magnuson >Priority: Major > > SDK harnesses require a common set of gRPC services to operate. The role of > the harness manager is to provide a uniform interface that multiplexes data > streams and auxiliary data between SDK environments and operators within a > given job. > Note that multiple operators may communicate with a single SDK environment to > amortize container initialization cost. Environments are _not_ shared between > different jobs. > The initial implementation will shell out to local docker, but the harness > manager should eventually support working with externally-managed > environments (e.g., created by Kubernetes). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom reassigned BEAM-3327: Assignee: Ben Sidhom (was: Axel Magnuson) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 8h 50m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-4109) Support arbitrary artifact names in local file artifact staging service
[ https://issues.apache.org/jira/browse/BEAM-4109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom reassigned BEAM-4109: Assignee: (was: Kenneth Knowles) > Support arbitrary artifact names in local file artifact staging service > --- > > Key: BEAM-4109 > URL: https://issues.apache.org/jira/browse/BEAM-4109 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Ben Sidhom >Priority: Minor > > The local-file based artifact staging service implementation stores artifacts > in a flat directory under the exact names they are given by the artifact > staging request. This assumes that all artifact names are safe file names and > requires staging clients to manually escape names. > Instead, the staging service should perform its own escaping/mapping > transparently and allow clients to specify arbitrary artifact staging names. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4131) Python SDK harness container image contains SDK and dependencies
Ben Sidhom created BEAM-4131: Summary: Python SDK harness container image contains SDK and dependencies Key: BEAM-4131 URL: https://issues.apache.org/jira/browse/BEAM-4131 Project: Beam Issue Type: New Feature Components: sdk-py-harness Reporter: Ben Sidhom Assignee: Robert Bradshaw The Docker image for the SDK harness should contain SDK code and dependencies so that this does not need to be downloaded from the artifact retrieval service at each boot. This is required for operation with portable runners right now because the python client does not currently stage the SDK itself (as it does with the Dataflow runner). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4130) Portable Flink runner entry point
Ben Sidhom created BEAM-4130: Summary: Portable Flink runner entry point Key: BEAM-4130 URL: https://issues.apache.org/jira/browse/BEAM-4130 Project: Beam Issue Type: New Feature Components: runner-flink Reporter: Ben Sidhom The portable Flink runner exists as a Job Service that runs somewhere. We need a main entry point that itself spins up the job service (and artifact staging service). The main program itself should be packaged into an uberjar such that it can be run locally or submitted to a Flink deployment via `flink run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-2597) FlinkRunner ExecutableStage batch operator
[ https://issues.apache.org/jira/browse/BEAM-2597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom updated BEAM-2597: - Description: This operator will execute user code in the context of an SDK harness by constructing a ProcessBundleDescriptor from an ExecutableStage (physical stage plan) and sending instructions/elements over the control and data planes. (was: To run non-Java SDK code is to put together an operator that manages a Fn API client DoFnRunner and an SDK harness Fn API server.) > FlinkRunner ExecutableStage batch operator > -- > > Key: BEAM-2597 > URL: https://issues.apache.org/jira/browse/BEAM-2597 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Ben Sidhom >Priority: Major > Labels: portability > > This operator will execute user code in the context of an SDK harness by > constructing a ProcessBundleDescriptor from an ExecutableStage (physical > stage plan) and sending instructions/elements over the control and data > planes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-2597) FlinkRunner ExecutableStage operator
[ https://issues.apache.org/jira/browse/BEAM-2597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom updated BEAM-2597: - Description: To run non-Java SDK code is to put together an operator that manages a Fn API client DoFnRunner and an SDK harness Fn API server. was: To run non-Java SDK code is to put together an operator that manages a Fn API client DoFnRunner and an SDK harness Fn API server. (filing to organize steps, details of this may evolve as it is implemented) > FlinkRunner ExecutableStage operator > > > Key: BEAM-2597 > URL: https://issues.apache.org/jira/browse/BEAM-2597 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Ben Sidhom >Priority: Major > Labels: portability > > > To run non-Java SDK code is to put together an operator that manages a Fn API > client DoFnRunner and an SDK harness Fn API server. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-2597) FlinkRunner ExecutableStage batch operator
[ https://issues.apache.org/jira/browse/BEAM-2597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom updated BEAM-2597: - Summary: FlinkRunner ExecutableStage batch operator (was: FlinkRunner ExecutableStage operator) > FlinkRunner ExecutableStage batch operator > -- > > Key: BEAM-2597 > URL: https://issues.apache.org/jira/browse/BEAM-2597 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Ben Sidhom >Priority: Major > Labels: portability > > > To run non-Java SDK code is to put together an operator that manages a Fn API > client DoFnRunner and an SDK harness Fn API server. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-2597) FlinkRunner ExecutableStage operator
[ https://issues.apache.org/jira/browse/BEAM-2597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom updated BEAM-2597: - Summary: FlinkRunner ExecutableStage operator (was: FlinkRunner Fn API based ParDo operator) > FlinkRunner ExecutableStage operator > > > Key: BEAM-2597 > URL: https://issues.apache.org/jira/browse/BEAM-2597 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Ben Sidhom >Priority: Major > Labels: portability > > To run non-Java SDK code is to put together an operator that manages a Fn API > client DoFnRunner and an SDK harness Fn API server. > (filing to organize steps, details of this may evolve as it is implemented) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-2897) Configurable container/process management
[ https://issues.apache.org/jira/browse/BEAM-2897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom resolved BEAM-2897. -- Resolution: Duplicate Fix Version/s: Not applicable > Configurable container/process management > - > > Key: BEAM-2897 > URL: https://issues.apache.org/jira/browse/BEAM-2897 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Fix For: Not applicable > > > Filnk should support configurable container/process management as per > https://s.apache.org/beam-fn-api-container-contract. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4128) Handle WindowInto transforms in portable Flink runner directly
Ben Sidhom created BEAM-4128: Summary: Handle WindowInto transforms in portable Flink runner directly Key: BEAM-4128 URL: https://issues.apache.org/jira/browse/BEAM-4128 Project: Beam Issue Type: New Feature Components: runner-flink Reporter: Ben Sidhom Assignee: Aljoscha Krettek This is mostly an optimization, but by having the runner itself handle well-known WindowFns (e.g., global windowing, fixed windowing), we can avoid round trips to the SDK harness. This requires some pipeline surgery pre-translation if we want it to work efficiently in conjunction with fusion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-4127) Flink portable runner translates streaming pipelines by proto
[ https://issues.apache.org/jira/browse/BEAM-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom reassigned BEAM-4127: Assignee: (was: Aljoscha Krettek) > Flink portable runner translates streaming pipelines by proto > - > > Key: BEAM-4127 > URL: https://issues.apache.org/jira/browse/BEAM-4127 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom updated BEAM-3972: - Summary: Flink runner translates batch pipelines directly by proto (was: Flink runner translates pipelines directly by proto) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4127) Flink portable runner translates streaming pipelines by proto
Ben Sidhom created BEAM-4127: Summary: Flink portable runner translates streaming pipelines by proto Key: BEAM-4127 URL: https://issues.apache.org/jira/browse/BEAM-4127 Project: Beam Issue Type: New Feature Components: runner-flink Reporter: Ben Sidhom Assignee: Aljoscha Krettek -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-4109) Support arbitrary artifact names in local file artifact staging service
[ https://issues.apache.org/jira/browse/BEAM-4109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom updated BEAM-4109: - Issue Type: Bug (was: Improvement) > Support arbitrary artifact names in local file artifact staging service > --- > > Key: BEAM-4109 > URL: https://issues.apache.org/jira/browse/BEAM-4109 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Ben Sidhom >Assignee: Kenneth Knowles >Priority: Minor > > The local-file based artifact staging service implementation stores artifacts > in a flat directory under the exact names they are given by the artifact > staging request. This assumes that all artifact names are safe file names and > requires staging clients to manually escape names. > Instead, the staging service should perform its own escaping/mapping > transparently and allow clients to specify arbitrary artifact staging names. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (BEAM-4056) Identify Side Inputs by PTransform ID and local name
[ https://issues.apache.org/jira/browse/BEAM-4056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom closed BEAM-4056. Resolution: Fixed Fix Version/s: Not applicable > Identify Side Inputs by PTransform ID and local name > > > Key: BEAM-4056 > URL: https://issues.apache.org/jira/browse/BEAM-4056 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Minor > Fix For: Not applicable > > Time Spent: 2h 50m > Remaining Estimate: 0h > > This is necessary in order to correctly identify side inputs during all > phases of portable pipeline execution (fusion, translation, and SDK > execution). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (BEAM-3994) Use typed sinks and sources for FnApiControlClientPoolService
[ https://issues.apache.org/jira/browse/BEAM-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom closed BEAM-3994. Resolution: Fixed Fix Version/s: Not applicable > Use typed sinks and sources for FnApiControlClientPoolService > - > > Key: BEAM-3994 > URL: https://issues.apache.org/jira/browse/BEAM-3994 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Minor > Fix For: Not applicable > > Time Spent: 2h 20m > Remaining Estimate: 0h > > We operate with blocking queues directly when managing control clients with > the FnApiControlClientPoolService. This makes interactions with the client > pool difficult to understand. We should instead make client sources and sinks > explicit. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (BEAM-4069) Empty pipeline options can be gracefully serialized/deserialized
[ https://issues.apache.org/jira/browse/BEAM-4069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom closed BEAM-4069. Resolution: Fixed Fix Version/s: Not applicable > Empty pipeline options can be gracefully serialized/deserialized > > > Key: BEAM-4069 > URL: https://issues.apache.org/jira/browse/BEAM-4069 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Minor > Fix For: Not applicable > > Time Spent: 1h 20m > Remaining Estimate: 0h > > PipelineOptionsTranslation.fromProto currently crashes with a > NullPointerException when passed an empty options Struct. This is due to > ProxyInvocationHandler.Deserializer expecting a non-empty enclosing Struct. > Empty pipeline options may be passed by SDKs interacting with a job server, > so this case needs to be handled. Note that testing a round-trip of an > effectively-empty Java PipelineOptions object is not sufficient to catch this > because "empty" Java options still contain default fields not defined in > other SDKs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4116) Add a timeout to artifact stager
Ben Sidhom created BEAM-4116: Summary: Add a timeout to artifact stager Key: BEAM-4116 URL: https://issues.apache.org/jira/browse/BEAM-4116 Project: Beam Issue Type: Improvement Components: runner-core Reporter: Ben Sidhom Assignee: Kenneth Knowles ArtifactServiceStager currently blocks indefinitely while waiting for all artifacts to stage. It would be nice to have a configurable timeout here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4109) Support arbitrary artifact names in local file artifact staging service
Ben Sidhom created BEAM-4109: Summary: Support arbitrary artifact names in local file artifact staging service Key: BEAM-4109 URL: https://issues.apache.org/jira/browse/BEAM-4109 Project: Beam Issue Type: Improvement Components: runner-core Reporter: Ben Sidhom Assignee: Kenneth Knowles The local-file based artifact staging service implementation stores artifacts in a flat directory under the exact names they are given by the artifact staging request. This assumes that all artifact names are safe file names and requires staging clients to manually escape names. Instead, the staging service should perform its own escaping/mapping transparently and allow clients to specify arbitrary artifact staging names. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4106) Merge staging file options between Dataflow runner and portable runner
Ben Sidhom created BEAM-4106: Summary: Merge staging file options between Dataflow runner and portable runner Key: BEAM-4106 URL: https://issues.apache.org/jira/browse/BEAM-4106 Project: Beam Issue Type: Improvement Components: runner-core, runner-dataflow Reporter: Ben Sidhom Assignee: Kenneth Knowles Both runners (will) require staging file options. This should be refactored such that the same options are used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom reassigned BEAM-3327: Assignee: Axel Magnuson (was: Ben Sidhom) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 4h 50m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4071) Portable Runner Job API shim
Ben Sidhom created BEAM-4071: Summary: Portable Runner Job API shim Key: BEAM-4071 URL: https://issues.apache.org/jira/browse/BEAM-4071 Project: Beam Issue Type: New Feature Components: runner-core Reporter: Ben Sidhom Assignee: Ben Sidhom There needs to be a way to execute Java-SDK pipelines against a portable job server. The job server itself is expected to be started up out-of-band. The "PortableRunner" should take an option indicating the Job API endpoint and defer other runner configurations to the backend itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom updated BEAM-2588: - Description: The portable Flink runner needs to be wired into a job server so that it can accept jobs the job api (https://s.apache.org/beam-job-api). (was: Whatever the result of https://s.apache.org/beam-job-api we will need a way for the JVM-based FlinkRunner to receive and run pipelines authors in Python.) > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > > The portable Flink runner needs to be wired into a job server so that it can > accept jobs the job api (https://s.apache.org/beam-job-api). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437781#comment-16437781 ] Ben Sidhom commented on BEAM-2588: -- I've renamed this bug to reflect the fact that the portable Flink runner will effectively be its own runner entrypoint. The Job API itself does not actually deal with Java "Runners" at all. > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > > Whatever the result of https://s.apache.org/beam-job-api we will need a way > for the JVM-based FlinkRunner to receive and run pipelines authors in Python. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom updated BEAM-2588: - Summary: Portable Flink Runner Job API (was: FlinkRunner shim for serving Job API) > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > > Whatever the result of https://s.apache.org/beam-job-api we will need a way > for the JVM-based FlinkRunner to receive and run pipelines authors in Python. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages
[ https://issues.apache.org/jira/browse/BEAM-2898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437763#comment-16437763 ] Ben Sidhom commented on BEAM-2898: -- Pipeline fusion has been implemented in runner-core. With support for impulse, the Flink runner will support pipeline fusion once translation by proto has been implemented. > Flink supports chaining/fusion of single-SDK stages > --- > > Key: BEAM-2898 > URL: https://issues.apache.org/jira/browse/BEAM-2898 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 4h 50m > Remaining Estimate: 0h > > The Fn API supports fused stages, which avoids unnecessarily round-tripping > the data over the Fn API between stages. The Flink runner should use that > capability for better performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages
[ https://issues.apache.org/jira/browse/BEAM-2898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom reassigned BEAM-2898: Assignee: Ben Sidhom > Flink supports chaining/fusion of single-SDK stages > --- > > Key: BEAM-2898 > URL: https://issues.apache.org/jira/browse/BEAM-2898 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 4h 50m > Remaining Estimate: 0h > > The Fn API supports fused stages, which avoids unnecessarily round-tripping > the data over the Fn API between stages. The Flink runner should use that > capability for better performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-4068) Consistent option specification between SDKs and runners by URN
[ https://issues.apache.org/jira/browse/BEAM-4068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437686#comment-16437686 ] Ben Sidhom commented on BEAM-4068: -- See [https://github.com/axelmagn/beam/pull/5/files] for an example. Note that the specific runner options there are not relevant, but that runners in general require arbitrary options. > Consistent option specification between SDKs and runners by URN > --- > > Key: BEAM-4068 > URL: https://issues.apache.org/jira/browse/BEAM-4068 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Ben Sidhom >Assignee: Kenneth Knowles >Priority: Minor > > Pipeline options are materialized differently by different SDKs. However, in > some cases, runners require Java-specific options that are not available > elsewhere. We should decide on well-known URNs and use them across SDKs where > applicable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4069) Empty pipeline options can be gracefully serialized/deserialized
Ben Sidhom created BEAM-4069: Summary: Empty pipeline options can be gracefully serialized/deserialized Key: BEAM-4069 URL: https://issues.apache.org/jira/browse/BEAM-4069 Project: Beam Issue Type: Bug Components: runner-core Reporter: Ben Sidhom Assignee: Ben Sidhom PipelineOptionsTranslation.fromProto currently crashes with a NullPointerException when passed an empty options Struct. This is due to ProxyInvocationHandler.Deserializer expecting a non-empty enclosing Struct. Empty pipeline options may be passed by SDKs interacting with a job server, so this case needs to be handled. Note that testing a round-trip of an effectively-empty Java PipelineOptions object is not sufficient to catch this because "empty" Java options still contain default fields not defined in other SDKs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4068) Consistent option specification between SDKs and runners by URN
Ben Sidhom created BEAM-4068: Summary: Consistent option specification between SDKs and runners by URN Key: BEAM-4068 URL: https://issues.apache.org/jira/browse/BEAM-4068 Project: Beam Issue Type: New Feature Components: runner-core Reporter: Ben Sidhom Assignee: Kenneth Knowles Pipeline options are materialized differently by different SDKs. However, in some cases, runners require Java-specific options that are not available elsewhere. We should decide on well-known URNs and use them across SDKs where applicable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4067) Add portable Flink test runner
Ben Sidhom created BEAM-4067: Summary: Add portable Flink test runner Key: BEAM-4067 URL: https://issues.apache.org/jira/browse/BEAM-4067 Project: Beam Issue Type: New Feature Components: runner-flink Reporter: Ben Sidhom Assignee: Aljoscha Krettek The portable Flink runner cannot be tested through the normal mechanisms used for ValidatesRunner tests because it requires a job server to be constructed out of band and for pipelines to be run through it. We should implement a shim that acts as a standard Java SDK Runner that spins up the necessary server (possibly in-process) and runs against it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4063) Flink runner supports cluster-wide artifact deployments through the Distributed Cache
Ben Sidhom created BEAM-4063: Summary: Flink runner supports cluster-wide artifact deployments through the Distributed Cache Key: BEAM-4063 URL: https://issues.apache.org/jira/browse/BEAM-4063 Project: Beam Issue Type: New Feature Components: runner-flink Reporter: Ben Sidhom Assignee: Aljoscha Krettek As of now, Flink effectively has a dependency on an external storage system for artifact management. This is because the Flink Distributed Cache does not actually distribute and cache blobs itself, but rather expects that each node in a running cluster has access to a well-known artifact resource. We should get this for free whenever [https://github.com/apache/flink/pull/5580] is merged (likely in 1.5). For now, we will have to defer to external storage systems like GCS or HDFS. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4056) Identify Side Inputs by PTransform ID and local name
Ben Sidhom created BEAM-4056: Summary: Identify Side Inputs by PTransform ID and local name Key: BEAM-4056 URL: https://issues.apache.org/jira/browse/BEAM-4056 Project: Beam Issue Type: New Feature Components: runner-core Reporter: Ben Sidhom Assignee: Ben Sidhom This is necessary in order to correctly identify side inputs during all phases of portable pipeline execution (fusion, translation, and SDK execution). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (BEAM-3779) Enable deserialization of a non-java Pipeline
[ https://issues.apache.org/jira/browse/BEAM-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom closed BEAM-3779. Resolution: Won't Do Fix Version/s: Not applicable > Enable deserialization of a non-java Pipeline > - > > Key: BEAM-3779 > URL: https://issues.apache.org/jira/browse/BEAM-3779 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Fix For: Not applicable > > > Currently, rehydrating a Pipeline works on the PCollection and PTransform > levels with the use of RawPTransform, but the runner-core-construction-java > utilities will throw if the runner attempts to deserialize a > WindowingStrategy or Coder which contains non-Java custom (or otherwise > unknown) Coders or WindowFns. > > Use a strategy like RawPTransform to deserialize WindowFns and Coders, so > they can be interacted with as intermediate tokens in the java form. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3779) Enable deserialization of a non-java Pipeline
[ https://issues.apache.org/jira/browse/BEAM-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16434685#comment-16434685 ] Ben Sidhom commented on BEAM-3779: -- This is no longer relevant as we have decided to use protos directly during portable pipeline translation. > Enable deserialization of a non-java Pipeline > - > > Key: BEAM-3779 > URL: https://issues.apache.org/jira/browse/BEAM-3779 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > > Currently, rehydrating a Pipeline works on the PCollection and PTransform > levels with the use of RawPTransform, but the runner-core-construction-java > utilities will throw if the runner attempts to deserialize a > WindowingStrategy or Coder which contains non-Java custom (or otherwise > unknown) Coders or WindowFns. > > Use a strategy like RawPTransform to deserialize WindowFns and Coders, so > they can be interacted with as intermediate tokens in the java form. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-3994) Use typed sinks and sources for FnApiControlClientPoolService
Ben Sidhom created BEAM-3994: Summary: Use typed sinks and sources for FnApiControlClientPoolService Key: BEAM-3994 URL: https://issues.apache.org/jira/browse/BEAM-3994 Project: Beam Issue Type: Bug Components: runner-core Reporter: Ben Sidhom Assignee: Ben Sidhom We operate with blocking queues directly when managing control clients with the FnApiControlClientPoolService. This makes interactions with the client pool difficult to understand. We should instead make client sources and sinks explicit. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-3976) SdkHarnessClient is thread-safe
Ben Sidhom created BEAM-3976: Summary: SdkHarnessClient is thread-safe Key: BEAM-3976 URL: https://issues.apache.org/jira/browse/BEAM-3976 Project: Beam Issue Type: Bug Components: runner-core Reporter: Ben Sidhom Assignee: Axel Magnuson In general, we want to share access to a given SDK harness among multiple runner workers as a way to amortize container startup and resource costs. Because control messages are multiplexed over the same shared control connection, this sharing currently requires external locking on the same shared lock object. This is error-prone and difficult to verify. The SdkHarnesClient should use internal mechanisms to provide thread-safety. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3970) Java SDK harness supports window_into
[ https://issues.apache.org/jira/browse/BEAM-3970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16419746#comment-16419746 ] Ben Sidhom commented on BEAM-3970: -- Scratch that last comment. I didn't realize that window mapping was distinct from window assignment. > Java SDK harness supports window_into > - > > Key: BEAM-3970 > URL: https://issues.apache.org/jira/browse/BEAM-3970 > Project: Beam > Issue Type: Bug > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Luke Cwik >Priority: Major > > The Java SDK harness does not currently register a PTransformRunnerFactory > for beam:transform:window_into:v1. We need this functionality for GroupByKey > transforms. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3970) Java SDK harness supports window_into
[ https://issues.apache.org/jira/browse/BEAM-3970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16419739#comment-16419739 ] Ben Sidhom commented on BEAM-3970: -- It looks like [WindowMappingFnRunner|https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java] should already support this. If so, we need to investigate why this is not being picked up by ExecutableStages. > Java SDK harness supports window_into > - > > Key: BEAM-3970 > URL: https://issues.apache.org/jira/browse/BEAM-3970 > Project: Beam > Issue Type: Bug > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Luke Cwik >Priority: Major > > The Java SDK harness does not currently register a PTransformRunnerFactory > for beam:transform:window_into:v1. We need this functionality for GroupByKey > transforms. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-3971) Pipeline translation utilities should not use SDK construction classes
Ben Sidhom created BEAM-3971: Summary: Pipeline translation utilities should not use SDK construction classes Key: BEAM-3971 URL: https://issues.apache.org/jira/browse/BEAM-3971 Project: Beam Issue Type: Bug Components: runner-core Reporter: Ben Sidhom Assignee: Kenneth Knowles In general, portable runners will require access to pipeline information not available in rehydrated pipelines while constructing physical plans. Translation utilities should operate directly on protos or on thin, information-preserving wrappers. The pipeline fusion utilities already operate on protos directly and can be used as an example of how this could be done. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-3972) Flink runner translates pipelines directly by proto
Ben Sidhom created BEAM-3972: Summary: Flink runner translates pipelines directly by proto Key: BEAM-3972 URL: https://issues.apache.org/jira/browse/BEAM-3972 Project: Beam Issue Type: Bug Components: runner-flink Reporter: Ben Sidhom Assignee: Ben Sidhom The non-portable runner uses reydrated pipelines which lack necessary information. The portable Flink runner needs to translate pipelines directly by proto in order to wire components into individual executable stages correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-3970) Java SDK harness supports window_into
Ben Sidhom created BEAM-3970: Summary: Java SDK harness supports window_into Key: BEAM-3970 URL: https://issues.apache.org/jira/browse/BEAM-3970 Project: Beam Issue Type: Bug Components: sdk-java-harness Reporter: Ben Sidhom Assignee: Luke Cwik The Java SDK harness does not currently register a PTransformRunnerFactory for beam:transform:window_into:v1. We need this functionality for GroupByKey transforms. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-3966) Move core utilities into a new top-level module
Ben Sidhom created BEAM-3966: Summary: Move core utilities into a new top-level module Key: BEAM-3966 URL: https://issues.apache.org/jira/browse/BEAM-3966 Project: Beam Issue Type: Bug Components: sdk-java-core Reporter: Ben Sidhom Assignee: Kenneth Knowles As part of a longer-term dependency cleanup, fn-execution and similar utilities should be moved into a new top-level module (util?) that can be shared among runner and/or SDK code while clearly delineating the boundary between runner and SDK. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-3952) GreedyStageFuserTest broken
Ben Sidhom created BEAM-3952: Summary: GreedyStageFuserTest broken Key: BEAM-3952 URL: https://issues.apache.org/jira/browse/BEAM-3952 Project: Beam Issue Type: Bug Components: runner-core Reporter: Ben Sidhom Assignee: Thomas Groh The materializesWithDifferentEnvConsumer test is currently failing due to a bad assertion. The fused subgraph contains the parDo.out PCollection but the test expects an empty output. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-3888) Python SDK propagates pipeline options correctly when talking to Job API
Ben Sidhom created BEAM-3888: Summary: Python SDK propagates pipeline options correctly when talking to Job API Key: BEAM-3888 URL: https://issues.apache.org/jira/browse/BEAM-3888 Project: Beam Issue Type: Sub-task Components: sdk-py-core Reporter: Ben Sidhom Assignee: Ahmet Altay Pipeline options are not propagated correctly from the Python SDK to the runner. At least part of the problem is due to option key casing (e.g., between Java and Python). Additionally, some runners require special options for configuration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-3887) Python SDK harness retrieves element codes from gRPC read/write nodes
Ben Sidhom created BEAM-3887: Summary: Python SDK harness retrieves element codes from gRPC read/write nodes Key: BEAM-3887 URL: https://issues.apache.org/jira/browse/BEAM-3887 Project: Beam Issue Type: Sub-task Components: sdk-py-core Reporter: Ben Sidhom Assignee: Ahmet Altay The python harness does not correctly get coders from the gRPC read/write transforms but rather from PCollection inputs/outputs. This prevents the harness from correctly receiving/transmitting prefix-encoded elements (seen as prefix-encoded byte arrays by the runner). It should instead retrieve input and output coders from the gRPC nodes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-3886) Python SDK harness does not contact State API if not needed
Ben Sidhom created BEAM-3886: Summary: Python SDK harness does not contact State API if not needed Key: BEAM-3886 URL: https://issues.apache.org/jira/browse/BEAM-3886 Project: Beam Issue Type: Sub-task Components: sdk-py-core Reporter: Ben Sidhom Assignee: Ahmet Altay The Python harness always talks to the State API, even if it is never used by the current process bundle. As a minor optimization and to make implementing new runners easier, the harness should not talk to the State server unless it's actually needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-3885) Python SDK Read IO is expressed as Impulse -> ParDo
Ben Sidhom created BEAM-3885: Summary: Python SDK Read IO is expressed as Impulse -> ParDo Key: BEAM-3885 URL: https://issues.apache.org/jira/browse/BEAM-3885 Project: Beam Issue Type: Sub-task Components: sdk-py-core Reporter: Ben Sidhom Assignee: Ahmet Altay Portable runners cannot understand SDK-specific Read transforms. The Python SDK will need to rewrite Read as Impulse followed by a ParDo that actually does the IO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-3884) Python SDK supports Impulse as a primitive transform
Ben Sidhom created BEAM-3884: Summary: Python SDK supports Impulse as a primitive transform Key: BEAM-3884 URL: https://issues.apache.org/jira/browse/BEAM-3884 Project: Beam Issue Type: Sub-task Components: sdk-py-core Reporter: Ben Sidhom Assignee: Ahmet Altay Portable runners require Impulse to be the only root nodes of pipelines. The Python SDK should provide this for pipeline construction. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-3883) Python SDK stages artifacts when talking to job server
Ben Sidhom created BEAM-3883: Summary: Python SDK stages artifacts when talking to job server Key: BEAM-3883 URL: https://issues.apache.org/jira/browse/BEAM-3883 Project: Beam Issue Type: Sub-task Components: sdk-py-core Reporter: Ben Sidhom Assignee: Ahmet Altay The Python SDK does not currently stage its user-defined functions or dependencies when talking to the job API. Artifacts that need to be staged include the user code itself, any SDK components not included in the container image, and the list of Python packages that must be installed at runtime. Artifacts that are currently expected can be found in the harness boot code: [https://github.com/apache/beam/blob/58e3b06bee7378d2d8db1c8dd534b415864f63e1/sdks/python/container/boot.go#L52.] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-2588) FlinkRunner shim for serving Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Sidhom reassigned BEAM-2588: Assignee: Axel Magnuson > FlinkRunner shim for serving Job API > > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > > Whatever the result of https://s.apache.org/beam-job-api we will need a way > for the JVM-based FlinkRunner to receive and run pipelines authors in Python. -- This message was sent by Atlassian JIRA (v7.6.3#76005)