[GitHub] [beam] tvalentyn commented on pull request #13219: Use a 'slim' flavor for Python images to reduce the image size.

2020-10-28 Thread GitBox


tvalentyn commented on pull request #13219:
URL: https://github.com/apache/beam/pull/13219#issuecomment-718394925







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn removed a comment on pull request #13219: Use a 'slim' flavor for Python images to reduce the image size.

2020-10-28 Thread GitBox


tvalentyn removed a comment on pull request #13219:
URL: https://github.com/apache/beam/pull/13219#issuecomment-718394306







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn commented on pull request #13219: Use a 'slim' flavor for Python images to reduce the image size.

2020-10-28 Thread GitBox


tvalentyn commented on pull request #13219:
URL: https://github.com/apache/beam/pull/13219#issuecomment-718394558


   Run Python Dataflow ValidatesContainer



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn commented on pull request #13219: Use a 'slim' flavor for Python images to reduce the image size.

2020-10-28 Thread GitBox


tvalentyn commented on pull request #13219:
URL: https://github.com/apache/beam/pull/13219#issuecomment-718394306


   Run Python Dataflow ValidatesContainer
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn opened a new pull request #13219: Use a 'slim' flavor for Python images to reduce the image size.

2020-10-28 Thread GitBox


tvalentyn opened a new pull request #13219:
URL: https://github.com/apache/beam/pull/13219


   We could reduce the size of Python container images by switching to slim 
version of the base image. We have been building Dataflow Python containers 
from 'slim' versions for a while and haven't encountered any issues. 
   
   A slim version shaves 750 MB of unpacked image size as per `docker images`:
   ```
   python 3.7-slim-buster 
217e853914498 days ago  112MB
   python 3.7-buster  
b0dee8d708b98 days ago  876MB
   ``
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](

[GitHub] [beam] youngoli merged pull request #13188: [BEAM-11108] Add a version of TextIO implemented via SDF.

2020-10-28 Thread GitBox


youngoli merged pull request #13188:
URL: https://github.com/apache/beam/pull/13188


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] youngoli commented on a change in pull request #13209: [BEAM-9615] Add schema coders and tests.

2020-10-28 Thread GitBox


youngoli commented on a change in pull request #13209:
URL: https://github.com/apache/beam/pull/13209#discussion_r514010115



##
File path: sdks/go/pkg/beam/coder_test.go
##
@@ -75,46 +134,51 @@ func TestCoders(t *testing.T) {
1,
true,
"a string",
-   map[int64]string{1: "one", 11: "oneone", 21: "twoone", 1211: 
"onetwooneone"},
+   map[int64]string{1: "one", 11: "oneone", 21: "twoone", 1211: 
"onetwooneone"}, // (not supported by custom type registration)
struct {
A int
B *string
C bool
}{4, &ptrString, false},
-   [...]int64{1, 2, 3, 4, 5}, // array
+   [...]int64{1, 2, 3, 4, 5}, // array (not supported by custom 
type registration)
[]int64{1, 2, 3, 4, 5},// slice
struct {
A []int
B [3]int
}{A: []int{1, 2, 3}, B: [...]int{4, 5, 6}},
+   [...]struct{ A int }{{1}, {2}, {3}, {4}, {5}},
+   []struct{ B int }{{1}, {2}, {3}, {4}, {5}},
+   regTestType{[4]int{4, 2, 4, 2}},
}
 
for _, test := range tests {
-   var results []string
-   rt := reflect.TypeOf(test)
-   enc := NewElementEncoder(rt)
-   for i := 0; i < 10; i++ {
-   var buf bytes.Buffer
-   if err := enc.Encode(test, &buf); err != nil {
-   t.Fatalf("Failed to encode %v: %v", tests, err)
+   t.Run(fmt.Sprintf("%T", test), func(t *testing.T) {
+   var results []string
+   rt := reflect.TypeOf(test)
+   enc := NewElementEncoder(rt)
+   for i := 0; i < 10; i++ {
+   var buf bytes.Buffer
+   if err := enc.Encode(test, &buf); err != nil {
+   t.Fatalf("Failed to encode %v: %v", 
test, err)
+   }
+   results = append(results, string(buf.Bytes()))
}
-   results = append(results, string(buf.Bytes()))
-   }
-   for i, d := range results {
-   if d != results[0] {
-   t.Errorf("coder not deterministic: encoding %d 
not the same as the first encoding: %v != %v ", i, d, results[0])
+   for i, d := range results {
+   if d != results[0] {
+   t.Errorf("coder not deterministic: 
encoding %d not the same as the first encoding: %v != %v ", i, d, results[0])
+   }
}
-   }
 
-   dec := NewElementDecoder(rt)
-   buf := bytes.NewBuffer([]byte(results[0]))
-   decoded, err := dec.Decode(buf)
-   if err != nil {
-   t.Fatalf("Failed to decode: %v", err)
-   }
+   dec := NewElementDecoder(rt)
+   buf := bytes.NewBuffer([]byte(results[0]))
+   decoded, err := dec.Decode(buf)
+   if err != nil {
+   t.Fatalf("Failed to decode: %q, into %v", 
results[0], err)

Review comment:
   Is the `%v` in the format string supposed to be the error? It reads like 
it's meant to output the expected decoded value (i.e. `test`).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ajamato commented on pull request #13217: [BEAM-11092] Add bigquery io request count metric, implementing HarnessMonitoringInfos and process_wide metrics

2020-10-28 Thread GitBox


ajamato commented on pull request #13217:
URL: https://github.com/apache/beam/pull/13217#issuecomment-718382171


   Run Python PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib opened a new pull request #13218: [BEAM-10974] Skip GroupByKeyTest.testLargeKeys10MB.

2020-10-28 Thread GitBox


ibzib opened a new pull request #13218:
URL: https://github.com/apache/beam/pull/13218


   I tried changing the taskmanager memory size, but couldn't get it to work.
   
   R: @mxm 
   cc: @tysonjh 
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/

[GitHub] [beam] kennknowles commented on pull request #13134: [BEAM-10402] Enable checkerframework globally

2020-10-28 Thread GitBox


kennknowles commented on pull request #13134:
URL: https://github.com/apache/beam/pull/13134#issuecomment-718335567


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

2020-10-28 Thread GitBox


boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r513912395



##
File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleCheckpointHandlers.java
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility methods for creating {@link BundleCheckpointHandler}s. */
+public class BundleCheckpointHandlers {
+
+  /**
+   * A {@link BundleCheckpointHandler} which uses {@link
+   * org.apache.beam.runners.core.TimerInternals.TimerData} ans {@link
+   * org.apache.beam.sdk.state.ValueState} to reschedule {@link 
DelayedBundleApplication}.
+   */
+  public static class StateAndTimerBundleCheckpointHandler implements 
BundleCheckpointHandler {
+private static final Logger LOG =
+LoggerFactory.getLogger(StateAndTimerBundleCheckpointHandler.class);
+private final TimerInternalsFactory timerInternalsFactory;
+private final StateInternalsFactory stateInternalsFactory;
+private final Coder> residualCoder;
+private final Coder windowCoder;
+private final IdGenerator idGenerator = IdGenerators.incrementingLongs();
+public static final String SDF_PREFIX = "sdf_checkpoint";
+
+public StateAndTimerBundleCheckpointHandler(
+TimerInternalsFactory timerInternalsFactory,
+StateInternalsFactory stateInternalsFactory,
+Coder> residualCoder,
+Coder windowCoder) {
+  this.residualCoder = residualCoder;
+  this.windowCoder = windowCoder;
+  this.timerInternalsFactory = timerInternalsFactory;
+  this.stateInternalsFactory = stateInternalsFactory;
+}
+
+/**
+ * A helper function to help check whether the given timer is the timer 
which is set for
+ * rescheduling {@link DelayedBundleApplication}.
+ */
+public static boolean isSdfTimer(String timerId) {
+  return timerId.startsWith(SDF_PREFIX);
+}
+
+private static String constructSdfCheckpointId(String id, int index) {
+  return SDF_PREFIX + ":" + id + ":" + index;
+}
+
+@Override
+public void onCheckpoint(ProcessBundleResponse response) {
+  String id = idGenerator.getId();
+  for (int index = 0; index < response.getResidualRootsCount(); index++) {
+DelayedBundleApplication residual = response.getResidualRoots(index);
+if (!residual.hasApplication()) {
+  continue;
+}
+String tag = constructSdfCheckpointId(id, index);
+try {
+  WindowedValue stateValue =
+  CoderUtils.decodeFromByteArray(
+  residualCoder, 
residual.getApplication().getElement().toByteArray());
+  TimerInternals timerInternals =
+  
timerInternalsFactory.timerInternalsForKey((stateValue.getValue()));
+  StateInternals stateInternals =
+  
stateInternalsFactory.stateInternalsForKey(stateValue.getValue());
+  // Calculate the timestamp for the timer.
+  Instant timestamp = Instant.now();
+  if (residual.hasRequestedTimeDelay()) {
+timestamp = 
timestamp.plus(res

[GitHub] [beam] boyuanzz commented on a change in pull request #12806: [BEAM-10869] Make WriteToPubsub output serialized PubsubMessage proto bytes when using runner v2

2020-10-28 Thread GitBox


boyuanzz commented on a change in pull request #12806:
URL: https://github.com/apache/beam/pull/12806#discussion_r513895196



##
File path: sdks/python/apache_beam/io/gcp/pubsub.py
##
@@ -418,10 +439,11 @@ def __init__(self,
id_label,  # type: Optional[str]
with_attributes,  # type: bool
timestamp_attribute  # type: Optional[str]
-  ):
+   ):
 self.coder = coders.BytesCoder()
 self.full_topic = topic
 self.id_label = id_label
+#TODO(BEAM-10869): Remove with_attributes since we will never look at it.

Review comment:
   We can remove `with_attributes` from `PubSubSink` in the cleanup PR. 
@chamikaramj 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kennknowles commented on pull request #13134: [BEAM-10402] Enable checkerframework globally

2020-10-28 Thread GitBox


kennknowles commented on pull request #13134:
URL: https://github.com/apache/beam/pull/13134#issuecomment-718315067


   run java precommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kennknowles commented on pull request #13134: [BEAM-10402] Enable checkerframework globally

2020-10-28 Thread GitBox


kennknowles commented on pull request #13134:
URL: https://github.com/apache/beam/pull/13134#issuecomment-718315024


   I'm doing something like that on my laptop. Certainly the core SDK compile 
is slower, but even with a huge slowdown in compile the test execution should 
dominate. So I do think it may be caching that is the difference. Running it a 
few times to warm the build cache might also give a sense of realistic CI times.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kennknowles commented on pull request #13134: [BEAM-10402] Enable checkerframework globally

2020-10-28 Thread GitBox


kennknowles commented on pull request #13134:
URL: https://github.com/apache/beam/pull/13134#issuecomment-718303211


   Java precommit seems to have degraded in runtime.
   
- Before: https://scans.gradle.com/s/gohimd5ekkcmo/timeline
- After: https://scans.gradle.com/s/z4brjsxfpftxy/timeline
   
   So it went from 40 minutes to 90 minutes. It isn't apples-to-apples as we 
have a lot of caching effects since they are just running on Jenkins:
   
- Before: `2375 tasks executed in 116 projects in 34m 18.746s, with 162 
avoided tasks saving 1h 19.326s`
- After: `2463 tasks executed in 116 projects in 1h 20m 11.508s, with 38 
avoided tasks saving 14m 48.735s`
   
   I think zero-cache builds in some semi-controlled environment might give a 
better sense whether this needs to be treated specially.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ajamato commented on pull request #13217: [BEAM-11092] Add bigquery io request count metric, implementing HarnessMonitoringInfos and process_wide metrics

2020-10-28 Thread GitBox


ajamato commented on pull request #13217:
URL: https://github.com/apache/beam/pull/13217#issuecomment-718301011


   R: @chamikaramj 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ajamato commented on pull request #13217: [BEAM-11092] Add bigquery io request count metric, implementing HarnessMonitoringInfos and process_wide metrics

2020-10-28 Thread GitBox


ajamato commented on pull request #13217:
URL: https://github.com/apache/beam/pull/13217#issuecomment-718300927


   R: @pabloem 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ajamato opened a new pull request #13217: [BEAM-11092] Add bigquery io request count metric, implementing HarnessMonitoringInfos and process_wide metrics

2020-10-28 Thread GitBox


ajamato opened a new pull request #13217:
URL: https://github.com/apache/beam/pull/13217


   [BEAM-11092] Add bigquery io request count metric, implementing 
HarnessMonitoringInfos and process_wide metrics
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python

[GitHub] [beam] codecov[bot] edited a comment on pull request #12806: [BEAM-10869] Make WriteToPubsub output serialized PubsubMessage proto bytes when using runner v2

2020-10-28 Thread GitBox


codecov[bot] edited a comment on pull request #12806:
URL: https://github.com/apache/beam/pull/12806#issuecomment-692876818


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=h1) Report
   > Merging 
[#12806](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=desc) into 
[master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc)
 will **decrease** coverage by `42.05%`.
   > The diff coverage is `16.00%`.
   
   [![Impacted file tree 
graph](https://codecov.io/gh/apache/beam/pull/12806/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=tree)
   
   ```diff
   @@ Coverage Diff @@
   ##   master   #12806   +/-   ##
   ===
   - Coverage   82.48%   40.42%   -42.06% 
   ===
 Files 455  449-6 
 Lines   5487653539 -1337 
   ===
   - Hits4526621645-23621 
   - Misses   961031894+22284 
   ```
   
   
   | [Impacted 
Files](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=tree) | Coverage 
Δ | |
   |---|---|---|
   | 
[...on/apache\_beam/runners/dataflow/dataflow\_runner.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9kYXRhZmxvd19ydW5uZXIucHk=)
 | `19.08% <0.00%> (-57.85%)` | :arrow_down: |
   | 
[...python/apache\_beam/runners/direct/direct\_runner.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvZGlyZWN0X3J1bm5lci5weQ==)
 | `32.55% <11.11%> (-61.16%)` | :arrow_down: |
   | 
[sdks/python/apache\_beam/io/gcp/pubsub.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL3B1YnN1Yi5weQ==)
 | `40.78% <20.00%> (-51.53%)` | :arrow_down: |
   | 
[...python/apache\_beam/examples/complete/distribopt.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvY29tcGxldGUvZGlzdHJpYm9wdC5weQ==)
 | `0.00% <0.00%> (-98.59%)` | :arrow_down: |
   | 
[...dks/python/apache\_beam/transforms/create\_source.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jcmVhdGVfc291cmNlLnB5)
 | `0.00% <0.00%> (-98.19%)` | :arrow_down: |
   | 
[...on/apache\_beam/runners/direct/helper\_transforms.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvaGVscGVyX3RyYW5zZm9ybXMucHk=)
 | `0.00% <0.00%> (-98.15%)` | :arrow_down: |
   | 
[...e\_beam/runners/interactive/testing/mock\_ipython.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS90ZXN0aW5nL21vY2tfaXB5dGhvbi5weQ==)
 | `7.14% <0.00%> (-92.86%)` | :arrow_down: |
   | 
[.../examples/snippets/transforms/elementwise/pardo.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvdHJhbnNmb3Jtcy9lbGVtZW50d2lzZS9wYXJkby5weQ==)
 | `11.36% <0.00%> (-88.64%)` | :arrow_down: |
   | 
[sdks/python/apache\_beam/typehints/opcodes.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL29wY29kZXMucHk=)
 | `0.00% <0.00%> (-87.92%)` | :arrow_down: |
   | 
[...s/snippets/transforms/aggregation/combineperkey.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvdHJhbnNmb3Jtcy9hZ2dyZWdhdGlvbi9jb21iaW5lcGVya2V5LnB5)
 | `11.95% <0.00%> (-86.96%)` | :arrow_down: |
   | ... and [290 
more](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree-more) | |
   
   --
   
   [Continue to review full report at 
Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=continue).
   > **Legend** - [Click here to learn 
more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute  (impact)`, `ø = not affected`, `? = missing data`
   > Powered by 
[Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=footer). Last 
update 
[376d2a6...7a309c2](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=lastupdated).
 Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] y1chi commented on pull request #13216: Update beam-2.25.0.md

2020-10-28 Thread GitBox


y1chi commented on pull request #13216:
URL: https://github.com/apache/beam/pull/13216#issuecomment-718284437


   R: @robinyqiu 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] codecov[bot] edited a comment on pull request #12806: [BEAM-10869] Make WriteToPubsub output serialized PubsubMessage proto bytes when using runner v2

2020-10-28 Thread GitBox


codecov[bot] edited a comment on pull request #12806:
URL: https://github.com/apache/beam/pull/12806#issuecomment-692876818


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=h1) Report
   > Merging 
[#12806](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=desc) into 
[master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc)
 will **decrease** coverage by `42.05%`.
   > The diff coverage is `16.00%`.
   
   [![Impacted file tree 
graph](https://codecov.io/gh/apache/beam/pull/12806/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=tree)
   
   ```diff
   @@ Coverage Diff @@
   ##   master   #12806   +/-   ##
   ===
   - Coverage   82.48%   40.42%   -42.06% 
   ===
 Files 455  449-6 
 Lines   5487653539 -1337 
   ===
   - Hits4526621645-23621 
   - Misses   961031894+22284 
   ```
   
   
   | [Impacted 
Files](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=tree) | Coverage 
Δ | |
   |---|---|---|
   | 
[...on/apache\_beam/runners/dataflow/dataflow\_runner.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9kYXRhZmxvd19ydW5uZXIucHk=)
 | `19.08% <0.00%> (-57.85%)` | :arrow_down: |
   | 
[...python/apache\_beam/runners/direct/direct\_runner.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvZGlyZWN0X3J1bm5lci5weQ==)
 | `32.55% <11.11%> (-61.16%)` | :arrow_down: |
   | 
[sdks/python/apache\_beam/io/gcp/pubsub.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL3B1YnN1Yi5weQ==)
 | `40.78% <20.00%> (-51.53%)` | :arrow_down: |
   | 
[...python/apache\_beam/examples/complete/distribopt.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvY29tcGxldGUvZGlzdHJpYm9wdC5weQ==)
 | `0.00% <0.00%> (-98.59%)` | :arrow_down: |
   | 
[...dks/python/apache\_beam/transforms/create\_source.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jcmVhdGVfc291cmNlLnB5)
 | `0.00% <0.00%> (-98.19%)` | :arrow_down: |
   | 
[...on/apache\_beam/runners/direct/helper\_transforms.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvaGVscGVyX3RyYW5zZm9ybXMucHk=)
 | `0.00% <0.00%> (-98.15%)` | :arrow_down: |
   | 
[...e\_beam/runners/interactive/testing/mock\_ipython.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS90ZXN0aW5nL21vY2tfaXB5dGhvbi5weQ==)
 | `7.14% <0.00%> (-92.86%)` | :arrow_down: |
   | 
[.../examples/snippets/transforms/elementwise/pardo.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvdHJhbnNmb3Jtcy9lbGVtZW50d2lzZS9wYXJkby5weQ==)
 | `11.36% <0.00%> (-88.64%)` | :arrow_down: |
   | 
[sdks/python/apache\_beam/typehints/opcodes.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL29wY29kZXMucHk=)
 | `0.00% <0.00%> (-87.92%)` | :arrow_down: |
   | 
[...s/snippets/transforms/aggregation/combineperkey.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvdHJhbnNmb3Jtcy9hZ2dyZWdhdGlvbi9jb21iaW5lcGVya2V5LnB5)
 | `11.95% <0.00%> (-86.96%)` | :arrow_down: |
   | ... and [290 
more](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree-more) | |
   
   --
   
   [Continue to review full report at 
Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=continue).
   > **Legend** - [Click here to learn 
more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute  (impact)`, `ø = not affected`, `? = missing data`
   > Powered by 
[Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=footer). Last 
update 
[376d2a6...2d38a8b](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=lastupdated).
 Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] y1chi opened a new pull request #13216: Update beam-2.25.0.md

2020-10-28 Thread GitBox


y1chi opened a new pull request #13216:
URL: https://github.com/apache/beam/pull/13216


   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)[![Build
 
Status](https://

[GitHub] [beam] rohdesamuel opened a new pull request #13215: [BEAM-11151] Adds the ToStringFnRunner to Java

2020-10-28 Thread GitBox


rohdesamuel opened a new pull request #13215:
URL: https://github.com/apache/beam/pull/13215


   This adds the ToString implementation to the Java SDK Harness.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)[![Build
 
Status](

[GitHub] [beam] amaliujia commented on pull request #13094: [BEAM-5570] Update javacc dependency

2020-10-28 Thread GitBox


amaliujia commented on pull request #13094:
URL: https://github.com/apache/beam/pull/13094#issuecomment-718281226


   @iemejia do you have any other comment on this PR? 
   
   I am planning to merge this PR in early next week?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia edited a comment on pull request #13094: [BEAM-5570] Update javacc dependency

2020-10-28 Thread GitBox


amaliujia edited a comment on pull request #13094:
URL: https://github.com/apache/beam/pull/13094#issuecomment-718281226


   @iemejia do you have any other comment on this PR? 
   
   I am planning to merge this PR in early next week if there is no more 
comment.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] codecov[bot] edited a comment on pull request #12806: [BEAM-10869] Make WriteToPubsub output serialized PubsubMessage proto bytes when using runner v2

2020-10-28 Thread GitBox


codecov[bot] edited a comment on pull request #12806:
URL: https://github.com/apache/beam/pull/12806#issuecomment-692876818


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=h1) Report
   > Merging 
[#12806](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=desc) into 
[master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc)
 will **decrease** coverage by `42.05%`.
   > The diff coverage is `16.00%`.
   
   [![Impacted file tree 
graph](https://codecov.io/gh/apache/beam/pull/12806/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=tree)
   
   ```diff
   @@ Coverage Diff @@
   ##   master   #12806   +/-   ##
   ===
   - Coverage   82.48%   40.42%   -42.06% 
   ===
 Files 455  449-6 
 Lines   5487653539 -1337 
   ===
   - Hits4526621645-23621 
   - Misses   961031894+22284 
   ```
   
   
   | [Impacted 
Files](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=tree) | Coverage 
Δ | |
   |---|---|---|
   | 
[...on/apache\_beam/runners/dataflow/dataflow\_runner.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9kYXRhZmxvd19ydW5uZXIucHk=)
 | `19.08% <0.00%> (-57.85%)` | :arrow_down: |
   | 
[...python/apache\_beam/runners/direct/direct\_runner.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvZGlyZWN0X3J1bm5lci5weQ==)
 | `32.55% <11.11%> (-61.16%)` | :arrow_down: |
   | 
[sdks/python/apache\_beam/io/gcp/pubsub.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL3B1YnN1Yi5weQ==)
 | `40.78% <20.00%> (-51.53%)` | :arrow_down: |
   | 
[...python/apache\_beam/examples/complete/distribopt.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvY29tcGxldGUvZGlzdHJpYm9wdC5weQ==)
 | `0.00% <0.00%> (-98.59%)` | :arrow_down: |
   | 
[...dks/python/apache\_beam/transforms/create\_source.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jcmVhdGVfc291cmNlLnB5)
 | `0.00% <0.00%> (-98.19%)` | :arrow_down: |
   | 
[...on/apache\_beam/runners/direct/helper\_transforms.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvaGVscGVyX3RyYW5zZm9ybXMucHk=)
 | `0.00% <0.00%> (-98.15%)` | :arrow_down: |
   | 
[...e\_beam/runners/interactive/testing/mock\_ipython.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS90ZXN0aW5nL21vY2tfaXB5dGhvbi5weQ==)
 | `7.14% <0.00%> (-92.86%)` | :arrow_down: |
   | 
[.../examples/snippets/transforms/elementwise/pardo.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvdHJhbnNmb3Jtcy9lbGVtZW50d2lzZS9wYXJkby5weQ==)
 | `11.36% <0.00%> (-88.64%)` | :arrow_down: |
   | 
[sdks/python/apache\_beam/typehints/opcodes.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL29wY29kZXMucHk=)
 | `0.00% <0.00%> (-87.92%)` | :arrow_down: |
   | 
[...s/snippets/transforms/aggregation/combineperkey.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvdHJhbnNmb3Jtcy9hZ2dyZWdhdGlvbi9jb21iaW5lcGVya2V5LnB5)
 | `11.95% <0.00%> (-86.96%)` | :arrow_down: |
   | ... and [290 
more](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree-more) | |
   
   --
   
   [Continue to review full report at 
Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=continue).
   > **Legend** - [Click here to learn 
more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute  (impact)`, `ø = not affected`, `? = missing data`
   > Powered by 
[Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=footer). Last 
update 
[376d2a6...fe09155](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=lastupdated).
 Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn merged pull request #13193: [BEAM-7372][BEAM-8371][BEAM-9372] drop python 2.7 and 3.5 support from python container

2020-10-28 Thread GitBox


tvalentyn merged pull request #13193:
URL: https://github.com/apache/beam/pull/13193


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn commented on a change in pull request #13187: Updated shared.py comments

2020-10-28 Thread GitBox


tvalentyn commented on a change in pull request #13187:
URL: https://github.com/apache/beam/pull/13187#discussion_r513823318



##
File path: sdks/python/apache_beam/utils/shared.py
##
@@ -26,22 +26,31 @@
 
 To share a very large list across all threads of each worker in a DoFn::
 
-  class GetNthStringFn(beam.DoFn):
-def __init__(self, shared_handle):
-  self._shared_handle = shared_handle
-
-def process(self, element):
-  def initialize_list():
-# Build the giant initial list.
-return [str(i) for i in range(100)]
-
-  giant_list = self._shared_handle.acquire(initialize_list)
-  yield giant_list[element]
-
-  p = beam.Pipeline()
-  shared_handle = shared.Shared()
-  (p | beam.Create([2, 4, 6, 8])
- | beam.ParDo(GetNthStringFn(shared_handle)))
+# Several built-in types such as list and dict do not directly support weak

Review comment:
   Consider following wording for line 29 to prepare the reader for why 
weak references are discussed:
   
   Shared is a helper class for managing a single instance of an object shared 
by multiple threads within the same process. Instances of Shared are 
serializable objects that can be shared by all threads of each worker process. 
A Shared object encapsulates a weak reference to a singleton instance of the 
shared resource. The singleton is lazily initialized by calls to 
Shared.acquire().
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

2020-10-28 Thread GitBox


boyuanzz commented on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-718267176


   I manually tested this transform with Dataflow runner v2 and I can confirm 
that the transform works as expected. But it's not easy to have an E2E test 
since the transform doesn't output values and it interacts with the external 
system.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] yifanmai commented on a change in pull request #13177: Sort stages according to data edges as well as must-follows.

2020-10-28 Thread GitBox


yifanmai commented on a change in pull request #13177:
URL: https://github.com/apache/beam/pull/13177#discussion_r511169718



##
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##
@@ -1702,13 +1702,26 @@ def sort_stages(stages, pipeline_context):
   seen = set()  # type: Set[Stage]
   ordered = []
 
+  producers = {
+  pcoll: stage
+  for stage in all_stages for t in stage.transforms

Review comment:
   Do we need to traverse into sub-transforms to fetch their inputs, or are 
we guaranteed that all transforms are leaf transforms?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

2020-10-28 Thread GitBox


boyuanzz commented on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-718258631


   > > Yeah I also want to have some tests around this but it may not be 
possible to involve actual Kafka consumer to do so. The way I can come up is to 
use mock but I'm not sure whether it's feasible.
   > 
   > Can we add "SDF Read"-related tests to `KafkaIOIT` in this case?
   
   Sorry for the late. I was on something urgent. KafkaIOIt will use `SDF Read` 
automatically if the runner is using `beam_fn_api` and `use_sdf_kafka_read `. 
In the upcoming future, we will make `SDF Read` as default for Kafka.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #13201: Updating BigQuery client for Python

2020-10-28 Thread GitBox


pabloem commented on pull request #13201:
URL: https://github.com/apache/beam/pull/13201#issuecomment-718228953


   Run Python_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

2020-10-28 Thread GitBox


amaliujia commented on pull request #13200:
URL: https://github.com/apache/beam/pull/13200#issuecomment-718252195


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #13066: [BEAM-11052] Memoize to_pcollection

2020-10-28 Thread GitBox


TheNeuralBit commented on pull request #13066:
URL: https://github.com/apache/beam/pull/13066#issuecomment-718251075


   Rebased to resolve merge conflicts



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn commented on a change in pull request #13142: [BEAM-5939] - Deduplicate constants

2020-10-28 Thread GitBox


tvalentyn commented on a change in pull request #13142:
URL: https://github.com/apache/beam/pull/13142#discussion_r513802306



##
File path: sdks/python/apache_beam/runners/dataflow/internal/names.py
##
@@ -27,7 +27,13 @@
 # Standard file names used for staging files.
 from builtins import object
 
-DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar'
+# pylint: disable=unused-import
+from apache_beam.runners.internal.names import DATAFLOW_SDK_TARBALL_FILE
+from apache_beam.runners.internal.names import PICKLED_MAIN_SESSION_FILE

Review comment:
   I think we could avoid importing all of these except for 
PICKLED_MAIN_SESSION_FILE, and update all references in Beam codeabase to use 
apache_beam.runners.internal.names instead, similar what we do in apiclient.py.
   
   ```
   # pylint: disable=unused-import
   # Used by Dataflow legacy worker. 
   from apache_beam.runners.internal.names import PICKLED_MAIN_SESSION_FILE
   ```

##
File path: sdks/python/apache_beam/runners/internal/names.py
##
@@ -20,8 +20,11 @@
 # All constants are for internal use only; no backwards-compatibility
 # guarantees.
 
+DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar'

Review comment:
   Let's rename this to:

   `STAGED_SDK_SOURCES_FILENAME = 'dataflow_python_sdk.tar'  # Current value is 
hardcoded in Dataflow internal infrastructure; please don't change without a 
review from Dataflow maintainers.`
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn commented on pull request #13193: [BEAM-7372][BEAM-8371][BEAM-9372] drop python 2.7 and 3.5 support from python container

2020-10-28 Thread GitBox


tvalentyn commented on pull request #13193:
URL: https://github.com/apache/beam/pull/13193#issuecomment-718224258


   Run PythonDocker PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #13201: Updating BigQuery client for Python

2020-10-28 Thread GitBox


pabloem commented on pull request #13201:
URL: https://github.com/apache/beam/pull/13201#issuecomment-718237863


   Run Python_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] nehsyc commented on a change in pull request #13208: [BEAM-10703] Add an option to GroupIntoBatches to output ShardedKeys. Update Dataflow pipeline translation accordingly.

2020-10-28 Thread GitBox


nehsyc commented on a change in pull request #13208:
URL: https://github.com/apache/beam/pull/13208#discussion_r513774255



##
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##
@@ -103,43 +156,76 @@ public void process(ProcessContext c) {
 }
 
 @Override
-public PTransformReplacement>, PCollection>>>
+public PTransformReplacement>, 
PCollection, Iterable>>>
 getReplacementTransform(
 AppliedPTransform<
-PCollection>, PCollection>>, 
GroupIntoBatches>
+PCollection>,
+PCollection, Iterable>>,
+GroupIntoBatches.WithShardedKey>
 transform) {
   return PTransformReplacement.of(
   PTransformReplacements.getSingletonMainInput(transform),
-  new StreamingGroupIntoBatches(runner, transform.getTransform()));
+  new StreamingGroupIntoBatchesWithShardedKey<>(runner, 
transform.getTransform()));
 }
 
 @Override
 public Map, ReplacementOutput> mapOutputs(
-Map, PCollection> outputs, PCollection>> newOutput) {
+Map, PCollection> outputs,
+PCollection, Iterable>> newOutput) {
   return ReplacementOutputs.singleton(outputs, newOutput);
 }
   }
 
   /**
-   * Specialized implementation of {@link GroupIntoBatches} for unbounded 
Dataflow pipelines. The
-   * override does the same thing as the original transform but additionally 
record the input to add
-   * corresponding properties during the graph translation.
+   * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for 
unbounded Dataflow
+   * pipelines. The override does the same thing as the original transform but 
additionally records
+   * the input of {@code GroupIntoBatchesDoFn} in order to append relevant 
step properties during
+   * the graph translation.
*/
-  static class StreamingGroupIntoBatches
-  extends PTransform>, PCollection>>> {
+  static class StreamingGroupIntoBatchesWithShardedKey
+  extends PTransform>, PCollection, 
Iterable>>> {
 
 private final transient DataflowRunner runner;
-private final GroupIntoBatches original;
+private final GroupIntoBatches.WithShardedKey original;
 
-public StreamingGroupIntoBatches(DataflowRunner runner, 
GroupIntoBatches original) {
+public StreamingGroupIntoBatchesWithShardedKey(
+DataflowRunner runner, GroupIntoBatches.WithShardedKey original) 
{
   this.runner = runner;
   this.original = original;
 }
 
 @Override
-public PCollection>> expand(PCollection> input) 
{
-  runner.maybeRecordPCollectionWithAutoSharding(input);
-  return input.apply(original);
+public PCollection, Iterable>> 
expand(PCollection> input) {
+  PCollection, V>> intermediate_input = ShardKeys(input);
+
+  runner.maybeRecordPCollectionWithAutoSharding(intermediate_input);
+
+  if (original.getMaxBufferingDuration() != null) {

Review comment:
   We need to recognize the GroupIntoBatchesDoFn which is a private member 
of the transform. Here we record the input pcoll of GroupIntoBatchesDoFn, which 
would be the output of the key-sharding DoFn, so in the translation we can 
append autosharding properties for those steps whose input was recorded. This 
doesn't scale indeed. Any suggestions?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rohdesamuel opened a new pull request #13214: Adds the ToString well-known transform URN

2020-10-28 Thread GitBox


rohdesamuel opened a new pull request #13214:
URL: https://github.com/apache/beam/pull/13214


   This adds the ToString URN to the list of well-known transforms. the 
ToString transform allows for Runners to translate a given element into a 
human-readable string. This is useful for debugging, creating tools, or 
features on the runner side that generalize across SDKs that require element 
introspection.
   
   Design-doc: 
https://docs.google.com/document/d/1v7iWj0LIum04mYwRM_Cvze915tATwmEzLrqj_uVBkCE/edit#
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.

[GitHub] [beam] tvalentyn commented on pull request #13193: [BEAM-7372][BEAM-8371][BEAM-9372] drop python 2.7 and 3.5 support from python container

2020-10-28 Thread GitBox


tvalentyn commented on pull request #13193:
URL: https://github.com/apache/beam/pull/13193#issuecomment-718220319


   LGTM, thank you!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

2020-10-28 Thread GitBox


ibzib commented on pull request #13200:
URL: https://github.com/apache/beam/pull/13200#issuecomment-718234442


   @amaliujia Rules are fixed, PTAL



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #13139: [BEAM-9547] Implementation for drop, explode

2020-10-28 Thread GitBox


TheNeuralBit commented on a change in pull request #13139:
URL: https://github.com/apache/beam/pull/13139#discussion_r513799533



##
File path: sdks/python/apache_beam/dataframe/frames.py
##
@@ -619,21 +619,75 @@ def assign(self, **kwargs):
 "instances are supported.")
 return frame_base._elementwise_method('assign')(self, **kwargs)
 
-
   apply = frame_base.not_implemented_method('apply')
-  explode = frame_base.not_implemented_method('explode')
   isin = frame_base.not_implemented_method('isin')
   append = frame_base.not_implemented_method('append')
   combine = frame_base.not_implemented_method('combine')
   combine_first = frame_base.not_implemented_method('combine_first')
   count = frame_base.not_implemented_method('count')
-  drop = frame_base.not_implemented_method('drop')
   eval = frame_base.not_implemented_method('eval')
   reindex = frame_base.not_implemented_method('reindex')
   melt = frame_base.not_implemented_method('melt')
   pivot = frame_base.not_implemented_method('pivot')
   pivot_table = frame_base.not_implemented_method('pivot_table')
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def explode(self, column, ignore_index):
+# ignoring the index will not preserve it
+preserves = (partitionings.Nothing() if ignore_index
+ else partitionings.Singleton())
+return frame_base.DeferredFrame.wrap(
+expressions.ComputedExpression(
+'explode',
+lambda df: df.explode(column, ignore_index),
+[self._expr],
+preserves_partition_by=preserves,
+requires_partition_by=partitionings.Nothing()))
+
+
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  @frame_base.maybe_inplace
+  def drop(self, **kwargs):
+labels = kwargs.get('labels', None)

Review comment:
   Ah whoops. I switched it over to use args directly





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

2020-10-28 Thread GitBox


amaliujia commented on pull request #13200:
URL: https://github.com/apache/beam/pull/13200#issuecomment-718242175


   Overall LGTM. @kennknowles  do you have other comments except for the 
interface discussion?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] codecov[bot] edited a comment on pull request #12806: [BEAM-10869] Make WriteToPubsub output serialized PubsubMessage proto bytes when using runner v2

2020-10-28 Thread GitBox


codecov[bot] edited a comment on pull request #12806:
URL: https://github.com/apache/beam/pull/12806#issuecomment-692876818


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=h1) Report
   > Merging 
[#12806](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=desc) into 
[master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc)
 will **decrease** coverage by `42.05%`.
   > The diff coverage is `16.00%`.
   
   [![Impacted file tree 
graph](https://codecov.io/gh/apache/beam/pull/12806/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=tree)
   
   ```diff
   @@ Coverage Diff @@
   ##   master   #12806   +/-   ##
   ===
   - Coverage   82.48%   40.42%   -42.06% 
   ===
 Files 455  449-6 
 Lines   5487653539 -1337 
   ===
   - Hits4526621645-23621 
   - Misses   961031894+22284 
   ```
   
   
   | [Impacted 
Files](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=tree) | Coverage 
Δ | |
   |---|---|---|
   | 
[...on/apache\_beam/runners/dataflow/dataflow\_runner.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9kYXRhZmxvd19ydW5uZXIucHk=)
 | `19.08% <0.00%> (-57.85%)` | :arrow_down: |
   | 
[...python/apache\_beam/runners/direct/direct\_runner.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvZGlyZWN0X3J1bm5lci5weQ==)
 | `32.55% <11.11%> (-61.16%)` | :arrow_down: |
   | 
[sdks/python/apache\_beam/io/gcp/pubsub.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL3B1YnN1Yi5weQ==)
 | `40.78% <20.00%> (-51.53%)` | :arrow_down: |
   | 
[...python/apache\_beam/examples/complete/distribopt.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvY29tcGxldGUvZGlzdHJpYm9wdC5weQ==)
 | `0.00% <0.00%> (-98.59%)` | :arrow_down: |
   | 
[...dks/python/apache\_beam/transforms/create\_source.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jcmVhdGVfc291cmNlLnB5)
 | `0.00% <0.00%> (-98.19%)` | :arrow_down: |
   | 
[...on/apache\_beam/runners/direct/helper\_transforms.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvaGVscGVyX3RyYW5zZm9ybXMucHk=)
 | `0.00% <0.00%> (-98.15%)` | :arrow_down: |
   | 
[...e\_beam/runners/interactive/testing/mock\_ipython.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS90ZXN0aW5nL21vY2tfaXB5dGhvbi5weQ==)
 | `7.14% <0.00%> (-92.86%)` | :arrow_down: |
   | 
[.../examples/snippets/transforms/elementwise/pardo.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvdHJhbnNmb3Jtcy9lbGVtZW50d2lzZS9wYXJkby5weQ==)
 | `11.36% <0.00%> (-88.64%)` | :arrow_down: |
   | 
[sdks/python/apache\_beam/typehints/opcodes.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL29wY29kZXMucHk=)
 | `0.00% <0.00%> (-87.92%)` | :arrow_down: |
   | 
[...s/snippets/transforms/aggregation/combineperkey.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvdHJhbnNmb3Jtcy9hZ2dyZWdhdGlvbi9jb21iaW5lcGVya2V5LnB5)
 | `11.95% <0.00%> (-86.96%)` | :arrow_down: |
   | ... and [290 
more](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree-more) | |
   
   --
   
   [Continue to review full report at 
Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=continue).
   > **Legend** - [Click here to learn 
more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute  (impact)`, `ø = not affected`, `? = missing data`
   > Powered by 
[Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=footer). Last 
update 
[376d2a6...e4095ad](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=lastupdated).
 Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] nehsyc commented on a change in pull request #13208: [BEAM-10703] Add an option to GroupIntoBatches to output ShardedKeys. Update Dataflow pipeline translation accordingly.

2020-10-28 Thread GitBox


nehsyc commented on a change in pull request #13208:
URL: https://github.com/apache/beam/pull/13208#discussion_r513784255



##
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##
@@ -92,9 +96,58 @@ public void process(ProcessContext c) {
 }
   }
 
+  static class BatchGroupIntoBatchesWithShardedKeyOverrideFactory

Review comment:
   Two things that made me add the override for `withShardedKeys`
   - We insert an explicit GBK for batch stateful dofns (although I feel this 
should be done in the backend).
   
   
https://github.com/apache/beam/blob/562041956efeae2a186b1d815ea6bcd7d54682ae/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java#L228
   
   - Currently the batch GIB doesn't use keyed states but instead partitions 
the iterables after GBK:
   
   
https://github.com/apache/beam/blob/562041956efeae2a186b1d815ea6bcd7d54682ae/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java#L84





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

2020-10-28 Thread GitBox


amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r513761419



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Provider for user-defined functions written in Java. Implementations should 
be annotated with
+ * {@link com.google.auto.service.AutoService}.
+ */
+public interface UdfProvider {
+  /** Maps function names to scalar function implementations. */
+  default Map userDefinedScalarFunctions() {

Review comment:
   Per our offline chat, we can continue discussing what a good interface 
should be and this PR should not be blocked. We will mark this interface as 
experimental and refine when necessary.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz merged pull request #13144: [BEAM-10475] Add max buffering duration option for GroupIntoBatches transform in Python

2020-10-28 Thread GitBox


boyuanzz merged pull request #13144:
URL: https://github.com/apache/beam/pull/13144


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on pull request #13141: [BEAM-9547] Dataframe corrwith.

2020-10-28 Thread GitBox


robertwb commented on pull request #13141:
URL: https://github.com/apache/beam/pull/13141#issuecomment-718200546


   Run PythonLint PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rohdesamuel commented on pull request #13210: [BEAM-10994] Update hot key detection log message

2020-10-28 Thread GitBox


rohdesamuel commented on pull request #13210:
URL: https://github.com/apache/beam/pull/13210#issuecomment-718194658


   R: @pabloem 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] udim merged pull request #13213: [INFRA-20858] Update JDK name to match Jenkins.

2020-10-28 Thread GitBox


udim merged pull request #13213:
URL: https://github.com/apache/beam/pull/13213


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rohdesamuel commented on a change in pull request #13210: [BEAM-10994] Update hot key detection log message

2020-10-28 Thread GitBox


rohdesamuel commented on a change in pull request #13210:
URL: https://github.com/apache/beam/pull/13210#discussion_r513741622



##
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java
##
@@ -52,7 +52,9 @@ public void logHotKeyDetection(String userStepName, Duration 
hotKeyAge) {
 LOG.warn(
 "A hot key was detected in step '{}' with age of '{}'. This is "
 + "a symptom of key distribution being skewed. To fix, please 
inspect your data and "
-+ "pipeline to ensure that elements are evenly distributed across 
your key space.",
++ "pipeline to ensure that elements are evenly distributed across 
your key space. If "
++ "you want to log the plain-text key to Cloud Logging please 
re-run with the "
++ "`hotKeyLoggingEnabled` pipeline option.",

Review comment:
   Sure, I added a link to the Cloud docs about specifying pipeline 
options. This generalizes across different languages and is a good how-to guide.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] udim commented on pull request #13213: [INFRA-20858] Update JDK name to match Jenkins.

2020-10-28 Thread GitBox


udim commented on pull request #13213:
URL: https://github.com/apache/beam/pull/13213#issuecomment-718184130


   I believe this is a job setting, so the seed job needs to run to update it



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] udim commented on pull request #13213: [INFRA-20858] Update JDK name to match Jenkins.

2020-10-28 Thread GitBox


udim commented on pull request #13213:
URL: https://github.com/apache/beam/pull/13213#issuecomment-718183902


   run seed job



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] reuvenlax commented on pull request #12915: [BEAM-7386] Introduce temporal inner join.

2020-10-28 Thread GitBox


reuvenlax commented on pull request #12915:
URL: https://github.com/apache/beam/pull/12915#issuecomment-718179094


   Sorry for the delay.
   
   AFAIK both this and the schema library are limited today to equijoins. The 
schema API is designed so that we can extend it later with non equijoins, 
however doing arbitrary join conditions in a distributed manner can be a hard 
problem.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tysonjh commented on pull request #13213: [INFRA-20858] Update JDK name to match Jenkins.

2020-10-28 Thread GitBox


tysonjh commented on pull request #13213:
URL: https://github.com/apache/beam/pull/13213#issuecomment-718174027


   R: @udim 
   
   The change didn't apply to the precommit run, it still shows the No JDK 
named ‘JDK 1.8 (latest)’ found error. Maybe it needs to be committed to take 
effect?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #13139: [BEAM-9547] Implementation for drop, explode

2020-10-28 Thread GitBox


TheNeuralBit commented on a change in pull request #13139:
URL: https://github.com/apache/beam/pull/13139#discussion_r513711027



##
File path: sdks/python/apache_beam/dataframe/frames.py
##
@@ -1513,6 +1584,8 @@ def repeat(self, repeats):
   'repeat',
   lambda series: series.str.repeat(repeats),
   [self._expr],
+  # Output will also be a str Series

Review comment:
   Yeah when I added the verification of the proxy in `TransformTest` I 
found that this proxy was incorrectly inferred as `bool` for the zipping case 
tested there.
   
   ```py
   In [10]: proxy.dtypes
   Out[10]: 
   strobject
   repeats int64
   dtype: object
   
   In [11]: proxy.str.str.repeat(proxy.repeats)
   Out[11]: Series([], Name: str, dtype: bool)
   ```
   
   The actual operation does produce `object` though:
   ```py
   In [13]: df.str.str.repeat(df.repeats)
   Out[13]: 
   0  AAA
   1B
   2 
   3D
   4   EE
   Name: str, dtype: object
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #13139: [BEAM-9547] Implementation for drop, explode

2020-10-28 Thread GitBox


TheNeuralBit commented on a change in pull request #13139:
URL: https://github.com/apache/beam/pull/13139#discussion_r513711378



##
File path: sdks/python/apache_beam/dataframe/frames.py
##
@@ -1513,6 +1584,8 @@ def repeat(self, repeats):
   'repeat',
   lambda series: series.str.repeat(repeats),
   [self._expr],
+  # Output will also be a str Series

Review comment:
   Maybe something we should fix upstream and patch around here in the 
meantime?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12780: [BEAM-5504] Add Avro support to Pubsub table provider

2020-10-28 Thread GitBox


TheNeuralBit commented on a change in pull request #12780:
URL: https://github.com/apache/beam/pull/12780#discussion_r513699837



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RowToPubsubMessage.java
##
@@ -57,14 +73,43 @@ public static RowToPubsubMessage 
withTimestampAttribute(boolean useTimestampAttr
 ? input.apply(WithTimestamps.of((row) -> 
row.getDateTime(TIMESTAMP_FIELD).toInstant()))
 : input;
 
-return withTimestamp
-.apply(DropFields.fields(TIMESTAMP_FIELD))
-.apply(ToJson.of())
-.apply(
-MapElements.into(TypeDescriptor.of(PubsubMessage.class))
-.via(
-(String json) ->
-new PubsubMessage(
-json.getBytes(StandardCharsets.ISO_8859_1), 
ImmutableMap.of(;
+withTimestamp = withTimestamp.apply(DropFields.fields(TIMESTAMP_FIELD));
+switch (payloadFormat) {
+  case JSON:
+return withTimestamp
+.apply("MapRowToJsonString", ToJson.of())
+.apply("MapToJsonBytes", MapElements.via(new StringToBytes()))
+.apply("MapToPubsubMessage", MapElements.via(new 
ToPubsubMessage()));
+  case AVRO:
+return withTimestamp
+.apply(
+"MapRowToAvroBytes",
+
MapElements.via(AvroUtils.getRowToAvroBytesFunction(payloadSchema)))
+.apply("MapToPubsubMessage", MapElements.via(new 
ToPubsubMessage()));
+  default:
+throw new IllegalArgumentException("Unsupported payload format: " + 
payloadFormat);
+}
+  }
+
+  private static class StringToBytes extends SimpleFunction {
+@Override
+public byte[] apply(String s) {
+  return s.getBytes(ISO_8859_1);

Review comment:
   I'm not sure why I opted to get ISO_8859_1 encoded bytes here... Could 
you change this to use UTF_8?

##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageToRow.java
##
@@ -175,11 +213,21 @@ public void processElement(
 
 private final boolean useDlq;
 
+private final PayloadFormat payloadFormat;
+
 private transient volatile @Nullable ObjectMapper objectMapper;
 
-protected NestedSchemaPubsubMessageToRow(Schema messageSchema, boolean 
useDlq) {
+private final SimpleFunction avroBytesToRowFn;
+
+private final Schema payloadSchema;
+
+protected NestedSchemaPubsubMessageToRow(
+Schema messageSchema, boolean useDlq, PayloadFormat payloadFormat) {
   this.messageSchema = messageSchema;
   this.useDlq = useDlq;
+  this.payloadFormat = payloadFormat;
+  this.payloadSchema = 
messageSchema.getField(PAYLOAD_FIELD).getType().getRowSchema();
+  this.avroBytesToRowFn = 
AvroUtils.getAvroBytesToRowFunction(payloadSchema);

Review comment:
   Instead of eagerly generating the avroBytesToRowFn (even if we won't 
need it) and then branching on the payloadFormat for every element  in 
parsePayload, we should instead generate a parsePayload function when it's 
needed. This could work similarly to what the current version does 
with`objectMapper`, except we would create and store a `Function`. In the JSON case we'd generate the ObjectMapper and wrap that in a 
function, and in the Avro case we'd call getAvroBytesToRowFunction and wrap it.

##
File path: 
website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
##
@@ -251,6 +253,8 @@ TBLPROPERTIES '{"timestampAttributeKey": "key", 
"deadLetterQueue": "projects/[PR
 *   `deadLetterQueue`: The topic into which messages are written if the
 payload was not parsed. If not specified, an exception is thrown for
 parsing failures.
+*   `format`: Optional. Allows you to specify the Pubsub payload format.
+Possible values are {`json`, `avro`}. Defaults to `json`.

Review comment:
   :+1: thanks

##
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubAvroIT.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+

[GitHub] [beam] youngoli commented on a change in pull request #13188: [BEAM-11108] Add a version of TextIO implemented via SDF.

2020-10-28 Thread GitBox


youngoli commented on a change in pull request #13188:
URL: https://github.com/apache/beam/pull/13188#discussion_r513702465



##
File path: sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go
##
@@ -63,10 +63,87 @@ func TestRestriction_EvenSplits(t *testing.T) {
t.Errorf("split restriction [%v, %v] 
has unexpected size. got: %v, want: %v or %v",
split.Start, split.End, size, 
min, min+1)
}
-   // Check: All elements are still in a split 
restrictions. This
-   // logic assumes that the splits are returned 
in order which
-   // isn't guaranteed by EvenSplits, but this 
check is way easier
-   // with the assumption.
+   // Check: All elements are still in a split 
restriction and
+   // the restrictions are in the appropriate 
ascending order.
+   if split.Start != prevEnd {
+   t.Errorf("restriction range [%v, %v] 
missing after splits.",
+   prevEnd, split.Start)
+   } else {
+   prevEnd = split.End
+   }
+   }
+   if prevEnd != r.End {
+   t.Errorf("restriction range [%v, %v] missing 
after splits.",
+   prevEnd, r.End)
+   }
+   })
+   }
+}
+
+// TestRestriction_SizedSplits tests various splits and checks that they all
+// follow the contract for SizedSplits. This means that all restrictions match
+// the given size unless it is a remainder, and that each element is present
+// in the split restrictions.
+func TestRestriction_SizedSplits(t *testing.T) {
+   tests := []struct {
+   rest Restriction
+   size int64
+   want []Restriction
+   }{
+   {
+   rest: Restriction{Start: 0, End: 11},
+   size: 5,
+   want: []Restriction{{0, 5}, {5, 10}, {10, 11}},
+   },
+   {

Review comment:
   Yeah, that seems like a good idea. I'll add one more case before merging.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

2020-10-28 Thread GitBox


robertwb commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r513689796



##
File path: sdks/python/apache_beam/transforms/stats.py
##
@@ -368,82 +383,129 @@ class PerKey(PTransform):
   weighted: (optional) if set to True, the transform returns weighted
 quantiles. The input PCollection is then expected to contain tuples of
 input values with the corresponding weight.
+  batch_input: (optional) if set to True, the transform expects each 
element
+of input PCollection to be a batch. Provides a way to accumulate
+multiple elements at a time more efficiently.
 """
-def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+def __init__(
+self,
+num_quantiles,
+key=None,
+reverse=False,
+weighted=False,
+batch_input=False):
   self._num_quantiles = num_quantiles
   self._key = key
   self._reverse = reverse
   self._weighted = weighted
+  self._batch_input = batch_input
 
 def expand(self, pcoll):
   return pcoll | CombinePerKey(
   ApproximateQuantilesCombineFn.create(
   num_quantiles=self._num_quantiles,
   key=self._key,
   reverse=self._reverse,
-  weighted=self._weighted))
+  weighted=self._weighted,
+  batch_input=self._batch_input))
 
 def display_data(self):
   return ApproximateQuantiles._display_data(
   num_quantiles=self._num_quantiles,
   key=self._key,
   reverse=self._reverse,
-  weighted=self._weighted)
+  weighted=self._weighted,
+  batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+# type: (int, int, bool, Any, bool) -> None
+self.buffer_size = buffer_size
+self.num_buffers = num_buffers
+self.weighted = weighted
+self.key = key
+self.reverse = reverse
+
+# Used to sort tuples of values and weights.
+self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+# Used to compare values.
+if key is None and not reverse:

Review comment:
   Nit: it'd be easier to read `if reverse and key is None` rather than 
having the extra negation in there. 

##
File path: sdks/python/apache_beam/transforms/stats.py
##
@@ -61,30 +58,34 @@
 K = typing.TypeVar('K')
 V = typing.TypeVar('V')
 
+try:
+  import mmh3  # pylint: disable=import-error
 
-def _get_default_hash_fn():
-  """Returns either murmurhash or md5 based on installation."""
-  try:
-import mmh3  # pylint: disable=import-error
+  def _mmh3_hash(value):
+# mmh3.hash64 returns two 64-bit unsigned integers
+return mmh3.hash64(value, seed=0, signed=False)[0]
+
+  _default_hash_fn = _mmh3_hash
+  _default_hash_fn_type = 'mmh3'
+except ImportError:
 
-def _mmh3_hash(value):
-  # mmh3.hash64 returns two 64-bit unsigned integers
-  return mmh3.hash64(value, seed=0, signed=False)[0]
+  def _md5_hash(value):
+# md5 is a 128-bit hash, so we truncate the hexdigest (string of 32
+# hexadecimal digits) to 16 digits and convert to int to get the 64-bit
+# integer fingerprint.
+return int(hashlib.md5(value).hexdigest()[:16], 16)
 
-return _mmh3_hash
+  _default_hash_fn = _md5_hash
+  _default_hash_fn_type = 'md5'
 
-  except ImportError:
+
+def _get_default_hash_fn():
+  """Returns either murmurhash or md5 based on installation."""
+  if _default_hash_fn_type == 'md5':
 logging.warning(
 'Couldn\'t find murmurhash. Install mmh3 for a faster implementation 
of'

Review comment:
   Are there downsides to just making this a dependency? 

##
File path: sdks/python/apache_beam/transforms/stats.py
##
@@ -368,82 +383,129 @@ class PerKey(PTransform):
   weighted: (optional) if set to True, the transform returns weighted
 quantiles. The input PCollection is then expected to contain tuples of
 input values with the corresponding weight.
+  batch_input: (optional) if set to True, the transform expects each 
element
+of input PCollection to be a batch. Provides a way to accumulate
+multiple elements at a time more efficiently.
 """
-def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+def __init__(
+self,
+num_quantiles,
+key=None,
+reverse=False,
+weighted=False,
+batch_input=False):
   self._num_quantiles = num_quantiles
   self._key = key
   self._reverse = reverse
   self._weighted = weighted
+  self._batch_input = batch_input
 
 def expand(self, pcoll):
   return pcoll | CombinePerKey(
   ApproximateQuantilesCombineFn.create(
   num_quantiles=self._num_quantiles,
   

[GitHub] [beam] robertwb commented on a change in pull request #13139: [BEAM-9547] Implementation for drop, explode

2020-10-28 Thread GitBox


robertwb commented on a change in pull request #13139:
URL: https://github.com/apache/beam/pull/13139#discussion_r513682982



##
File path: sdks/python/apache_beam/dataframe/frames.py
##
@@ -619,21 +619,75 @@ def assign(self, **kwargs):
 "instances are supported.")
 return frame_base._elementwise_method('assign')(self, **kwargs)
 
-
   apply = frame_base.not_implemented_method('apply')
-  explode = frame_base.not_implemented_method('explode')
   isin = frame_base.not_implemented_method('isin')
   append = frame_base.not_implemented_method('append')
   combine = frame_base.not_implemented_method('combine')
   combine_first = frame_base.not_implemented_method('combine_first')
   count = frame_base.not_implemented_method('count')
-  drop = frame_base.not_implemented_method('drop')
   eval = frame_base.not_implemented_method('eval')
   reindex = frame_base.not_implemented_method('reindex')
   melt = frame_base.not_implemented_method('melt')
   pivot = frame_base.not_implemented_method('pivot')
   pivot_table = frame_base.not_implemented_method('pivot_table')
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def explode(self, column, ignore_index):
+# ignoring the index will not preserve it
+preserves = (partitionings.Nothing() if ignore_index
+ else partitionings.Singleton())
+return frame_base.DeferredFrame.wrap(
+expressions.ComputedExpression(
+'explode',
+lambda df: df.explode(column, ignore_index),
+[self._expr],
+preserves_partition_by=preserves,
+requires_partition_by=partitionings.Nothing()))
+
+
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  @frame_base.maybe_inplace
+  def drop(self, **kwargs):
+labels = kwargs.get('labels', None)

Review comment:
   One danger here of using kwargs.get rather than letting it be a 
parameter is that you're hard-coding what all the defaults are (rather than 
using populate_defaults). 

##
File path: sdks/python/apache_beam/dataframe/frames.py
##
@@ -1513,6 +1584,8 @@ def repeat(self, repeats):
   'repeat',
   lambda series: series.str.repeat(repeats),
   [self._expr],
+  # Output will also be a str Series

Review comment:
   Is there a drawback to this being automatically inferred? (Or was it 
not?)
   
   Same below.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

2020-10-28 Thread GitBox


boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-718139268


   Run Java Flink PortableValidatesRunner Streaming



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #12960: [BEAM-9804] Allow user configuration of BigQuery temporary dataset

2020-10-28 Thread GitBox


pabloem commented on pull request #12960:
URL: https://github.com/apache/beam/pull/12960#issuecomment-718135801


   Run Python_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on a change in pull request #13208: [BEAM-10703] Add an option to GroupIntoBatches to output ShardedKeys. Update Dataflow pipeline translation accordingly.

2020-10-28 Thread GitBox


robertwb commented on a change in pull request #13208:
URL: https://github.com/apache/beam/pull/13208#discussion_r513671483



##
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##
@@ -103,43 +156,76 @@ public void process(ProcessContext c) {
 }
 
 @Override
-public PTransformReplacement>, PCollection>>>
+public PTransformReplacement>, 
PCollection, Iterable>>>
 getReplacementTransform(
 AppliedPTransform<
-PCollection>, PCollection>>, 
GroupIntoBatches>
+PCollection>,
+PCollection, Iterable>>,
+GroupIntoBatches.WithShardedKey>
 transform) {
   return PTransformReplacement.of(
   PTransformReplacements.getSingletonMainInput(transform),
-  new StreamingGroupIntoBatches(runner, transform.getTransform()));
+  new StreamingGroupIntoBatchesWithShardedKey<>(runner, 
transform.getTransform()));
 }
 
 @Override
 public Map, ReplacementOutput> mapOutputs(
-Map, PCollection> outputs, PCollection>> newOutput) {
+Map, PCollection> outputs,
+PCollection, Iterable>> newOutput) {
   return ReplacementOutputs.singleton(outputs, newOutput);
 }
   }
 
   /**
-   * Specialized implementation of {@link GroupIntoBatches} for unbounded 
Dataflow pipelines. The
-   * override does the same thing as the original transform but additionally 
record the input to add
-   * corresponding properties during the graph translation.
+   * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for 
unbounded Dataflow
+   * pipelines. The override does the same thing as the original transform but 
additionally records
+   * the input of {@code GroupIntoBatchesDoFn} in order to append relevant 
step properties during
+   * the graph translation.
*/
-  static class StreamingGroupIntoBatches
-  extends PTransform>, PCollection>>> {
+  static class StreamingGroupIntoBatchesWithShardedKey
+  extends PTransform>, PCollection, 
Iterable>>> {
 
 private final transient DataflowRunner runner;
-private final GroupIntoBatches original;
+private final GroupIntoBatches.WithShardedKey original;
 
-public StreamingGroupIntoBatches(DataflowRunner runner, 
GroupIntoBatches original) {
+public StreamingGroupIntoBatchesWithShardedKey(
+DataflowRunner runner, GroupIntoBatches.WithShardedKey original) 
{
   this.runner = runner;
   this.original = original;
 }
 
 @Override
-public PCollection>> expand(PCollection> input) 
{
-  runner.maybeRecordPCollectionWithAutoSharding(input);
-  return input.apply(original);
+public PCollection, Iterable>> 
expand(PCollection> input) {
+  PCollection, V>> intermediate_input = ShardKeys(input);
+
+  runner.maybeRecordPCollectionWithAutoSharding(intermediate_input);
+
+  if (original.getMaxBufferingDuration() != null) {

Review comment:
   This doesn't look like it'll scale if more options are used. Why not 
just apply original? 

##
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##
@@ -103,43 +156,76 @@ public void process(ProcessContext c) {
 }
 
 @Override
-public PTransformReplacement>, PCollection>>>
+public PTransformReplacement>, 
PCollection, Iterable>>>
 getReplacementTransform(
 AppliedPTransform<
-PCollection>, PCollection>>, 
GroupIntoBatches>
+PCollection>,
+PCollection, Iterable>>,
+GroupIntoBatches.WithShardedKey>
 transform) {
   return PTransformReplacement.of(
   PTransformReplacements.getSingletonMainInput(transform),
-  new StreamingGroupIntoBatches(runner, transform.getTransform()));
+  new StreamingGroupIntoBatchesWithShardedKey<>(runner, 
transform.getTransform()));
 }
 
 @Override
 public Map, ReplacementOutput> mapOutputs(
-Map, PCollection> outputs, PCollection>> newOutput) {
+Map, PCollection> outputs,
+PCollection, Iterable>> newOutput) {
   return ReplacementOutputs.singleton(outputs, newOutput);
 }
   }
 
   /**
-   * Specialized implementation of {@link GroupIntoBatches} for unbounded 
Dataflow pipelines. The
-   * override does the same thing as the original transform but additionally 
record the input to add
-   * corresponding properties during the graph translation.
+   * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for 
unbounded Dataflow
+   * pipelines. The override does the same thing as the original transform but 
additionally records
+   * the input of {@code GroupIntoBatchesDoFn} in order to append relevant 
step properties during
+   * the graph translation.
*/
-  static 

[GitHub] [beam] tysonjh commented on pull request #13213: [INFRA-20858] Update JDK name to match Jenkins.

2020-10-28 Thread GitBox


tysonjh commented on pull request #13213:
URL: https://github.com/apache/beam/pull/13213#issuecomment-718134478


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #13188: [BEAM-11108] Add a version of TextIO implemented via SDF.

2020-10-28 Thread GitBox


lostluck commented on a change in pull request #13188:
URL: https://github.com/apache/beam/pull/13188#discussion_r513678887



##
File path: sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go
##
@@ -63,10 +63,87 @@ func TestRestriction_EvenSplits(t *testing.T) {
t.Errorf("split restriction [%v, %v] 
has unexpected size. got: %v, want: %v or %v",
split.Start, split.End, size, 
min, min+1)
}
-   // Check: All elements are still in a split 
restrictions. This
-   // logic assumes that the splits are returned 
in order which
-   // isn't guaranteed by EvenSplits, but this 
check is way easier
-   // with the assumption.
+   // Check: All elements are still in a split 
restriction and
+   // the restrictions are in the appropriate 
ascending order.
+   if split.Start != prevEnd {
+   t.Errorf("restriction range [%v, %v] 
missing after splits.",
+   prevEnd, split.Start)
+   } else {
+   prevEnd = split.End
+   }
+   }
+   if prevEnd != r.End {
+   t.Errorf("restriction range [%v, %v] missing 
after splits.",
+   prevEnd, r.End)
+   }
+   })
+   }
+}
+
+// TestRestriction_SizedSplits tests various splits and checks that they all
+// follow the contract for SizedSplits. This means that all restrictions match
+// the given size unless it is a remainder, and that each element is present
+// in the split restrictions.
+func TestRestriction_SizedSplits(t *testing.T) {
+   tests := []struct {
+   rest Restriction
+   size int64
+   want []Restriction
+   }{
+   {
+   rest: Restriction{Start: 0, End: 11},
+   size: 5,
+   want: []Restriction{{0, 5}, {5, 10}, {10, 11}},
+   },
+   {

Review comment:
   Consider adding an exact case too eg.
   Start 7, End 17
   size 5
   {7, 12}, {12, 17}
   IIUC the implementation correctly.
   If I understand the 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kennknowles commented on pull request #13134: [BEAM-10402] Enable checkerframework globally

2020-10-28 Thread GitBox


kennknowles commented on pull request #13134:
URL: https://github.com/apache/beam/pull/13134#issuecomment-718132606


   Run Portable_Python PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tysonjh opened a new pull request #13213: [INFRA-20858] Update JDK name to match Jenkins.

2020-10-28 Thread GitBox


tysonjh opened a new pull request #13213:
URL: https://github.com/apache/beam/pull/13213


   See:
https://issues.apache.org/jira/browse/INFRA-20858

https://lists.apache.org/thread.html/rb4c2834b9874b9f4a74c528de9055958483d2bc6e62c3464bc5c053f%40%3Cbuilds.apache.org%3E
   
   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 
Statu

[GitHub] [beam] pabloem commented on pull request #13154: Implementing Python Bounded Source Reader DoFn

2020-10-28 Thread GitBox


pabloem commented on pull request #13154:
URL: https://github.com/apache/beam/pull/13154#issuecomment-718126946


   Run Python 3.8 PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #13154: Implementing Python Bounded Source Reader DoFn

2020-10-28 Thread GitBox


pabloem commented on pull request #13154:
URL: https://github.com/apache/beam/pull/13154#issuecomment-718126843


   > I'm curious do we have a plan to build actual SDF for BQ instead of still 
relying on BoundedSource implementation?
   
   In this case, we will have a simple DoFn that _starts_ the read from BQ, but 
it eventually returns multiple Avro file sources that can be read individually. 
This is different from what we had before, where all of the BQ reading logic 
was part of a BoundedSource. In fact, the _CustomBigQuerySource will be removed 
eventually.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on a change in pull request #13154: Implementing Python Bounded Source Reader DoFn

2020-10-28 Thread GitBox


pabloem commented on a change in pull request #13154:
URL: https://github.com/apache/beam/pull/13154#discussion_r513670976



##
File path: sdks/python/apache_beam/io/iobase.py
##
@@ -1618,3 +1628,48 @@ def display_data(self):
 'source': DisplayDataItem(self.source.__class__, label='Read Source'),
 'source_dd': self.source
 }
+
+
+class SDFBoundedSourceReader(PTransform):

Review comment:
   I've done this - but I've still allowed the source to come in via the 
constructor as well as as an input. The intention of doing this is to keep the 
display data for simple Read transforms where the source is known at 
construction time.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] yifanmai commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

2020-10-28 Thread GitBox


yifanmai commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r513623504



##
File path: CHANGES.md
##
@@ -62,6 +62,7 @@
 
 ## New Features / Improvements
 * Added support for avro payload format in Beam SQL Kafka Table 
([BEAM-10885](https://issues.apache.org/jira/browse/BEAM-10885))
+* Added CombineFn.setup and CombineFn.teardown to Python SDK. These methods 
let you initialize a state before any of the other methods of the CombineFn is 
executed and clean that state up later on. 
([BEAM-3736](https://issues.apache.org/jira/browse/BEAM-3736))

Review comment:
   nit: 'a state' -> 'the CombineFn's state'

##
File path: sdks/python/apache_beam/transforms/core.py
##
@@ -1975,10 +1990,14 @@ def add_input_types(transform):
   return combined
 
 if self.has_defaults:
-  combine_fn = (
-  self.fn if isinstance(self.fn, CombineFn) else
-  CombineFn.from_callable(self.fn))
-  default_value = combine_fn.apply([], *self.args, **self.kwargs)
+  combine_fn = copy.deepcopy(
+  self.fn if isinstance(self.fn, CombineFn) else CombineFn.

Review comment:
   nit: this can be `copy.deepcopy(self.fn) if...` i.e. copy is only needed 
in the first branch

##
File path: sdks/python/apache_beam/transforms/core.py
##
@@ -877,17 +877,19 @@ class CombineFn(WithTypeHints, HasDisplayData, 
urns.RunnerApiFn):
   combining process proceeds as follows:
 
   1. Input values are partitioned into one or more batches.
-  2. For each batch, the create_accumulator method is invoked to create a fresh
+  2. For each batch, the setup method is invoked.
+  3. For each batch, the create_accumulator method is invoked to create a fresh
  initial "accumulator" value representing the combination of zero values.
-  3. For each input value in the batch, the add_input method is invoked to
+  4. For each input value in the batch, the add_input method is invoked to
  combine more values with the accumulator for that batch.
-  4. The merge_accumulators method is invoked to combine accumulators from
+  5. The merge_accumulators method is invoked to combine accumulators from
  separate batches into a single combined output accumulator value, once all
  of the accumulators have had all the input value in their batches added to
  them. This operation is invoked repeatedly, until there is only one
  accumulator value left.
-  5. The extract_output operation is invoked on the final accumulator to get
+  6. The extract_output operation is invoked on the final accumulator to get
  the output value.
+  7. The teardown method is invoked.

Review comment:
   Question: What is the expected behavior if setup throws an exception? 
Should teardown still be called?

##
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##
@@ -411,6 +411,33 @@ def visit_transform(self, transform_node):
 
 return FlattenInputVisitor()
 
+  @staticmethod
+  def combinefn_visitor():
+# Imported here to avoid circular dependencies.
+from apache_beam.pipeline import PipelineVisitor
+from apache_beam import core
+
+class CombineFnVisitor(PipelineVisitor):
+  """Checks if `CombineFn` has non-default setup or teardown methods.
+  If yes, raises `ValueError`.
+  """
+  def visit_transform(self, applied_transform):
+transform = applied_transform.transform
+if isinstance(transform, core.ParDo) and isinstance(
+transform.fn, core.CombineValuesDoFn):
+  if self._overrides_setup_or_teardown(transform.fn.combinefn):
+raise ValueError(
+'CombineFn.setup and CombineFn.teardown are '
+'not supported with non-portable Dataflow '
+'runner. Please use Dataflow Runner V2 instead.')

Review comment:
   Question: Is there any plan to support this in non-portable Dataflow 
Runner, or will this be a V2 feature only?

##
File path: sdks/python/apache_beam/transforms/combinefn_lifecycle_pipeline.py
##
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+from typing import Set

[GitHub] [beam] robertwb commented on a change in pull request #13141: [BEAM-9547] Dataframe corrwith.

2020-10-28 Thread GitBox


robertwb commented on a change in pull request #13141:
URL: https://github.com/apache/beam/pull/13141#discussion_r513666115



##
File path: sdks/python/apache_beam/dataframe/frames.py
##
@@ -575,6 +575,8 @@ def __setitem__(self, key, value):
 else:
   raise NotImplementedError(key)
 
+  align = frame_base._elementwise_method('align')

Review comment:
   Good call. Done.

##
File path: sdks/python/apache_beam/dataframe/frames.py
##
@@ -771,6 +773,62 @@ def fill_matrix(*args):
   requires_partition_by=partitionings.Singleton(),
   proxy=proxy))
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def corrwith(self, other, axis, **kwargs):
+if axis not in (0, 'index'):
+  raise NotImplementedError('corrwith(axis=%r)' % axis)
+if not isinstance(other, frame_base.DeferredFrame):
+  other = frame_base.DeferredFrame.wrap(
+  expressions.ConstantExpression(other))
+
+if isinstance(other, DeferredSeries):
+  proxy = self._expr.proxy().corrwith(other._expr.proxy())
+  self, other = self.align(other, axis=0, join='inner')
+  corrs = [self[col].corr(other, **kwargs) for col in proxy.index]
+  def fill_dataframe(*args):
+result = proxy.copy(deep=True)
+for col, value in zip(proxy.index, args):
+  result[col] = value
+return result
+  with expressions.allow_non_parallel_operations(True):
+return frame_base.DeferredFrame.wrap(
+  expressions.ComputedExpression(
+'fill_dataframe',
+fill_dataframe,
+[corr._expr for corr in corrs],
+requires_partition_by=partitionings.Singleton(),
+proxy=proxy))
+
+elif isinstance(other, DeferredDataFrame):
+  proxy = self._expr.proxy().corrwith(other._expr.proxy())
+  self, other = self.align(other, axis=0, join='inner')
+  valid_cols = list(
+  set(self.columns)
+  .intersection(other.columns)
+  .intersection(proxy.index))
+  corrs = [self[col].corr(other[col], **kwargs) for col in valid_cols]
+  def fill_dataframe(*args):
+result = proxy.copy(deep=True)
+for col, value in zip(valid_cols, args):
+  result[col] = value
+return result
+  with expressions.allow_non_parallel_operations(True):
+return frame_base.DeferredFrame.wrap(
+  expressions.ComputedExpression(
+'fill_dataframe',
+fill_dataframe,
+[corr._expr for corr in corrs],
+requires_partition_by=partitionings.Singleton(),
+proxy=proxy))

Review comment:
   I've consolidated them now.

##
File path: sdks/python/apache_beam/dataframe/frames.py
##
@@ -771,6 +773,62 @@ def fill_matrix(*args):
   requires_partition_by=partitionings.Singleton(),
   proxy=proxy))
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def corrwith(self, other, axis, **kwargs):
+if axis not in (0, 'index'):
+  raise NotImplementedError('corrwith(axis=%r)' % axis)
+if not isinstance(other, frame_base.DeferredFrame):
+  other = frame_base.DeferredFrame.wrap(
+  expressions.ConstantExpression(other))
+
+if isinstance(other, DeferredSeries):
+  proxy = self._expr.proxy().corrwith(other._expr.proxy())
+  self, other = self.align(other, axis=0, join='inner')
+  corrs = [self[col].corr(other, **kwargs) for col in proxy.index]

Review comment:
   Resolved. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #12806: [BEAM-10869] Make WriteToPubsub output serialized PubsubMessage proto bytes when using runner v2

2020-10-28 Thread GitBox


boyuanzz commented on pull request #12806:
URL: https://github.com/apache/beam/pull/12806#issuecomment-718116388


   Run Python PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #12960: [BEAM-9804] Allow user configuration of BigQuery temporary dataset

2020-10-28 Thread GitBox


pabloem commented on pull request #12960:
URL: https://github.com/apache/beam/pull/12960#issuecomment-718112424


   Run Python_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on pull request #13209: [BEAM-9615] Add schema coders and tests.

2020-10-28 Thread GitBox


lostluck commented on pull request #13209:
URL: https://github.com/apache/beam/pull/13209#issuecomment-718112331


   Run Go PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12780: [BEAM-5504] Add Avro support to Pubsub table provider

2020-10-28 Thread GitBox


TheNeuralBit commented on a change in pull request #12780:
URL: https://github.com/apache/beam/pull/12780#discussion_r513654019



##
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubAvroIT.java
##
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasProperty;
+
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.hamcrest.Matcher;
+import org.joda.time.Instant;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration tests for querying Pubsub AVRO messages with SQL. */
+@RunWith(JUnit4.class)
+public class PubsubAvroIT extends PubsubTableProviderIT {
+  private static final Schema NAME_HEIGHT_KNOWS_JS_SCHEMA =
+  Schema.builder()
+  .addNullableField("name", Schema.FieldType.STRING)
+  .addNullableField("height", Schema.FieldType.INT32)
+  .addNullableField("knowsJavascript", Schema.FieldType.BOOLEAN)
+  .build();
+
+  private static final Schema NAME_HEIGHT_SCHEMA =
+  Schema.builder()
+  .addNullableField("name", Schema.FieldType.STRING)
+  .addNullableField("height", Schema.FieldType.INT32)
+  .build();
+
+  @Override
+  protected String getPayloadFormat() {
+return "avro";
+  }
+
+  @Override
+  protected PCollection applyRowsToStrings(PCollection rows) {
+return rows.apply(
+MapElements.into(TypeDescriptors.strings())
+.via(
+(Row row) ->
+new String(
+
AvroUtils.getRowToAvroBytesFunction(row.getSchema()).apply(row), UTF_8)));
+  }
+
+  @Override
+  protected PubsubMessage messageIdName(Instant timestamp, int id, String 
name) {
+Row row = row(PAYLOAD_SCHEMA, id, name);
+return message(timestamp, 
AvroUtils.getRowToAvroBytesFunction(PAYLOAD_SCHEMA).apply(row));
+  }
+
+  @Override
+  protected Matcher matcherNames(String name) {
+Schema schema = Schema.builder().addStringField("name").build();
+Row row = row(schema, name);
+return hasProperty("payload", 
equalTo(AvroUtils.getRowToAvroBytesFunction(schema).apply(row)));
+  }
+
+  @Override
+  protected Matcher matcherNameHeight(String name, int height) {
+Row row = row(NAME_HEIGHT_SCHEMA, name, height);
+return hasProperty(
+"payload", 
equalTo(AvroUtils.getRowToAvroBytesFunction(NAME_HEIGHT_SCHEMA).apply(row)));
+  }
+
+  @Override
+  protected Matcher matcherNameHeightKnowsJS(
+  String name, int height, boolean knowsJS) {
+Row row = row(NAME_HEIGHT_KNOWS_JS_SCHEMA, name, height, knowsJS);
+return hasProperty(
+"payload",
+
equalTo(AvroUtils.getRowToAvroBytesFunction(NAME_HEIGHT_KNOWS_JS_SCHEMA).apply(row)));

Review comment:
   We should do this for the other `matcher*` methods as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #13144: [BEAM-10475] Add max buffering duration option for GroupIntoBatches transform in Python

2020-10-28 Thread GitBox


boyuanzz commented on pull request #13144:
URL: https://github.com/apache/beam/pull/13144#issuecomment-718106117


   Per https://issues.apache.org/jira/browse/BEAM-10921, we can ignore the 
windows-latest test suite.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #13144: [BEAM-10475] Add max buffering duration option for GroupIntoBatches transform in Python

2020-10-28 Thread GitBox


boyuanzz commented on pull request #13144:
URL: https://github.com/apache/beam/pull/13144#issuecomment-718092303


   Run Python_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit merged pull request #12839: [BEAM-10893] Add Json support to Kafka Table Provider

2020-10-28 Thread GitBox


TheNeuralBit merged pull request #12839:
URL: https://github.com/apache/beam/pull/12839


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb merged pull request #13077: [BEAM-9561] Dataframe test infrastructure improvements.

2020-10-28 Thread GitBox


robertwb merged pull request #13077:
URL: https://github.com/apache/beam/pull/13077


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] yifanmai commented on pull request #13202: WIP: Set parent of fused stages to the lowest common ancestor

2020-10-28 Thread GitBox


yifanmai commented on pull request #13202:
URL: https://github.com/apache/beam/pull/13202#issuecomment-718087661


   Flink is still unhappy with this change. I will investigate further.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] dhruvesh09 closed pull request #13171: Added check for alter_label_if_ipython when input PCollection is an empty tuple or dictionary.

2020-10-28 Thread GitBox


dhruvesh09 closed pull request #13171:
URL: https://github.com/apache/beam/pull/13171


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] iindyk commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

2020-10-28 Thread GitBox


iindyk commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-717978836


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tszerszen edited a comment on pull request #13142: [BEAM-5939] - Deduplicate constants

2020-10-28 Thread GitBox


tszerszen edited a comment on pull request #13142:
URL: https://github.com/apache/beam/pull/13142#issuecomment-717849131


   @tvalentyn thank you for your review, could you please take a look after 
recent changes?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tszerszen edited a comment on pull request #13142: [BEAM-5939] - Deduplicate constants

2020-10-28 Thread GitBox


tszerszen edited a comment on pull request #13142:
URL: https://github.com/apache/beam/pull/13142#issuecomment-717849131


   @tvalentyn could you please take a look after recent changes?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tszerszen commented on pull request #13142: [BEAM-5939] - Deduplicate constants

2020-10-28 Thread GitBox


tszerszen commented on pull request #13142:
URL: https://github.com/apache/beam/pull/13142#issuecomment-717849131


   @tvalentyn could you please take a look now?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski removed a comment on pull request #12780: [BEAM-5504] Add Avro support to Pubsub table provider

2020-10-28 Thread GitBox


piotr-szuberski removed a comment on pull request #12780:
URL: https://github.com/apache/beam/pull/12780#issuecomment-717797979


   Run SQL PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on pull request #12780: [BEAM-5504] Add Avro support to Pubsub table provider

2020-10-28 Thread GitBox


piotr-szuberski commented on pull request #12780:
URL: https://github.com/apache/beam/pull/12780#issuecomment-717822443


   Run SQL PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on pull request #12780: [BEAM-5504] Add Avro support to Pubsub table provider

2020-10-28 Thread GitBox


piotr-szuberski commented on pull request #12780:
URL: https://github.com/apache/beam/pull/12780#issuecomment-717797979


   Run SQL PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12780: [BEAM-5504] Add Avro support to Pubsub table provider

2020-10-28 Thread GitBox


piotr-szuberski commented on a change in pull request #12780:
URL: https://github.com/apache/beam/pull/12780#discussion_r513281422



##
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubAvroIT.java
##
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasProperty;
+
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.hamcrest.Matcher;
+import org.joda.time.Instant;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration tests for querying Pubsub AVRO messages with SQL. */
+@RunWith(JUnit4.class)
+public class PubsubAvroIT extends PubsubTableProviderIT {
+  private static final Schema NAME_HEIGHT_KNOWS_JS_SCHEMA =
+  Schema.builder()
+  .addNullableField("name", Schema.FieldType.STRING)
+  .addNullableField("height", Schema.FieldType.INT32)
+  .addNullableField("knowsJavascript", Schema.FieldType.BOOLEAN)
+  .build();
+
+  private static final Schema NAME_HEIGHT_SCHEMA =
+  Schema.builder()
+  .addNullableField("name", Schema.FieldType.STRING)
+  .addNullableField("height", Schema.FieldType.INT32)
+  .build();
+
+  @Override
+  protected String getPayloadFormat() {
+return "avro";
+  }
+
+  @Override
+  protected PCollection applyRowsToStrings(PCollection rows) {
+return rows.apply(
+MapElements.into(TypeDescriptors.strings())
+.via(
+(Row row) ->
+new String(
+
AvroUtils.getRowToAvroBytesFunction(row.getSchema()).apply(row), UTF_8)));
+  }
+
+  @Override
+  protected PubsubMessage messageIdName(Instant timestamp, int id, String 
name) {
+Row row = row(PAYLOAD_SCHEMA, id, name);
+return message(timestamp, 
AvroUtils.getRowToAvroBytesFunction(PAYLOAD_SCHEMA).apply(row));
+  }
+
+  @Override
+  protected Matcher matcherNames(String name) {
+Schema schema = Schema.builder().addStringField("name").build();
+Row row = row(schema, name);
+return hasProperty("payload", 
equalTo(AvroUtils.getRowToAvroBytesFunction(schema).apply(row)));
+  }
+
+  @Override
+  protected Matcher matcherNameHeight(String name, int height) {
+Row row = row(NAME_HEIGHT_SCHEMA, name, height);
+return hasProperty(
+"payload", 
equalTo(AvroUtils.getRowToAvroBytesFunction(NAME_HEIGHT_SCHEMA).apply(row)));
+  }
+
+  @Override
+  protected Matcher matcherNameHeightKnowsJS(
+  String name, int height, boolean knowsJS) {
+Row row = row(NAME_HEIGHT_KNOWS_JS_SCHEMA, name, height, knowsJS);
+return hasProperty(
+"payload",
+
equalTo(AvroUtils.getRowToAvroBytesFunction(NAME_HEIGHT_KNOWS_JS_SCHEMA).apply(row)));

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on pull request #12839: [BEAM-10893] Add Json support to Kafka Table Provider

2020-10-28 Thread GitBox


piotr-szuberski commented on pull request #12839:
URL: https://github.com/apache/beam/pull/12839#issuecomment-717789503


   Run SQL PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski removed a comment on pull request #12839: [BEAM-10893] Add Json support to Kafka Table Provider

2020-10-28 Thread GitBox


piotr-szuberski removed a comment on pull request #12839:
URL: https://github.com/apache/beam/pull/12839#issuecomment-717748971


   Run SQL PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12780: [BEAM-5504] Add Avro support to Pubsub table provider

2020-10-28 Thread GitBox


piotr-szuberski commented on a change in pull request #12780:
URL: https://github.com/apache/beam/pull/12780#discussion_r513235208



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageToRow.java
##
@@ -226,11 +260,11 @@ public void processElement(
 field, timestamp, element.getAttributeMap(), 
payload))
 .collect(toList());
 
o.get(MAIN_TAG).output(Row.withSchema(messageSchema).addValues(values).build());
-  } catch (UnsupportedRowJsonException jsonException) {
+  } catch (UnsupportedRowJsonException | AvroRuntimeException exception) {

Review comment:
   Done

##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PayloadFormat.java
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+public enum PayloadFormat {
+  JSON,
+  AVRO
+}

Review comment:
   Done.

##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageToRow.java
##
@@ -226,11 +260,11 @@ public void processElement(
 field, timestamp, element.getAttributeMap(), 
payload))
 .collect(toList());
 
o.get(MAIN_TAG).output(Row.withSchema(messageSchema).addValues(values).build());
-  } catch (UnsupportedRowJsonException jsonException) {
+  } catch (UnsupportedRowJsonException | AvroRuntimeException exception) {

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on pull request #12839: [BEAM-10893] Add Json support to Kafka Table Provider

2020-10-28 Thread GitBox


piotr-szuberski commented on pull request #12839:
URL: https://github.com/apache/beam/pull/12839#issuecomment-717748922







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12839: [BEAM-10893] Add Json support to Kafka Table Provider

2020-10-28 Thread GitBox


piotr-szuberski commented on a change in pull request #12839:
URL: https://github.com/apache/beam/pull/12839#discussion_r513222917



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaJsonTable.java
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.util.RowJsonUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+public class BeamKafkaJsonTable extends BeamKafkaTable {
+  public BeamKafkaJsonTable(Schema beamSchema, String bootstrapServers, 
List topics) {
+super(beamSchema, bootstrapServers, topics);
+  }
+
+  @Override
+  public PTransform>, PCollection> 
getPTransformForInput() {
+ObjectMapper objectMapper =
+
RowJsonUtils.newObjectMapperWith(RowJson.RowJsonDeserializer.forSchema(schema));
+return new BeamKafkaJsonTable.JsonRecorderDecoder(schema, objectMapper);
+  }
+
+  @Override
+  public PTransform, PCollection>> 
getPTransformForOutput() {
+ObjectMapper objectMapper =
+
RowJsonUtils.newObjectMapperWith(RowJson.RowJsonSerializer.forSchema(schema));
+return new BeamKafkaJsonTable.JsonRecorderEncoder(objectMapper);
+  }
+
+  /** A PTransform to convert {@code KV} to {@link Row}. */
+  public static class JsonRecorderDecoder

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org