[GitHub] [beam] rionmonster commented on pull request #11944: [BEAM-10210] Beam Katas for Kotlin Blog Post

2020-06-09 Thread GitBox


rionmonster commented on pull request #11944:
URL: https://github.com/apache/beam/pull/11944#issuecomment-641711018


   @henryken, 
   
   Is there anything else needed on this front? I didn’t know how some of the 
website oriented changes like the blog worked and if I’d need to take someone 
for the actual merge.
   
   Thanks,
   
   Rion



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj closed pull request #11965: [BEAM-9992] | use Sets transform in BeamSQL

2020-06-09 Thread GitBox


darshanj closed pull request #11965:
URL: https://github.com/apache/beam/pull/11965


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] reubenvanammers commented on pull request #11955: [BEAM-10220] add support for implicit nulls for converting between beam rows and json

2020-06-09 Thread GitBox


reubenvanammers commented on pull request #11955:
URL: https://github.com/apache/beam/pull/11955#issuecomment-641689222


   Updated PR in response to comments. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb merged pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.

2020-06-09 Thread GitBox


robertwb merged pull request #11932:
URL: https://github.com/apache/beam/pull/11932


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11959: refactor HCLS IO ITs to support stores in other projects

2020-06-09 Thread GitBox


pabloem commented on pull request #11959:
URL: https://github.com/apache/beam/pull/11959#issuecomment-641677744







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj opened a new pull request #11965: [BEAM-9992] | use Sets transform in BeamSQL

2020-06-09 Thread GitBox


darshanj opened a new pull request #11965:
URL: https://github.com/apache/beam/pull/11965


   Remove Sets tranform in BeamSQL code.
   
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 

[GitHub] [beam] darshanj commented on pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms

2020-06-09 Thread GitBox


darshanj commented on pull request #11610:
URL: https://github.com/apache/beam/pull/11610#issuecomment-641674257


   Thanks all for patiently reviewing this PR and providing valuable feedback.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] reubenvanammers commented on pull request #11955: [BEAM-10220] add support for implicit nulls for converting between beam rows and json

2020-06-09 Thread GitBox


reubenvanammers commented on pull request #11955:
URL: https://github.com/apache/beam/pull/11955#issuecomment-641671697


   Thanks for the comments @TheNeuralBit. 
   
   Yeah, I was planning on working on BEAM-7624 after this one. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] reubenvanammers commented on a change in pull request #11955: [BEAM-10220] add support for implicit nulls for converting between beam rows and json

2020-06-09 Thread GitBox


reubenvanammers commented on a change in pull request #11955:
URL: https://github.com/apache/beam/pull/11955#discussion_r437814700



##
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
##
@@ -185,18 +186,25 @@ private RowJsonDeserializer(Schema schema) {
   this.schema = schema;
 }
 
+public RowJsonDeserializer allowMissingFields(Boolean allowMissing){
+  this.allowMissingFields = allowMissing;
+  return this;
+  }
+
 @Override
 public Row deserialize(JsonParser jsonParser, DeserializationContext 
deserializationContext)
 throws IOException {
 
   // Parse and convert the root object to Row as if it's a nested field 
with name 'root'
   return (Row)
   extractJsonNodeValue(
-  FieldValue.of("root", FieldType.row(schema), 
jsonParser.readValueAsTree()));
+  FieldValue.of("root", FieldType.row(schema), 
jsonParser.readValueAsTree(), allowMissingFields));
 }
 
