[2/3] incubator-beam git commit: [BEAM-716] Fix javadoc on with* methods [BEAM-959] Improve check preconditions in JmsIO
[BEAM-716] Fix javadoc on with* methods [BEAM-959] Improve check preconditions in JmsIO Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/30e14cfa Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/30e14cfa Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/30e14cfa Branch: refs/heads/master Commit: 30e14cfa63db50d567185599ea049c96229b48e2 Parents: caf1c72 Author: Jean-Baptiste OnofréAuthored: Tue Dec 13 21:55:46 2016 +0100 Committer: Jean-Baptiste Onofré Committed: Mon Dec 19 07:24:05 2016 +0100 -- .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 45 +--- 1 file changed, 30 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30e14cfa/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java -- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 76dee67..b6de26a 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -18,7 +18,7 @@ package org.apache.beam.sdk.io.jms; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; @@ -145,7 +145,7 @@ public class JmsIO { } /** - * Specify the JMS connection factory to connect to the JMS broker. + * Specify the JMS connection factory to connect to the JMS broker. * * For instance: * @@ -159,11 +159,13 @@ public class JmsIO { * @return The corresponding {@link JmsIO.Read}. */ public Read withConnectionFactory(ConnectionFactory connectionFactory) { + checkArgument(connectionFactory != null, "withConnectionFactory(connectionFactory) called" + + " with null connectionFactory"); return builder().setConnectionFactory(connectionFactory).build(); } /** - * Specify the JMS queue destination name where to read messages from. The + * Specify the JMS queue destination name where to read messages from. The * {@link JmsIO.Read} acts as a consumer on the queue. * * This method is exclusive with {@link JmsIO.Read#withTopic(String)}. The user has to @@ -181,11 +183,12 @@ public class JmsIO { * @return The corresponding {@link JmsIO.Read}. */ public Read withQueue(String queue) { + checkArgument(queue != null, "withQueue(queue) called with null queue"); return builder().setQueue(queue).build(); } /** - * Specify the JMS topic destination name where to receive messages from. The + * Specify the JMS topic destination name where to receive messages from. The * {@link JmsIO.Read} acts as a subscriber on the topic. * * This method is exclusive with {@link JmsIO.Read#withQueue(String)}. The user has to @@ -203,11 +206,12 @@ public class JmsIO { * @return The corresponding {@link JmsIO.Read}. */ public Read withTopic(String topic) { + checkArgument(topic != null, "withTopic(topic) called with null topic"); return builder().setTopic(topic).build(); } /** - * Define the max number of records that the source will read. Using a max number of records + * Define the max number of records that the source will read. Using a max number of records * different from {@code Long.MAX_VALUE} means the source will be {@code Bounded}, and will * stop once the max number of records read is reached. * @@ -223,11 +227,13 @@ public class JmsIO { * @return The corresponding {@link JmsIO.Read}. */ public Read withMaxNumRecords(long maxNumRecords) { + checkArgument(maxNumRecords >= 0, "withMaxNumRecords(maxNumRecords) called with invalid " + + "maxNumRecords"); return builder().setMaxNumRecords(maxNumRecords).build(); } /** - * Define the max read time that the source will read. Using a non null max read time + * Define the max read time that the source will read. Using a non null max read time * duration means the source will be {@code Bounded}, and will stop once the max read time is * reached. * @@ -243,6 +249,8 @@ public class JmsIO { * @return The corresponding {@link JmsIO.Read}. */ public Read withMaxReadTime(Duration maxReadTime) { + checkArgument(maxReadTime != null,
[1/3] incubator-beam git commit: [BEAM-716] Use AutoValue in JmsIO
Repository: incubator-beam Updated Branches: refs/heads/master 1c9bf8d66 -> 1e148cd7d [BEAM-716] Use AutoValue in JmsIO Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/caf1c720 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/caf1c720 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/caf1c720 Branch: refs/heads/master Commit: caf1c720f66de4d502f79b6c11c64b49c53329b0 Parents: 1c9bf8d Author: Jean-Baptiste OnofréAuthored: Sun Dec 11 07:43:41 2016 +0100 Committer: Jean-Baptiste Onofré Committed: Mon Dec 19 07:24:00 2016 +0100 -- sdks/java/io/jms/pom.xml| 7 + .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 321 +-- 2 files changed, 228 insertions(+), 100 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/caf1c720/sdks/java/io/jms/pom.xml -- diff --git a/sdks/java/io/jms/pom.xml b/sdks/java/io/jms/pom.xml index bca0152..b88254e 100644 --- a/sdks/java/io/jms/pom.xml +++ b/sdks/java/io/jms/pom.xml @@ -81,6 +81,13 @@ jsr305 + + + com.google.auto.value + auto-value + provided + + org.apache.activemq http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/caf1c720/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java -- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 24fa67d..76dee67 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.jms; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; @@ -101,37 +102,148 @@ public class JmsIO { private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class); public static Read read() { -return new Read(null, null, null, Long.MAX_VALUE, null); +return new AutoValue_JmsIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).build(); } public static Write write() { -return new Write(null, null, null); +return new AutoValue_JmsIO_Write.Builder().build(); } /** * A {@link PTransform} to read from a JMS destination. See {@link JmsIO} for more * information on usage and configuration. */ - public static class Read extends PTransform { + @AutoValue + public abstract static class Read extends PTransform { +/** + * NB: According to http://docs.oracle.com/javaee/1.4/api/javax/jms/ConnectionFactory.html + * "It is expected that JMS providers will provide the tools an administrator needs to create + * and configure administered objects in a JNDI namespace. JMS provider implementations of + * administered objects should be both javax.jndi.Referenceable and java.io.Serializable so + * that they can be stored in all JNDI naming contexts. In addition, it is recommended that + * these implementations follow the JavaBeansTM design patterns." + * + * So, a {@link ConnectionFactory} implementation is serializable. + */ +@Nullable abstract ConnectionFactory getConnectionFactory(); +@Nullable abstract String getQueue(); +@Nullable abstract String getTopic(); +abstract long getMaxNumRecords(); +@Nullable abstract Duration getMaxReadTime(); + +abstract Builder builder(); + +@AutoValue.Builder +abstract static class Builder { + abstract Builder setConnectionFactory(ConnectionFactory connectionFactory); + abstract Builder setQueue(String queue); + abstract Builder setTopic(String topic); + abstract Builder setMaxNumRecords(long maxNumRecords); + abstract Builder setMaxReadTime(Duration maxReadTime); + abstract Read build(); +} + +/** + * Specify the JMS connection factory to connect to the JMS broker. + * + * For instance: + * + * + * {@code + * pipeline.apply(JmsIO.read().withConnectionFactory(myConnectionFactory) + * } + * + * + * @param connectionFactory The JMS {@link ConnectionFactory}. + * @return The corresponding {@link JmsIO.Read}. + */ public Read withConnectionFactory(ConnectionFactory connectionFactory) { - return new
[3/3] incubator-beam git commit: [BEAM-716] This closes #1577
[BEAM-716] This closes #1577 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1e148cd7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1e148cd7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1e148cd7 Branch: refs/heads/master Commit: 1e148cd7d5f12e6742ac57440bf0731460d11b80 Parents: 1c9bf8d 30e14cf Author: Jean-Baptiste OnofréAuthored: Mon Dec 19 07:40:39 2016 +0100 Committer: Jean-Baptiste Onofré Committed: Mon Dec 19 07:40:39 2016 +0100 -- sdks/java/io/jms/pom.xml| 7 + .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 338 +-- 2 files changed, 244 insertions(+), 101 deletions(-) --
[jira] [Commented] (BEAM-841) Releases should produce proper MD5/SHA1 checksums
[ https://issues.apache.org/jira/browse/BEAM-841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15760315#comment-15760315 ] Jean-Baptiste Onofré commented on BEAM-841: --- We use the gpg sign from the Apache POM: http://repo.maven.apache.org/maven2/org/apache/apache/18/apache-18.pom {code} org.apache.maven.plugins maven-gpg-plugin sign-release-artifacts sign {code} So, I would optionally create this PR on the Apache POM, as it's not directly relevant to Beam. > Releases should produce proper MD5/SHA1 checksums > - > > Key: BEAM-841 > URL: https://issues.apache.org/jira/browse/BEAM-841 > Project: Beam > Issue Type: Improvement > Components: build-system >Affects Versions: 0.3.0-incubating >Reporter: Sergio Fernández >Priority: Trivial > > Currently {{09 7B 6A 0A C9 3E 71 C1 05 0C 71 65 E9 0C 4F AE}} is used, while > most of the tools use the simpler format {{097b6a0ac93e71c1050c7165e90c4fae}} > to allow automatically checking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (BEAM-1162) Create RedisIO
[ https://issues.apache.org/jira/browse/BEAM-1162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jean-Baptiste Onofré resolved BEAM-1162. Resolution: Duplicate Fix Version/s: Not applicable > Create RedisIO > -- > > Key: BEAM-1162 > URL: https://issues.apache.org/jira/browse/BEAM-1162 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Affects Versions: Not applicable >Reporter: Ismaël Mejía >Assignee: Jean-Baptiste Onofré >Priority: Minor > Fix For: Not applicable > > > As discussed in the mailing list there is interest in this IO. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-440) Create.values() returns a type-unsafe Coder
[ https://issues.apache.org/jira/browse/BEAM-440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-440: - Assignee: Jason White > Create.values() returns a type-unsafe Coder > --- > > Key: BEAM-440 > URL: https://issues.apache.org/jira/browse/BEAM-440 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Daniel Halperin >Assignee: Jason White > Labels: newbie, starter > > {{Create.values()}} with no arguments will default to a {{VoidCoder}}, unless > one is set later with {{setCoder(Coder)}}. > Although it will encode its input correctly, this seems like a bad choice in > many cases. E.g., with {{Flatten}}: > {code} > PCollection> initial = p.apply("First", > Create. >of()); > PCollection > second = > p.apply("Second", Create.of("a", "b")).apply(ParDo.of(new > MyAvroDoFn())); > PCollectionList > .of(initial).and(second) > .apply(Flatten. >pCollections()); > {code} > This crashes trying to cast a KV from "Second" to a Void.class. > 1. Suggest throwing a warning in {{getDefaultOutputCoder}} when defaulting to > {{VoidCoder}} for an empty elements list. Should this be an error? > 2. Suggest adding something like {{Create.empty(TypeDescriptor)}} to handle > this case properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-440) Create.values() returns a type-unsafe Coder
[ https://issues.apache.org/jira/browse/BEAM-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15760233#comment-15760233 ] Kenneth Knowles commented on BEAM-440: -- The root issue here is actually worse. There are two facets. 1. {{Create}} based on the value is unsafe in general. (full disclosure: I implemented this unsafe "feature"). It adds a coder that is safe for _those values only_ but not for their static type. {code} PCollection allStrings = pipeline.apply("Create strings", Create.of("hello")); {code} The coder inference infrastructure reasonably assumes that {{allStrings.getCoder()}} returns a coder that can handle all values of type {{Object}}, which is as false in this example as it is for an empty {{Create}}. Transforms like {{MapElements
[jira] [Updated] (BEAM-440) Create.values() returns a type-unsafe Coder
[ https://issues.apache.org/jira/browse/BEAM-440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-440: - Description: {{Create.values()}} with no arguments will default to a {{VoidCoder}}, unless one is set later with {{setCoder(Coder)}}. Although it will encode its input correctly, this seems like a bad choice in many cases. E.g., with {{Flatten}}: {code} PCollection> initial = p.apply("First", Create. >of()); PCollection > second = p.apply("Second", Create.of("a", "b")).apply(ParDo.of(new MyAvroDoFn())); PCollectionList .of(initial).and(second) .apply(Flatten. >pCollections()); {code} This crashes trying to cast a KV from "Second" to a Void.class. 1. Suggest throwing a warning in {{getDefaultOutputCoder}} when defaulting to {{VoidCoder}} for an empty elements list. Should this be an error? 2. Suggest adding something like {{Create.empty(TypeDescriptor)}} to handle this case properly. was: Create.values() with no arguments will default to a VoidCoder, unless one is set later with #setCoder(Coder). Although it will encode its input correctly, this seems like a bad choice in many cases. E.g., with Flatten: PCollection > initial = p.apply("First", Create. >of()); PCollection > second = p.apply("Second", Create.of("a", "b")).apply(ParDo.of(new MyAvroDoFn())); PCollectionList .of(initial).and(second) .apply(Flatten. >pCollections()); This crashes trying to cast a KV from "Second" to a Void.class. 1. Suggest throwing a warning in #getDefaultOutputCoder when defaulting to VoidCoder for an empty elements list. Should this be an error? 2. Suggest adding something like Create.empty(TypeDescriptor) to handle this case properly. > Create.values() returns a type-unsafe Coder > --- > > Key: BEAM-440 > URL: https://issues.apache.org/jira/browse/BEAM-440 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Daniel Halperin > Labels: newbie, starter > > {{Create.values()}} with no arguments will default to a {{VoidCoder}}, unless > one is set later with {{setCoder(Coder)}}. > Although it will encode its input correctly, this seems like a bad choice in > many cases. E.g., with {{Flatten}}: > {code} > PCollection > initial = p.apply("First", > Create. >of()); > PCollection > second = > p.apply("Second", Create.of("a", "b")).apply(ParDo.of(new > MyAvroDoFn())); > PCollectionList > .of(initial).and(second) > .apply(Flatten. >pCollections()); > {code} > This crashes trying to cast a KV from "Second" to a Void.class. > 1. Suggest throwing a warning in {{getDefaultOutputCoder}} when defaulting to > {{VoidCoder}} for an empty elements list. Should this be an error? > 2. Suggest adding something like {{Create.empty(TypeDescriptor)}} to handle > this case properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-210) Allow control of empty ON_TIME panes analogous to final panes
[ https://issues.apache.org/jira/browse/BEAM-210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-210: - Component/s: (was: runner-core) sdk-java-core beam-model-runner-api beam-model > Allow control of empty ON_TIME panes analogous to final panes > - > > Key: BEAM-210 > URL: https://issues.apache.org/jira/browse/BEAM-210 > Project: Beam > Issue Type: Bug > Components: beam-model, beam-model-runner-api, sdk-java-core >Reporter: Mark Shields >Assignee: Thomas Groh > > Today, ON_TIME panes are emitted whether or not they are empty. We had > decided that for final panes the user would want to control this behavior, to > control data volume. But for ON_TIME panes no such control exists. The > rationale is perhaps that the ON_TIME pane is a fundamental result that > should not be elided. To be considered: whether this is what we want. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-885) Move PipelineOptions from Pipeline.create() to Pipeline.run()
[ https://issues.apache.org/jira/browse/BEAM-885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-885: - Component/s: beam-model-runner-api > Move PipelineOptions from Pipeline.create() to Pipeline.run() > - > > Key: BEAM-885 > URL: https://issues.apache.org/jira/browse/BEAM-885 > Project: Beam > Issue Type: New Feature > Components: beam-model-runner-api, sdk-java-core >Reporter: Thomas Groh >Assignee: Thomas Groh > Labels: backward-incompatible > > The specification of a Pipeline should be independent of its PipelineOptions. > This delays specification of the options, including choices like Pipeline > Runner. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-646) Get runners out of the apply()
[ https://issues.apache.org/jira/browse/BEAM-646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-646: - Component/s: beam-model-runner-api > Get runners out of the apply() > -- > > Key: BEAM-646 > URL: https://issues.apache.org/jira/browse/BEAM-646 > Project: Beam > Issue Type: Improvement > Components: beam-model-runner-api, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Thomas Groh > > Right now, the runner intercepts calls to apply() and replaces transforms as > we go. This means that there is no "original" user graph. For portability and > misc architectural benefits, we would like to build the original graph first, > and have the runner override later. > Some runners already work in this manner, but we could integrate it more > smoothly, with more validation, via some handy APIs on e.g. the Pipeline > object. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-260) Know the getSideInputWindow upper bound so can gc side input state
[ https://issues.apache.org/jira/browse/BEAM-260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-260: - Component/s: beam-model-runner-api > Know the getSideInputWindow upper bound so can gc side input state > -- > > Key: BEAM-260 > URL: https://issues.apache.org/jira/browse/BEAM-260 > Project: Beam > Issue Type: Bug > Components: beam-model, beam-model-runner-api >Reporter: Mark Shields >Assignee: Kenneth Knowles > > We currently have no static knowledge about the getSideInputWindow function, > and runners are thus forced to hold on to all side input state / elements in > case a future element reaches back into an earlier side input element. > Maybe we need an upper bound on lag from current to result of > getSideInputWindow so we can have a progressing gc horizon as we do for GKB > window state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-742) Move trigger state machines to runners-core, convert triggers to AST
[ https://issues.apache.org/jira/browse/BEAM-742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-742: - Component/s: beam-model-runner-api > Move trigger state machines to runners-core, convert triggers to AST > > > Key: BEAM-742 > URL: https://issues.apache.org/jira/browse/BEAM-742 > Project: Beam > Issue Type: Sub-task > Components: beam-model-runner-api, runner-core, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Kenneth Knowles > Fix For: 0.4.0-incubating > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-846) Decouple side input window mapping from WindowFn
[ https://issues.apache.org/jira/browse/BEAM-846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-846: - Component/s: beam-model-runner-api > Decouple side input window mapping from WindowFn > > > Key: BEAM-846 > URL: https://issues.apache.org/jira/browse/BEAM-846 > Project: Beam > Issue Type: New Feature > Components: beam-model, beam-model-runner-api, sdk-java-core >Reporter: Robert Bradshaw >Assignee: Kenneth Knowles > Labels: backward-incompatible > > Currently the main WindowFn provides as getSideInputWindow method. Instead, > this mapping should be specified per-side-input (thought the default mapping > would remain the same). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-1159) DoFnSignature should have info on the fn's side inputs and outputs
[ https://issues.apache.org/jira/browse/BEAM-1159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15760080#comment-15760080 ] Kenneth Knowles commented on BEAM-1159: --- The expected resolution of this ticket was actually designed long ago, and is included as an appendix to the new DoFn design, here: https://s.apache.org/a-new-dofn#heading=h.1budnm7l01ko It hasn't been a priority yet, but it isn't difficult. It can probably be done in a backwards-compatible manner, though it would be cleaner if we have time to add the new form of support and remove the old way. > DoFnSignature should have info on the fn's side inputs and outputs > -- > > Key: BEAM-1159 > URL: https://issues.apache.org/jira/browse/BEAM-1159 > Project: Beam > Issue Type: Improvement >Reporter: Eugene Kirpichov > > This is logically part of the fn itself, rather than its enclosing transform. > Example where this would have been important: > signature.processElement().observesWindow() should return true for a DoFn > that has any side inputs, since side inputs are windowed. See BEAM-1149. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (BEAM-1149) Side input access fails in direct runner (possibly others too) when input element in multiple windows
[ https://issues.apache.org/jira/browse/BEAM-1149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles resolved BEAM-1149. --- Resolution: Fixed > Side input access fails in direct runner (possibly others too) when input > element in multiple windows > - > > Key: BEAM-1149 > URL: https://issues.apache.org/jira/browse/BEAM-1149 > Project: Beam > Issue Type: Bug >Reporter: Eugene Kirpichov >Assignee: Kenneth Knowles >Priority: Blocker > Fix For: 0.4.0-incubating > > > {code:java} > private static class FnWithSideInputs extends DoFn{ > private final PCollectionView view; > private FnWithSideInputs(PCollectionView view) { > this.view = view; > } > @ProcessElement > public void processElement(ProcessContext c) { > c.output(c.element() + ":" + c.sideInput(view)); > } > } > @Test > public void testSideInputsWithMultipleWindows() { > Pipeline p = TestPipeline.create(); > MutableDateTime mutableNow = Instant.now().toMutableDateTime(); > mutableNow.setMillisOfSecond(0); > Instant now = mutableNow.toInstant(); > SlidingWindows windowFn = > > SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)); > PCollectionView view = > p.apply(Create.of(1)).apply(View.asSingleton()); > PCollection res = > p.apply(Create.timestamped(TimestampedValue.of("a", now))) > .apply(Window.into(windowFn)) > .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view)); > PAssert.that(res).containsInAnyOrder("a:1"); > p.run(); > } > {code} > This fails with the following exception: > {code} > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IllegalStateException: sideInput called when main input element is > in multiple windows > at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:343) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:1) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:176) > at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112) > at > Caused by: java.lang.IllegalStateException: sideInput called when main input > element is in multiple windows > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:514) > at > org.apache.beam.sdk.transforms.ParDoTest$FnWithSideInputs.processElement(ParDoTest.java:738) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-1164) Allow a DoFn to opt in to mutating it's input
[ https://issues.apache.org/jira/browse/BEAM-1164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15760074#comment-15760074 ] Kenneth Knowles commented on BEAM-1164: --- I think with the new {{DoFn}} there is a fairly elegant solution here. Today we write: {code} new DoFn() { @ProcessElement public void processElem(ProcessContext ctx) { ... ctx.element() ... } } {code} We'd like to allow the user to request only the element, both for clarity and for potential optimization, as in {code} new DoFn () { @ProcessElement public void processElem(Element elem) { ... elem.get() ... } } {code} where {{Element}} is a distinguished inner class, to avoid repeating verbose input types. >From here, it is a short step to saying that you want a mutable element: {code} new DoFn () { @ProcessElement public void processElem(MutableElement elem) { ... elem.get().setBizzle(...) ... } } {code} At the level of the "JSON" runner API, we will need to tag the user-defined function with the fact that it intends to mutate its input. The Java SDK will analyze the method signature, as usual, to discern this automatically. A runner will then be free to decide between disabling optimizations or cloning elements when necessary. > Allow a DoFn to opt in to mutating it's input > - > > Key: BEAM-1164 > URL: https://issues.apache.org/jira/browse/BEAM-1164 > Project: Beam > Issue Type: Bug > Components: beam-model >Reporter: Frances Perry >Priority: Minor > > Runners generally can't tell if a DoFn is mutating inputs, but assuming so by > default leads to significant performance implications from unnecessary > copying (around sibling fusion, etc). So instead the model prevents mutating > inputs, and the Direct Runner validates this behavior. (See: > http://beam.incubator.apache.org/contribute/design-principles/#make-efficient-things-easy-rather-than-make-easy-things-efficient) > > However, if users are processing a small number of large records by making > incremental changes (for example, genomics use cases), the cost of > immutability requirement can be very large. As a workaround, users sometimes > do suboptimal things (fusing ParDos by hand) or undefined things when they > expect the immutability requirement is unnecessarily strict (adding no-op > coders in places they hope the runner won't be materializing things, mutating > things anyway when they don't expect sibling fusion to happen, etc). > We should consider adding a signal (MutatingDoFn?) that users explicitly opt > in to to say their code may mutate inputs. The runner can then use this > assumption to either prevent optimizations that would break in the face of > this or insert additional copies as needed to allow optimizations to preserve > semantics. > See this related user@ discussion: > https://lists.apache.org/thread.html/f39689f54147117f3fc54c498eff1a20fa73f1be5b5cad5b6f816fd3@%3Cuser.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-1179) Update assertions of source_test_utils from camelcase to underscore-separated
Chamikara Jayalath created BEAM-1179: Summary: Update assertions of source_test_utils from camelcase to underscore-separated Key: BEAM-1179 URL: https://issues.apache.org/jira/browse/BEAM-1179 Project: Beam Issue Type: Improvement Components: sdk-py Reporter: Chamikara Jayalath Assignee: Chamikara Jayalath Assertions in source_test_utils module [1] currently have method names in camel case. This was original done so that it's similar to assertions in Python unittest module. But it's better to deprecate these and add underscore-separated variations so that code in Python SDK is consistent. [1] https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/source_test_utils.py -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-1178) Make naming of logger objects consistent
[ https://issues.apache.org/jira/browse/BEAM-1178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15759358#comment-15759358 ] ASF GitHub Bot commented on BEAM-1178: -- GitHub user iemejia opened a pull request: https://github.com/apache/incubator-beam/pull/1655 [BEAM-1178] Make naming of logger objects consistent Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [X] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [X] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [X] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/iemejia/incubator-beam BEAM-1178-logger Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1655.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1655 commit fbfea5953000bd77b07b6a1f1c7a192e24b88021 Author: Ismaël MejíaDate: 2016-12-18T15:02:41Z Fix grammar error (repeated for) commit 11ba4d3638da2859727206d8ea887298efcad34a Author: Ismaël Mejía Date: 2016-12-18T20:01:13Z [BEAM-1178] Make naming of logger objects consistent > Make naming of logger objects consistent > > > Key: BEAM-1178 > URL: https://issues.apache.org/jira/browse/BEAM-1178 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-java-extensions >Affects Versions: 0.5.0-incubating >Reporter: Ismaël Mejía >Assignee: Ismaël Mejía >Priority: Trivial > > Logger objects are used in different instances in Beam, around 90% of the > current classes that use loggers use the convention name 'LOG', however there > are instances that use 'logger' and others that uses 'LOGGER', this issue is > to make the logger naming consistent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-1178) Make naming of logger objects consistent
Ismaël Mejía created BEAM-1178: -- Summary: Make naming of logger objects consistent Key: BEAM-1178 URL: https://issues.apache.org/jira/browse/BEAM-1178 Project: Beam Issue Type: Improvement Components: sdk-java-core, sdk-java-extensions Affects Versions: 0.5.0-incubating Reporter: Ismaël Mejía Assignee: Ismaël Mejía Priority: Trivial Logger objects are used in different instances in Beam, around 90% of the current classes that use loggers use the convention name 'LOG', however there are instances that use 'logger' and others that uses 'LOGGER', this issue is to make the logger naming consistent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #1654: [BEAM-1177] Input DStream "bundles" shoul...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1654 [BEAM-1177] Input DStream "bundles" should be in serialized form and include relevant metadata. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam read-unbounded-bytes Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1654.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1654 commit 975dec257364d68b5ada3bced7f139e88853722a Author: SelaDate: 2016-12-18T12:36:53Z SparkUnboundedSource mapWithStateDStream input data shuold be in serialized form for shuffle and checkpointing. Emit read count and watermark per microbatch. commit 53bd915b8ccacf18b71da16a0a434013ef41 Author: Sela Date: 2016-12-18T13:16:23Z Report the input global watermark for batch to the UI. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (BEAM-1177) Input DStream "bundles" should be in serialized form and include relevant metadata.
[ https://issues.apache.org/jira/browse/BEAM-1177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15758852#comment-15758852 ] Amit Sela commented on BEAM-1177: - Instead of simply emitting {{Iterable}} per partition, I'll emit {{Tuple2 , Metadata>}} to be able to report read count and watermark per batch. {code} class Metadata { private final long numRecords; private final Instant watermark; public Metadata(long numRecords, Instant watermark) { this.numRecords = numRecords; this.watermark = watermark; } public long getNumRecords() { return numRecords; } public Instant getWatermark() { return watermark; } } {code} > Input DStream "bundles" should be in serialized form and include relevant > metadata. > --- > > Key: BEAM-1177 > URL: https://issues.apache.org/jira/browse/BEAM-1177 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > Currently, the input partitions hold "bundles" of read elements within the > {{mapWithStateDStream}} used for the read. > Since this is automatically shuffled, user-data (the read elements) should be > serialized using coders to avoid breaking (if user-data is not {{Kryo}} > serializable). > Even after BEAM-848 would complete, the resulting {{MapWithStateDStream}} > would be checkpointed periodically and so it would still have to remain in > serialized form. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-1177) Input DStream "bundles" should be in serialized form and include relevant metadata.
[ https://issues.apache.org/jira/browse/BEAM-1177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela updated BEAM-1177: Description: Currently, the input partitions hold "bundles" of read elements within the {{mapWithStateDStream}} used for the read. Since this is automatically shuffled, user-data (the read elements) should be serialized using coders to avoid breaking (if user-data is not {{Kryo}} serializable). Even after BEAM-848 would complete, the resulting {{MapWithStateDStream}} would be checkpointed periodically and so it would still have to remain in serialized form. was: Currently, the input partitions hold "bundles" of read elements within the {{mapWithStateDStream}} used for the read. Since this is automatically shuffled, user-data (the read elements) should be serialized using coders to avoid breaking (if user-data is not {{Kryo}} serializable). Even after BEAM-848 would complete, this {{MapWithStateDStream}} would be checkpointed periodically and so it would still have to remain in serialized form. > Input DStream "bundles" should be in serialized form and include relevant > metadata. > --- > > Key: BEAM-1177 > URL: https://issues.apache.org/jira/browse/BEAM-1177 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > Currently, the input partitions hold "bundles" of read elements within the > {{mapWithStateDStream}} used for the read. > Since this is automatically shuffled, user-data (the read elements) should be > serialized using coders to avoid breaking (if user-data is not {{Kryo}} > serializable). > Even after BEAM-848 would complete, the resulting {{MapWithStateDStream}} > would be checkpointed periodically and so it would still have to remain in > serialized form. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-1177) Input DStream "bundles" should be in serialized form and include relevant metadata.
Amit Sela created BEAM-1177: --- Summary: Input DStream "bundles" should be in serialized form and include relevant metadata. Key: BEAM-1177 URL: https://issues.apache.org/jira/browse/BEAM-1177 Project: Beam Issue Type: Bug Components: runner-spark Reporter: Amit Sela Assignee: Amit Sela Currently, the input partitions hold "bundles" of read elements within the {{mapWithStateDStream}} used for the read. Since this is automatically shuffled, user-data (the read elements) should be serialized using coders to avoid breaking (if user-data is not {{Kryo}} serializable). Even after BEAM-848 would complete, this {{MapWithStateDStream}} would be checkpointed periodically and so it would still have to remain in serialized form. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-848) A better shuffle after reading from within mapWithState.
[ https://issues.apache.org/jira/browse/BEAM-848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela updated BEAM-848: --- Summary: A better shuffle after reading from within mapWithState. (was: Make post-read (unbounded) shuffle use coders instead of Kryo.) > A better shuffle after reading from within mapWithState. > > > Key: BEAM-848 > URL: https://issues.apache.org/jira/browse/BEAM-848 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > The SparkRunner uses {{mapWithState}} to read and manage CheckpointMarks, and > this stateful operation will be followed by a shuffle: > https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala#L159 > Since the stateful read maps "splitSource" -> "partition of a list of read > values", the following shuffle won't benefit in any way (the list of read > values has not been flatMapped yet). In order to avoid shuffle we need to set > the input RDD ({{SourceRDD.Unbounded}}) partitioner to be a default > {{HashPartitioner}} since {{mapWithState}} would use the same partitioner and > will skip shuffle if the partitioners match. > It would be wise to shuffle the read values _after_ flatmap. > I will break this into two tasks: > # Set default-partitioner to the input RDD. > # Shuffle (using Coders) the input. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-1074) Set default-partitioner in SourceRDD.Unbounded.
[ https://issues.apache.org/jira/browse/BEAM-1074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15758766#comment-15758766 ] ASF GitHub Bot commented on BEAM-1074: -- Github user amitsela closed the pull request at: https://github.com/apache/incubator-beam/pull/1500 > Set default-partitioner in SourceRDD.Unbounded. > --- > > Key: BEAM-1074 > URL: https://issues.apache.org/jira/browse/BEAM-1074 > Project: Beam > Issue Type: Sub-task > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > This will make sure the following stateful read within {{mapWithState}} won't > shuffle the read values as long as they are grouped in a {{List}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #1500: [BEAM-1074] Set default-partitioner in So...
Github user amitsela closed the pull request at: https://github.com/apache/incubator-beam/pull/1500 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (BEAM-1126) Expose UnboundedSource split backlog in number of events
[ https://issues.apache.org/jira/browse/BEAM-1126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15758431#comment-15758431 ] ASF GitHub Bot commented on BEAM-1126: -- Github user aviemzur closed the pull request at: https://github.com/apache/incubator-beam/pull/1574 > Expose UnboundedSource split backlog in number of events > > > Key: BEAM-1126 > URL: https://issues.apache.org/jira/browse/BEAM-1126 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Aviem Zur >Assignee: Daniel Halperin >Priority: Minor > > Today {{UnboundedSource}} exposes split backlog in bytes via > {{getSplitBacklogBytes()}} > There is value in exposing backlog in number of events as well, since this > number can be more human comprehensible than bytes. something like > {{getSplitBacklogEvents()}} or {{getSplitBacklogCount()}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #1574: [BEAM-1126] Expose UnboundedSource split ...
Github user aviemzur closed the pull request at: https://github.com/apache/incubator-beam/pull/1574 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---