[GitHub] [beam] jainsourabh2 commented on pull request #11804: Corrected the input to execute the Java JAR file

2020-06-05 Thread GitBox


jainsourabh2 commented on pull request #11804:
URL: https://github.com/apache/beam/pull/11804#issuecomment-639997856


   tempLocation gives error. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ihji commented on pull request #11942: [BEAM-10208] add cross-language KafkaIO integration test

2020-06-05 Thread GitBox


ihji commented on pull request #11942:
URL: https://github.com/apache/beam/pull/11942#issuecomment-639983521


   CC: @robertwb 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ihji commented on pull request #11942: [BEAM-10208] add cross-language KafkaIO integration test

2020-06-05 Thread GitBox


ihji commented on pull request #11942:
URL: https://github.com/apache/beam/pull/11942#issuecomment-639966084


   Please also run "Run Python 2 PostCommit" and "Run Python 3.7 PostCommit"



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ihji commented on pull request #11942: [BEAM-10208] add cross-language KafkaIO integration test

2020-06-05 Thread GitBox


ihji commented on pull request #11942:
URL: https://github.com/apache/beam/pull/11942#issuecomment-639965704


   R: @chamikaramj 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ihji opened a new pull request #11942: [BEAM-10208] add cross-language KafkaIO integration test

2020-06-05 Thread GitBox


ihji opened a new pull request #11942:
URL: https://github.com/apache/beam/pull/11942


   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/last

[GitHub] [beam] lukecwik commented on pull request #11941: [BEAM-2939] Fix splittable DoFn lifecycle.

2020-06-05 Thread GitBox


lukecwik commented on pull request #11941:
URL: https://github.com/apache/beam/pull/11941#issuecomment-639964771


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11923: [BEAM-10176] Support STRUCT, FLOAT64, INT64 BigQuery types

2020-06-05 Thread GitBox


pabloem commented on pull request #11923:
URL: https://github.com/apache/beam/pull/11923#issuecomment-639949346


   Run Python 3.7 PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] epicfaace commented on pull request #11824: [BEAM-10101] Add HttpIO / HttpFileSystem (Python)

2020-06-05 Thread GitBox


epicfaace commented on pull request #11824:
URL: https://github.com/apache/beam/pull/11824#issuecomment-639946797


   I'll be making changes soon.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] reuvenlax commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

2020-06-05 Thread GitBox


reuvenlax commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r436220739



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+  Schema.of(
+  Field.of(LINE_FIELD_NAME, FieldType.STRING),
+  Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+  public static final TupleTag MAIN_TUPLE_TAG = new TupleTag() {};
+  public static final TupleTag DEAD_LETTER_TUPLE_TAG = new 
TupleTag() {};
+
   public static PTransform, PCollection> 
withSchema(Schema rowSchema) {
 return JsonToRowFn.forSchema(rowSchema);
   }
 
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing 
layer are returned as
+   * Row objects of form: {@link JsonToRow#ERROR_ROW_SCHEMA} line : The 
original json string err :
+   * The error message from the parsing function.
+   *
+   * You can access the results by using:
+   *
+   * {@link JsonToRow#MAIN_TUPLE_TAG}
+   *
+   * {@Code PCollection personRows =
+   * results.get(JsonToRow.MAIN_TUPLE_TAG).setRowSchema(personSchema)}

Review comment:
   +1. If you output.a Row, you should be setting the schema in your 
transform.

##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
   return this.objectMapper;
 }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+  extends PTransform, PCollectionTuple> {
+private transient volatile @Nullable ObjectMapper objectMapper;
+private Schema schema;

Review comment:
   make final

##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+  Schema.of(
+  Field.of(LINE_FIELD_NAME, FieldType.STRING),
+  Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+

Review comment:
   would be nicer to make these field names configurable, though with 
defaults.

##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
   return this.objectMapper;
 }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+  extends PTransform, PCollectionTuple> {

Review comment:
   I think it would be cleaner to wrap this in a custom result class and 
not expose the TupleTags to users. Look 
org.apache.beam.sdk.io.gcp.bigquery.WriteResult for an example.

##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
   return this.objectMapper;
 }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+  extends PTransform, PCollectionTuple> {
+private transient volatile @Nullable ObjectMapper objectMapper;
+private Schema schema;
+private static final String METRIC_NAMESPACE = "JsonToRowFn";
+private static final String DEAD_LETTER_METRIC_NAME = 
"JsonToRowFn_ParseFailure";
+
+private Distribution jsonConversionErrors =
+Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+public static final TupleTag main = MAIN_TUPLE_TAG;
+public static final TupleTag deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+PCollection deadLetterCollection;
+
+static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+  // Throw exception if this schema is not supported by RowJson
+  RowJson.verifySchemaSupported(rowSchema);
+  return new JsonToRowWithFailureCaptureFn(rowSchema);
+}
+
+private JsonToRowWithFailureCaptureFn(Schema schema) {
+  this.schema = schema;
+}
+
+@Override
+public PCollectionTuple expand(PCollection jsonStrings) {
+
+  return jsonStrings.apply(
+  ParDo.of(
+  new DoFn() {
+@ProcessElement
+public void processElement(ProcessContext context) {

Review comment:
   Why not use injected parameters instead of ProcessContext?

##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+  Schema.of(
+  Field.of(LINE_FIELD_NAME, FieldType.STRING),
+

[GitHub] [beam] pabloem commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

2020-06-05 Thread GitBox


pabloem commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r436215155



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
   return this.objectMapper;
 }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+  extends PTransform, PCollectionTuple> {
+private transient volatile @Nullable ObjectMapper objectMapper;
+private Schema schema;
+private static final String METRIC_NAMESPACE = "JsonToRowFn";
+private static final String DEAD_LETTER_METRIC_NAME = 
"JsonToRowFn_ParseFailure";
+
+private Distribution jsonConversionErrors =
+Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+public static final TupleTag main = MAIN_TUPLE_TAG;
+public static final TupleTag deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+PCollection deadLetterCollection;
+
+static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+  // Throw exception if this schema is not supported by RowJson
+  RowJson.verifySchemaSupported(rowSchema);
+  return new JsonToRowWithFailureCaptureFn(rowSchema);
+}
+
+private JsonToRowWithFailureCaptureFn(Schema schema) {
+  this.schema = schema;
+}
+
+@Override
+public PCollectionTuple expand(PCollection jsonStrings) {
+
+  return jsonStrings.apply(
+  ParDo.of(
+  new DoFn() {
+@ProcessElement
+public void processElement(ProcessContext context) {
+  try {
+context.output(jsonToRow(objectMapper(), 
context.element()));
+  } catch (Exception ex) {
+context.output(
+deadLetter,
+Row.withSchema(ERROR_ROW_SCHEMA)
+.addValue(context.element())
+.addValue(ex.getMessage())
+.build());

Review comment:
   (I am mostly leaning towards not doing this, but lmk what you think)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

2020-06-05 Thread GitBox


pabloem commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r436214157



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
   return this.objectMapper;
 }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+  extends PTransform, PCollectionTuple> {
+private transient volatile @Nullable ObjectMapper objectMapper;
+private Schema schema;
+private static final String METRIC_NAMESPACE = "JsonToRowFn";
+private static final String DEAD_LETTER_METRIC_NAME = 
"JsonToRowFn_ParseFailure";
+
+private Distribution jsonConversionErrors =
+Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+public static final TupleTag main = MAIN_TUPLE_TAG;
+public static final TupleTag deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+PCollection deadLetterCollection;
+
+static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+  // Throw exception if this schema is not supported by RowJson
+  RowJson.verifySchemaSupported(rowSchema);
+  return new JsonToRowWithFailureCaptureFn(rowSchema);
+}
+
+private JsonToRowWithFailureCaptureFn(Schema schema) {
+  this.schema = schema;
+}
+
+@Override
+public PCollectionTuple expand(PCollection jsonStrings) {
+
+  return jsonStrings.apply(
+  ParDo.of(
+  new DoFn() {
+@ProcessElement
+public void processElement(ProcessContext context) {
+  try {
+context.output(jsonToRow(objectMapper(), 
context.element()));
+  } catch (Exception ex) {
+context.output(
+deadLetter,
+Row.withSchema(ERROR_ROW_SCHEMA)
+.addValue(context.element())
+.addValue(ex.getMessage())
+.build());

Review comment:
   I guess this doesn't make sense, but - would it help to include the Row 
Schema that we tried(and failed) to use for this JSON string? Some users may 
not needed, and others can add it themselves in the downstream ParDo - but it's 
possible it may help. Thoughts?

##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
   return this.objectMapper;
 }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+  extends PTransform, PCollectionTuple> {
+private transient volatile @Nullable ObjectMapper objectMapper;
+private Schema schema;
+private static final String METRIC_NAMESPACE = "JsonToRowFn";
+private static final String DEAD_LETTER_METRIC_NAME = 
"JsonToRowFn_ParseFailure";
+
+private Distribution jsonConversionErrors =
+Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+public static final TupleTag main = MAIN_TUPLE_TAG;
+public static final TupleTag deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+PCollection deadLetterCollection;
+
+static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+  // Throw exception if this schema is not supported by RowJson
+  RowJson.verifySchemaSupported(rowSchema);
+  return new JsonToRowWithFailureCaptureFn(rowSchema);
+}
+
+private JsonToRowWithFailureCaptureFn(Schema schema) {
+  this.schema = schema;
+}
+
+@Override
+public PCollectionTuple expand(PCollection jsonStrings) {
+
+  return jsonStrings.apply(
+  ParDo.of(
+  new DoFn() {
+@ProcessElement
+public void processElement(ProcessContext context) {
+  try {
+context.output(jsonToRow(objectMapper(), 
context.element()));
+  } catch (Exception ex) {
+context.output(
+deadLetter,
+Row.withSchema(ERROR_ROW_SCHEMA)
+.addValue(context.element())
+.addValue(ex.getMessage())
+.build());
+  }
+}
+  })
+  .withOutputTags(main, TupleTagList.of(deadLetter)));

Review comment:
   you can add the schema for the outputs here, so that users do not need 
to add them themselves?

##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
   return this.objectMapper;
 }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+  extends PTransform, PCollectionTuple> {
+private transient volatile @Nullable ObjectMapper objectMapper;
+private S

[GitHub] [beam] aaltay commented on pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python

2020-06-05 Thread GitBox


aaltay commented on pull request #8457:
URL: https://github.com/apache/beam/pull/8457#issuecomment-639912763


   > There are still failing tests on #11295. @mf2199 - What is the next step 
for this PR?
   
   PIng on this? What is our plan for this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11923: [BEAM-10176] Support STRUCT, FLOAT64, INT64 BigQuery types

2020-06-05 Thread GitBox


pabloem commented on pull request #11923:
URL: https://github.com/apache/beam/pull/11923#issuecomment-639911150


   Run Python 3.7 PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on a change in pull request #11702: [BEAM-9990] Add Conditional Update and Conditional Create to FhirIO

2020-06-05 Thread GitBox


pabloem commented on a change in pull request #11702:
URL: https://github.com/apache/beam/pull/11702#discussion_r436210077



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##
@@ -1173,4 +1276,339 @@ public void executeBundles(ProcessContext context) {
   }
 }
   }
+
+  /**
+   * Create resources fhir io . create resources.
+   *
+   * @param  the type parameter
+   * @param fhirStore the fhir store
+   * @return the fhir io . create resources
+   */
+  public static  FhirIO.CreateResources 
createResources(ValueProvider fhirStore) {
+return new CreateResources(fhirStore);
+  }
+
+  /**
+   * Create resources fhir io . create resources.
+   *
+   * @param  the type parameter
+   * @param fhirStore the fhir store
+   * @return the fhir io . create resources
+   */
+  public static  FhirIO.CreateResources createResources(String 
fhirStore) {
+return new CreateResources(fhirStore);
+  }
+  /**
+   * {@link PTransform} for Creating FHIR resources.
+   *
+   * 
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+   */
+  public static class CreateResources extends PTransform, 
Write.Result> {
+private final String fhirStore;
+private SerializableFunction ifNoneExistFunction;
+private SerializableFunction formatBodyFunction;
+private SerializableFunction typeFunction;
+private static final Logger LOG = 
LoggerFactory.getLogger(CreateResources.class);
+
+/**
+ * Instantiates a new Create resources transform.
+ *
+ * @param fhirStore the fhir store
+ */
+CreateResources(ValueProvider fhirStore) {
+  this.fhirStore = fhirStore.get();
+}
+
+/**
+ * Instantiates a new Create resources.
+ *
+ * @param fhirStore the fhir store
+ */
+CreateResources(String fhirStore) {
+  this.fhirStore = fhirStore;
+}
+
+/**
+ * This adds a {@link SerializableFunction} that reads an resource string 
and extracts an
+ * If-None-Exists query for conditional create. Typically this will just 
be extracting an ID to
+ * look for.
+ *
+ * 
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+ *
+ * @param ifNoneExistFunction the if none exist function
+ * @return the create resources
+ */
+public CreateResources withIfNotExistFunction(
+SerializableFunction ifNoneExistFunction) {
+  this.ifNoneExistFunction = ifNoneExistFunction;
+  return this;
+}
+
+/**
+ * This adds a {@link SerializableFunction} that reads an resource string 
and extracts an
+ * resource type.
+ *
+ * 
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+ *
+ * @param typeFunction for extracting type from a resource.
+ * @return the create resources
+ */
+public CreateResources withTypeFunction(SerializableFunction 
typeFunction) {
+  this.typeFunction = typeFunction;
+  return this;
+}
+/**
+ * With format body function create resources.
+ *
+ * @param formatBodyFunction the format body function
+ * @return the create resources
+ */
+public CreateResources withFormatBodyFunction(

Review comment:
   I don't think I understand this function very well. It seems like a fn 
to format a resource properly in case its formatting is not correct? Could you 
detail the documentation for it?

##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##
@@ -155,17 +168,53 @@
  * FhirIO.Write.Result writeResult =
  * output.apply("Execute FHIR Bundles", 
FhirIO.executeBundles(options.getExistingFhirStore()));
  *
+ * // Alternatively you could use import for high throughput to a new store.
+ * FhirIO.Write.Result writeResult =
+ * output.apply("Import FHIR Resources", 
FhirIO.executeBundles(options.getNewFhirStore()));
+ * // [End Writing ]
+ *
  * PCollection> failedBundles = 
writeResult.getFailedInsertsWithErr();
  *
+ * // [Begin Writing to Dead Letter Queue]
  * failedBundles.apply("Write failed bundles to BigQuery",
  * BigQueryIO
  * .write()
  * .to(option.getBQFhirExecuteBundlesDeadLetterTable())
  * .withFormatFunction(new HealthcareIOErrorToTableRow()));
+ * // [End Writing to Dead Letter Queue]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional update
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ * .apply("Reconcile with Conditional Update",
+ * FhirIO.ConditionalUpdate(fhirStore)
+ * 
.withFormatBodyFunction(HealthcareIOError::getDataResource)
+ * .withTypeFunction((HealthcareIOError err) -> {
+ *   String body = err.getDataResource();
+ * 

[GitHub] [beam] lukecwik commented on pull request #11941: [BEAM-2939] Fix splittable DoFn lifecycle.

2020-06-05 Thread GitBox


lukecwik commented on pull request #11941:
URL: https://github.com/apache/beam/pull/11941#issuecomment-639897770


   R: @boyuanzz @youngoli 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik opened a new pull request #11941: [BEAM-2939] Fix splittable DoFn lifecycle.

2020-06-05 Thread GitBox


lukecwik opened a new pull request #11941:
URL: https://github.com/apache/beam/pull/11941


   getInitialRestriction/splitAndSize should not be wrapped with 
startBundle/FinishBundle invocations.
   Instead of copying the stateAccessor initialization (used for side inputs) I 
made it so that it was initialized only once and cleaned up the 
caches/references in the finalizeState call.
   
   Tested within Google using runner_v2.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_Po

[GitHub] [beam] pabloem commented on pull request #11824: [BEAM-10101] Add HttpIO / HttpFileSystem (Python)

2020-06-05 Thread GitBox


pabloem commented on pull request #11824:
URL: https://github.com/apache/beam/pull/11824#issuecomment-639885325


   @epicfaace lmk what are your plans for this PR



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11940: [BEAM-6215] Additional tests for FlatMap label.

2020-06-05 Thread GitBox


pabloem commented on pull request #11940:
URL: https://github.com/apache/beam/pull/11940#issuecomment-639884926


   Run Python PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

2020-06-05 Thread GitBox


pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-639885056


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11923: [BEAM-10176] Support STRUCT, FLOAT64, INT64 BigQuery types

2020-06-05 Thread GitBox


pabloem commented on pull request #11923:
URL: https://github.com/apache/beam/pull/11923#issuecomment-639882291


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] udim commented on a change in pull request #11939: [BEAM-10197] Support typehints for Python's frozenset

2020-06-05 Thread GitBox


udim commented on a change in pull request #11939:
URL: https://github.com/apache/beam/pull/11939#discussion_r436187871



##
File path: sdks/python/apache_beam/typehints/typehints_test.py
##
@@ -612,54 +613,70 @@ def test_match_type_variables(self):
  hint.match_type_variables(typehints.Dict[int, str]))
 
 
-class SetHintTestCase(TypeHintTestCase):
+class BaseSetHintTest:
+  def __init__(self, string_type, py_type, beam_type, *args, **kwargs):

Review comment:
   You should omit `*args, **kwargs` if there aren't any more args allowed. 
This is the style we follow in this codebase. Here and below as well.
   
   For example: if some code called `BaseSetHintTest('Set', set, typehints.Set, 
typehints.FrozenSet)` would you rather it be silently ignored or raised as an 
incorrect number of arguments?

##
File path: sdks/python/apache_beam/typehints/typehints_test.py
##
@@ -612,54 +613,70 @@ def test_match_type_variables(self):
  hint.match_type_variables(typehints.Dict[int, str]))
 
 
-class SetHintTestCase(TypeHintTestCase):
+class BaseSetHintTest:

Review comment:
   If `BaseSetHintTest` inherited from `TypeHintTestCase` you could avoid 
multiple inheritance in the sub-classes below. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem merged pull request #11931: [BEAM-10145] Delete persistent disks after every KafkaIO performance test run

2020-06-05 Thread GitBox


pabloem merged pull request #11931:
URL: https://github.com/apache/beam/pull/11931


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] apilloud commented on a change in pull request #11820: [BEAM-10093] ZetaSql Nexmark variant

2020-06-05 Thread GitBox


apilloud commented on a change in pull request #11820:
URL: https://github.com/apache/beam/pull/11820#discussion_r436191652



##
File path: 
sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlBoundedSideInputJoinTest.java
##
@@ -48,166 +47,182 @@
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /** Test the various NEXMark queries yield results coherent with their models. 
*/
-@RunWith(JUnit4.class)
+@RunWith(Enclosed.class)
 public class SqlBoundedSideInputJoinTest {
 
-  @Rule public TestPipeline p = TestPipeline.create();
+  private abstract static class SqlBoundedSideInputJoinTestCases {
 
-  @Before
-  public void setupPipeline() {
-NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p);
-  }
+protected abstract SqlBoundedSideInputJoin getQuery(NexmarkConfiguration 
configuration);
+
+@Rule public TestPipeline p = TestPipeline.create();
+
+@Before
+public void setupPipeline() {
+  NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p);
+}
 
-  /** Test {@code query} matches {@code model}. */
-  private  void queryMatchesModel(
-  String name,
-  NexmarkConfiguration config,
-  NexmarkQueryTransform query,
-  NexmarkQueryModel model,
-  boolean streamingMode)
-  throws Exception {
-
-ResourceId sideInputResourceId =
-FileSystems.matchNewResource(
-String.format(
-"%s/JoinToFiles-%s", p.getOptions().getTempLocation(), new 
Random().nextInt()),
-false);
-config.sideInputUrl = sideInputResourceId.toString();
-
-try {
+/** Test {@code query} matches {@code model}. */
+private  void queryMatchesModel(
+String name,
+NexmarkConfiguration config,
+NexmarkQueryTransform query,
+NexmarkQueryModel model,
+boolean streamingMode)
+throws Exception {
+
+  ResourceId sideInputResourceId =
+  FileSystems.matchNewResource(
+  String.format(
+  "%s/JoinToFiles-%s", p.getOptions().getTempLocation(), new 
Random().nextInt()),
+  false);
+  config.sideInputUrl = sideInputResourceId.toString();
+
+  try {
+PCollection> sideInput = 
NexmarkUtils.prepareSideInput(p, config);
+query.setSideInput(sideInput);
+
+PCollection events =
+p.apply(
+name + ".Read",
+streamingMode
+? NexmarkUtils.streamEventsSource(config)
+: NexmarkUtils.batchEventsSource(config));
+
+PCollection> results =
+(PCollection>) events.apply(new 
NexmarkQuery<>(config, query));
+PAssert.that(results).satisfies(model.assertionFor());
+PipelineResult result = p.run();
+result.waitUntilFinish();
+  } finally {
+NexmarkUtils.cleanUpSideInput(config);
+  }
+}
+
+/**
+ * A smoke test that the count of input bids and outputs are the same, to 
help diagnose
+ * flakiness in more complex tests.
+ */
+@Test
+public void inputOutputSameEvents() throws Exception {
+  NexmarkConfiguration config = NexmarkConfiguration.DEFAULT.copy();
+  config.sideInputType = NexmarkUtils.SideInputType.DIRECT;
+  config.numEventGenerators = 1;
+  config.numEvents = 5000;
+  config.sideInputRowCount = 10;
+  config.sideInputNumShards = 3;
   PCollection> sideInput = 
NexmarkUtils.prepareSideInput(p, config);
-  query.setSideInput(sideInput);
-
-  PCollection events =
-  p.apply(
-  name + ".Read",
-  streamingMode
-  ? NexmarkUtils.streamEventsSource(config)
-  : NexmarkUtils.batchEventsSource(config));
-
-  PCollection> results =
-  (PCollection>) events.apply(new 
NexmarkQuery<>(config, query));
-  PAssert.that(results).satisfies(model.assertionFor());
-  PipelineResult result = p.run();
-  result.waitUntilFinish();
-} finally {
-  NexmarkUtils.cleanUpSideInput(config);
+
+  try {
+PCollection input = 
p.apply(NexmarkUtils.batchEventsSource(config));
+PCollection justBids = input.apply(NexmarkQueryUtil.JUST_BIDS);
+PCollection bidCount = justBids.apply("Count Bids", 
Count.globally());
+
+NexmarkQueryTransform query = getQuery(config);
+query.setSideInput(sideInput);
+
+PCollection> output =
+(PCollection>) input.apply(new 
NexmarkQuery(config, query));
+PCollection outputCount = output.apply("Count outputs", 
Count.globally());
+
+
PAssert.that(PCollectionList.of(bidCount).and(outputCount).apply(Flatten.pCollections()))
+.satisfies(
+counts -> {
+  assertTha

[GitHub] [beam] udim commented on pull request #11939: [BEAM-10197] Support typehints for Python's frozenset

2020-06-05 Thread GitBox


udim commented on pull request #11939:
URL: https://github.com/apache/beam/pull/11939#issuecomment-639864241


   random comment to trigger tests



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] udim edited a comment on pull request #11939: [BEAM-10197] Support typehints for Python's frozenset

2020-06-05 Thread GitBox


udim edited a comment on pull request #11939:
URL: https://github.com/apache/beam/pull/11939#issuecomment-639864241


   arbitrary comment to trigger tests



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on a change in pull request #11868: [BEAM-9363] Support HOP and SESSION as TVF

2020-06-05 Thread GitBox


amaliujia commented on a change in pull request #11868:
URL: https://github.com/apache/beam/pull/11868#discussion_r436188166



##
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##
@@ -4805,6 +4805,93 @@ public void testTVFTumbleAggregation() {
 
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
+  @Test

Review comment:
   Good point. It can be a good point to move all streaming tests to 
another place.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-05 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r436187245



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *   KafkaIO.Read#getKeyDeserializer

[GitHub] [beam] robinyqiu commented on a change in pull request #11868: [BEAM-9363] Support HOP and SESSION as TVF

2020-06-05 Thread GitBox


robinyqiu commented on a change in pull request #11868:
URL: https://github.com/apache/beam/pull/11868#discussion_r436178923



##
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##
@@ -4805,6 +4805,93 @@ public void testTVFTumbleAggregation() {
 
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
+  @Test

Review comment:
   +1. IIRC there are some other tests in this file that are testing our 
streaming extension. It makes sense to separate them to other file.

##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TVFToPTransform.java
##
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall;
+
+/** Provides a function that produces a PCollection based on TVF and upstream 
PCollection. */
+public interface TVFToPTransform {

Review comment:
   +1





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on a change in pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.

2020-06-05 Thread GitBox


ibzib commented on a change in pull request #11932:
URL: https://github.com/apache/beam/pull/11932#discussion_r436178918



##
File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactRetrievalService.java
##
@@ -89,34 +92,41 @@ public void resolveArtifacts(
   public void getArtifact(
   ArtifactApi.GetArtifactRequest request,
   StreamObserver responseObserver) {
-switch (request.getArtifact().getTypeUrn()) {
+try {
+  InputStream inputStream = getArtifact(request.getArtifact());
+  byte[] buffer = new byte[bufferSize];
+  int bytesRead;
+  while ((bytesRead = inputStream.read(buffer)) > 0) {
+responseObserver.onNext(
+ArtifactApi.GetArtifactResponse.newBuilder()
+.setData(ByteString.copyFrom(buffer, 0, bytesRead))
+.build());
+  }
+  responseObserver.onCompleted();
+} catch (IOException exn) {
+  exn.printStackTrace();
+  responseObserver.onError(exn);

Review comment:
   Should we have wrapped this exception in a StatusException as well?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on pull request #11626: Cleanup ToString transforms.

2020-06-05 Thread GitBox


robertwb commented on pull request #11626:
URL: https://github.com/apache/beam/pull/11626#issuecomment-639845882


   R: @y1chi



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on pull request #11940: [BEAM-6215] Additional tests for FlatMap label.

2020-06-05 Thread GitBox


robertwb commented on pull request #11940:
URL: https://github.com/apache/beam/pull/11940#issuecomment-639844043


   R: @pabloem 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb opened a new pull request #11940: [BEAM-6215] Additional tests for FlatMap label.

2020-06-05 Thread GitBox


robertwb opened a new pull request #11940:
URL: https://github.com/apache/beam/pull/11940


   This does not appear to be broken, but additional tests are good. 
   
   Also cleaned up unneeded use of raw strings.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/

[GitHub] [beam] amaliujia merged pull request #11933: [BEAM-10204] @Ignore: re-enable LIKE operator related unit tests.

2020-06-05 Thread GitBox


amaliujia merged pull request #11933:
URL: https://github.com/apache/beam/pull/11933


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-05 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r436179356



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -1055,29 +1144,6 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
   private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
 
-  /**

Review comment:
   This common part is moved to the KafkaIOUtil.java





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-05 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r436177315



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -949,45 +1077,6 @@ public void setValueDeserializer(String 
valueDeserializer) {
 final SerializableFunction, OutT> fn) {
   return record -> fn.apply(record.getKV());
 }
-
///

Review comment:
   This common part is moved to the KafkaIOUtil.java.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kennknowles commented on a change in pull request #11924: [BEAM-8543] Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle

2020-06-05 Thread GitBox


kennknowles commented on a change in pull request #11924:
URL: https://github.com/apache/beam/pull/11924#discussion_r436173779



##
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##
@@ -3938,10 +3939,11 @@ public void testEventTimeTimerOrdering() throws 
Exception {
   ValidatesRunner.class,
   UsesTimersInParDo.class,
   UsesStatefulParDo.class,
+  UsesUnboundedPCollections.class,
   UsesStrictTimerOrdering.class
 })
 public void testEventTimeTimerOrderingWithCreate() throws Exception {
-  final int numTestElements = 100;
+  final int numTestElements = 5;

Review comment:
   Why shrink it? Does the test get really slow? Is this going to be a perf 
problem overall?

##
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##
@@ -577,12 +583,21 @@ public void flushState() {
 WindmillTimerInternals.windmillTimerToTimerData(
 WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, 
timer, windowCoder))
 .iterator();
+
+cachedFiredUserTimers.forEachRemaining(toBeFiredTimersOrdered::add);
+  }
+
+  Instant currentInputWatermark = 
userTimerInternals.currentInputWatermarkTime();
+  if (userTimerInternals.hasTimerBefore(currentInputWatermark)) {
+while (!toBeFiredTimersOrdered.isEmpty()) {
+  userTimerInternals.setTimer(toBeFiredTimersOrdered.poll());
+}
   }

Review comment:
   Yea I don't actually understand what this block is for.
   
   FWIW to do timer deletion/reset cheaply without building a bespoke data 
structure just keep a map from id to firing time or tombstone. This way, 
whenever a timer comes up in the prio queue you pull out the actual time for it 
from the map. If it is actually set for another time, don't fire it. If it is 
obsolete, don't fire it.

##
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##
@@ -577,12 +583,21 @@ public void flushState() {
 WindmillTimerInternals.windmillTimerToTimerData(
 WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, 
timer, windowCoder))
 .iterator();
+
+cachedFiredUserTimers.forEachRemaining(toBeFiredTimersOrdered::add);

Review comment:
   Do we even need `cachedFiredUserTimers`? It seems obsolete if we 
populate the priority queue. The name is also wrong - even before this PR it 
wasn't a cache. It is a lazily initialized iterator. Instead, we should have a 
lazily initialized priority queue (like you do) and just a flag to say whether 
the incoming timers have been loaded yet.

##
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##
@@ -4040,7 +4043,8 @@ public void onTimer(
 }
   };
 
-  PCollection output = 
pipeline.apply(transform).apply(ParDo.of(fn));
+  PCollection output =
+  
pipeline.apply(transform).setIsBoundedInternal(IsBounded.UNBOUNDED).apply(ParDo.of(fn));

Review comment:
   Should not be calling `setIsBoundedInternal` here. Is this just to force 
streaming mode? We need to just create a separate run of ValidatesRunner that 
forces streaming mode.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] saavan-google-intern commented on pull request #11939: [BEAM-10197] Support typehints for Python's frozenset

2020-06-05 Thread GitBox


saavan-google-intern commented on pull request #11939:
URL: https://github.com/apache/beam/pull/11939#issuecomment-639824517


   @udim 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] saavan-google-intern opened a new pull request #11939: [BEAM-10197] Support typehints for Python's frozenset

2020-06-05 Thread GitBox


saavan-google-intern opened a new pull request #11939:
URL: https://github.com/apache/beam/pull/11939


   This PR improves Beam's parameterized type hint coverage by adding support 
for Python's frozenset container. Type annotations should now work with 
frozenset, typing.FrozenSet, and typehints.FrozenSet. 
   
   The same level of unit test coverage that exists for set has been added for 
frozenset. When possible, similar tests were refactored via inheritance to 
support both set and frozenset to make the code extensible and reduce 
duplication.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_Pos

[GitHub] [beam] lukecwik merged pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn

2020-06-05 Thread GitBox


lukecwik merged pull request #11922:
URL: https://github.com/apache/beam/pull/11922


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] y1chi commented on pull request #11916: [BEAM-10189] Add ValueState user state to python sdk

2020-06-05 Thread GitBox


y1chi commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-639789507


   > Yes, the plan was to consider changing Java too, though that's harder due 
to backwards compatibility issues.
   
   Renamed to ReadModifyWriteState.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] reuvenlax commented on a change in pull request #11924: [BEAM-8543] Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle

2020-06-05 Thread GitBox


reuvenlax commented on a change in pull request #11924:
URL: https://github.com/apache/beam/pull/11924#discussion_r436147265



##
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##
@@ -577,12 +583,21 @@ public void flushState() {
 WindmillTimerInternals.windmillTimerToTimerData(
 WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, 
timer, windowCoder))
 .iterator();
+
+cachedFiredUserTimers.forEachRemaining(toBeFiredTimersOrdered::add);
+  }
+
+  Instant currentInputWatermark = 
userTimerInternals.currentInputWatermarkTime();
+  if (userTimerInternals.hasTimerBefore(currentInputWatermark)) {
+while (!toBeFiredTimersOrdered.isEmpty()) {
+  userTimerInternals.setTimer(toBeFiredTimersOrdered.poll());
+}
   }

Review comment:
   @kennknowles for comment. This doesn't look right to me, as I don't 
think we should be modifying the WindmillTimerInternals here. I think we just 
want to merge the timer modifications from processing the workitem into this 
priority queue; note that if timers are deleted, we need to detect that as well 
and remove from the priority queue.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn

2020-06-05 Thread GitBox


lukecwik commented on pull request #11922:
URL: https://github.com/apache/beam/pull/11922#issuecomment-639757431


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-05 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r436123330



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *   KafkaIO.Read#getKeyDeserializer

[GitHub] [beam] lostluck merged pull request #11937: [BEAM-9615] Remove LP indicator on string methods.

2020-06-05 Thread GitBox


lostluck merged pull request #11937:
URL: https://github.com/apache/beam/pull/11937


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn

2020-06-05 Thread GitBox


boyuanzz commented on a change in pull request #11922:
URL: https://github.com/apache/beam/pull/11922#discussion_r436116972



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -577,26 +586,83 @@ public Instant timestamp(DoFn doFn) {
 switch (pTransform.getSpec().getUrn()) {
   case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN:
 this.convertSplitResultToWindowedSplitResult =
-(splitResult, watermarkEstimatorState) ->
-WindowedSplitResult.forRoots(
-WindowedValue.of(
-KV.of(
-currentElement.getValue(),
-KV.of(splitResult.getPrimary(), 
currentWatermarkEstimatorState)),
-currentElement.getTimestamp(),
-currentWindow,
-currentElement.getPane()),
-WindowedValue.of(
-KV.of(
-currentElement.getValue(),
-KV.of(splitResult.getResidual(), 
watermarkEstimatorState)),
-currentElement.getTimestamp(),
-currentWindow,
-currentElement.getPane()));
+(splitResult, watermarkEstimatorState) -> {
+  List primaryFullyProcessedWindows =
+  ImmutableList.copyOf(
+  Iterables.limit(
+  currentElement.getWindows(), 
currentWindowIterator.previousIndex()));
+  // Advances the iterator consuming the remaining windows.
+  List residualUnprocessedWindows =
+  ImmutableList.copyOf(currentWindowIterator);

Review comment:
   Ah I see. It's different from what I know about an `Iterator`. Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb opened a new pull request #11938: [BEAM-9577] Remove uses of legacy artifact service in Java.

2020-06-05 Thread GitBox


robertwb opened a new pull request #11938:
URL: https://github.com/apache/beam/pull/11938


   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/

[GitHub] [beam] lukecwik commented on a change in pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn

2020-06-05 Thread GitBox


lukecwik commented on a change in pull request #11922:
URL: https://github.com/apache/beam/pull/11922#discussion_r436113097



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -296,6 +299,12 @@ public void accept(WindowedValue input) throws Exception {
   /** Only valid during {@code processElement...} methods, null otherwise. */
   private WindowedValue currentElement;
 
+  /**
+   * Only valid during {@link #processElementForSizedElementAndRestriction} 
and {@link
+   * #processElementForSizedElementAndRestriction}.

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj merged pull request #11846: [BEAM-9869] adding self-contained Kafka service jar for testing

2020-06-05 Thread GitBox


chamikaramj merged pull request #11846:
URL: https://github.com/apache/beam/pull/11846


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11846: [BEAM-9869] adding self-contained Kafka service jar for testing

2020-06-05 Thread GitBox


chamikaramj commented on pull request #11846:
URL: https://github.com/apache/beam/pull/11846#issuecomment-639725186


   LGTM. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn

2020-06-05 Thread GitBox


lukecwik commented on a change in pull request #11922:
URL: https://github.com/apache/beam/pull/11922#discussion_r436112241



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -941,20 +1047,59 @@ private Progress getProgress() {
   convertSplitResultToWindowedSplitResult.apply(result, 
watermarkAndState.getValue());
 }
 
+List primaryRoots = new ArrayList<>();

Review comment:
   Yes. As shown in the test the self checkpoint will checkpoint for the 
"remaining" windows as well.
   
   I would like to leave it as is until we can build consensus around what we 
should do in these use cases.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn

2020-06-05 Thread GitBox


lukecwik commented on a change in pull request #11922:
URL: https://github.com/apache/beam/pull/11922#discussion_r436111838



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -632,6 +698,17 @@ public Object restriction() {
 }
   });
   return WindowedSplitResult.forRoots(
+  primaryFullyProcessedWindows.isEmpty()
+  ? null
+  : WindowedValue.of(
+  KV.of(
+  KV.of(
+  currentElement.getValue(),
+  KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+  fullSize),

Review comment:
   I don't think so but this warrants a larger discussion about what does 
an element+restriction in multiple windows mean and how that it impacts 
splitting/sizing.
   
   I would like to leave it as is until we can build that consensus.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn

2020-06-05 Thread GitBox


lukecwik commented on a change in pull request #11922:
URL: https://github.com/apache/beam/pull/11922#discussion_r43686



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -577,26 +586,83 @@ public Instant timestamp(DoFn doFn) {
 switch (pTransform.getSpec().getUrn()) {
   case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN:
 this.convertSplitResultToWindowedSplitResult =
-(splitResult, watermarkEstimatorState) ->
-WindowedSplitResult.forRoots(
-WindowedValue.of(
-KV.of(
-currentElement.getValue(),
-KV.of(splitResult.getPrimary(), 
currentWatermarkEstimatorState)),
-currentElement.getTimestamp(),
-currentWindow,
-currentElement.getPane()),
-WindowedValue.of(
-KV.of(
-currentElement.getValue(),
-KV.of(splitResult.getResidual(), 
watermarkEstimatorState)),
-currentElement.getTimestamp(),
-currentWindow,
-currentElement.getPane()));
+(splitResult, watermarkEstimatorState) -> {
+  List primaryFullyProcessedWindows =
+  ImmutableList.copyOf(
+  Iterables.limit(
+  currentElement.getWindows(), 
currentWindowIterator.previousIndex()));
+  // Advances the iterator consuming the remaining windows.
+  List residualUnprocessedWindows =
+  ImmutableList.copyOf(currentWindowIterator);

Review comment:
   ImmutableList.copyOf(iterator) drains all the remaining elements from 
iterator. It is effectively:
   ```
   while (iterator.hasNext()) {
 list.add(iterator.next());
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn

2020-06-05 Thread GitBox


lukecwik commented on a change in pull request #11922:
URL: https://github.com/apache/beam/pull/11922#discussion_r436111296



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -577,26 +586,83 @@ public Instant timestamp(DoFn doFn) {
 switch (pTransform.getSpec().getUrn()) {
   case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN:
 this.convertSplitResultToWindowedSplitResult =
-(splitResult, watermarkEstimatorState) ->
-WindowedSplitResult.forRoots(
-WindowedValue.of(
-KV.of(
-currentElement.getValue(),
-KV.of(splitResult.getPrimary(), 
currentWatermarkEstimatorState)),
-currentElement.getTimestamp(),
-currentWindow,
-currentElement.getPane()),
-WindowedValue.of(
-KV.of(
-currentElement.getValue(),
-KV.of(splitResult.getResidual(), 
watermarkEstimatorState)),
-currentElement.getTimestamp(),
-currentWindow,
-currentElement.getPane()));
+(splitResult, watermarkEstimatorState) -> {
+  List primaryFullyProcessedWindows =
+  ImmutableList.copyOf(
+  Iterables.limit(
+  currentElement.getWindows(), 
currentWindowIterator.previousIndex()));
+  // Advances the iterator consuming the remaining windows.
+  List residualUnprocessedWindows =
+  ImmutableList.copyOf(currentWindowIterator);
+  return WindowedSplitResult.forRoots(
+  primaryFullyProcessedWindows.isEmpty()
+  ? null
+  : WindowedValue.of(
+  KV.of(
+  currentElement.getValue(),
+  KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+  currentElement.getTimestamp(),
+  primaryFullyProcessedWindows,
+  currentElement.getPane()),
+  WindowedValue.of(
+  KV.of(
+  currentElement.getValue(),
+  KV.of(splitResult.getPrimary(), 
currentWatermarkEstimatorState)),
+  currentElement.getTimestamp(),
+  currentWindow,
+  currentElement.getPane()),
+  WindowedValue.of(
+  KV.of(
+  currentElement.getValue(),
+  KV.of(splitResult.getResidual(), 
watermarkEstimatorState)),
+  currentElement.getTimestamp(),
+  currentWindow,
+  currentElement.getPane()),
+  residualUnprocessedWindows.isEmpty()
+  ? null
+  : WindowedValue.of(
+  KV.of(
+  currentElement.getValue(),
+  KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+  currentElement.getTimestamp(),
+  residualUnprocessedWindows,
+  currentElement.getPane()));
+};
 break;
   case 
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
 this.convertSplitResultToWindowedSplitResult =
 (splitResult, watermarkEstimatorState) -> {
+  List primaryFullyProcessedWindows =
+  ImmutableList.copyOf(
+  Iterables.limit(
+  currentElement.getWindows(), 
currentWindowIterator.previousIndex()));
+  // Advances the iterator consuming the remaining windows.
+  List residualUnprocessedWindows =
+  ImmutableList.copyOf(currentWindowIterator);

Review comment:
   ImmutableList.copyOf(iterator) drains all the remaining elements from 
iterator. It is effectively:
   ```
   while (iterator.hasNext()) {
 list.add(iterator.next());
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on pull request #11937: [BEAM-9615] Remove LP indicator on string methods.

2020-06-05 Thread GitBox


lostluck commented on pull request #11937:
URL: https://github.com/apache/beam/pull/11937#issuecomment-639718856


   R: @tysonjh 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck opened a new pull request #11937: [BEAM-9615] Remove LP indicator on string methods.

2020-06-05 Thread GitBox


lostluck opened a new pull request #11937:
URL: https://github.com/apache/beam/pull/11937


   Missed commit from PR 11925.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.o

[GitHub] [beam] amaliujia commented on a change in pull request #11933: [BEAM-10204] @Ignore: re-enable LIKE operator related unit tests.

2020-06-05 Thread GitBox


amaliujia commented on a change in pull request #11933:
URL: https://github.com/apache/beam/pull/11933#discussion_r436101952



##
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##
@@ -837,7 +834,6 @@ public void testLikeAllowsEscapingBackslash() {
   }
 
   @Test
-  @Ignore("Currently non UTF-8 values are coerced to UTF-8")
   public void testThrowsErrorForNonUTF8() {

Review comment:
   PR updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on a change in pull request #11933: [BEAM-10204] @Ignore: re-enable LIKE operator related unit tests.

2020-06-05 Thread GitBox


amaliujia commented on a change in pull request #11933:
URL: https://github.com/apache/beam/pull/11933#discussion_r436094165



##
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##
@@ -837,7 +834,6 @@ public void testLikeAllowsEscapingBackslash() {
   }
 
   @Test
-  @Ignore("Currently non UTF-8 values are coerced to UTF-8")
   public void testThrowsErrorForNonUTF8() {

Review comment:
   This is a good point. I will choose to remove this test. 
   
   I tried to dig into ZetaSQL's documentation and internal tests suite, there 
was no clear explanation how to deal with nonUTF8 chars by LIKE operator (there 
are code-generated nonUTF8 test cases, but they don't have clear comments to 
demonstrate their purpose). So I will leave this part to be tested by internal 
test suite.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on pull request #11925: [BEAM-9615] Add string coder utility functions.

2020-06-05 Thread GitBox


lostluck commented on pull request #11925:
URL: https://github.com/apache/beam/pull/11925#issuecomment-639698549


   Ah dang it. I thought I had pushed the commit with the rename, but it was 
waiting on a password. I'll have that as another PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck merged pull request #11927: [BEAM-9615] finish standardizing proto import names

2020-06-05 Thread GitBox


lostluck merged pull request #11927:
URL: https://github.com/apache/beam/pull/11927


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck edited a comment on pull request #11927: [BEAM-9615] finish standardizing proto import names

2020-06-05 Thread GitBox


lostluck edited a comment on pull request #11927:
URL: https://github.com/apache/beam/pull/11927#issuecomment-639695778


   @lukecwik  I'm 100% certain that the Java PreCommit failure here is 
unrelated to the change.
   
1. The short names are for programmer convenience and shouldn't change the 
compiled binary.
   
   2. [That 
test](https://builds.apache.org/job/beam_PreCommit_Java_Phrase/2291/testReport/org.apache.beam.sdk.extensions.ml/VideoIntelligenceIT/annotateVideoFromURINoContext/)
 doesn't even run the boot binary. 
   
   @tysonjh Filed it as https://issues.apache.org/jira/browse/BEAM-10206



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn

2020-06-05 Thread GitBox


boyuanzz commented on a change in pull request #11922:
URL: https://github.com/apache/beam/pull/11922#discussion_r436072548



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -296,6 +299,12 @@ public void accept(WindowedValue input) throws Exception {
   /** Only valid during {@code processElement...} methods, null otherwise. */
   private WindowedValue currentElement;
 
+  /**
+   * Only valid during {@link #processElementForSizedElementAndRestriction} 
and {@link
+   * #processElementForSizedElementAndRestriction}.

Review comment:
   Duplicated {@link #processElementForSizedElementAndRestriction}?

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -577,26 +586,83 @@ public Instant timestamp(DoFn doFn) {
 switch (pTransform.getSpec().getUrn()) {
   case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN:
 this.convertSplitResultToWindowedSplitResult =
-(splitResult, watermarkEstimatorState) ->
-WindowedSplitResult.forRoots(
-WindowedValue.of(
-KV.of(
-currentElement.getValue(),
-KV.of(splitResult.getPrimary(), 
currentWatermarkEstimatorState)),
-currentElement.getTimestamp(),
-currentWindow,
-currentElement.getPane()),
-WindowedValue.of(
-KV.of(
-currentElement.getValue(),
-KV.of(splitResult.getResidual(), 
watermarkEstimatorState)),
-currentElement.getTimestamp(),
-currentWindow,
-currentElement.getPane()));
+(splitResult, watermarkEstimatorState) -> {
+  List primaryFullyProcessedWindows =
+  ImmutableList.copyOf(
+  Iterables.limit(
+  currentElement.getWindows(), 
currentWindowIterator.previousIndex()));
+  // Advances the iterator consuming the remaining windows.
+  List residualUnprocessedWindows =
+  ImmutableList.copyOf(currentWindowIterator);

Review comment:
   `currentWindowIterator .next()`?

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -941,20 +1047,59 @@ private Progress getProgress() {
   convertSplitResultToWindowedSplitResult.apply(result, 
watermarkAndState.getValue());
 }
 
+List primaryRoots = new ArrayList<>();

Review comment:
   This also takes care of self checkpoint, right?

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -577,26 +586,83 @@ public Instant timestamp(DoFn doFn) {
 switch (pTransform.getSpec().getUrn()) {
   case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN:
 this.convertSplitResultToWindowedSplitResult =
-(splitResult, watermarkEstimatorState) ->
-WindowedSplitResult.forRoots(
-WindowedValue.of(
-KV.of(
-currentElement.getValue(),
-KV.of(splitResult.getPrimary(), 
currentWatermarkEstimatorState)),
-currentElement.getTimestamp(),
-currentWindow,
-currentElement.getPane()),
-WindowedValue.of(
-KV.of(
-currentElement.getValue(),
-KV.of(splitResult.getResidual(), 
watermarkEstimatorState)),
-currentElement.getTimestamp(),
-currentWindow,
-currentElement.getPane()));
+(splitResult, watermarkEstimatorState) -> {
+  List primaryFullyProcessedWindows =
+  ImmutableList.copyOf(
+  Iterables.limit(
+  currentElement.getWindows(), 
currentWindowIterator.previousIndex()));
+  // Advances the iterator consuming the remaining windows.
+  List residualUnprocessedWindows =
+  ImmutableList.copyOf(currentWindowIterator);
+  return WindowedSplitResult.forRoots(
+  primaryFullyProcessedWindows.isEmpty()
+  ? null
+  : WindowedValue.of(
+  KV.of(
+  currentElement.getValue(),
+  KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+  currentElement.getTimestamp(),
+  primaryFullyProcessedWindows,
+  currentElement.getPane()),
+ 

[GitHub] [beam] amaliujia commented on a change in pull request #11933: [BEAM-10204] @Ignore: re-enable LIKE operator related unit tests.

2020-06-05 Thread GitBox


amaliujia commented on a change in pull request #11933:
URL: https://github.com/apache/beam/pull/11933#discussion_r436094165



##
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##
@@ -837,7 +834,6 @@ public void testLikeAllowsEscapingBackslash() {
   }
 
   @Test
-  @Ignore("Currently non UTF-8 values are coerced to UTF-8")
   public void testThrowsErrorForNonUTF8() {

Review comment:
   This is a good point. I will choose to remove this test. I tried to dig 
into ZetaSQL's documentation and internal tests suite, there was no clear 
explanation how to deal with nonUTF8 chars by LIKE operator (there are 
code-generated nonUTF8 test cases, but they don't have clear comments to 
demonstrate their purpose) 
   
   So I will leave this part to be tested by internal test suite.

##
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##
@@ -837,7 +834,6 @@ public void testLikeAllowsEscapingBackslash() {
   }
 
   @Test
-  @Ignore("Currently non UTF-8 values are coerced to UTF-8")
   public void testThrowsErrorForNonUTF8() {

Review comment:
   This is a good point. I will choose to remove this test. I tried to dig 
into ZetaSQL's documentation and internal tests suite, there was no clear 
explanation how to deal with nonUTF8 chars by LIKE operator (there are 
code-generated nonUTF8 test cases, but they don't have clear comments to 
demonstrate their purpose).
   
   So I will leave this part to be tested by internal test suite.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on pull request #11927: [BEAM-9615] finish standardizing proto import names

2020-06-05 Thread GitBox


lostluck commented on pull request #11927:
URL: https://github.com/apache/beam/pull/11927#issuecomment-639695778


   @lukecwik  I'm 100% certain that the Java PreCommit failure here is 
unrelated to the change. 1. The short names are for programmer convenience and 
don't change the boot binary at all.
[That 
test](https://builds.apache.org/job/beam_PreCommit_Java_Phrase/2291/testReport/org.apache.beam.sdk.extensions.ml/VideoIntelligenceIT/annotateVideoFromURINoContext/)
 doesn't even run the boot binary. 
   
   @tysonjh Filed it as https://issues.apache.org/jira/browse/BEAM-10206



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #11925: [BEAM-9615] Add string coder utility functions.

2020-06-05 Thread GitBox


lostluck commented on a change in pull request #11925:
URL: https://github.com/apache/beam/pull/11925#discussion_r436095065



##
File path: sdks/go/pkg/beam/core/graph/coder/stringutf8_test.go
##
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+   "bytes"
+   "encoding/base64"
+   "io"
+   "strings"
+   "testing"
+   "unicode/utf8"
+)
+
+var testValues = []string{
+   "",
+   "a",
+   "13",
+   "hello",
+   "a longer string with spaces and all that",
+   "a string with a \n newline",
+   "スタリング",
+   "I am the very model of a modern major general.\nI've information 
animal, vegetable, and mineral",
+}
+
+// Base64 encoded versions of the above strings, without the length prefix.
+var testEncodings = []string{
+   "",
+   "YQ",
+   "MTM",
+   "aGVsbG8",
+   "YSBsb25nZXIgc3RyaW5nIHdpdGggc3BhY2VzIGFuZCBhbGwgdGhhdA",
+   "YSBzdHJpbmcgd2l0aCBhIAogbmV3bGluZQ",
+   "44K544K_44Oq44Oz44Kw",
+   
"SSBhbSB0aGUgdmVyeSBtb2RlbCBvZiBhIG1vZGVybiBtYWpvciBnZW5lcmFsLgpJJ3ZlIGluZm9ybWF0aW9uIGFuaW1hbCwgdmVnZXRhYmxlLCBhbmQgbWluZXJhbA",
+}
+
+// TestLen serves as a verification that string lengths
+// match their equivalent byte lengths, and not their rune
+// representation.
+func TestLen(t *testing.T) {
+   runeCount := []int{0, 1, 2, 5, 40, 25, 5, 94}
+   for i, s := range testValues {
+   if got, want := len(s), len([]byte(s)); got != want {
+   t.Errorf("string and []byte len do not match. got %v, 
want %v", got, want)
+   }
+   if got, want := utf8.RuneCountInString(s), runeCount[i]; got != 
want {
+   t.Errorf("Rune count of %q change len do not match. got 
%v, want %v", s, got, want)
+   }
+   }
+}
+
+func TestEncodeStringUTF8(t *testing.T) {
+   for i, s := range testValues {
+   s := s
+   want := testEncodings[i]
+   t.Run(s, func(t *testing.T) {
+   var b strings.Builder
+   base64enc := base64.NewEncoder(base64.RawURLEncoding, 
&b)
+
+   if err := encodeStringUTF8(s, base64enc); err != nil {
+   t.Fatal(err)
+   }
+   base64enc.Close()
+   got := b.String()
+   if got != want {
+   t.Errorf("encodeStringUTF8(%q) = %q, want %q", 
s, got, want)
+   }
+   })
+   }
+}
+
+func TestDecodeStringUTF8(t *testing.T) {
+   for i, s := range testEncodings {
+   s := s
+   want := testValues[i]
+   t.Run(want, func(t *testing.T) {
+   b := bytes.NewBufferString(s)
+   base64dec := base64.NewDecoder(base64.RawURLEncoding, b)
+
+   got, err := decodeStringUTF8(int64(len(want)), 
base64dec)
+   if err != nil && err != io.EOF {
+   t.Fatal(err)
+   }
+   if got != want {
+   t.Errorf("decodeStringUTF8(%q) = %q, want %q", 
s, got, want)
+   }
+   })
+   }
+}
+
+func TestEncodeDecodeStringUTF8LP(t *testing.T) {
+   for _, s := range testValues {
+   want := s
+   t.Run(want, func(t *testing.T) {
+   var build strings.Builder
+   if err := EncodeStringUTF8LP(want, &build); err != nil {

Review comment:
   That did occur to me as well, but it would fail the moment we run a 
wordcount since nothing could get decoded properly, and the runner would end up 
with very strange data under/over reads when it tries to use the initial 
character encodings as varints. In practice, this is not going to change, and 
certainly not to remove both length prefixes at the same time.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL ab

[GitHub] [beam] lostluck merged pull request #11925: [BEAM-9615] Add string coder utility functions.

2020-06-05 Thread GitBox


lostluck merged pull request #11925:
URL: https://github.com/apache/beam/pull/11925


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.

2020-06-05 Thread GitBox


robertwb commented on pull request #11932:
URL: https://github.com/apache/beam/pull/11932#issuecomment-639691021


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on a change in pull request #11933: [BEAM-10204] @Ignore: re-enable LIKE operator related unit tests.

2020-06-05 Thread GitBox


amaliujia commented on a change in pull request #11933:
URL: https://github.com/apache/beam/pull/11933#discussion_r436094165



##
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##
@@ -837,7 +834,6 @@ public void testLikeAllowsEscapingBackslash() {
   }
 
   @Test
-  @Ignore("Currently non UTF-8 values are coerced to UTF-8")
   public void testThrowsErrorForNonUTF8() {

Review comment:
   This is a good point. I will choose to remove this test. I tried to dig 
into ZetaSQL's documentation and internal tests suite, there was no clear 
explanation how to deal with nonUTF8 chars by LIKE operator (there is are 
code-generated nonUTF8 test cases, but they don't have clear comments to 
demonstrate their purpose) 
   
   So I will leave this part to be tested by internal test suite.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kennknowles commented on pull request #11820: [BEAM-10093] ZetaSql Nexmark variant

2020-06-05 Thread GitBox


kennknowles commented on pull request #11820:
URL: https://github.com/apache/beam/pull/11820#issuecomment-639689286


   PTAL. The first commit is unchanged, the second is a total rewrite (or 
unwrite, if you like).
   
- Modified each `SqlQuery2`, etc, class to have just the amount of 
variation necessary
- Made each test class run as `@Enclosed` with an inner class for each 
dialect. I don't love this use of inheritance, but it seems to be how JUnit 
wants things. It has a clearer test signal than using `@Parameterized` across 
the dialects, and allows separate `@Ignore` for broken/unsupported features.
   
   Did not do:
   
- Jenkins job to publish benchmarks; planning that as a followup
- Any other refactor to make suites more manageable



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tysonjh commented on a change in pull request #11925: [BEAM-9615] Add string coder utility functions.

2020-06-05 Thread GitBox


tysonjh commented on a change in pull request #11925:
URL: https://github.com/apache/beam/pull/11925#discussion_r436092337



##
File path: sdks/go/pkg/beam/core/graph/coder/stringutf8_test.go
##
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+   "bytes"
+   "encoding/base64"
+   "io"
+   "strings"
+   "testing"
+   "unicode/utf8"
+)
+
+var testValues = []string{
+   "",
+   "a",
+   "13",
+   "hello",
+   "a longer string with spaces and all that",
+   "a string with a \n newline",
+   "スタリング",
+   "I am the very model of a modern major general.\nI've information 
animal, vegetable, and mineral",
+}
+
+// Base64 encoded versions of the above strings, without the length prefix.
+var testEncodings = []string{
+   "",
+   "YQ",
+   "MTM",
+   "aGVsbG8",
+   "YSBsb25nZXIgc3RyaW5nIHdpdGggc3BhY2VzIGFuZCBhbGwgdGhhdA",
+   "YSBzdHJpbmcgd2l0aCBhIAogbmV3bGluZQ",
+   "44K544K_44Oq44Oz44Kw",
+   
"SSBhbSB0aGUgdmVyeSBtb2RlbCBvZiBhIG1vZGVybiBtYWpvciBnZW5lcmFsLgpJJ3ZlIGluZm9ybWF0aW9uIGFuaW1hbCwgdmVnZXRhYmxlLCBhbmQgbWluZXJhbA",
+}
+
+// TestLen serves as a verification that string lengths
+// match their equivalent byte lengths, and not their rune
+// representation.
+func TestLen(t *testing.T) {
+   runeCount := []int{0, 1, 2, 5, 40, 25, 5, 94}
+   for i, s := range testValues {
+   if got, want := len(s), len([]byte(s)); got != want {
+   t.Errorf("string and []byte len do not match. got %v, 
want %v", got, want)
+   }
+   if got, want := utf8.RuneCountInString(s), runeCount[i]; got != 
want {
+   t.Errorf("Rune count of %q change len do not match. got 
%v, want %v", s, got, want)
+   }
+   }
+}
+
+func TestEncodeStringUTF8(t *testing.T) {
+   for i, s := range testValues {
+   s := s
+   want := testEncodings[i]
+   t.Run(s, func(t *testing.T) {
+   var b strings.Builder
+   base64enc := base64.NewEncoder(base64.RawURLEncoding, 
&b)
+
+   if err := encodeStringUTF8(s, base64enc); err != nil {
+   t.Fatal(err)
+   }
+   base64enc.Close()
+   got := b.String()
+   if got != want {
+   t.Errorf("encodeStringUTF8(%q) = %q, want %q", 
s, got, want)
+   }
+   })
+   }
+}
+
+func TestDecodeStringUTF8(t *testing.T) {
+   for i, s := range testEncodings {
+   s := s
+   want := testValues[i]
+   t.Run(want, func(t *testing.T) {
+   b := bytes.NewBufferString(s)
+   base64dec := base64.NewDecoder(base64.RawURLEncoding, b)
+
+   got, err := decodeStringUTF8(int64(len(want)), 
base64dec)
+   if err != nil && err != io.EOF {
+   t.Fatal(err)
+   }
+   if got != want {
+   t.Errorf("decodeStringUTF8(%q) = %q, want %q", 
s, got, want)
+   }
+   })
+   }
+}
+
+func TestEncodeDecodeStringUTF8LP(t *testing.T) {
+   for _, s := range testValues {
+   want := s
+   t.Run(want, func(t *testing.T) {
+   var build strings.Builder
+   if err := EncodeStringUTF8LP(want, &build); err != nil {

Review comment:
   If the LP part got removed, or unused somehow, in both encode/decode (a 
stretch to be sure), then this test would pass despite there being no LP. My 
thought was that since the exposed methods suggest LP as part of the 
abstraction, it should be explicitly verified to avoid any surprises.
   
   I'll leave it up to your discretion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific commen

[GitHub] [beam] lostluck commented on a change in pull request #11925: [BEAM-9615] Add string coder utility functions.

2020-06-05 Thread GitBox


lostluck commented on a change in pull request #11925:
URL: https://github.com/apache/beam/pull/11925#discussion_r436091546



##
File path: sdks/go/pkg/beam/core/graph/coder/stringutf8_test.go
##
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+   "bytes"
+   "encoding/base64"
+   "io"
+   "strings"
+   "testing"
+   "unicode/utf8"
+)
+
+var testValues = []string{
+   "",
+   "a",
+   "13",
+   "hello",
+   "a longer string with spaces and all that",
+   "a string with a \n newline",
+   "スタリング",
+   "I am the very model of a modern major general.\nI've information 
animal, vegetable, and mineral",
+}
+
+// Base64 encoded versions of the above strings, without the length prefix.
+var testEncodings = []string{
+   "",
+   "YQ",
+   "MTM",
+   "aGVsbG8",
+   "YSBsb25nZXIgc3RyaW5nIHdpdGggc3BhY2VzIGFuZCBhbGwgdGhhdA",
+   "YSBzdHJpbmcgd2l0aCBhIAogbmV3bGluZQ",
+   "44K544K_44Oq44Oz44Kw",
+   
"SSBhbSB0aGUgdmVyeSBtb2RlbCBvZiBhIG1vZGVybiBtYWpvciBnZW5lcmFsLgpJJ3ZlIGluZm9ybWF0aW9uIGFuaW1hbCwgdmVnZXRhYmxlLCBhbmQgbWluZXJhbA",
+}
+
+// TestLen serves as a verification that string lengths
+// match their equivalent byte lengths, and not their rune
+// representation.
+func TestLen(t *testing.T) {
+   runeCount := []int{0, 1, 2, 5, 40, 25, 5, 94}
+   for i, s := range testValues {
+   if got, want := len(s), len([]byte(s)); got != want {
+   t.Errorf("string and []byte len do not match. got %v, 
want %v", got, want)
+   }
+   if got, want := utf8.RuneCountInString(s), runeCount[i]; got != 
want {
+   t.Errorf("Rune count of %q change len do not match. got 
%v, want %v", s, got, want)
+   }
+   }
+}
+
+func TestEncodeStringUTF8(t *testing.T) {
+   for i, s := range testValues {
+   s := s
+   want := testEncodings[i]
+   t.Run(s, func(t *testing.T) {
+   var b strings.Builder
+   base64enc := base64.NewEncoder(base64.RawURLEncoding, 
&b)
+
+   if err := encodeStringUTF8(s, base64enc); err != nil {
+   t.Fatal(err)
+   }
+   base64enc.Close()
+   got := b.String()
+   if got != want {
+   t.Errorf("encodeStringUTF8(%q) = %q, want %q", 
s, got, want)
+   }
+   })
+   }
+}
+
+func TestDecodeStringUTF8(t *testing.T) {
+   for i, s := range testEncodings {
+   s := s
+   want := testValues[i]
+   t.Run(want, func(t *testing.T) {
+   b := bytes.NewBufferString(s)
+   base64dec := base64.NewDecoder(base64.RawURLEncoding, b)
+
+   got, err := decodeStringUTF8(int64(len(want)), 
base64dec)
+   if err != nil && err != io.EOF {
+   t.Fatal(err)
+   }
+   if got != want {
+   t.Errorf("decodeStringUTF8(%q) = %q, want %q", 
s, got, want)
+   }
+   })
+   }
+}
+
+func TestEncodeDecodeStringUTF8LP(t *testing.T) {
+   for _, s := range testValues {
+   want := s
+   t.Run(want, func(t *testing.T) {
+   var build strings.Builder
+   if err := EncodeStringUTF8LP(want, &build); err != nil {

Review comment:
   By removing the callout for the LP, it's part of the implementation 
details. Eg if you expect the next value is a StringUTF8, then it must be 
length prefixed. This code would only work in that situation.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] damondouglas opened a new pull request #11936: [BEAM-9679] Add CombinePerKey to Core Transforms Go Katas

2020-06-05 Thread GitBox


damondouglas opened a new pull request #11936:
URL: https://github.com/apache/beam/pull/11936


   This pull requests adds a Combine/CombinePerKey lesson to the Go SDK katas.  
I would like to request the following reviewers:
   
   (R: @lostluck )
   (R: @henryken )
   
   If accepted by both reviewers, please wait until the [Stepik 
course](https://stepik.org/course/70387) is updated before finally merging this 
PR.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [x] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [x] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_J

[GitHub] [beam] tysonjh commented on a change in pull request #11925: [BEAM-9615] Add string coder utility functions.

2020-06-05 Thread GitBox


tysonjh commented on a change in pull request #11925:
URL: https://github.com/apache/beam/pull/11925#discussion_r436090874



##
File path: sdks/go/pkg/beam/core/graph/coder/stringutf8.go
##
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+   "io"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const bufCap = 64
+
+// EncodeStringUTF8LP encodes a UTF string with a length prefix.

Review comment:
   Sounds good.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #11925: [BEAM-9615] Add string coder utility functions.

2020-06-05 Thread GitBox


lostluck commented on a change in pull request #11925:
URL: https://github.com/apache/beam/pull/11925#discussion_r436090722



##
File path: sdks/go/pkg/beam/core/graph/coder/stringutf8.go
##
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+   "io"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const bufCap = 64
+
+// EncodeStringUTF8LP encodes a UTF string with a length prefix.

Review comment:
   I removed it. It's part of the beam spec for an encoded UTF8 string, so 
the call out is not necessary.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay merged pull request #11882: [BEAM-10112] Add state and timer python examples to website

2020-06-05 Thread GitBox


aaltay merged pull request #11882:
URL: https://github.com/apache/beam/pull/11882


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay merged pull request #11805: Add documentation for python apache-beam[aws] installation

2020-06-05 Thread GitBox


aaltay merged pull request #11805:
URL: https://github.com/apache/beam/pull/11805


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] KevinGG commented on pull request #11884: [BEAM-7923] Initialize an empty Jupyter labextension with README

2020-06-05 Thread GitBox


KevinGG commented on pull request #11884:
URL: https://github.com/apache/beam/pull/11884#issuecomment-639670746


   > There's a lot of files there that don't seem relevant; I think we should 
go through and figure out what's needed for the actual plugin vs. what's 
"extras" that just got copied from a template. We also need to figure out the 
distribution story. Will this be released with beam? As another pypi pacakge 
(and npm package)?
   
   I can go through these files and explain their usages and the distribution 
story. If this is not enough, we can talk offline.
   
   [ESLint](https://eslint.org/docs/user-guide/getting-started)
   ```
   .eslintignore
   .eslintrc.js
   ```
   
   [Github 
workflow](https://help.github.com/en/actions/configuring-and-managing-workflows/configuring-a-workflow)
   
   ```
   # This is later used in the README to display a status badge.
   .github/workflows/build.yml
   ```
   
   [Prettier](https://prettier.io/docs/en/install.html)
   
   ```
   .prettierignore
   .prettierrc.json
   ```
   
   [Python Packaging](https://packaging.python.org/guides/using-manifest-in/)
   
   ```
   MANIFEST.in
   ```
   
   JupyterLab server extension
   
   ```
   interactive_beam_side_panel/*.py
   ```
   
   
   JupyterLab frontend extension
   
   ```
   src/*.ts[x]
   ```
   
   JupyterLab frontend extension styles
   
   ```
   style/*.css
   ```
   
   JupyterLab configuration
   
   ```
   jupyter-config/*.json
   ```
   
   [TypeScript 
configuration](https://www.typescriptlang.org/docs/handbook/tsconfig-json.html)
   
   ```
   tsconfig.json
   ```
   
   [PEP 518](https://www.python.org/dev/peps/pep-0518/)
   
   ```
   pyproject.toml
   ```
   
   [npm 
package](https://nodejs.org/en/knowledge/getting-started/npm/what-is-the-file-package-json/)
   
   ```
   package.json
   ```
   
   The distribution story is 
[here](https://jupyterlab.readthedocs.io/en/stable/developer/extension_dev.html#shipping-packages).
   The plan is to ship the server extension (PYPI) and the frontend extension 
(NPM) separately.
   Temporarily, the server extension will just be a placeholder. We only need 
to release it once.
   The frontend extension can be shipped regularly: `The general idea is to 
pack the Jupyterlab extension using npm pack, and then use the data_files logic 
in setup.py to ensure the file ends up in the 
/share/jupyter/lab/extensions directory.`
   It doesn't have to be released with Beam releases.
   
   If this `sdks/python/apache_beam/runners/interactive` is not a suitable 
place to put the code base, we can move it to some other directory in this repo 
outside of `sdks`.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #11925: [BEAM-9615] Add string coder utility functions.

2020-06-05 Thread GitBox


lostluck commented on a change in pull request #11925:
URL: https://github.com/apache/beam/pull/11925#discussion_r436076913



##
File path: sdks/go/pkg/beam/core/graph/coder/stringutf8.go
##
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+   "io"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const bufCap = 64
+
+// EncodeStringUTF8LP encodes a UTF string with a length prefix.

Review comment:
   It's part of the StringUTF8 so I could remove the explicit call out. 
However, in Java an python tests for the string encodings, they use those 
common encodings (which is why we have a separate encoding/decoding tests with 
the golden values), but those encodings *do not* include the length prefix.
   
   I'd rather have the positive inclusion of a length prefix in the name, 
instead of having the helper method be "withoutLP" instead.
   
   Some of that is from the archaic concept of "nested" and "unnested" coders 
which is largely phased out. In practice, if you have a variable amount of 
data, a length prefix is required.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #11925: [BEAM-9615] Add string coder utility functions.

2020-06-05 Thread GitBox


lostluck commented on a change in pull request #11925:
URL: https://github.com/apache/beam/pull/11925#discussion_r436077202



##
File path: sdks/go/pkg/beam/core/graph/coder/stringutf8_test.go
##
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+   "bytes"
+   "encoding/base64"
+   "io"
+   "strings"
+   "testing"
+   "unicode/utf8"
+)
+
+var testValues = []string{
+   "",
+   "a",
+   "13",
+   "hello",
+   "a longer string with spaces and all that",
+   "a string with a \n newline",
+   "スタリング",
+   "I am the very model of a modern major general.\nI've information 
animal, vegetable, and mineral",
+}
+
+// Base64 encoded versions of the above strings, without the length prefix.
+var testEncodings = []string{
+   "",
+   "YQ",
+   "MTM",
+   "aGVsbG8",
+   "YSBsb25nZXIgc3RyaW5nIHdpdGggc3BhY2VzIGFuZCBhbGwgdGhhdA",
+   "YSBzdHJpbmcgd2l0aCBhIAogbmV3bGluZQ",
+   "44K544K_44Oq44Oz44Kw",
+   
"SSBhbSB0aGUgdmVyeSBtb2RlbCBvZiBhIG1vZGVybiBtYWpvciBnZW5lcmFsLgpJJ3ZlIGluZm9ybWF0aW9uIGFuaW1hbCwgdmVnZXRhYmxlLCBhbmQgbWluZXJhbA",
+}
+
+// TestLen serves as a verification that string lengths
+// match their equivalent byte lengths, and not their rune
+// representation.
+func TestLen(t *testing.T) {
+   runeCount := []int{0, 1, 2, 5, 40, 25, 5, 94}
+   for i, s := range testValues {
+   if got, want := len(s), len([]byte(s)); got != want {
+   t.Errorf("string and []byte len do not match. got %v, 
want %v", got, want)
+   }
+   if got, want := utf8.RuneCountInString(s), runeCount[i]; got != 
want {
+   t.Errorf("Rune count of %q change len do not match. got 
%v, want %v", s, got, want)
+   }
+   }
+}
+
+func TestEncodeStringUTF8(t *testing.T) {
+   for i, s := range testValues {
+   s := s
+   want := testEncodings[i]
+   t.Run(s, func(t *testing.T) {
+   var b strings.Builder
+   base64enc := base64.NewEncoder(base64.RawURLEncoding, 
&b)
+
+   if err := encodeStringUTF8(s, base64enc); err != nil {
+   t.Fatal(err)
+   }
+   base64enc.Close()
+   got := b.String()
+   if got != want {
+   t.Errorf("encodeStringUTF8(%q) = %q, want %q", 
s, got, want)
+   }
+   })
+   }
+}
+
+func TestDecodeStringUTF8(t *testing.T) {
+   for i, s := range testEncodings {
+   s := s
+   want := testValues[i]
+   t.Run(want, func(t *testing.T) {
+   b := bytes.NewBufferString(s)
+   base64dec := base64.NewDecoder(base64.RawURLEncoding, b)
+
+   got, err := decodeStringUTF8(int64(len(want)), 
base64dec)
+   if err != nil && err != io.EOF {
+   t.Fatal(err)
+   }
+   if got != want {
+   t.Errorf("decodeStringUTF8(%q) = %q, want %q", 
s, got, want)
+   }
+   })
+   }
+}
+
+func TestEncodeDecodeStringUTF8LP(t *testing.T) {
+   for _, s := range testValues {
+   want := s
+   t.Run(want, func(t *testing.T) {
+   var build strings.Builder
+   if err := EncodeStringUTF8LP(want, &build); err != nil {

Review comment:
   Technically, this test has the LP verification. If the LP weren't 
present, it wouldn't be possible to get the result back again on decode since 
the decoder won't know how much data to read.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tysonjh commented on pull request #11927: [BEAM-9615] finish standardizing proto import names

2020-06-05 Thread GitBox


tysonjh commented on pull request #11927:
URL: https://github.com/apache/beam/pull/11927#issuecomment-639668226


   > @lukecwik There probably is, but I'm not shaving that yak today. The 
gradle commands don't meaningfully work for Go, and not when I'm ripping them 
apart in the next two months to be replaced with something better.
   > In practice this is only a problem for very few packages, and only when 
those packages get new files.
   
   Consider cutting a jira to not lose track of this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TobKed commented on pull request #11877: [BEAM-10184] Build python wheels on GitHub Actions for Linux/MacOS

2020-06-05 Thread GitBox


TobKed commented on pull request #11877:
URL: https://github.com/apache/beam/pull/11877#issuecomment-639667163


   cc @potiuk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on pull request #11927: [BEAM-9615] finish standardizing proto import names

2020-06-05 Thread GitBox


lostluck commented on pull request #11927:
URL: https://github.com/apache/beam/pull/11927#issuecomment-639664788


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia merged pull request #11934: [BEAM-10205] @Ignore:BYTES can work with UNION ALL

2020-06-05 Thread GitBox


amaliujia merged pull request #11934:
URL: https://github.com/apache/beam/pull/11934


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tysonjh commented on a change in pull request #11933: [BEAM-10204] @Ignore: re-enable LIKE operator related unit tests.

2020-06-05 Thread GitBox


tysonjh commented on a change in pull request #11933:
URL: https://github.com/apache/beam/pull/11933#discussion_r436071355



##
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##
@@ -837,7 +834,6 @@ public void testLikeAllowsEscapingBackslash() {
   }
 
   @Test
-  @Ignore("Currently non UTF-8 values are coerced to UTF-8")
   public void testThrowsErrorForNonUTF8() {

Review comment:
   This test name doesn't seem correct after these changes. What is it 
testing now? What about the previous assertions on throwing an exception for 
invalid utf8?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb opened a new pull request #11935: [BEAM-9577] Remove use of legacy artifact service in Python.

2020-06-05 Thread GitBox


robertwb opened a new pull request #11935:
URL: https://github.com/apache/beam/pull/11935


   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/

[GitHub] [beam] TobKed commented on a change in pull request #11877: [BEAM-10184] Build python wheels on GitHub Actions for Linux/MacOS

2020-06-05 Thread GitBox


TobKed commented on a change in pull request #11877:
URL: https://github.com/apache/beam/pull/11877#discussion_r436068238



##
File path: .github/workflows/build_wheels.yml
##
@@ -0,0 +1,141 @@
+name: Build python wheels
+
+on:
+  push:
+branches:
+  - master
+  - release-*
+tags:
+  - v*
+
+jobs:
+
+  build_source:
+runs-on: ubuntu-18.04
+steps:
+  - name: Checkout code
+uses: actions/checkout@v2
+  - name: Install python
+uses: actions/setup-python@v2
+with:
+  python-version: 3.7
+  - name: Get build dependencies
+working-directory: ./sdks/python
+run: python3 -m pip install cython && python3 -m pip install -r 
build-requirements.txt
+  - name: Install wheels
+run: python3 -m pip install wheel
+  - name: Buld source
+working-directory: ./sdks/python
+run: python3 setup.py sdist --formats=gztar,zip
+  - name: Unzip source
+working-directory: ./sdks/python
+run: unzip dist/$(ls dist | grep .zip | head -n 1)
+  - name: Rename source directory
+working-directory: ./sdks/python
+run: mv $(ls | grep apache-beam) apache-beam-source
+  - name: Upload source
+uses: actions/upload-artifact@v2
+with:
+  name: source
+  path: sdks/python/apache-beam-source
+  - name: Upload compressed sources
+uses: actions/upload-artifact@v2
+with:
+  name: source_gztar_zip
+  path: sdks/python/dist
+
+  prepare_gcs:
+name: Prepare GCS
+needs: build_source
+runs-on: ubuntu-18.04
+steps:
+  - name: Authenticate on GCP
+uses: GoogleCloudPlatform/github-actions/setup-gcloud@master
+with:
+  service_account_email: ${{ secrets.CCP_SA_EMAIL }}
+  service_account_key: ${{ secrets.CCP_SA_KEY }}
+  - name: Remove existing files on GCS bucket
+run: gsutil rm -r "gs://${{ secrets.CCP_BUCKET }}/${GITHUB_REF##*/}/" 
|| true
+
+  upload_source_to_gcs:
+name: Upload source to GCS bucket
+needs: prepare_gcs
+runs-on: ubuntu-18.04
+steps:
+  - name: Download wheels
+uses: actions/download-artifact@v2
+with:
+  name: source_gztar_zip
+  path: source/
+  - name: Authenticate on GCP
+uses: GoogleCloudPlatform/github-actions/setup-gcloud@master
+with:
+  service_account_email: ${{ secrets.CCP_SA_EMAIL }}
+  service_account_key: ${{ secrets.CCP_SA_KEY }}
+  - name: Copy sources to GCS bucket
+run: gsutil cp -r -a public-read source/* gs://${{ secrets.CCP_BUCKET 
}}/${GITHUB_REF##*/}/
+  - name: List sources on GCS bucket
+run: |
+  gsutil ls "gs://${{ secrets.CCP_BUCKET }}/${GITHUB_REF##*/}/*.tar.gz"
+  gsutil ls "gs://${{ secrets.CCP_BUCKET }}/${GITHUB_REF##*/}/*.zip"
+
+  build_wheels:
+name: Build wheels on ${{ matrix.os }}
+needs: prepare_gcs
+runs-on: ${{ matrix.os }}
+strategy:
+  matrix:
+os : [ubuntu-18.04, macos-10.15]

Review comment:
   Hopefully yes 😄 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TobKed commented on a change in pull request #11877: [BEAM-10184] Build python wheels on GitHub Actions for Linux/MacOS

2020-06-05 Thread GitBox


TobKed commented on a change in pull request #11877:
URL: https://github.com/apache/beam/pull/11877#discussion_r436068410



##
File path: .github/workflows/build_wheels.yml
##
@@ -0,0 +1,141 @@
+name: Build python wheels
+
+on:
+  push:
+branches:
+  - master
+  - release-*
+tags:
+  - v*
+
+jobs:
+
+  build_source:
+runs-on: ubuntu-18.04
+steps:
+  - name: Checkout code
+uses: actions/checkout@v2
+  - name: Install python
+uses: actions/setup-python@v2
+with:
+  python-version: 3.7
+  - name: Get build dependencies
+working-directory: ./sdks/python
+run: python3 -m pip install cython && python3 -m pip install -r 
build-requirements.txt
+  - name: Install wheels
+run: python3 -m pip install wheel
+  - name: Buld source

Review comment:
   Thanks!. Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TobKed commented on a change in pull request #11877: [BEAM-10184] Build python wheels on GitHub Actions for Linux/MacOS

2020-06-05 Thread GitBox


TobKed commented on a change in pull request #11877:
URL: https://github.com/apache/beam/pull/11877#discussion_r436068604



##
File path: .github/workflows/build_wheels.yml
##
@@ -0,0 +1,141 @@
+name: Build python wheels
+
+on:
+  push:
+branches:
+  - master
+  - release-*
+tags:
+  - v*
+
+jobs:
+
+  build_source:
+runs-on: ubuntu-18.04
+steps:
+  - name: Checkout code
+uses: actions/checkout@v2
+  - name: Install python
+uses: actions/setup-python@v2
+with:
+  python-version: 3.7
+  - name: Get build dependencies
+working-directory: ./sdks/python
+run: python3 -m pip install cython && python3 -m pip install -r 
build-requirements.txt
+  - name: Install wheels
+run: python3 -m pip install wheel
+  - name: Buld source
+working-directory: ./sdks/python
+run: python3 setup.py sdist --formats=gztar,zip
+  - name: Unzip source
+working-directory: ./sdks/python
+run: unzip dist/$(ls dist | grep .zip | head -n 1)
+  - name: Rename source directory
+working-directory: ./sdks/python
+run: mv $(ls | grep apache-beam) apache-beam-source
+  - name: Upload source
+uses: actions/upload-artifact@v2
+with:
+  name: source
+  path: sdks/python/apache-beam-source
+  - name: Upload compressed sources
+uses: actions/upload-artifact@v2
+with:
+  name: source_gztar_zip
+  path: sdks/python/dist
+
+  prepare_gcs:
+name: Prepare GCS
+needs: build_source
+runs-on: ubuntu-18.04
+steps:
+  - name: Authenticate on GCP
+uses: GoogleCloudPlatform/github-actions/setup-gcloud@master
+with:
+  service_account_email: ${{ secrets.CCP_SA_EMAIL }}
+  service_account_key: ${{ secrets.CCP_SA_KEY }}
+  - name: Remove existing files on GCS bucket
+run: gsutil rm -r "gs://${{ secrets.CCP_BUCKET }}/${GITHUB_REF##*/}/" 
|| true
+
+  upload_source_to_gcs:
+name: Upload source to GCS bucket
+needs: prepare_gcs
+runs-on: ubuntu-18.04
+steps:
+  - name: Download wheels
+uses: actions/download-artifact@v2
+with:
+  name: source_gztar_zip
+  path: source/
+  - name: Authenticate on GCP
+uses: GoogleCloudPlatform/github-actions/setup-gcloud@master
+with:
+  service_account_email: ${{ secrets.CCP_SA_EMAIL }}
+  service_account_key: ${{ secrets.CCP_SA_KEY }}
+  - name: Copy sources to GCS bucket
+run: gsutil cp -r -a public-read source/* gs://${{ secrets.CCP_BUCKET 
}}/${GITHUB_REF##*/}/
+  - name: List sources on GCS bucket
+run: |
+  gsutil ls "gs://${{ secrets.CCP_BUCKET }}/${GITHUB_REF##*/}/*.tar.gz"
+  gsutil ls "gs://${{ secrets.CCP_BUCKET }}/${GITHUB_REF##*/}/*.zip"
+
+  build_wheels:
+name: Build wheels on ${{ matrix.os }}
+needs: prepare_gcs
+runs-on: ${{ matrix.os }}
+strategy:
+  matrix:
+os : [ubuntu-18.04, macos-10.15]
+steps:
+- name: Download source
+  uses: actions/download-artifact@v2
+  with:
+name: source
+path: apache-beam-source
+- name: Install Python
+  uses: actions/setup-python@v2
+  with:
+python-version: 3.7
+- name: Install packages on Mac
+  if: startsWith(matrix.os, 'macos')
+  run: |
+brew update
+brew install pkg-config
+- name: Install cibuildwheel
+  run: pip install cibuildwheel==1.4.2
+- name: Build wheel
+  working-directory: apache-beam-source
+  env:
+CIBW_BUILD: cp27-* cp35-* cp36-* cp37-*
+CIBW_BUILD_VERBOSITY: 3

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tysonjh commented on a change in pull request #11925: [BEAM-9615] Add string coder utility functions.

2020-06-05 Thread GitBox


tysonjh commented on a change in pull request #11925:
URL: https://github.com/apache/beam/pull/11925#discussion_r436016818



##
File path: sdks/go/pkg/beam/core/graph/coder/stringutf8.go
##
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+   "io"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const bufCap = 64
+
+// EncodeStringUTF8LP encodes a UTF string with a length prefix.

Review comment:
   Is the length prefix more than an implementation detail or should this 
just be named EncodeStringUTF8?

##
File path: sdks/go/pkg/beam/core/graph/coder/stringutf8_test.go
##
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+   "bytes"
+   "encoding/base64"
+   "io"
+   "strings"
+   "testing"
+   "unicode/utf8"
+)
+
+var testValues = []string{
+   "",
+   "a",
+   "13",
+   "hello",
+   "a longer string with spaces and all that",
+   "a string with a \n newline",
+   "スタリング",
+   "I am the very model of a modern major general.\nI've information 
animal, vegetable, and mineral",
+}
+
+// Base64 encoded versions of the above strings, without the length prefix.
+var testEncodings = []string{
+   "",
+   "YQ",
+   "MTM",
+   "aGVsbG8",
+   "YSBsb25nZXIgc3RyaW5nIHdpdGggc3BhY2VzIGFuZCBhbGwgdGhhdA",
+   "YSBzdHJpbmcgd2l0aCBhIAogbmV3bGluZQ",
+   "44K544K_44Oq44Oz44Kw",
+   
"SSBhbSB0aGUgdmVyeSBtb2RlbCBvZiBhIG1vZGVybiBtYWpvciBnZW5lcmFsLgpJJ3ZlIGluZm9ybWF0aW9uIGFuaW1hbCwgdmVnZXRhYmxlLCBhbmQgbWluZXJhbA",
+}
+
+// TestLen serves as a verification that string lengths
+// match their equivalent byte lengths, and not their rune
+// representation.
+func TestLen(t *testing.T) {
+   runeCount := []int{0, 1, 2, 5, 40, 25, 5, 94}
+   for i, s := range testValues {
+   if got, want := len(s), len([]byte(s)); got != want {
+   t.Errorf("string and []byte len do not match. got %v, 
want %v", got, want)
+   }
+   if got, want := utf8.RuneCountInString(s), runeCount[i]; got != 
want {
+   t.Errorf("Rune count of %q change len do not match. got 
%v, want %v", s, got, want)
+   }
+   }
+}
+
+func TestEncodeStringUTF8(t *testing.T) {
+   for i, s := range testValues {
+   s := s
+   want := testEncodings[i]
+   t.Run(s, func(t *testing.T) {
+   var b strings.Builder
+   base64enc := base64.NewEncoder(base64.RawURLEncoding, 
&b)
+
+   if err := encodeStringUTF8(s, base64enc); err != nil {
+   t.Fatal(err)
+   }
+   base64enc.Close()
+   got := b.String()
+   if got != want {
+   t.Errorf("encodeStringUTF8(%q) = %q, want %q", 
s, got, want)
+   }
+   })
+   }
+}
+
+func TestDecodeStringUTF8(t *testing.T) {
+   for i, s := range testEncodings {
+   s := s
+   want := testValues[i]
+   t.Run(want, func(t *testing.T) {
+   b := bytes.NewBufferString(s)
+   base64dec := base64.NewDecoder(base64.RawURLEncoding, b)
+
+   got, err

[GitHub] [beam] boyuanzz commented on a change in pull request #11894: [BEAM-7074] Enable test_pardo_timers_clear for fn_runner

2020-06-05 Thread GitBox


boyuanzz commented on a change in pull request #11894:
URL: https://github.com/apache/beam/pull/11894#discussion_r436068037



##
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
##
@@ -377,11 +377,6 @@ def process_timer(
   assert_that(actual, equal_to(expected))
 
   def test_pardo_timers_clear(self):
-if type(self).__name__ != 'FlinkRunnerTest':

Review comment:
   We have problems if overriding subclass. The `PortableRunnerTest` 
doesn't support clearing timer, but its subclass `FlinkRunnerTest` and 
`SparkRunnerTest` support it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TobKed commented on a change in pull request #11877: [BEAM-10184] Build python wheels on GitHub Actions for Linux/MacOS

2020-06-05 Thread GitBox


TobKed commented on a change in pull request #11877:
URL: https://github.com/apache/beam/pull/11877#discussion_r436067638



##
File path: .github/workflows/build_wheels.yml
##
@@ -0,0 +1,141 @@
+name: Build python wheels
+
+on:
+  push:
+branches:
+  - master
+  - release-*
+tags:
+  - v*
+
+jobs:
+
+  build_source:
+runs-on: ubuntu-18.04
+steps:
+  - name: Checkout code
+uses: actions/checkout@v2
+  - name: Install python
+uses: actions/setup-python@v2
+with:
+  python-version: 3.7
+  - name: Get build dependencies
+working-directory: ./sdks/python
+run: python3 -m pip install cython && python3 -m pip install -r 
build-requirements.txt
+  - name: Install wheels
+run: python3 -m pip install wheel
+  - name: Buld source
+working-directory: ./sdks/python
+run: python3 setup.py sdist --formats=gztar,zip
+  - name: Unzip source
+working-directory: ./sdks/python
+run: unzip dist/$(ls dist | grep .zip | head -n 1)
+  - name: Rename source directory
+working-directory: ./sdks/python
+run: mv $(ls | grep apache-beam) apache-beam-source
+  - name: Upload source
+uses: actions/upload-artifact@v2
+with:
+  name: source
+  path: sdks/python/apache-beam-source
+  - name: Upload compressed sources
+uses: actions/upload-artifact@v2
+with:
+  name: source_gztar_zip

Review comment:
   1. Currently two steps: `List sources on GCS bucket` and `Copy wheels to 
GCS bucket` are listing files of specific types. Instead of this two separate 
steps I could create job which will list all files in specific gcs folder. I 
think it would be much cleaner and explicit. Did I understand correctly your 
idea?
   
   About cleaning up these GCS locations I consider two options:
   - setting lifecycle management on the bucket which will delete files older 
than some arbitrary age, e.g. 365 days. I think advantage of this is that will 
be maintenance free.
   - creating another scheduled workflow on github actions which will delete 
gcs folders if corresponding branch does not exist anymore. Could be scheduled 
to run e.g. once pre week.
   
   Which option has more sense for you?
   
   2. "Upload" steps perform file upload as artifacts so they could be passed 
between jobs and being available for download for 90 days (if not deleted 
earlier). These artifacts are picked up later by "Upload to GCS" jobs. What do 
you think about renaming these steps e.g.: "Upload wheels" -> "Upload wheels as 
artifacts" ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on pull request #11933: [BEAM-10204] @Ignore: re-enable LIKE operator related unit tests.

2020-06-05 Thread GitBox


amaliujia commented on pull request #11933:
URL: https://github.com/apache/beam/pull/11933#issuecomment-639656091


   Run SQL PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia opened a new pull request #11934: [BEAM-10205] @Ignore:BYTES can work with UNION ALL

2020-06-05 Thread GitBox


amaliujia opened a new pull request #11934:
URL: https://github.com/apache/beam/pull/11934


   See: https://jira.apache.org/jira/browse/BEAM-10205
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status]

[GitHub] [beam] amaliujia opened a new pull request #11933: [BEAM-10204] @Ignore: re-enable LIKE operator related unit tests.

2020-06-05 Thread GitBox


amaliujia opened a new pull request #11933:
URL: https://github.com/apache/beam/pull/11933


   See  https://issues.apache.org/jira/browse/BEAM-10204
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Statu

[GitHub] [beam] apilloud merged pull request #11758: Old Fastjson has a serious security problem

2020-06-05 Thread GitBox


apilloud merged pull request #11758:
URL: https://github.com/apache/beam/pull/11758


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck merged pull request #11926: [BEAM-9615] Additional coder unit tests

2020-06-05 Thread GitBox


lostluck merged pull request #11926:
URL: https://github.com/apache/beam/pull/11926


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #11926: [BEAM-9615] Additional coder unit tests

2020-06-05 Thread GitBox


lostluck commented on a change in pull request #11926:
URL: https://github.com/apache/beam/pull/11926#discussion_r436032240



##
File path: sdks/go/pkg/beam/coder_test.go
##
@@ -63,3 +64,57 @@ func TestJSONCoder(t *testing.T) {
}
}
 }
+
+func TestCoders(t *testing.T) {
+   ptrString := "test *string"
+   tests := []interface{}{
+   43,
+   12431235,
+   -2,
+   0,
+   1,
+   true,
+   "a string",
+   map[int64]string{1: "one", 11: "oneone", 21: "twoone", 1211: 
"onetwooneone"},
+   struct {
+   A int
+   B *string
+   C bool
+   }{4, &ptrString, false},
+   [...]int64{1, 2, 3, 4, 5}, // array
+   []int64{1, 2, 3, 4, 5},// slice
+   struct {
+   A []int
+   B [3]int
+   }{A: []int{1, 2, 3}, B: [...]int{4, 5, 6}},
+   }
+
+   for _, test := range tests {
+   var results []string
+   rt := reflect.TypeOf(test)
+   enc := NewElementEncoder(rt)
+   for i := 0; i < 10; i++ {
+   var buf bytes.Buffer
+   if err := enc.Encode(test, &buf); err != nil {
+   t.Fatalf("Failed to encode %v: %v", tests, err)
+   }
+   results = append(results, string(buf.Bytes()))
+   }
+   for i, data := range results {
+   if data != results[0] {
+   t.Errorf("coder not deterministic: data[%d]: %v 
!= %v ", i, data, results[0])

Review comment:
   Good catch! Fixed to make it clearer what the test is checking for.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb merged pull request #11835: Various fixes to allow Java PAssert to run on Python

2020-06-05 Thread GitBox


robertwb merged pull request #11835:
URL: https://github.com/apache/beam/pull/11835


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb merged pull request #11766: [BEAM-10036] More flexible dataframes partitioning.

2020-06-05 Thread GitBox


robertwb merged pull request #11766:
URL: https://github.com/apache/beam/pull/11766


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.

2020-06-05 Thread GitBox


robertwb commented on pull request #11932:
URL: https://github.com/apache/beam/pull/11932#issuecomment-639605068


   R: @ibzib 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb opened a new pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.

2020-06-05 Thread GitBox


robertwb opened a new pull request #11932:
URL: https://github.com/apache/beam/pull/11932


   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/

  1   2   >