+  
+
 private static Object extractJsonNodeValue(FieldValue fieldValue) {
-  if (!fieldValue.isJsonValuePresent()) {

Review comment:
   I initially thought about this, then changed to putting it into the 
FieldValue because extractJsonNodeValue was static. Now that you mention it, I 
believe it is probably neater to just check this.allowMissing fields and change 
the method to non-static.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on pull request #11960: [BEAM-9999] Remove Gearpump runner.

2020-06-09 Thread GitBox


amaliujia commented on pull request #11960:
URL: https://github.com/apache/beam/pull/11960#issuecomment-641667621


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

2020-06-09 Thread GitBox


rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r437807053



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##
@@ -116,4 +131,267 @@ private ObjectMapper objectMapper() {
   return this.objectMapper;
 }
   }
+
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing 
layer are returned as
+   * Row objects within a {@link ParseResult}
+   *
+   * You can access the results by using:
+   *
+   * ParseResult results = 
jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA));
+   *
+   * {@link ParseResult#getResults()}
+   *
+   * {@Code PCollection personRows = results.getResults()}
+   *
+   * {@link ParseResult#getFailedToParseLines()}
+   *
+   * {@Code PCollection errorsLines = results.getFailedToParseLines()}
+   *
+   * To access the reason for the failure you will need to first enable 
extended error reporting.
+   * {@Code ParseResult results =
+   * 
jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo());
 }
+   *
+   * {@link ParseResult#getFailedToParseLinesWithErr()}
+   *
+   * {@Code PCollection errorsLinesWithErrMsg = 
results.getFailedToParseLines()}
+   *
+   * @return {@link JsonToRowWithErrFn}
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) {

Review comment:
   Changed to withExceptionReporting.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

2020-06-09 Thread GitBox


rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r437806981



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
   return this.objectMapper;
 }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+  extends PTransform, PCollectionTuple> {
+private transient volatile @Nullable ObjectMapper objectMapper;
+private Schema schema;
+private static final String METRIC_NAMESPACE = "JsonToRowFn";
+private static final String DEAD_LETTER_METRIC_NAME = 
"JsonToRowFn_ParseFailure";
+
+private Distribution jsonConversionErrors =
+Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+public static final TupleTag main = MAIN_TUPLE_TAG;
+public static final TupleTag deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+PCollection deadLetterCollection;
+
+static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+  // Throw exception if this schema is not supported by RowJson
+  RowJson.verifySchemaSupported(rowSchema);
+  return new JsonToRowWithFailureCaptureFn(rowSchema);
+}
+
+private JsonToRowWithFailureCaptureFn(Schema schema) {
+  this.schema = schema;
+}
+
+@Override
+public PCollectionTuple expand(PCollection jsonStrings) {
+
+  return jsonStrings.apply(
+  ParDo.of(
+  new DoFn() {
+@ProcessElement
+public void processElement(ProcessContext context) {

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay merged pull request #11851: [BEAM-10144] Update PipelineOptions snippets for best practices

2020-06-09 Thread GitBox


aaltay merged pull request #11851:
URL: https://github.com/apache/beam/pull/11851


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11086: [BEAM-8910] Make custom BQ source read from Avro

2020-06-09 Thread GitBox


pabloem commented on pull request #11086:
URL: https://github.com/apache/beam/pull/11086#issuecomment-641652552


   Run Python 3.7 PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11959: refactor HCLS IO ITs to support stores in other projects

2020-06-09 Thread GitBox


pabloem commented on pull request #11959:
URL: https://github.com/apache/beam/pull/11959#issuecomment-641652307


   Run Java PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] JustineKoa commented on pull request #11964: [BEAM-9987] added instruction for converting maven project to gradle

2020-06-09 Thread GitBox


JustineKoa commented on pull request #11964:
URL: https://github.com/apache/beam/pull/11964#issuecomment-641651721


   Review: @angoenka 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] JustineKoa opened a new pull request #11964: [BEAM-9987] added instruction for converting maven project to gradle

2020-06-09 Thread GitBox


JustineKoa opened a new pull request #11964:
URL: https://github.com/apache/beam/pull/11964


   Added instructions for converting the example WordCount Maven project 
generated using the generate:archetype into a Gradle project. I've attached 
images of what it looks like when I run it locally. I don't have much 
experience writing instructions like this so please let me know if there are 
any suggestions on how the phrasing/formatting could be better!
   
   Added to setting up development environment:
   
![gradleinstructions1](https://user-images.githubusercontent.com/6096872/84213475-49f54c00-aab0-11ea-927d-f1870610d4e4.png)
   
   Instructions:
   
![gradleinstructions2](https://user-images.githubusercontent.com/6096872/84213486-4f529680-aab0-11ea-9fa0-2f16eef58ae7.png)
   
   
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 

[GitHub] [beam] robertwb commented on pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.

2020-06-09 Thread GitBox


robertwb commented on pull request #11932:
URL: https://github.com/apache/beam/pull/11932#issuecomment-641650830


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.

2020-06-09 Thread GitBox


robertwb commented on pull request #11932:
URL: https://github.com/apache/beam/pull/11932#issuecomment-641650721


   sdks:java:io:rabbitmq:test timed out



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn commented on pull request #11946: Fix VideoIntelligence IT tests

2020-06-09 Thread GitBox


tvalentyn commented on pull request #11946:
URL: https://github.com/apache/beam/pull/11946#issuecomment-641650596


   I filed https://issues.apache.org/jira/browse/BEAM-10229 - is it already 
fixed by this change?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11955: [BEAM-10220] add support for implicit nulls for converting between beam rows and json

2020-06-09 Thread GitBox


TheNeuralBit commented on pull request #11955:
URL: https://github.com/apache/beam/pull/11955#issuecomment-641644986


   FYI you can run `./gradlew spotlessApply` locally to apply required 
formatting changes.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #11955: [BEAM-10220] add support for implicit nulls for converting between beam rows and json

2020-06-09 Thread GitBox


TheNeuralBit commented on a change in pull request #11955:
URL: https://github.com/apache/beam/pull/11955#discussion_r437784412



##
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
##
@@ -362,6 +382,11 @@ private RowJsonSerializer(Schema schema) {
   super(Row.class);
   this.schema = schema;
 }
+  
+public RowJsonSerializer ignoreNullsOnWrite(Boolean ignoreNullsOnWrite) {

Review comment:
   Similar comment here, `withIgnoreNullsOnWrite` and add a docstring. (I 
think checkstyle will complain without the docstring anyway).

##
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
##
@@ -185,18 +186,25 @@ private RowJsonDeserializer(Schema schema) {
   this.schema = schema;
 }
 
+public RowJsonDeserializer allowMissingFields(Boolean allowMissing){
+  this.allowMissingFields = allowMissing;
+  return this;
+  }
+
 @Override
 public Row deserialize(JsonParser jsonParser, DeserializationContext 
deserializationContext)
 throws IOException {
 
   // Parse and convert the root object to Row as if it's a nested field 
with name 'root'
   return (Row)
   extractJsonNodeValue(
-  FieldValue.of("root", FieldType.row(schema), 
jsonParser.readValueAsTree()));
+  FieldValue.of("root", FieldType.row(schema), 
jsonParser.readValueAsTree(), allowMissingFields));
 }
 
+  
+
 private static Object extractJsonNodeValue(FieldValue fieldValue) {
-  if (!fieldValue.isJsonValuePresent()) {

Review comment:
   I think you could just check `this.allowMissingFields` here rather than 
passing it into all the `FieldValue` instances, no?

##
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
##
@@ -185,18 +186,25 @@ private RowJsonDeserializer(Schema schema) {
   this.schema = schema;
 }
 
+public RowJsonDeserializer allowMissingFields(Boolean allowMissing){

Review comment:
   nit: could you change this to `withAllowMissingFields`? 
   
   Also please add a docstring

##
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
##
@@ -362,6 +382,11 @@ private RowJsonSerializer(Schema schema) {
   super(Row.class);
   this.schema = schema;
 }
+  
+public RowJsonSerializer ignoreNullsOnWrite(Boolean ignoreNullsOnWrite) {

Review comment:
   I might call it "DropNullsOnWrite" instead of ignore, but I don't feel 
strongly about it 

##
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
##
@@ -375,6 +400,9 @@ private void writeRow(Row row, Schema schema, JsonGenerator 
gen) throws IOExcept
   for (int i = 0; i < schema.getFieldCount(); ++i) {
 Field field = schema.getField(i);
 Object value = row.getValue(i);
+if (ignoreNullsOnWrite && value == null){
+  continue;
+}

Review comment:
   This should also check `field.getType().getNullable()` like the other 
conditional. If we get a null for a non-nullable field we should fail loudly 
rather than silently dropping it.

##
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
##
@@ -185,18 +186,25 @@ private RowJsonDeserializer(Schema schema) {
   this.schema = schema;
 }
 
+public RowJsonDeserializer allowMissingFields(Boolean allowMissing){

Review comment:
   We might want to make this an enum so in the future there could be a 
third mode where nulls _must_ be encoded with a missing field, and having a 
null field value would be considered an error. The mode you've added here is a 
permissive middle ground where we allow either one.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb opened a new pull request #11963: Add relational GroupBy transform to Python.

2020-06-09 Thread GitBox


robertwb opened a new pull request #11963:
URL: https://github.com/apache/beam/pull/11963


   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 

[GitHub] [beam] robertwb commented on a change in pull request #11838: [BEAM-9322] Modify the TestStream to output a dict when no output_tags are specified

2020-06-09 Thread GitBox


robertwb commented on a change in pull request #11838:
URL: https://github.com/apache/beam/pull/11838#discussion_r437787906



##
File path: sdks/python/apache_beam/testing/test_stream.py
##
@@ -291,10 +291,10 @@ def expand(self, pbegin):
 assert isinstance(pbegin, pvalue.PBegin)
 self.pipeline = pbegin.pipeline
 if not self.output_tags:
-  self.output_tags = set([None])
+  self.output_tags = {None}

Review comment:
   OK, in that case I'm fine with this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rohdesamuel commented on a change in pull request #11838: [BEAM-9322] Modify the TestStream to output a dict when no output_tags are specified

2020-06-09 Thread GitBox


rohdesamuel commented on a change in pull request #11838:
URL: https://github.com/apache/beam/pull/11838#discussion_r437784983



##
File path: sdks/python/apache_beam/testing/test_stream.py
##
@@ -291,10 +291,10 @@ def expand(self, pbegin):
 assert isinstance(pbegin, pvalue.PBegin)
 self.pipeline = pbegin.pipeline
 if not self.output_tags:
-  self.output_tags = set([None])
+  self.output_tags = {None}

Review comment:
   This is a little harder to implement, mainly because the TestStream 
retrieves its output_tags from the keys of the PTransform payload holding it. 
This means that output_tags = None and output_tags = {None} look the same to 
the PTransform payload outputs as a map with a single key being None. When a 
TestStream is reconstructed, even if the original output_tags was unset, it 
will be constructed with output_tags = {None}.
   
   I think the best we can do is to treat {None} and None the same way.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11955: [BEAM-10220] add support for implicit nulls for converting between beam rows and json

2020-06-09 Thread GitBox


TheNeuralBit commented on pull request #11955:
URL: https://github.com/apache/beam/pull/11955#issuecomment-641639457


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11955: [BEAM-10220] add support for implicit nulls for converting between beam rows and json

2020-06-09 Thread GitBox


TheNeuralBit commented on pull request #11955:
URL: https://github.com/apache/beam/pull/11955#issuecomment-641639519


   Unfortunately only committers can trigger jenkins now :/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb merged pull request #11901: Prototype schema-inferring Row constructor.

2020-06-09 Thread GitBox


robertwb merged pull request #11901:
URL: https://github.com/apache/beam/pull/11901


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] apilloud commented on pull request #11957: [BEAM-10033] zetaSqlValueToJavaObject uses value.getType()

2020-06-09 Thread GitBox


apilloud commented on pull request #11957:
URL: https://github.com/apache/beam/pull/11957#issuecomment-641631947


   Thanks! We have a bunch of nullness issues, investigating some of those is 
how I discovered this issue. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] reubenvanammers edited a comment on pull request #11955: [BEAM-10220] add support for implicit nulls for converting between beam rows and json

2020-06-09 Thread GitBox


reubenvanammers edited a comment on pull request #11955:
URL: https://github.com/apache/beam/pull/11955#issuecomment-641097493


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robinyqiu commented on pull request #11957: [BEAM-10033] zetaSqlValueToJavaObject uses value.getType()

2020-06-09 Thread GitBox


robinyqiu commented on pull request #11957:
URL: https://github.com/apache/beam/pull/11957#issuecomment-641627057


   Well. Some tests are failing and I realized this turns out to be not that 
simple change. There are subtle differences between Beam schema and ZetaSQL 
struct that prevents this change (e.g. if ZetaSQL types are by default nullable 
whereas it is a label in Beam field type; and ZetaSQL struct allows fields with 
same name but Beam does not).
   
   I will take more time to figure out what is the correct thing to do here. In 
the meanwhile, I will send out some other small cleanup PRs for review. Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay commented on pull request #11851: [BEAM-10144] Update PipelineOptions snippets for best practices

2020-06-09 Thread GitBox


aaltay commented on pull request #11851:
URL: https://github.com/apache/beam/pull/11851#issuecomment-641621183


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

2020-06-09 Thread GitBox


rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r437764105



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##
@@ -116,4 +131,267 @@ private ObjectMapper objectMapper() {
   return this.objectMapper;
 }
   }
+
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing 
layer are returned as
+   * Row objects within a {@link ParseResult}
+   *
+   * You can access the results by using:
+   *
+   * ParseResult results = 
jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA));
+   *
+   * {@link ParseResult#getResults()}
+   *
+   * {@Code PCollection personRows = results.getResults()}
+   *
+   * {@link ParseResult#getFailedToParseLines()}
+   *
+   * {@Code PCollection errorsLines = results.getFailedToParseLines()}
+   *
+   * To access the reason for the failure you will need to first enable 
extended error reporting.
+   * {@Code ParseResult results =
+   * 
jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo());
 }
+   *
+   * {@link ParseResult#getFailedToParseLinesWithErr()}
+   *
+   * {@Code PCollection errorsLinesWithErrMsg = 
results.getFailedToParseLines()}
+   *
+   * @return {@link JsonToRowWithErrFn}
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) {
+return JsonToRowWithErrFn.forSchema(rowSchema);
+  }
+
+  @AutoValue
+  abstract static class JsonToRowWithErrFn extends 
PTransform, ParseResult> {
+
+private Pipeline pipeline;
+
+private PCollection parsedLine;
+private PCollection failedParse;
+private PCollection failedParseWithErr;
+
+private static final String LINE_FIELD_NAME = "line";
+private static final String ERROR_FIELD_NAME = "err";
+
+public static final Schema ERROR_ROW_SCHEMA =
+Schema.of(Field.of(LINE_FIELD_NAME, FieldType.STRING));
+
+public static final Schema ERROR_ROW_WITH_ERR_MSG_SCHEMA =
+Schema.of(
+Field.of(LINE_FIELD_NAME, FieldType.STRING),
+Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+static final TupleTag PARSED_LINE = new TupleTag() {};
+static final TupleTag PARSE_ERROR_LINE = new TupleTag() {};
+static final TupleTag PARSE_ERROR_LINE_WITH_MSG = new TupleTag() 
{};
+
+public abstract Schema getSchema();
+
+public abstract String getLineFieldName();
+
+public abstract String getErrorFieldName();
+
+public abstract boolean getExtendedErrorInfo();
+
+PCollection deadLetterCollection;
+
+public abstract Builder toBuilder();
+
+@AutoValue.Builder
+public abstract static class Builder {
+  public abstract Builder setSchema(Schema value);
+
+  public abstract Builder setLineFieldName(String value);
+
+  public abstract Builder setErrorFieldName(String value);
+
+  public abstract Builder setExtendedErrorInfo(boolean value);
+
+  public abstract JsonToRowWithErrFn build();
+}
+
+public static JsonToRowWithErrFn forSchema(Schema rowSchema) {
+  // Throw exception if this schema is not supported by RowJson
+  RowJson.verifySchemaSupported(rowSchema);
+  return new AutoValue_JsonToRow_JsonToRowWithErrFn.Builder()
+  .setSchema(rowSchema)
+  .setExtendedErrorInfo(false)
+  .setLineFieldName(LINE_FIELD_NAME)
+  .setErrorFieldName(ERROR_FIELD_NAME)
+  .build();
+}
+
+/**
+ * Adds the error message to the returned error Row.
+ *
+ * @return {@link JsonToRow}
+ */
+public JsonToRowWithErrFn withExtendedErrorInfo() {
+  return this.toBuilder().setExtendedErrorInfo(true).build();
+}
+
+/**
+ * Sets the field name for the line field in the returned Row.
+ *
+ * @return {@link JsonToRow}
+ */
+public JsonToRowWithErrFn setLineField(String lineField) {
+  return this.toBuilder().setLineFieldName(lineField).build();
+}
+
+/**
+ * Adds the error message to the returned error Row.
+ *
+ * @return {@link JsonToRow}
+ */
+public JsonToRowWithErrFn setErrorField(String errorField) {
+  if (!this.getExtendedErrorInfo()) {
+throw new IllegalArgumentException(
+"This option is only available with Extended Error Info.");
+  }
+  return this.toBuilder().setErrorFieldName(errorField).build();
+}
+
+@Override
+public ParseResult expand(PCollection jsonStrings) {
+
+  PCollectionTuple result =
+  jsonStrings.apply(
+  ParDo.of(new ParseWithError(this.getSchema(), 
getExtendedErrorInfo()))
+  .withOutputTags(
+  PARSED_LINE,
+  
TupleTagList.of(PARSE_ERROR_LINE).and(PARSE_ERROR_LINE_WITH_MSG)));
+
+  this.parsedLine = result.get(PARSED_LINE).setRowSchema(this.getSchema());
+  this.failedParse =
+  

[GitHub] [beam] aaltay commented on pull request #11960: [BEAM-9999] Remove Gearpump runner.

2020-06-09 Thread GitBox


aaltay commented on pull request #11960:
URL: https://github.com/apache/beam/pull/11960#issuecomment-641619571


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

2020-06-09 Thread GitBox


rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r437762683



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##
@@ -116,4 +131,267 @@ private ObjectMapper objectMapper() {
   return this.objectMapper;
 }
   }
+
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing 
layer are returned as
+   * Row objects within a {@link ParseResult}
+   *
+   * You can access the results by using:
+   *
+   * ParseResult results = 
jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA));
+   *
+   * {@link ParseResult#getResults()}
+   *
+   * {@Code PCollection personRows = results.getResults()}
+   *
+   * {@link ParseResult#getFailedToParseLines()}
+   *
+   * {@Code PCollection errorsLines = results.getFailedToParseLines()}
+   *
+   * To access the reason for the failure you will need to first enable 
extended error reporting.
+   * {@Code ParseResult results =
+   * 
jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo());
 }
+   *
+   * {@link ParseResult#getFailedToParseLinesWithErr()}
+   *
+   * {@Code PCollection errorsLinesWithErrMsg = 
results.getFailedToParseLines()}
+   *
+   * @return {@link JsonToRowWithErrFn}
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) {
+return JsonToRowWithErrFn.forSchema(rowSchema);
+  }
+
+  @AutoValue
+  abstract static class JsonToRowWithErrFn extends 
PTransform, ParseResult> {
+
+private Pipeline pipeline;
+
+private PCollection parsedLine;
+private PCollection failedParse;
+private PCollection failedParseWithErr;
+
+private static final String LINE_FIELD_NAME = "line";
+private static final String ERROR_FIELD_NAME = "err";
+
+public static final Schema ERROR_ROW_SCHEMA =
+Schema.of(Field.of(LINE_FIELD_NAME, FieldType.STRING));
+
+public static final Schema ERROR_ROW_WITH_ERR_MSG_SCHEMA =
+Schema.of(
+Field.of(LINE_FIELD_NAME, FieldType.STRING),
+Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+static final TupleTag PARSED_LINE = new TupleTag() {};
+static final TupleTag PARSE_ERROR_LINE = new TupleTag() {};
+static final TupleTag PARSE_ERROR_LINE_WITH_MSG = new TupleTag() 
{};
+
+public abstract Schema getSchema();
+
+public abstract String getLineFieldName();
+
+public abstract String getErrorFieldName();
+
+public abstract boolean getExtendedErrorInfo();
+
+PCollection deadLetterCollection;
+
+public abstract Builder toBuilder();
+
+@AutoValue.Builder
+public abstract static class Builder {
+  public abstract Builder setSchema(Schema value);
+
+  public abstract Builder setLineFieldName(String value);
+
+  public abstract Builder setErrorFieldName(String value);
+
+  public abstract Builder setExtendedErrorInfo(boolean value);
+
+  public abstract JsonToRowWithErrFn build();
+}
+
+public static JsonToRowWithErrFn forSchema(Schema rowSchema) {
+  // Throw exception if this schema is not supported by RowJson
+  RowJson.verifySchemaSupported(rowSchema);
+  return new AutoValue_JsonToRow_JsonToRowWithErrFn.Builder()
+  .setSchema(rowSchema)
+  .setExtendedErrorInfo(false)
+  .setLineFieldName(LINE_FIELD_NAME)
+  .setErrorFieldName(ERROR_FIELD_NAME)
+  .build();
+}
+
+/**
+ * Adds the error message to the returned error Row.
+ *
+ * @return {@link JsonToRow}
+ */
+public JsonToRowWithErrFn withExtendedErrorInfo() {
+  return this.toBuilder().setExtendedErrorInfo(true).build();
+}
+
+/**
+ * Sets the field name for the line field in the returned Row.
+ *
+ * @return {@link JsonToRow}
+ */
+public JsonToRowWithErrFn setLineField(String lineField) {
+  return this.toBuilder().setLineFieldName(lineField).build();
+}
+
+/**
+ * Adds the error message to the returned error Row.
+ *
+ * @return {@link JsonToRow}
+ */
+public JsonToRowWithErrFn setErrorField(String errorField) {
+  if (!this.getExtendedErrorInfo()) {
+throw new IllegalArgumentException(
+"This option is only available with Extended Error Info.");
+  }
+  return this.toBuilder().setErrorFieldName(errorField).build();
+}
+
+@Override
+public ParseResult expand(PCollection jsonStrings) {
+
+  PCollectionTuple result =
+  jsonStrings.apply(
+  ParDo.of(new ParseWithError(this.getSchema(), 
getExtendedErrorInfo()))
+  .withOutputTags(
+  PARSED_LINE,
+  
TupleTagList.of(PARSE_ERROR_LINE).and(PARSE_ERROR_LINE_WITH_MSG)));
+
+  this.parsedLine = result.get(PARSED_LINE).setRowSchema(this.getSchema());
+  this.failedParse =
+  

[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

2020-06-09 Thread GitBox


rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r437762165



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##
@@ -116,4 +131,267 @@ private ObjectMapper objectMapper() {
   return this.objectMapper;
 }
   }
+
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing 
layer are returned as
+   * Row objects within a {@link ParseResult}
+   *
+   * You can access the results by using:
+   *
+   * ParseResult results = 
jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA));
+   *
+   * {@link ParseResult#getResults()}
+   *
+   * {@Code PCollection personRows = results.getResults()}
+   *
+   * {@link ParseResult#getFailedToParseLines()}
+   *
+   * {@Code PCollection errorsLines = results.getFailedToParseLines()}
+   *
+   * To access the reason for the failure you will need to first enable 
extended error reporting.
+   * {@Code ParseResult results =
+   * 
jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo());
 }
+   *
+   * {@link ParseResult#getFailedToParseLinesWithErr()}
+   *
+   * {@Code PCollection errorsLinesWithErrMsg = 
results.getFailedToParseLines()}
+   *
+   * @return {@link JsonToRowWithErrFn}
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) {
+return JsonToRowWithErrFn.forSchema(rowSchema);
+  }
+
+  @AutoValue
+  abstract static class JsonToRowWithErrFn extends 
PTransform, ParseResult> {
+
+private Pipeline pipeline;
+
+private PCollection parsedLine;
+private PCollection failedParse;
+private PCollection failedParseWithErr;
+
+private static final String LINE_FIELD_NAME = "line";
+private static final String ERROR_FIELD_NAME = "err";
+
+public static final Schema ERROR_ROW_SCHEMA =
+Schema.of(Field.of(LINE_FIELD_NAME, FieldType.STRING));
+
+public static final Schema ERROR_ROW_WITH_ERR_MSG_SCHEMA =
+Schema.of(
+Field.of(LINE_FIELD_NAME, FieldType.STRING),
+Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+static final TupleTag PARSED_LINE = new TupleTag() {};
+static final TupleTag PARSE_ERROR_LINE = new TupleTag() {};
+static final TupleTag PARSE_ERROR_LINE_WITH_MSG = new TupleTag() 
{};
+
+public abstract Schema getSchema();
+
+public abstract String getLineFieldName();
+
+public abstract String getErrorFieldName();
+
+public abstract boolean getExtendedErrorInfo();
+
+PCollection deadLetterCollection;
+
+public abstract Builder toBuilder();
+
+@AutoValue.Builder
+public abstract static class Builder {
+  public abstract Builder setSchema(Schema value);
+
+  public abstract Builder setLineFieldName(String value);
+
+  public abstract Builder setErrorFieldName(String value);
+
+  public abstract Builder setExtendedErrorInfo(boolean value);
+
+  public abstract JsonToRowWithErrFn build();
+}
+
+public static JsonToRowWithErrFn forSchema(Schema rowSchema) {
+  // Throw exception if this schema is not supported by RowJson
+  RowJson.verifySchemaSupported(rowSchema);
+  return new AutoValue_JsonToRow_JsonToRowWithErrFn.Builder()
+  .setSchema(rowSchema)
+  .setExtendedErrorInfo(false)
+  .setLineFieldName(LINE_FIELD_NAME)
+  .setErrorFieldName(ERROR_FIELD_NAME)
+  .build();
+}
+
+/**
+ * Adds the error message to the returned error Row.
+ *
+ * @return {@link JsonToRow}
+ */
+public JsonToRowWithErrFn withExtendedErrorInfo() {
+  return this.toBuilder().setExtendedErrorInfo(true).build();
+}
+
+/**
+ * Sets the field name for the line field in the returned Row.
+ *
+ * @return {@link JsonToRow}
+ */
+public JsonToRowWithErrFn setLineField(String lineField) {
+  return this.toBuilder().setLineFieldName(lineField).build();
+}
+
+/**
+ * Adds the error message to the returned error Row.
+ *
+ * @return {@link JsonToRow}
+ */
+public JsonToRowWithErrFn setErrorField(String errorField) {
+  if (!this.getExtendedErrorInfo()) {
+throw new IllegalArgumentException(
+"This option is only available with Extended Error Info.");
+  }
+  return this.toBuilder().setErrorFieldName(errorField).build();
+}
+
+@Override
+public ParseResult expand(PCollection jsonStrings) {
+
+  PCollectionTuple result =
+  jsonStrings.apply(
+  ParDo.of(new ParseWithError(this.getSchema(), 
getExtendedErrorInfo()))
+  .withOutputTags(
+  PARSED_LINE,
+  
TupleTagList.of(PARSE_ERROR_LINE).and(PARSE_ERROR_LINE_WITH_MSG)));
+
+  this.parsedLine = result.get(PARSED_LINE).setRowSchema(this.getSchema());
+  this.failedParse =
+  

[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

2020-06-09 Thread GitBox


rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r437760550



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
   return this.objectMapper;
 }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+  extends PTransform, PCollectionTuple> {
+private transient volatile @Nullable ObjectMapper objectMapper;
+private Schema schema;
+private static final String METRIC_NAMESPACE = "JsonToRowFn";
+private static final String DEAD_LETTER_METRIC_NAME = 
"JsonToRowFn_ParseFailure";
+
+private Distribution jsonConversionErrors =
+Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+public static final TupleTag main = MAIN_TUPLE_TAG;
+public static final TupleTag deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+PCollection deadLetterCollection;
+
+static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+  // Throw exception if this schema is not supported by RowJson
+  RowJson.verifySchemaSupported(rowSchema);
+  return new JsonToRowWithFailureCaptureFn(rowSchema);
+}
+
+private JsonToRowWithFailureCaptureFn(Schema schema) {
+  this.schema = schema;
+}
+
+@Override
+public PCollectionTuple expand(PCollection jsonStrings) {
+
+  return jsonStrings.apply(
+  ParDo.of(
+  new DoFn() {
+@ProcessElement
+public void processElement(ProcessContext context) {
+  try {
+context.output(jsonToRow(objectMapper(), 
context.element()));
+  } catch (Exception ex) {
+context.output(
+deadLetter,
+Row.withSchema(ERROR_ROW_SCHEMA)
+.addValue(context.element())
+.addValue(ex.getMessage())
+.build());
+  }
+}
+  })
+  .withOutputTags(main, TupleTagList.of(deadLetter)));

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem closed pull request #11850: [BEAM-1438] Allow 0 shards on WriteFiles streaming

2020-06-09 Thread GitBox


pabloem closed pull request #11850:
URL: https://github.com/apache/beam/pull/11850


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11086: [BEAM-8910] Make custom BQ source read from Avro

2020-06-09 Thread GitBox


pabloem commented on pull request #11086:
URL: https://github.com/apache/beam/pull/11086#issuecomment-641607885







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on a change in pull request #11901: Prototype schema-inferring Row constructor.

2020-06-09 Thread GitBox


robertwb commented on a change in pull request #11901:
URL: https://github.com/apache/beam/pull/11901#discussion_r437749540



##
File path: sdks/python/apache_beam/transforms/sql.py
##
@@ -74,3 +77,8 @@ def __init__(self, query, dialect=None):
 SqlTransformSchema(query=query, dialect=dialect)),
 BeamJarExpansionService(
 ':sdks:java:extensions:sql:expansion-service:shadowJar'))
+
+
+class Row(object):
+  def __init__(self, **kwargs):
+self.__dict__.update(kwargs)

Review comment:
   Moved to pvalue (imported to the top level), like TaggedOutput. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on a change in pull request #11901: Prototype schema-inferring Row constructor.

2020-06-09 Thread GitBox


robertwb commented on a change in pull request #11901:
URL: https://github.com/apache/beam/pull/11901#discussion_r437749425



##
File path: sdks/python/apache_beam/coders/row_coder.py
##
@@ -82,8 +86,19 @@ def from_runner_api_parameter(schema, components, 
unused_context):
 return RowCoder(schema)
 
   @staticmethod
-  def from_type_hint(named_tuple_type, registry):
-return RowCoder(named_tuple_to_schema(named_tuple_type))
+  def from_type_hint(type_hint, registry):
+if isinstance(type_hint, row_type.RowTypeConstraint):
+  schema = schema_pb2.Schema(
+  fields=[
+  schema_pb2.Field(
+  name=name,
+  type=typing_to_runner_api(type))
+  for (name, type) in type_hint._fields
+  ],
+  id=str(uuid.uuid4()))

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #11961: [BEAM-10225] Add log message when starting job server

2020-06-09 Thread GitBox


ibzib commented on pull request #11961:
URL: https://github.com/apache/beam/pull/11961#issuecomment-641605323


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11959: refactor HCLS IO ITs to support stores in other projects

2020-06-09 Thread GitBox


pabloem commented on pull request #11959:
URL: https://github.com/apache/beam/pull/11959#issuecomment-641604325


   LGTM



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11950: [BEAM-8596]: Add SplunkIO transform to write messages to Splunk

2020-06-09 Thread GitBox


pabloem commented on pull request #11950:
URL: https://github.com/apache/beam/pull/11950#issuecomment-641604419


   thanks. Looking once more...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11959: refactor HCLS IO ITs to support stores in other projects

2020-06-09 Thread GitBox


pabloem commented on pull request #11959:
URL: https://github.com/apache/beam/pull/11959#issuecomment-641604201


   Run Java PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11086: [BEAM-8910] Make custom BQ source read from Avro

2020-06-09 Thread GitBox


pabloem commented on pull request #11086:
URL: https://github.com/apache/beam/pull/11086#issuecomment-641603355


   Run Python PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn opened a new pull request #11962: [BEAM-10227] Switches typing version modifier to python_full_version.

2020-06-09 Thread GitBox


tvalentyn opened a new pull request #11962:
URL: https://github.com/apache/beam/pull/11962


   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)[![Build
 

[GitHub] [beam] pabloem commented on pull request #11824: [BEAM-10101] Add HttpIO / HttpFileSystem (Python)

2020-06-09 Thread GitBox


pabloem commented on pull request #11824:
URL: https://github.com/apache/beam/pull/11824#issuecomment-641602134


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit merged pull request #11951: [BEAM-8828] Added BigQueryTableProvider WriteDisposition configuration

2020-06-09 Thread GitBox


TheNeuralBit merged pull request #11951:
URL: https://github.com/apache/beam/pull/11951


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11951: [BEAM-8828] Added BigQueryTableProvider WriteDisposition configuration

2020-06-09 Thread GitBox


TheNeuralBit commented on pull request #11951:
URL: https://github.com/apache/beam/pull/11951#issuecomment-641598227


   Looks like SQL PostCommit finished successfully [including the new 
tests](https://builds.apache.org/job/beam_PostCommit_SQL_PR/321/testReport/org.apache.beam.sdk.extensions.sql.meta.provider.bigquery/BigQueryReadWriteIT/testSQLWriteAndRead_WithWriteDispositionAppend/),
 just hasn't been updated here yet. Merging now.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] annaqin418 opened a new pull request #11961: [BEAM-10225] Add log message when starting job server

2020-06-09 Thread GitBox


annaqin418 opened a new pull request #11961:
URL: https://github.com/apache/beam/pull/11961


   R: @ibzib @robertwb 
   
   Adds a log output message that clarifies when a newly-started job server is 
ready to receive jobs:
   `INFO org.apache.beam.runners.jobsubmission.JobServerDriver: Job server now 
running, terminate with Ctrl+C`
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [x] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [x] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | 

[GitHub] [beam] tysonjh opened a new pull request #11960: [BEAM-9999] Remove Gearpump runner.

2020-06-09 Thread GitBox


tysonjh opened a new pull request #11960:
URL: https://github.com/apache/beam/pull/11960


   Remove Gearpump runner.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 

[GitHub] [beam] pabloem commented on pull request #11086: [BEAM-8910] Make custom BQ source read from Avro

2020-06-09 Thread GitBox


pabloem commented on pull request #11086:
URL: https://github.com/apache/beam/pull/11086#issuecomment-641588460


   So, I think the last remaining issue was whether to have the 
`use_json_exports` flag or individual flags per columns. Are there other topics 
to discuss?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj merged pull request #11928: [ BEAM-3788] Updates kafka.py pydocs

2020-06-09 Thread GitBox


chamikaramj merged pull request #11928:
URL: https://github.com/apache/beam/pull/11928


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11928: [ BEAM-3788] Updates kafka.py pydocs

2020-06-09 Thread GitBox


chamikaramj commented on pull request #11928:
URL: https://github.com/apache/beam/pull/11928#issuecomment-641587073


   Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ihji commented on a change in pull request #11930: [BEAM-10202] make cross-language testing scripts OSX-compatible

2020-06-09 Thread GitBox


ihji commented on a change in pull request #11930:
URL: https://github.com/apache/beam/pull/11930#discussion_r437722946



##
File path: sdks/python/scripts/run_expansion_services.sh
##
@@ -133,4 +136,7 @@ case $STARTSTOP in
 fi
 ;;
 esac
-flock -u 200
+
+if [[ $CHECK_FLOCK -eq 0 ]]; then

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on a change in pull request #11702: [BEAM-9990] Add Conditional Update and Conditional Create to FhirIO

2020-06-09 Thread GitBox


pabloem commented on a change in pull request #11702:
URL: https://github.com/apache/beam/pull/11702#discussion_r437721434



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##
@@ -155,17 +168,53 @@
  * FhirIO.Write.Result writeResult =
  * output.apply("Execute FHIR Bundles", 
FhirIO.executeBundles(options.getExistingFhirStore()));
  *
+ * // Alternatively you could use import for high throughput to a new store.
+ * FhirIO.Write.Result writeResult =
+ * output.apply("Import FHIR Resources", 
FhirIO.executeBundles(options.getNewFhirStore()));
+ * // [End Writing ]
+ *
  * PCollection> failedBundles = 
writeResult.getFailedInsertsWithErr();
  *
+ * // [Begin Writing to Dead Letter Queue]
  * failedBundles.apply("Write failed bundles to BigQuery",
  * BigQueryIO
  * .write()
  * .to(option.getBQFhirExecuteBundlesDeadLetterTable())
  * .withFormatFunction(new HealthcareIOErrorToTableRow()));
+ * // [End Writing to Dead Letter Queue]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional update
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ * .apply("Reconcile with Conditional Update",
+ * FhirIO.ConditionalUpdate(fhirStore)
+ * 
.withFormatBodyFunction(HealthcareIOError::getDataResource)
+ * .withTypeFunction((HealthcareIOError err) -> {
+ *   String body = err.getDataResource();
+ *   // TODO(user) insert logic to exctract type.
+ *   return params;
+ * })
+ * .withSearchParametersFunction((HealthcareIOError err) 
-> {

Review comment:
   I think that makes total sense. Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11850: [BEAM-1438] Allow 0 shards on WriteFiles streaming

2020-06-09 Thread GitBox


pabloem commented on pull request #11850:
URL: https://github.com/apache/beam/pull/11850#issuecomment-641570009


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem merged pull request #11923: [BEAM-10176] Support STRUCT, FLOAT64, INT64 BigQuery types

2020-06-09 Thread GitBox


pabloem merged pull request #11923:
URL: https://github.com/apache/beam/pull/11923


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11923: [BEAM-10176] Support STRUCT, FLOAT64, INT64 BigQuery types

2020-06-09 Thread GitBox


pabloem commented on pull request #11923:
URL: https://github.com/apache/beam/pull/11923#issuecomment-641568293


   thanks @chunyang 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on a change in pull request #11086: [BEAM-8910] Make custom BQ source read from Avro

2020-06-09 Thread GitBox


pabloem commented on a change in pull request #11086:
URL: https://github.com/apache/beam/pull/11086#discussion_r437719751



##
File path: sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
##
@@ -254,11 +256,36 @@ def test_big_query_new_types(self):
 'output_schema': NEW_TYPES_OUTPUT_SCHEMA,
 'use_standard_sql': False,
 'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
+'use_json_exports': True,
 'on_success_matcher': all_of(*pipeline_verifiers)
 }
 options = self.test_pipeline.get_full_options_as_args(**extra_opts)
 big_query_query_to_table_pipeline.run_bq_pipeline(options)
 
+  @attr('IT')
+  def test_big_query_new_types(self):

Review comment:
   Thanks for pointing that out. I've renamed the test cases to keep 
coverage for the feature on the same test.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive m

2020-06-09 Thread GitBox


amaliujia commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-641558615


   Thanks for adding me as a reviewer. I tried to go through this PR and 
couldn't come up with valuable comments. (in fact, it was a good learning 
process for me to read these code changes).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on a change in pull request #11838: [BEAM-9322] Modify the TestStream to output a dict when no output_tags are specified

2020-06-09 Thread GitBox


robertwb commented on a change in pull request #11838:
URL: https://github.com/apache/beam/pull/11838#discussion_r437702523



##
File path: sdks/python/apache_beam/testing/test_stream.py
##
@@ -291,10 +291,10 @@ def expand(self, pbegin):
 assert isinstance(pbegin, pvalue.PBegin)
 self.pipeline = pbegin.pipeline
 if not self.output_tags:
-  self.output_tags = set([None])
+  self.output_tags = {None}

Review comment:
   If the user explicitly sets the output tags to {None}, they might be 
expecting a dict. (Specifically, they might get a set from elsewhere, and set 
the output tags from that set, and it would be awkward to have to check that 
set to determine how to interpret the result. So in this case I would do
   
   ```
   if not self.output_tags:
 return pvalue.PCollection(self.pipeline, is_bounded=False)
   else:
 return { ... for tag in self.output_tags}
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on pull request #11838: [BEAM-9322] Modify the TestStream to output a dict when no output_tags are specified

2020-06-09 Thread GitBox


robertwb commented on pull request #11838:
URL: https://github.com/apache/beam/pull/11838#issuecomment-641555415


   R: @robertwb 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] stale[bot] closed pull request #10958: [BEAM] Submitting final communication strategy

2020-06-09 Thread GitBox


stale[bot] closed pull request #10958:
URL: https://github.com/apache/beam/pull/10958


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] stale[bot] commented on pull request #10958: [BEAM] Submitting final communication strategy

2020-06-09 Thread GitBox


stale[bot] commented on pull request #10958:
URL: https://github.com/apache/beam/pull/10958#issuecomment-641555161


   This pull request has been closed due to lack of activity. If you think that 
is incorrect, or the pull request requires review, you can revive the PR at any 
time.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] udim commented on pull request #11939: [BEAM-10197] Support typehints for Python's frozenset

2020-06-09 Thread GitBox


udim commented on pull request #11939:
URL: https://github.com/apache/beam/pull/11939#issuecomment-641543855


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11951: [BEAM-8828] Added BigQueryTableProvider WriteDisposition configuration

2020-06-09 Thread GitBox


TheNeuralBit commented on pull request #11951:
URL: https://github.com/apache/beam/pull/11951#issuecomment-641543407


   Run SQL PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11951: [BEAM-8828] Added BigQueryTableProvider WriteDisposition configuration

2020-06-09 Thread GitBox


TheNeuralBit commented on pull request #11951:
URL: https://github.com/apache/beam/pull/11951#issuecomment-641540376


   Run SQL Postcommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11951: [BEAM-8828] Added BigQueryTableProvider WriteDisposition configuration

2020-06-09 Thread GitBox


TheNeuralBit commented on pull request #11951:
URL: https://github.com/apache/beam/pull/11951#issuecomment-641540296


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.

2020-06-09 Thread GitBox


robertwb commented on pull request #11932:
URL: https://github.com/apache/beam/pull/11932#issuecomment-641534358


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia merged pull request #11958: [BEAM-10198] Roll back changes

2020-06-09 Thread GitBox


amaliujia merged pull request #11958:
URL: https://github.com/apache/beam/pull/11958


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on pull request #11958: [BEAM-10198] Roll back changes

2020-06-09 Thread GitBox


amaliujia commented on pull request #11958:
URL: https://github.com/apache/beam/pull/11958#issuecomment-641521023


   @ihji 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] sabhyankar commented on pull request #11950: [BEAM-8596]: Add SplunkIO transform to write messages to Splunk

2020-06-09 Thread GitBox


sabhyankar commented on pull request #11950:
URL: https://github.com/apache/beam/pull/11950#issuecomment-641500443


   @pabloem Thanks for the quick reviews! I have pushed a couple of commits 
with the mods you requested.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] sabhyankar commented on a change in pull request #11950: [BEAM-8596]: Add SplunkIO transform to write messages to Splunk

2020-06-09 Thread GitBox


sabhyankar commented on a change in pull request #11950:
URL: https://github.com/apache/beam/pull/11950#discussion_r437640982



##
File path: 
sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.splunk;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An unbounded sink for Splunk's Http Event Collector (HEC).
+ *
+ * For more information, see the online documentation at https://dev.splunk.com/enterprise/docs/dataapps/httpeventcollector/;>Splunk
 HEC.
+ *
+ * Writing to Splunk's HEC
+ *
+ * The {@link SplunkIO} class provides a {@link PTransform} that allows 
writing {@link
+ * SplunkEvent} messages into a Splunk HEC end point.
+ *
+ * It takes as an input a {@link PCollection 
PCollectionSplunkEvent}, where each {@link
+ * SplunkEvent} represents an event to be published to HEC.
+ *
+ * To configure a {@link SplunkIO}, you must provide at a minimum:
+ *
+ * 
+ *   url - HEC endpoint URL.
+ *   token - HEC endpoint token.
+ * 
+ *
+ * The {@link SplunkIO} transform can be customized further by optionally 
specifying:
+ *
+ * 
+ *   parallelism - Number of parallel requests to the HEC.
+ *   batchCount - Number of events in a single batch.
+ *   disableCertificateValidation - Whether to disable ssl validation 
(useful for self-signed
+ *   certificates)
+ * 
+ *
+ * This transform will return any non-transient write failures via a {@link 
PCollection
+ * PCollectionSplunkWriteError}, where each {@link SplunkWriteError} 
captures the error that
+ * occurred while attempting to write to HEC. These can be published to a 
dead-letter sink or
+ * reprocessed.
+ *
+ * For example:
+ *
+ * {@code
+ * PCollection events = ...;
+ *
+ * PCollection errors =
+ * events.apply("WriteToSplunk",
+ *  SplunkIO.writeBuilder()

Review comment:
   Thanks for that input! Switched to a fluent factory pattern for the 
SplunkIO transform.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11959: refactor HCLS IO ITs to support stores in other projects

2020-06-09 Thread GitBox


pabloem commented on pull request #11959:
URL: https://github.com/apache/beam/pull/11959#issuecomment-641496977


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn commented on pull request #11954: [Do not merge] Add a unit test exposing BEAM-10217

2020-06-09 Thread GitBox


tvalentyn commented on pull request #11954:
URL: https://github.com/apache/beam/pull/11954#issuecomment-641495342


   The tests failed on Py3.6.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay edited a comment on pull request #11877: [BEAM-10184] Build python wheels on GitHub Actions for Linux/MacOS

2020-06-09 Thread GitBox


aaltay edited a comment on pull request #11877:
URL: https://github.com/apache/beam/pull/11877#issuecomment-641482606


   This looks nice. I have a few clarifiying questions:
   
   on pull_request: This is good. Would it trigger on every pull request. This 
may not be needed. I am not sure what GH resource we will have and what kind of 
queue there will be. We would not want ot add test load to all PRs. 
   
   Questions:
   - Can we trigger this with a phrase? And/Or limit it to python changes?
   - What is gh-action artifacts? How does one access this to get the artifacts?
   
   on push:
   Question:
   - What is release-candidate branches? It is release-* ? (If yes, this looks 
good.)
   
   on schedule: this looks good.
   
   Other questions:
   - How do we use cancel previous runs workflow? Does it work automatically? 
(If yes, this is great.)
   
   > Nightly is addressed by on schedule trigger.
   
   +1.
   
   > I investigated triggering builds manually by using repository_dispatch and 
curl however build triggered by it is executed on master branch and I think 
there is no convenient way to run it on different branches. If your question is 
related to "Build Release Candidate" phase it may be solved here as well since 
build will be triggered during pushing to master/release/release-candidate 
branches (and related to it by commit hash).
   
   Yes, I think this works. My question was how could a release manager build 
wheels. Release manager will push some commits but they will not do this using 
a PR. Would it still trigger builds? And release manager might want to build 
release at a specific commit in the release branch could they do this by 
opening a PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay commented on pull request #11877: [BEAM-10184] Build python wheels on GitHub Actions for Linux/MacOS

2020-06-09 Thread GitBox


aaltay commented on pull request #11877:
URL: https://github.com/apache/beam/pull/11877#issuecomment-641482606


   This looks nice. I have a few clarifiying questions:
   
   on pull_request: This is good. Would it trigger on every pull request. This 
may not be needed. I am not sure what GH resource we will have and what kind of 
queue there will be. We would not want ot add test load to all PRs. 
   
   Questions:
   - Can we trigger this with a phrase? And/Or limit it to python changes?
   - What is gh-action artifacts? How does one access this to get the artifacts?
   
   on push:
   Question:
   - What is release-candidate branches? It is release-* ? (If yes, this looks 
good.)
   
   on schedule: this looks good.
   
   Other questions:
   - How do we use cancel previous runs workflow? Does it work automatically? 
(If yes, this is great.)
   
   > Nightly is addressed by on schedule trigger.
   +1.
   
   > I investigated triggering builds manually by using repository_dispatch and 
curl however build triggered by it is executed on master branch and I think 
there is no convenient way to run it on different branches. If your question is 
related to "Build Release Candidate" phase it may be solved here as well since 
build will be triggered during pushing to master/release/release-candidate 
branches (and related to it by commit hash).
   
   Yes, I think this works. My question was how could a release manager build 
wheels. Release manager will push some commits but they will not do this using 
a PR. Would it still trigger builds? And release manager might want to build 
release at a specific commit in the release branch could they do this by 
opening a PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay commented on pull request #11851: [BEAM-10144] Update PipelineOptions snippets for best practices

2020-06-09 Thread GitBox


aaltay commented on pull request #11851:
URL: https://github.com/apache/beam/pull/11851#issuecomment-641482781


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] davidcavazos commented on pull request #11851: [BEAM-10144] Update PipelineOptions snippets for best practices

2020-06-09 Thread GitBox


davidcavazos commented on pull request #11851:
URL: https://github.com/apache/beam/pull/11851#issuecomment-641474784


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] davidcavazos commented on pull request #11851: [BEAM-10144] Update PipelineOptions snippets for best practices

2020-06-09 Thread GitBox


davidcavazos commented on pull request #11851:
URL: https://github.com/apache/beam/pull/11851#issuecomment-641474131


   Forgot to use `argv` to parse the pipeline options in 
`pipeline_options_local`. Tests should pass now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] sabhyankar commented on a change in pull request #11950: [BEAM-8596]: Add SplunkIO transform to write messages to Splunk

2020-06-09 Thread GitBox


sabhyankar commented on a change in pull request #11950:
URL: https://github.com/apache/beam/pull/11950#discussion_r437606598



##
File path: 
sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEvent.java
##
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.splunk;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import com.google.gson.annotations.SerializedName;
+import javax.annotation.Nullable;
+
+/**
+ * A {@link SplunkEvent} describes a single payload sent to Splunk's Http 
Event Collector (HEC)
+ * endpoint.
+ *
+ * Each object represents a single event and related metadata elements such 
as:
+ *
+ * 
+ *   time
+ *   host
+ *   source
+ *   sourceType
+ *   index
+ * 
+ */
+@AutoValue
+public abstract class SplunkEvent {

Review comment:
   Thank for the pointer @pabloem 
   
   I have switched from using custom coders to DefaultSchema with 
AutoValueSchema.
   I noticed that in 
[AutoValueUtils](https://github.com/apache/beam/blob/c3a2dd89616faea5a2171ae6d8e39a77f6e39422/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java#L187-L191)
 that the build method should literally be called 'build'. This required me to 
change some of my method names.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia opened a new pull request #11958: fixup! roll back changes

2020-06-09 Thread GitBox


amaliujia opened a new pull request #11958:
URL: https://github.com/apache/beam/pull/11958


   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 

[GitHub] [beam] jaketf commented on a change in pull request #11702: [BEAM-9990] Add Conditional Update and Conditional Create to FhirIO

2020-06-09 Thread GitBox


jaketf commented on a change in pull request #11702:
URL: https://github.com/apache/beam/pull/11702#discussion_r437590913



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##
@@ -155,17 +168,53 @@
  * FhirIO.Write.Result writeResult =
  * output.apply("Execute FHIR Bundles", 
FhirIO.executeBundles(options.getExistingFhirStore()));
  *
+ * // Alternatively you could use import for high throughput to a new store.
+ * FhirIO.Write.Result writeResult =
+ * output.apply("Import FHIR Resources", 
FhirIO.executeBundles(options.getNewFhirStore()));
+ * // [End Writing ]
+ *
  * PCollection> failedBundles = 
writeResult.getFailedInsertsWithErr();
  *
+ * // [Begin Writing to Dead Letter Queue]
  * failedBundles.apply("Write failed bundles to BigQuery",
  * BigQueryIO
  * .write()
  * .to(option.getBQFhirExecuteBundlesDeadLetterTable())
  * .withFormatFunction(new HealthcareIOErrorToTableRow()));
+ * // [End Writing to Dead Letter Queue]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional update
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ * .apply("Reconcile with Conditional Update",
+ * FhirIO.ConditionalUpdate(fhirStore)
+ * 
.withFormatBodyFunction(HealthcareIOError::getDataResource)
+ * .withTypeFunction((HealthcareIOError err) -> {
+ *   String body = err.getDataResource();
+ *   // TODO(user) insert logic to exctract type.
+ *   return params;
+ * })
+ * .withSearchParametersFunction((HealthcareIOError err) 
-> {

Review comment:
   Hmm on second thought the best way to do this if for the user to use 
HAPI library search builder which will be better maintained. 
   I think we can add this as a suggestion in the docs rather than making this 
a beam dependency or duplicating the functionality. Thoughts?
   https://hapifhir.io/hapi-fhir/docs/server_plain/rest_operations_search.html





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robinyqiu opened a new pull request #11957: [BEAM-10033] zetaSqlValueToJavaObject uses value.getType()

2020-06-09 Thread GitBox


robinyqiu opened a new pull request #11957:
URL: https://github.com/apache/beam/pull/11957


   R: @apilloud 
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)[![Build
 

[GitHub] [beam] robertwb merged pull request #11940: [BEAM-6215] Additional tests for FlatMap label.

2020-06-09 Thread GitBox


robertwb merged pull request #11940:
URL: https://github.com/apache/beam/pull/11940


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.

2020-06-09 Thread GitBox


robertwb commented on pull request #11932:
URL: https://github.com/apache/beam/pull/11932#issuecomment-641449939


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay commented on pull request #11939: [BEAM-10197] Support typehints for Python's frozenset

2020-06-09 Thread GitBox


aaltay commented on pull request #11939:
URL: https://github.com/apache/beam/pull/11939#issuecomment-641409860







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik merged pull request #11941: [BEAM-2939] Fix splittable DoFn lifecycle.

2020-06-09 Thread GitBox


lukecwik merged pull request #11941:
URL: https://github.com/apache/beam/pull/11941


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #11941: [BEAM-2939] Fix splittable DoFn lifecycle.

2020-06-09 Thread GitBox


lukecwik commented on a change in pull request #11941:
URL: https://github.com/apache/beam/pull/11941#discussion_r436891532



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -193,7 +193,21 @@
   bundleFinalizer);
 
   // Register the appropriate handlers.
-  startFunctionRegistry.register(pTransformId, runner::startBundle);
+  switch (pTransform.getSpec().getUrn()) {
+case PTransformTranslation.PAR_DO_TRANSFORM_URN:
+case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN:
+case 
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
+  startFunctionRegistry.register(pTransformId, runner::startBundle);
+  break;
+case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:

Review comment:
   We can't re-use the existing `startBundle`/`finishBundle` methods since 
it would be confusing to the user as to which context they are executing in 
(e.g. finishBundle can produce output) so this would require adding 
`startBundleForGetInitialRestriction`, `finishBundleForGetInitialRestriction`, 
`startBundleForPairWithRestriction` and `finishBundleForPairWithRestriction`. I 
could see value in this for per bundle object lifetime management but any such 
change should likely happen outside of the scope of this PR.
   
   Any reason not to use `setup`/`teardown` for your object cache?
   

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -784,9 +812,8 @@ private ByteString encodeProgress(double value) throws 
IOException {
   default:
 // no-op
 }
-  }
 
-  private void startBundle() {
+// TODO: Support caching state data across bundle boundaries.

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #11901: Prototype schema-inferring Row constructor.

2020-06-09 Thread GitBox


TheNeuralBit commented on a change in pull request #11901:
URL: https://github.com/apache/beam/pull/11901#discussion_r437536115



##
File path: sdks/python/apache_beam/transforms/sql.py
##
@@ -74,3 +77,8 @@ def __init__(self, query, dialect=None):
 SqlTransformSchema(query=query, dialect=dialect)),
 BeamJarExpansionService(
 ':sdks:java:extensions:sql:expansion-service:shadowJar'))
+
+
+class Row(object):
+  def __init__(self, **kwargs):
+self.__dict__.update(kwargs)

Review comment:
   Mm nothing comes to mind. I suppose it could just be `apache_beam.Row` 
for now, and we can alias it if we add a schema package with other top-level 
schema stuff later.

##
File path: sdks/python/apache_beam/coders/row_coder.py
##
@@ -82,8 +86,19 @@ def from_runner_api_parameter(schema, components, 
unused_context):
 return RowCoder(schema)
 
   @staticmethod
-  def from_type_hint(named_tuple_type, registry):
-return RowCoder(named_tuple_to_schema(named_tuple_type))
+  def from_type_hint(type_hint, registry):
+if isinstance(type_hint, row_type.RowTypeConstraint):
+  schema = schema_pb2.Schema(
+  fields=[
+  schema_pb2.Field(
+  name=name,
+  type=typing_to_runner_api(type))
+  for (name, type) in type_hint._fields
+  ],
+  id=str(uuid.uuid4()))

Review comment:
   Could you move this inference to `typehints.schemas` alongside 
`named_tuple_to_schema`? I have a WIP PR for batching schema'd PCollections 
that are inputs to Dataframes and I should re-use this logic there.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] iemejia commented on pull request #11909: [BEAM-8134] Grafana dashboards for Nexmark tests

2020-06-09 Thread GitBox


iemejia commented on pull request #11909:
URL: https://github.com/apache/beam/pull/11909#issuecomment-641060650







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aromanenko-dev commented on pull request #11396: [BEAM-9742] Add Configurable FluentBackoff to JdbcIO Write

2020-06-09 Thread GitBox


aromanenko-dev commented on pull request #11396:
URL: https://github.com/apache/beam/pull/11396#issuecomment-640562379







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on a change in pull request #11948: [BEAM-10213] @Ignore: fix the test for testCastToDateWithCase.

2020-06-09 Thread GitBox


amaliujia commented on a change in pull request #11948:
URL: https://github.com/apache/beam/pull/11948#discussion_r436967724



##
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##
@@ -3565,19 +3566,16 @@ public void testCaseWithValueNoElseNoMatch() {
   }
 
   @Test
-  @Ignore(
-  "Codegen generates code that Janino cannot compile, need further 
investigation on root"
-  + " cause.")
   public void testCastToDateWithCase() {
 String sql =
 "SELECT f_int, \n"
 + "CASE WHEN CHAR_LENGTH(TRIM(f_string)) = 8 \n"
 + "THEN CAST (CONCAT(\n"
-+ "   SUBSTR(TRIM(f_string), 0, 4) \n"
++ "   SUBSTR(TRIM(f_string), 1, 4) \n"

Review comment:
   SUBSTR count the first char from position 1.

##
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##
@@ -3565,19 +3566,16 @@ public void testCaseWithValueNoElseNoMatch() {
   }
 
   @Test
-  @Ignore(
-  "Codegen generates code that Janino cannot compile, need further 
investigation on root"
-  + " cause.")
   public void testCastToDateWithCase() {
 String sql =
 "SELECT f_int, \n"
 + "CASE WHEN CHAR_LENGTH(TRIM(f_string)) = 8 \n"
 + "THEN CAST (CONCAT(\n"
-+ "   SUBSTR(TRIM(f_string), 0, 4) \n"
++ "   SUBSTR(TRIM(f_string), 1, 4) \n"

Review comment:
   SUBSTR counts the first char from position 1.

##
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##
@@ -3587,11 +3585,14 @@ public void testCastToDateWithCase() {
 PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
 
 Schema resultType =
-Schema.builder().addInt32Field("f_int").addNullableField("f_date", 
DATETIME).build();
+Schema.builder()
+.addInt64Field("f_long")
+.addNullableField("f_date", FieldType.logicalType(SqlTypes.DATE))
+.build();
 
 PAssert.that(stream)
 .containsInAnyOrder(
-Row.withSchema(resultType).addValues(1, 
parseDate("2018-10-18")).build());
+Row.withSchema(resultType).addValues(1L, 
parseDateToLocalDate("2018-10-18")).build());

Review comment:
   Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on a change in pull request #11834: [BEAM-10117] Correct erroneous Job Failed message

2020-06-09 Thread GitBox


chamikaramj commented on a change in pull request #11834:
URL: https://github.com/apache/beam/pull/11834#discussion_r436891884



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
##
@@ -137,8 +137,8 @@ void waitForDone() throws Exception {
   throw e;
 }
   } else {
-// Job failed, schedule it again.
-LOG.info("Job {} failed. retrying.", 
jobInfo.pendingJob.currentJobId);
+// Job not yet complete, schedule it again.

Review comment:
   Hmm, seems like pollJob() call usually returns false if the job failed.
   
https://github.com/apache/beam/blob/2a4092dfb8c46408818402b1c4a09a8cd44e907a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java#L260





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aromanenko-dev commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-09 Thread GitBox


aromanenko-dev commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r435359001



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -906,19 +955,110 @@ public void setValueDeserializer(String 
valueDeserializer) {
   Coder keyCoder = getKeyCoder(coderRegistry);
   Coder valueCoder = getValueCoder(coderRegistry);
 
-  // Handles unbounded source to bounded conversion if maxNumRecords or 
maxReadTime is set.
-  Unbounded> unbounded =
-  org.apache.beam.sdk.io.Read.from(
-  
toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
+  if (!isUseSDFTransform()
+  || 
!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), 
"beam_fn_api")

Review comment:
   In this case, please, add a comment about that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn opened a new pull request #11954: [Do not merge] Add a unit test exposing BEAM-10217

2020-06-09 Thread GitBox


tvalentyn opened a new pull request #11954:
URL: https://github.com/apache/beam/pull/11954


   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)[![Build
 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-09 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r436922850



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *   

[GitHub] [beam] robinyqiu commented on a change in pull request #11948: [BEAM-10213] @Ignore: fix the test for testCastToDateWithCase.

2020-06-09 Thread GitBox


robinyqiu commented on a change in pull request #11948:
URL: https://github.com/apache/beam/pull/11948#discussion_r436981137



##
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##
@@ -3587,11 +3585,14 @@ public void testCastToDateWithCase() {
 PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
 
 Schema resultType =
-Schema.builder().addInt32Field("f_int").addNullableField("f_date", 
DATETIME).build();
+Schema.builder()
+.addInt64Field("f_long")
+.addNullableField("f_date", FieldType.logicalType(SqlTypes.DATE))
+.build();
 
 PAssert.that(stream)
 .containsInAnyOrder(
-Row.withSchema(resultType).addValues(1, 
parseDate("2018-10-18")).build());
+Row.withSchema(resultType).addValues(1L, 
parseDateToLocalDate("2018-10-18")).build());

Review comment:
   I think you don't have to implement a 'parseDateToLocalDate' function 
yourself. You can use `LocalDate.of(y, m, d)` or 
'LocalDate.parse("-mm-dd")' directly.

##
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##
@@ -3587,11 +3585,14 @@ public void testCastToDateWithCase() {
 PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
 
 Schema resultType =
-Schema.builder().addInt32Field("f_int").addNullableField("f_date", 
DATETIME).build();
+Schema.builder()
+.addInt64Field("f_long")
+.addNullableField("f_date", FieldType.logicalType(SqlTypes.DATE))
+.build();
 
 PAssert.that(stream)
 .containsInAnyOrder(
-Row.withSchema(resultType).addValues(1, 
parseDate("2018-10-18")).build());
+Row.withSchema(resultType).addValues(1L, 
parseDateToLocalDate("2018-10-18")).build());

Review comment:
   I think you don't have to implement a 'parseDateToLocalDate' function 
yourself. You can use `LocalDate.of(y, m, d)` or 
`LocalDate.parse("-mm-dd")` directly.

##
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##
@@ -3587,11 +3585,14 @@ public void testCastToDateWithCase() {
 PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
 
 Schema resultType =
-Schema.builder().addInt32Field("f_int").addNullableField("f_date", 
DATETIME).build();
+Schema.builder()
+.addInt64Field("f_long")
+.addNullableField("f_date", FieldType.logicalType(SqlTypes.DATE))
+.build();
 
 PAssert.that(stream)
 .containsInAnyOrder(
-Row.withSchema(resultType).addValues(1, 
parseDate("2018-10-18")).build());
+Row.withSchema(resultType).addValues(1L, 
parseDateToLocalDate("2018-10-18")).build());

Review comment:
   I think you don't have to implement a `parseDateToLocalDate` function 
yourself. You can use `LocalDate.of(y, m, d)` or 
`LocalDate.parse("-mm-dd")` directly.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >