[jira] [Commented] (BEAM-10079) Samza tests failing [Java 11]
[ https://issues.apache.org/jira/browse/BEAM-10079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125416#comment-17125416 ] Xinyu Liu commented on BEAM-10079: -- [~kenn]: hmm, this doesn't happen in our local folk at LinkedIn so not sure whether it has something to do with Java 11. We can take a look on our side. > Samza tests failing [Java 11] > - > > Key: BEAM-10079 > URL: https://issues.apache.org/jira/browse/BEAM-10079 > Project: Beam > Issue Type: Sub-task > Components: runner-samza >Reporter: Pawel Pasterz >Priority: P2 > > Gradle task *_:runners:samza:test_* fails during Java 11 Precommit job > > Example stack trace: > {code:java} > > Task :runners:samza:test > May 26, 2020 7:33:55 AM > org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory$SamzaTimerState > loadEventTimeTimers > INFO: Loaded 0 event time timers in memory > May 26, 2020 7:33:55 AM > org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory$SamzaTimerState > loadProcessingTimeTimers > INFO: Loaded 0 processing time timers in memory > May 26, 2020 7:33:56 AM org.apache.samza.util.Logging$class info > INFO: Registering task instances with consumers. > May 26, 2020 7:33:56 AM org.apache.samza.util.Logging$class info > INFO: Starting consumer multiplexer. > May 26, 2020 7:33:56 AM org.apache.samza.util.Logging$class info > INFO: Entering run loop. > May 26, 2020 7:33:56 AM > org.apache.samza.processor.StreamProcessor$ContainerListener afterStart > WARNING: Received container start notification for container: > org.apache.samza.container.SamzaContainer@af148fb in stream processor: 1. > May 26, 2020 7:33:56 AM org.apache.samza.util.Logging$class info > INFO: End of stream reached for partition: SystemStreamPartition > [11-PAssert_0_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_, > > 11-PAssert_0_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_, > 0] > May 26, 2020 7:33:56 AM org.apache.beam.runners.samza.runtime.OpAdapter > processWatermark > SEVERE: Op org.apache.beam.runners.samza.runtime.OpAdapter threw an > exception during processing watermark > java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: > java.lang.IncompatibleClassChangeError: Method > org.apache.samza.storage.kv.KeyValueStore.deleteAll(Ljava/util/List;)V must > be InterfaceMethodref constant > at > org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.withMetrics(DoFnRunnerWithMetrics.java:98) > at > org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:55) > at > org.apache.beam.runners.samza.runtime.GroupByKeyOp.fireTimer(GroupByKeyOp.java:225) > at > org.apache.beam.runners.samza.runtime.GroupByKeyOp.processWatermark(GroupByKeyOp.java:203) > at > org.apache.beam.runners.samza.runtime.OpAdapter.processWatermark(OpAdapter.java:109) > at > org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:399) > at > org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1603) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550) > at > java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) > at > java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517) > at > org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434) > at > org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416) > at > java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1106) > at > java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2235) > at > java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:143) > at > org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416) > at > org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1603) > at >
[jira] [Assigned] (BEAM-8459) Samza runner fails UsesStrictTimerOrdering category tests
[ https://issues.apache.org/jira/browse/BEAM-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xinyu Liu reassigned BEAM-8459: --- Assignee: Xinyu Liu > Samza runner fails UsesStrictTimerOrdering category tests > - > > Key: BEAM-8459 > URL: https://issues.apache.org/jira/browse/BEAM-8459 > Project: Beam > Issue Type: Bug > Components: runner-samza >Affects Versions: 2.17.0 >Reporter: Jan Lukavský >Assignee: Xinyu Liu >Priority: Major > > BEAM-7520 introduced new set of validatesRunner tests that test that timers > are fired exactly in order of increasing timestamp. Samza runner fails these > added tests (are currently ignored). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-7059) SamzaRunner: fix the job.id inconsistency in the new Samza version
[ https://issues.apache.org/jira/browse/BEAM-7059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xinyu Liu updated BEAM-7059: Description: The new Samza 1.1.0 version introduces a backward incompatible change which requires setting both app.id and job.id. We need to make sure setting both in SamzaRunner. Also add more logs to it. was: The new Samza 1.1.0 version introduces a back-incompatible change which requires setting both app.id and job.id. We need to make sure setting both in SamzaRunner. Also add more logs to it. > SamzaRunner: fix the job.id inconsistency in the new Samza version > -- > > Key: BEAM-7059 > URL: https://issues.apache.org/jira/browse/BEAM-7059 > Project: Beam > Issue Type: Bug > Components: runner-samza >Reporter: Xinyu Liu >Assignee: Xinyu Liu >Priority: Major > > The new Samza 1.1.0 version introduces a backward incompatible change which > requires setting both app.id and job.id. We need to make sure setting both in > SamzaRunner. > Also add more logs to it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7059) SamzaRunner: fix the job.id inconsistency in the new Samza version
Xinyu Liu created BEAM-7059: --- Summary: SamzaRunner: fix the job.id inconsistency in the new Samza version Key: BEAM-7059 URL: https://issues.apache.org/jira/browse/BEAM-7059 Project: Beam Issue Type: Bug Components: runner-samza Reporter: Xinyu Liu Assignee: Xinyu Liu The new Samza 1.1.0 version introduces a back-incompatible change which requires setting both app.id and job.id. We need to make sure setting both in SamzaRunner. Also add more logs to it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6994) SamzaRunner: further improvements for upgrading Samza
Xinyu Liu created BEAM-6994: --- Summary: SamzaRunner: further improvements for upgrading Samza Key: BEAM-6994 URL: https://issues.apache.org/jira/browse/BEAM-6994 Project: Beam Issue Type: Improvement Components: runner-samza Reporter: Xinyu Liu Assignee: Xinyu Liu More improvements for SamzaRunner: - Life cycle methods for the pipeline runtime - Hook up Samza ExternalContext for LinkedIin use cases - Support metrics reporters in pipeline options - Some bug fixes for the state key in Samza -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6932) SamzaRunner: migrate to use new Samza 1.1.0 liraries
Xinyu Liu created BEAM-6932: --- Summary: SamzaRunner: migrate to use new Samza 1.1.0 liraries Key: BEAM-6932 URL: https://issues.apache.org/jira/browse/BEAM-6932 Project: Beam Issue Type: Improvement Components: runner-samza Reporter: Xinyu Liu Assignee: Xinyu Liu Update SamzaRunner to use the latest Samza release libraries (1.1.0). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6866) SamzaRunner: support timers in ParDo
Xinyu Liu created BEAM-6866: --- Summary: SamzaRunner: support timers in ParDo Key: BEAM-6866 URL: https://issues.apache.org/jira/browse/BEAM-6866 Project: Beam Issue Type: Improvement Components: runner-samza Reporter: Xinyu Liu Assignee: Xinyu Liu This patch adds support of timers in ParDo. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6767) SamzaRunner: Add more parameters to SamzaPipelineOptions
Xinyu Liu created BEAM-6767: --- Summary: SamzaRunner: Add more parameters to SamzaPipelineOptions Key: BEAM-6767 URL: https://issues.apache.org/jira/browse/BEAM-6767 Project: Beam Issue Type: Improvement Components: runner-samza Reporter: Xinyu Liu Assignee: Xinyu Liu Add the following parameter support: 1. Allow enable/disable beam metrics in SamzaPipelineOptions 2. Enable durable state with Samza pipeline options 3. Enable host-affinity for durable state Also fixed a bug to update watermark hold state only when it's changed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-6550) ParDo Async Java API
[ https://issues.apache.org/jira/browse/BEAM-6550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xinyu Liu reassigned BEAM-6550: --- Assignee: Bharath Kumarasubramanian (was: Xinyu Liu) > ParDo Async Java API > > > Key: BEAM-6550 > URL: https://issues.apache.org/jira/browse/BEAM-6550 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Xinyu Liu >Assignee: Bharath Kumarasubramanian >Priority: Major > > This ticket is to track the work on adding the ParDo async API. The > motivation for this is: > - Many users are experienced in asynchronous programming. With async > frameworks such as Netty and ParSeq and libs like async jersey client, they > are able to make remote calls efficiently and the libraries help manage the > execution threads underneath. Async remote calls are very common in most of > our streaming applications today. > - Many jobs are running on a multi-tenancy cluster. Async processing helps > for less resource usage and fast computation (less context switch). > This API has become one of the most asked Java api from SamzaRunner users. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-6550) ParDo Async Java API
[ https://issues.apache.org/jira/browse/BEAM-6550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16756377#comment-16756377 ] Xinyu Liu commented on BEAM-6550: - Summary from email thread: >From [~kenn]: If the input is a CompletionStage then the output should also be a CompletionStage, since all you should do is async chaining. We could enforce this by giving the DoFn an OutputReceiver(CompletionStage). Another possibility that might be even more robust against poor future use could be process(@Element InputT element, @Output OutputReceiver>). In this way, the process method itself will be async chained, rather than counting on the user to do the right thing. When executed over the portable APIs, it will be primarily the Java SDK harness that makes all of these decisions. If we wanted runners to have some insight into it we would have to add it to the Beam model protos. I don't have any suggestions there, so I would leave it out of this discussion until there's good ideas. We could learn a lot by trying it out just in the SDK harness. >From [~SteveNiemitz]: I'd love to see something like this as well. Also +1 to process(@Element InputT element, @Output OutputReceiver>). I don't know if there's much benefit to passing a future in, since the framework itself could hook up the process function to complete when the future completes. I feel like I've spent a bunch of time writing very similar "kick off a future in ProcessElement, join it in FinishBundle" code, and looking around beam itself a lot of built-in transforms do it as well. Scio provides a few AsyncDoFn implementations [1] but it'd be great to see this as a first-class concept in beam itself. Doing error handling, concurrency, etc correctly can be tricky. [1] https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java >From [~swegner]: A related question is how to make execution observable such that a runner can make proper scaling decisions. Runners decide how to schedule bundles within and across multiple worker instances, and can use information about execution to make dynamic scaling decisions. First-class async APIs seem like they would encourage DoFn authors to implement their own parallelization, rather than deferring to the runner that should be more capable of providing the right level of parallelism. In the Dataflow worker harness, we estimate execution time to PTransform steps by sampling execution time on the execution thread and attributing it to the currently invoked method. This approach is fairly simple and possible because we assume that execution happens within the thread controlled by the runner. Some DoFn's already implement their own async logic and break this assumption; I would expect more if we make async built into the DoFn APIs. So: this isn't an argument against async APIs, but rather: does this break execution observability, and are there other lightweight mechanisms for attributing execution time of async work? [~robertwb]: If I understand correctly, the end goal is to process input elements of a DoFn asynchronously. Were I to do this naively, I would implement DoFns that simply take and receive [Serializable?]CompletionStages as element types, followed by a DoFn that adds a callback to emit on completion (possibly via a queue to avoid being-on-the-wrong-thread issues) and whose finalize forces all completions. This would, of course, interact poorly with processing time tracking, fusion breaks, watermark tracking, counter attribution, window propagation, etc. so it is desirable to make it part of the system itself. Taking a OutputReceiver> seems like a decent API. The invoking of the downstream process could be chained onto this, with all the implicit tracking and tracing set up correctly. Taking a CompletionStage as input means a DoFn would not have to create its output CompletionStage ex nihilo and possibly allow for better chaining (depending on the asynchronous APIs used). Even better might be to simply let the invocation of all DoFn.process() methods be asynchronous, but as Java doesn't offer an await primitive to relinquish control in the middle of a function body this might be hard. I think for correctness, completion would have to be forced at the end of each bundle. If your bundles are large enough, this may not be that big of a deal. In this case you could also start executing subsequent bundles while waiting for prior ones to complete. > ParDo Async Java API > > > Key: BEAM-6550 > URL: https://issues.apache.org/jira/browse/BEAM-6550 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Xinyu Liu >Assignee: Xinyu Liu >Priority: Major > > This ticket is to track the work on adding the ParDo
[jira] [Created] (BEAM-6550) ParDo Async Java API
Xinyu Liu created BEAM-6550: --- Summary: ParDo Async Java API Key: BEAM-6550 URL: https://issues.apache.org/jira/browse/BEAM-6550 Project: Beam Issue Type: Improvement Components: sdk-java-core Reporter: Xinyu Liu Assignee: Kenneth Knowles This ticket is to track the work on adding the ParDo async API. The motivation for this is: - Many users are experienced in asynchronous programming. With async frameworks such as Netty and ParSeq and libs like async jersey client, they are able to make remote calls efficiently and the libraries help manage the execution threads underneath. Async remote calls are very common in most of our streaming applications today. - Many jobs are running on a multi-tenancy cluster. Async processing helps for less resource usage and fast computation (less context switch). This API has become one of the most asked Java api from SamzaRunner users. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-6550) ParDo Async Java API
[ https://issues.apache.org/jira/browse/BEAM-6550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xinyu Liu reassigned BEAM-6550: --- Assignee: Xinyu Liu (was: Kenneth Knowles) > ParDo Async Java API > > > Key: BEAM-6550 > URL: https://issues.apache.org/jira/browse/BEAM-6550 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Xinyu Liu >Assignee: Xinyu Liu >Priority: Major > > This ticket is to track the work on adding the ParDo async API. The > motivation for this is: > - Many users are experienced in asynchronous programming. With async > frameworks such as Netty and ParSeq and libs like async jersey client, they > are able to make remote calls efficiently and the libraries help manage the > execution threads underneath. Async remote calls are very common in most of > our streaming applications today. > - Many jobs are running on a multi-tenancy cluster. Async processing helps > for less resource usage and fast computation (less context switch). > This API has become one of the most asked Java api from SamzaRunner users. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-5732) expose runner mode to user through samza pipeline option
[ https://issues.apache.org/jira/browse/BEAM-5732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xinyu Liu reassigned BEAM-5732: --- Assignee: Daniel Chen (was: Xinyu Liu) > expose runner mode to user through samza pipeline option > > > Key: BEAM-5732 > URL: https://issues.apache.org/jira/browse/BEAM-5732 > Project: Beam > Issue Type: Improvement > Components: runner-samza >Reporter: Hai Lu >Assignee: Daniel Chen >Priority: Major > > We should expose runner mode to user through samza pipeline option so that > user can decide whether to start samza job as local mode or remote mode. > This should work consistently in both Java runner and Portable runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6382) SamzaRunner: add an option to read configs using a user-defined factory
Xinyu Liu created BEAM-6382: --- Summary: SamzaRunner: add an option to read configs using a user-defined factory Key: BEAM-6382 URL: https://issues.apache.org/jira/browse/BEAM-6382 Project: Beam Issue Type: Improvement Components: runner-samza Reporter: Xinyu Liu Assignee: Xinyu Liu We need an option to read configs from a factory which is useful in Yarn as well as user-defined file format. By default this config factory is to read property file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6286) Add SamzaRunner profile to mvn archetype
Xinyu Liu created BEAM-6286: --- Summary: Add SamzaRunner profile to mvn archetype Key: BEAM-6286 URL: https://issues.apache.org/jira/browse/BEAM-6286 Project: Beam Issue Type: Improvement Components: runner-samza Reporter: Xinyu Liu Assignee: Xinyu Liu Seems this was lost during the conversion to gradle. Add this back. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6264) SamzaRunner: support metrics enable/disable and set state durable in SamzaPipelineOptions
Xinyu Liu created BEAM-6264: --- Summary: SamzaRunner: support metrics enable/disable and set state durable in SamzaPipelineOptions Key: BEAM-6264 URL: https://issues.apache.org/jira/browse/BEAM-6264 Project: Beam Issue Type: Improvement Components: runner-samza Reporter: Xinyu Liu Assignee: Xinyu Liu Add more options in SamzaPipelineOptions, including: - set/getEnableMetrics - set/getStateDurable These allow users to specify whether they need Beam metrics as well as durable state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6253) SamzaRunner: Add a few customized transforms for runner use cases
Xinyu Liu created BEAM-6253: --- Summary: SamzaRunner: Add a few customized transforms for runner use cases Key: BEAM-6253 URL: https://issues.apache.org/jira/browse/BEAM-6253 Project: Beam Issue Type: Improvement Components: runner-samza Reporter: Xinyu Liu Assignee: Xinyu Liu A few customized transforms are needed in SamzaRunner to support the following cases: - allow GroupByKey/CoGroupByKey without repartitioning - allow updating accumulation after firing a window pane -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6252) SamzaRunner: Add a registrar to allow customized DoFnInvoker
Xinyu Liu created BEAM-6252: --- Summary: SamzaRunner: Add a registrar to allow customized DoFnInvoker Key: BEAM-6252 URL: https://issues.apache.org/jira/browse/BEAM-6252 Project: Beam Issue Type: Improvement Components: runner-samza Reporter: Xinyu Liu Assignee: Xinyu Liu Add an registrar to allow us to extend the open source DoFnInvoker for our use cases. Also fixed a bug in the dot renderer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-4763) Add postCommit scripts and perfkit dashboards for nexmark on Samza runner
[ https://issues.apache.org/jira/browse/BEAM-4763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687147#comment-16687147 ] Xinyu Liu commented on BEAM-4763: - [~kenn]: Of course it's interesting to Samza runner! I will try to follow other runners to set it up. It might take some time though as things have been pretty hectic here. > Add postCommit scripts and perfkit dashboards for nexmark on Samza runner > - > > Key: BEAM-4763 > URL: https://issues.apache.org/jira/browse/BEAM-4763 > Project: Beam > Issue Type: Test > Components: examples-nexmark >Reporter: Etienne Chauchot >Assignee: Xinyu Liu >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)