[GitHub] [beam] rionmonster commented on pull request #11944: [BEAM-10210] Beam Katas for Kotlin Blog Post
rionmonster commented on pull request #11944: URL: https://github.com/apache/beam/pull/11944#issuecomment-641711018 @henryken, Is there anything else needed on this front? I didn’t know how some of the website oriented changes like the blog worked and if I’d need to take someone for the actual merge. Thanks, Rion This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj closed pull request #11965: [BEAM-9992] | use Sets transform in BeamSQL
darshanj closed pull request #11965: URL: https://github.com/apache/beam/pull/11965 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] reubenvanammers commented on pull request #11955: [BEAM-10220] add support for implicit nulls for converting between beam rows and json
reubenvanammers commented on pull request #11955: URL: https://github.com/apache/beam/pull/11955#issuecomment-641689222 Updated PR in response to comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb merged pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.
robertwb merged pull request #11932: URL: https://github.com/apache/beam/pull/11932 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11959: refactor HCLS IO ITs to support stores in other projects
pabloem commented on pull request #11959: URL: https://github.com/apache/beam/pull/11959#issuecomment-641677744 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj opened a new pull request #11965: [BEAM-9992] | use Sets transform in BeamSQL
darshanj opened a new pull request #11965: URL: https://github.com/apache/beam/pull/11965 Remove Sets tranform in BeamSQL code. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
[GitHub] [beam] darshanj commented on pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms
darshanj commented on pull request #11610: URL: https://github.com/apache/beam/pull/11610#issuecomment-641674257 Thanks all for patiently reviewing this PR and providing valuable feedback. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] reubenvanammers commented on pull request #11955: [BEAM-10220] add support for implicit nulls for converting between beam rows and json
reubenvanammers commented on pull request #11955: URL: https://github.com/apache/beam/pull/11955#issuecomment-641671697 Thanks for the comments @TheNeuralBit. Yeah, I was planning on working on BEAM-7624 after this one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] reubenvanammers commented on a change in pull request #11955: [BEAM-10220] add support for implicit nulls for converting between beam rows and json
reubenvanammers commented on a change in pull request #11955: URL: https://github.com/apache/beam/pull/11955#discussion_r437814700 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java ## @@ -185,18 +186,25 @@ private RowJsonDeserializer(Schema schema) { this.schema = schema; } +public RowJsonDeserializer allowMissingFields(Boolean allowMissing){ + this.allowMissingFields = allowMissing; + return this; + } + @Override public Row deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException { // Parse and convert the root object to Row as if it's a nested field with name 'root' return (Row) extractJsonNodeValue( - FieldValue.of("root", FieldType.row(schema), jsonParser.readValueAsTree())); + FieldValue.of("root", FieldType.row(schema), jsonParser.readValueAsTree(), allowMissingFields)); } + + private static Object extractJsonNodeValue(FieldValue fieldValue) { - if (!fieldValue.isJsonValuePresent()) { Review comment: I initially thought about this, then changed to putting it into the FieldValue because extractJsonNodeValue was static. Now that you mention it, I believe it is probably neater to just check this.allowMissing fields and change the method to non-static. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on pull request #11960: [BEAM-9999] Remove Gearpump runner.
amaliujia commented on pull request #11960: URL: https://github.com/apache/beam/pull/11960#issuecomment-641667621 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow
rezarokni commented on a change in pull request #11929: URL: https://github.com/apache/beam/pull/11929#discussion_r437807053 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java ## @@ -116,4 +131,267 @@ private ObjectMapper objectMapper() { return this.objectMapper; } } + + /** + * Enable Dead letter support. If this value is set errors in the parsing layer are returned as + * Row objects within a {@link ParseResult} + * + * You can access the results by using: + * + * ParseResult results = jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA)); + * + * {@link ParseResult#getResults()} + * + * {@Code PCollection personRows = results.getResults()} + * + * {@link ParseResult#getFailedToParseLines()} + * + * {@Code PCollection errorsLines = results.getFailedToParseLines()} + * + * To access the reason for the failure you will need to first enable extended error reporting. + * {@Code ParseResult results = + * jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo()); } + * + * {@link ParseResult#getFailedToParseLinesWithErr()} + * + * {@Code PCollection errorsLinesWithErrMsg = results.getFailedToParseLines()} + * + * @return {@link JsonToRowWithErrFn} + */ + @Experimental(Kind.SCHEMAS) + public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) { Review comment: Changed to withExceptionReporting. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow
rezarokni commented on a change in pull request #11929: URL: https://github.com/apache/beam/pull/11929#discussion_r437806981 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java ## @@ -116,4 +157,65 @@ private ObjectMapper objectMapper() { return this.objectMapper; } } + + static class JsonToRowWithFailureCaptureFn + extends PTransform, PCollectionTuple> { +private transient volatile @Nullable ObjectMapper objectMapper; +private Schema schema; +private static final String METRIC_NAMESPACE = "JsonToRowFn"; +private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure"; + +private Distribution jsonConversionErrors = +Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME); + +public static final TupleTag main = MAIN_TUPLE_TAG; +public static final TupleTag deadLetter = DEAD_LETTER_TUPLE_TAG; + +PCollection deadLetterCollection; + +static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) { + // Throw exception if this schema is not supported by RowJson + RowJson.verifySchemaSupported(rowSchema); + return new JsonToRowWithFailureCaptureFn(rowSchema); +} + +private JsonToRowWithFailureCaptureFn(Schema schema) { + this.schema = schema; +} + +@Override +public PCollectionTuple expand(PCollection jsonStrings) { + + return jsonStrings.apply( + ParDo.of( + new DoFn() { +@ProcessElement +public void processElement(ProcessContext context) { Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay merged pull request #11851: [BEAM-10144] Update PipelineOptions snippets for best practices
aaltay merged pull request #11851: URL: https://github.com/apache/beam/pull/11851 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11086: [BEAM-8910] Make custom BQ source read from Avro
pabloem commented on pull request #11086: URL: https://github.com/apache/beam/pull/11086#issuecomment-641652552 Run Python 3.7 PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11959: refactor HCLS IO ITs to support stores in other projects
pabloem commented on pull request #11959: URL: https://github.com/apache/beam/pull/11959#issuecomment-641652307 Run Java PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] JustineKoa commented on pull request #11964: [BEAM-9987] added instruction for converting maven project to gradle
JustineKoa commented on pull request #11964: URL: https://github.com/apache/beam/pull/11964#issuecomment-641651721 Review: @angoenka This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] JustineKoa opened a new pull request #11964: [BEAM-9987] added instruction for converting maven project to gradle
JustineKoa opened a new pull request #11964: URL: https://github.com/apache/beam/pull/11964 Added instructions for converting the example WordCount Maven project generated using the generate:archetype into a Gradle project. I've attached images of what it looks like when I run it locally. I don't have much experience writing instructions like this so please let me know if there are any suggestions on how the phrasing/formatting could be better! Added to setting up development environment: ![gradleinstructions1](https://user-images.githubusercontent.com/6096872/84213475-49f54c00-aab0-11ea-927d-f1870610d4e4.png) Instructions: ![gradleinstructions2](https://user-images.githubusercontent.com/6096872/84213486-4f529680-aab0-11ea-9fa0-2f16eef58ae7.png) Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
[GitHub] [beam] robertwb commented on pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.
robertwb commented on pull request #11932: URL: https://github.com/apache/beam/pull/11932#issuecomment-641650830 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.
robertwb commented on pull request #11932: URL: https://github.com/apache/beam/pull/11932#issuecomment-641650721 sdks:java:io:rabbitmq:test timed out This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on pull request #11946: Fix VideoIntelligence IT tests
tvalentyn commented on pull request #11946: URL: https://github.com/apache/beam/pull/11946#issuecomment-641650596 I filed https://issues.apache.org/jira/browse/BEAM-10229 - is it already fixed by this change? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11955: [BEAM-10220] add support for implicit nulls for converting between beam rows and json
TheNeuralBit commented on pull request #11955: URL: https://github.com/apache/beam/pull/11955#issuecomment-641644986 FYI you can run `./gradlew spotlessApply` locally to apply required formatting changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #11955: [BEAM-10220] add support for implicit nulls for converting between beam rows and json
TheNeuralBit commented on a change in pull request #11955: URL: https://github.com/apache/beam/pull/11955#discussion_r437784412 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java ## @@ -362,6 +382,11 @@ private RowJsonSerializer(Schema schema) { super(Row.class); this.schema = schema; } + +public RowJsonSerializer ignoreNullsOnWrite(Boolean ignoreNullsOnWrite) { Review comment: Similar comment here, `withIgnoreNullsOnWrite` and add a docstring. (I think checkstyle will complain without the docstring anyway). ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java ## @@ -185,18 +186,25 @@ private RowJsonDeserializer(Schema schema) { this.schema = schema; } +public RowJsonDeserializer allowMissingFields(Boolean allowMissing){ + this.allowMissingFields = allowMissing; + return this; + } + @Override public Row deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException { // Parse and convert the root object to Row as if it's a nested field with name 'root' return (Row) extractJsonNodeValue( - FieldValue.of("root", FieldType.row(schema), jsonParser.readValueAsTree())); + FieldValue.of("root", FieldType.row(schema), jsonParser.readValueAsTree(), allowMissingFields)); } + + private static Object extractJsonNodeValue(FieldValue fieldValue) { - if (!fieldValue.isJsonValuePresent()) { Review comment: I think you could just check `this.allowMissingFields` here rather than passing it into all the `FieldValue` instances, no? ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java ## @@ -185,18 +186,25 @@ private RowJsonDeserializer(Schema schema) { this.schema = schema; } +public RowJsonDeserializer allowMissingFields(Boolean allowMissing){ Review comment: nit: could you change this to `withAllowMissingFields`? Also please add a docstring ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java ## @@ -362,6 +382,11 @@ private RowJsonSerializer(Schema schema) { super(Row.class); this.schema = schema; } + +public RowJsonSerializer ignoreNullsOnWrite(Boolean ignoreNullsOnWrite) { Review comment: I might call it "DropNullsOnWrite" instead of ignore, but I don't feel strongly about it ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java ## @@ -375,6 +400,9 @@ private void writeRow(Row row, Schema schema, JsonGenerator gen) throws IOExcept for (int i = 0; i < schema.getFieldCount(); ++i) { Field field = schema.getField(i); Object value = row.getValue(i); +if (ignoreNullsOnWrite && value == null){ + continue; +} Review comment: This should also check `field.getType().getNullable()` like the other conditional. If we get a null for a non-nullable field we should fail loudly rather than silently dropping it. ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java ## @@ -185,18 +186,25 @@ private RowJsonDeserializer(Schema schema) { this.schema = schema; } +public RowJsonDeserializer allowMissingFields(Boolean allowMissing){ Review comment: We might want to make this an enum so in the future there could be a third mode where nulls _must_ be encoded with a missing field, and having a null field value would be considered an error. The mode you've added here is a permissive middle ground where we allow either one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb opened a new pull request #11963: Add relational GroupBy transform to Python.
robertwb opened a new pull request #11963: URL: https://github.com/apache/beam/pull/11963 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
[GitHub] [beam] robertwb commented on a change in pull request #11838: [BEAM-9322] Modify the TestStream to output a dict when no output_tags are specified
robertwb commented on a change in pull request #11838: URL: https://github.com/apache/beam/pull/11838#discussion_r437787906 ## File path: sdks/python/apache_beam/testing/test_stream.py ## @@ -291,10 +291,10 @@ def expand(self, pbegin): assert isinstance(pbegin, pvalue.PBegin) self.pipeline = pbegin.pipeline if not self.output_tags: - self.output_tags = set([None]) + self.output_tags = {None} Review comment: OK, in that case I'm fine with this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rohdesamuel commented on a change in pull request #11838: [BEAM-9322] Modify the TestStream to output a dict when no output_tags are specified
rohdesamuel commented on a change in pull request #11838: URL: https://github.com/apache/beam/pull/11838#discussion_r437784983 ## File path: sdks/python/apache_beam/testing/test_stream.py ## @@ -291,10 +291,10 @@ def expand(self, pbegin): assert isinstance(pbegin, pvalue.PBegin) self.pipeline = pbegin.pipeline if not self.output_tags: - self.output_tags = set([None]) + self.output_tags = {None} Review comment: This is a little harder to implement, mainly because the TestStream retrieves its output_tags from the keys of the PTransform payload holding it. This means that output_tags = None and output_tags = {None} look the same to the PTransform payload outputs as a map with a single key being None. When a TestStream is reconstructed, even if the original output_tags was unset, it will be constructed with output_tags = {None}. I think the best we can do is to treat {None} and None the same way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11955: [BEAM-10220] add support for implicit nulls for converting between beam rows and json
TheNeuralBit commented on pull request #11955: URL: https://github.com/apache/beam/pull/11955#issuecomment-641639457 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11955: [BEAM-10220] add support for implicit nulls for converting between beam rows and json
TheNeuralBit commented on pull request #11955: URL: https://github.com/apache/beam/pull/11955#issuecomment-641639519 Unfortunately only committers can trigger jenkins now :/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb merged pull request #11901: Prototype schema-inferring Row constructor.
robertwb merged pull request #11901: URL: https://github.com/apache/beam/pull/11901 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] apilloud commented on pull request #11957: [BEAM-10033] zetaSqlValueToJavaObject uses value.getType()
apilloud commented on pull request #11957: URL: https://github.com/apache/beam/pull/11957#issuecomment-641631947 Thanks! We have a bunch of nullness issues, investigating some of those is how I discovered this issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] reubenvanammers edited a comment on pull request #11955: [BEAM-10220] add support for implicit nulls for converting between beam rows and json
reubenvanammers edited a comment on pull request #11955: URL: https://github.com/apache/beam/pull/11955#issuecomment-641097493 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robinyqiu commented on pull request #11957: [BEAM-10033] zetaSqlValueToJavaObject uses value.getType()
robinyqiu commented on pull request #11957: URL: https://github.com/apache/beam/pull/11957#issuecomment-641627057 Well. Some tests are failing and I realized this turns out to be not that simple change. There are subtle differences between Beam schema and ZetaSQL struct that prevents this change (e.g. if ZetaSQL types are by default nullable whereas it is a label in Beam field type; and ZetaSQL struct allows fields with same name but Beam does not). I will take more time to figure out what is the correct thing to do here. In the meanwhile, I will send out some other small cleanup PRs for review. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on pull request #11851: [BEAM-10144] Update PipelineOptions snippets for best practices
aaltay commented on pull request #11851: URL: https://github.com/apache/beam/pull/11851#issuecomment-641621183 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow
rezarokni commented on a change in pull request #11929: URL: https://github.com/apache/beam/pull/11929#discussion_r437764105 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java ## @@ -116,4 +131,267 @@ private ObjectMapper objectMapper() { return this.objectMapper; } } + + /** + * Enable Dead letter support. If this value is set errors in the parsing layer are returned as + * Row objects within a {@link ParseResult} + * + * You can access the results by using: + * + * ParseResult results = jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA)); + * + * {@link ParseResult#getResults()} + * + * {@Code PCollection personRows = results.getResults()} + * + * {@link ParseResult#getFailedToParseLines()} + * + * {@Code PCollection errorsLines = results.getFailedToParseLines()} + * + * To access the reason for the failure you will need to first enable extended error reporting. + * {@Code ParseResult results = + * jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo()); } + * + * {@link ParseResult#getFailedToParseLinesWithErr()} + * + * {@Code PCollection errorsLinesWithErrMsg = results.getFailedToParseLines()} + * + * @return {@link JsonToRowWithErrFn} + */ + @Experimental(Kind.SCHEMAS) + public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) { +return JsonToRowWithErrFn.forSchema(rowSchema); + } + + @AutoValue + abstract static class JsonToRowWithErrFn extends PTransform, ParseResult> { + +private Pipeline pipeline; + +private PCollection parsedLine; +private PCollection failedParse; +private PCollection failedParseWithErr; + +private static final String LINE_FIELD_NAME = "line"; +private static final String ERROR_FIELD_NAME = "err"; + +public static final Schema ERROR_ROW_SCHEMA = +Schema.of(Field.of(LINE_FIELD_NAME, FieldType.STRING)); + +public static final Schema ERROR_ROW_WITH_ERR_MSG_SCHEMA = +Schema.of( +Field.of(LINE_FIELD_NAME, FieldType.STRING), +Field.of(ERROR_FIELD_NAME, FieldType.STRING)); + +static final TupleTag PARSED_LINE = new TupleTag() {}; +static final TupleTag PARSE_ERROR_LINE = new TupleTag() {}; +static final TupleTag PARSE_ERROR_LINE_WITH_MSG = new TupleTag() {}; + +public abstract Schema getSchema(); + +public abstract String getLineFieldName(); + +public abstract String getErrorFieldName(); + +public abstract boolean getExtendedErrorInfo(); + +PCollection deadLetterCollection; + +public abstract Builder toBuilder(); + +@AutoValue.Builder +public abstract static class Builder { + public abstract Builder setSchema(Schema value); + + public abstract Builder setLineFieldName(String value); + + public abstract Builder setErrorFieldName(String value); + + public abstract Builder setExtendedErrorInfo(boolean value); + + public abstract JsonToRowWithErrFn build(); +} + +public static JsonToRowWithErrFn forSchema(Schema rowSchema) { + // Throw exception if this schema is not supported by RowJson + RowJson.verifySchemaSupported(rowSchema); + return new AutoValue_JsonToRow_JsonToRowWithErrFn.Builder() + .setSchema(rowSchema) + .setExtendedErrorInfo(false) + .setLineFieldName(LINE_FIELD_NAME) + .setErrorFieldName(ERROR_FIELD_NAME) + .build(); +} + +/** + * Adds the error message to the returned error Row. + * + * @return {@link JsonToRow} + */ +public JsonToRowWithErrFn withExtendedErrorInfo() { + return this.toBuilder().setExtendedErrorInfo(true).build(); +} + +/** + * Sets the field name for the line field in the returned Row. + * + * @return {@link JsonToRow} + */ +public JsonToRowWithErrFn setLineField(String lineField) { + return this.toBuilder().setLineFieldName(lineField).build(); +} + +/** + * Adds the error message to the returned error Row. + * + * @return {@link JsonToRow} + */ +public JsonToRowWithErrFn setErrorField(String errorField) { + if (!this.getExtendedErrorInfo()) { +throw new IllegalArgumentException( +"This option is only available with Extended Error Info."); + } + return this.toBuilder().setErrorFieldName(errorField).build(); +} + +@Override +public ParseResult expand(PCollection jsonStrings) { + + PCollectionTuple result = + jsonStrings.apply( + ParDo.of(new ParseWithError(this.getSchema(), getExtendedErrorInfo())) + .withOutputTags( + PARSED_LINE, + TupleTagList.of(PARSE_ERROR_LINE).and(PARSE_ERROR_LINE_WITH_MSG))); + + this.parsedLine = result.get(PARSED_LINE).setRowSchema(this.getSchema()); + this.failedParse = +
[GitHub] [beam] aaltay commented on pull request #11960: [BEAM-9999] Remove Gearpump runner.
aaltay commented on pull request #11960: URL: https://github.com/apache/beam/pull/11960#issuecomment-641619571 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow
rezarokni commented on a change in pull request #11929: URL: https://github.com/apache/beam/pull/11929#discussion_r437762683 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java ## @@ -116,4 +131,267 @@ private ObjectMapper objectMapper() { return this.objectMapper; } } + + /** + * Enable Dead letter support. If this value is set errors in the parsing layer are returned as + * Row objects within a {@link ParseResult} + * + * You can access the results by using: + * + * ParseResult results = jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA)); + * + * {@link ParseResult#getResults()} + * + * {@Code PCollection personRows = results.getResults()} + * + * {@link ParseResult#getFailedToParseLines()} + * + * {@Code PCollection errorsLines = results.getFailedToParseLines()} + * + * To access the reason for the failure you will need to first enable extended error reporting. + * {@Code ParseResult results = + * jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo()); } + * + * {@link ParseResult#getFailedToParseLinesWithErr()} + * + * {@Code PCollection errorsLinesWithErrMsg = results.getFailedToParseLines()} + * + * @return {@link JsonToRowWithErrFn} + */ + @Experimental(Kind.SCHEMAS) + public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) { +return JsonToRowWithErrFn.forSchema(rowSchema); + } + + @AutoValue + abstract static class JsonToRowWithErrFn extends PTransform, ParseResult> { + +private Pipeline pipeline; + +private PCollection parsedLine; +private PCollection failedParse; +private PCollection failedParseWithErr; + +private static final String LINE_FIELD_NAME = "line"; +private static final String ERROR_FIELD_NAME = "err"; + +public static final Schema ERROR_ROW_SCHEMA = +Schema.of(Field.of(LINE_FIELD_NAME, FieldType.STRING)); + +public static final Schema ERROR_ROW_WITH_ERR_MSG_SCHEMA = +Schema.of( +Field.of(LINE_FIELD_NAME, FieldType.STRING), +Field.of(ERROR_FIELD_NAME, FieldType.STRING)); + +static final TupleTag PARSED_LINE = new TupleTag() {}; +static final TupleTag PARSE_ERROR_LINE = new TupleTag() {}; +static final TupleTag PARSE_ERROR_LINE_WITH_MSG = new TupleTag() {}; + +public abstract Schema getSchema(); + +public abstract String getLineFieldName(); + +public abstract String getErrorFieldName(); + +public abstract boolean getExtendedErrorInfo(); + +PCollection deadLetterCollection; + +public abstract Builder toBuilder(); + +@AutoValue.Builder +public abstract static class Builder { + public abstract Builder setSchema(Schema value); + + public abstract Builder setLineFieldName(String value); + + public abstract Builder setErrorFieldName(String value); + + public abstract Builder setExtendedErrorInfo(boolean value); + + public abstract JsonToRowWithErrFn build(); +} + +public static JsonToRowWithErrFn forSchema(Schema rowSchema) { + // Throw exception if this schema is not supported by RowJson + RowJson.verifySchemaSupported(rowSchema); + return new AutoValue_JsonToRow_JsonToRowWithErrFn.Builder() + .setSchema(rowSchema) + .setExtendedErrorInfo(false) + .setLineFieldName(LINE_FIELD_NAME) + .setErrorFieldName(ERROR_FIELD_NAME) + .build(); +} + +/** + * Adds the error message to the returned error Row. + * + * @return {@link JsonToRow} + */ +public JsonToRowWithErrFn withExtendedErrorInfo() { + return this.toBuilder().setExtendedErrorInfo(true).build(); +} + +/** + * Sets the field name for the line field in the returned Row. + * + * @return {@link JsonToRow} + */ +public JsonToRowWithErrFn setLineField(String lineField) { + return this.toBuilder().setLineFieldName(lineField).build(); +} + +/** + * Adds the error message to the returned error Row. + * + * @return {@link JsonToRow} + */ +public JsonToRowWithErrFn setErrorField(String errorField) { + if (!this.getExtendedErrorInfo()) { +throw new IllegalArgumentException( +"This option is only available with Extended Error Info."); + } + return this.toBuilder().setErrorFieldName(errorField).build(); +} + +@Override +public ParseResult expand(PCollection jsonStrings) { + + PCollectionTuple result = + jsonStrings.apply( + ParDo.of(new ParseWithError(this.getSchema(), getExtendedErrorInfo())) + .withOutputTags( + PARSED_LINE, + TupleTagList.of(PARSE_ERROR_LINE).and(PARSE_ERROR_LINE_WITH_MSG))); + + this.parsedLine = result.get(PARSED_LINE).setRowSchema(this.getSchema()); + this.failedParse = +
[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow
rezarokni commented on a change in pull request #11929: URL: https://github.com/apache/beam/pull/11929#discussion_r437762165 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java ## @@ -116,4 +131,267 @@ private ObjectMapper objectMapper() { return this.objectMapper; } } + + /** + * Enable Dead letter support. If this value is set errors in the parsing layer are returned as + * Row objects within a {@link ParseResult} + * + * You can access the results by using: + * + * ParseResult results = jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA)); + * + * {@link ParseResult#getResults()} + * + * {@Code PCollection personRows = results.getResults()} + * + * {@link ParseResult#getFailedToParseLines()} + * + * {@Code PCollection errorsLines = results.getFailedToParseLines()} + * + * To access the reason for the failure you will need to first enable extended error reporting. + * {@Code ParseResult results = + * jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo()); } + * + * {@link ParseResult#getFailedToParseLinesWithErr()} + * + * {@Code PCollection errorsLinesWithErrMsg = results.getFailedToParseLines()} + * + * @return {@link JsonToRowWithErrFn} + */ + @Experimental(Kind.SCHEMAS) + public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) { +return JsonToRowWithErrFn.forSchema(rowSchema); + } + + @AutoValue + abstract static class JsonToRowWithErrFn extends PTransform, ParseResult> { + +private Pipeline pipeline; + +private PCollection parsedLine; +private PCollection failedParse; +private PCollection failedParseWithErr; + +private static final String LINE_FIELD_NAME = "line"; +private static final String ERROR_FIELD_NAME = "err"; + +public static final Schema ERROR_ROW_SCHEMA = +Schema.of(Field.of(LINE_FIELD_NAME, FieldType.STRING)); + +public static final Schema ERROR_ROW_WITH_ERR_MSG_SCHEMA = +Schema.of( +Field.of(LINE_FIELD_NAME, FieldType.STRING), +Field.of(ERROR_FIELD_NAME, FieldType.STRING)); + +static final TupleTag PARSED_LINE = new TupleTag() {}; +static final TupleTag PARSE_ERROR_LINE = new TupleTag() {}; +static final TupleTag PARSE_ERROR_LINE_WITH_MSG = new TupleTag() {}; + +public abstract Schema getSchema(); + +public abstract String getLineFieldName(); + +public abstract String getErrorFieldName(); + +public abstract boolean getExtendedErrorInfo(); + +PCollection deadLetterCollection; + +public abstract Builder toBuilder(); + +@AutoValue.Builder +public abstract static class Builder { + public abstract Builder setSchema(Schema value); + + public abstract Builder setLineFieldName(String value); + + public abstract Builder setErrorFieldName(String value); + + public abstract Builder setExtendedErrorInfo(boolean value); + + public abstract JsonToRowWithErrFn build(); +} + +public static JsonToRowWithErrFn forSchema(Schema rowSchema) { + // Throw exception if this schema is not supported by RowJson + RowJson.verifySchemaSupported(rowSchema); + return new AutoValue_JsonToRow_JsonToRowWithErrFn.Builder() + .setSchema(rowSchema) + .setExtendedErrorInfo(false) + .setLineFieldName(LINE_FIELD_NAME) + .setErrorFieldName(ERROR_FIELD_NAME) + .build(); +} + +/** + * Adds the error message to the returned error Row. + * + * @return {@link JsonToRow} + */ +public JsonToRowWithErrFn withExtendedErrorInfo() { + return this.toBuilder().setExtendedErrorInfo(true).build(); +} + +/** + * Sets the field name for the line field in the returned Row. + * + * @return {@link JsonToRow} + */ +public JsonToRowWithErrFn setLineField(String lineField) { + return this.toBuilder().setLineFieldName(lineField).build(); +} + +/** + * Adds the error message to the returned error Row. + * + * @return {@link JsonToRow} + */ +public JsonToRowWithErrFn setErrorField(String errorField) { + if (!this.getExtendedErrorInfo()) { +throw new IllegalArgumentException( +"This option is only available with Extended Error Info."); + } + return this.toBuilder().setErrorFieldName(errorField).build(); +} + +@Override +public ParseResult expand(PCollection jsonStrings) { + + PCollectionTuple result = + jsonStrings.apply( + ParDo.of(new ParseWithError(this.getSchema(), getExtendedErrorInfo())) + .withOutputTags( + PARSED_LINE, + TupleTagList.of(PARSE_ERROR_LINE).and(PARSE_ERROR_LINE_WITH_MSG))); + + this.parsedLine = result.get(PARSED_LINE).setRowSchema(this.getSchema()); + this.failedParse = +
[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow
rezarokni commented on a change in pull request #11929: URL: https://github.com/apache/beam/pull/11929#discussion_r437760550 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java ## @@ -116,4 +157,65 @@ private ObjectMapper objectMapper() { return this.objectMapper; } } + + static class JsonToRowWithFailureCaptureFn + extends PTransform, PCollectionTuple> { +private transient volatile @Nullable ObjectMapper objectMapper; +private Schema schema; +private static final String METRIC_NAMESPACE = "JsonToRowFn"; +private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure"; + +private Distribution jsonConversionErrors = +Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME); + +public static final TupleTag main = MAIN_TUPLE_TAG; +public static final TupleTag deadLetter = DEAD_LETTER_TUPLE_TAG; + +PCollection deadLetterCollection; + +static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) { + // Throw exception if this schema is not supported by RowJson + RowJson.verifySchemaSupported(rowSchema); + return new JsonToRowWithFailureCaptureFn(rowSchema); +} + +private JsonToRowWithFailureCaptureFn(Schema schema) { + this.schema = schema; +} + +@Override +public PCollectionTuple expand(PCollection jsonStrings) { + + return jsonStrings.apply( + ParDo.of( + new DoFn() { +@ProcessElement +public void processElement(ProcessContext context) { + try { +context.output(jsonToRow(objectMapper(), context.element())); + } catch (Exception ex) { +context.output( +deadLetter, +Row.withSchema(ERROR_ROW_SCHEMA) +.addValue(context.element()) +.addValue(ex.getMessage()) +.build()); + } +} + }) + .withOutputTags(main, TupleTagList.of(deadLetter))); Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem closed pull request #11850: [BEAM-1438] Allow 0 shards on WriteFiles streaming
pabloem closed pull request #11850: URL: https://github.com/apache/beam/pull/11850 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11086: [BEAM-8910] Make custom BQ source read from Avro
pabloem commented on pull request #11086: URL: https://github.com/apache/beam/pull/11086#issuecomment-641607885 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on a change in pull request #11901: Prototype schema-inferring Row constructor.
robertwb commented on a change in pull request #11901: URL: https://github.com/apache/beam/pull/11901#discussion_r437749540 ## File path: sdks/python/apache_beam/transforms/sql.py ## @@ -74,3 +77,8 @@ def __init__(self, query, dialect=None): SqlTransformSchema(query=query, dialect=dialect)), BeamJarExpansionService( ':sdks:java:extensions:sql:expansion-service:shadowJar')) + + +class Row(object): + def __init__(self, **kwargs): +self.__dict__.update(kwargs) Review comment: Moved to pvalue (imported to the top level), like TaggedOutput. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on a change in pull request #11901: Prototype schema-inferring Row constructor.
robertwb commented on a change in pull request #11901: URL: https://github.com/apache/beam/pull/11901#discussion_r437749425 ## File path: sdks/python/apache_beam/coders/row_coder.py ## @@ -82,8 +86,19 @@ def from_runner_api_parameter(schema, components, unused_context): return RowCoder(schema) @staticmethod - def from_type_hint(named_tuple_type, registry): -return RowCoder(named_tuple_to_schema(named_tuple_type)) + def from_type_hint(type_hint, registry): +if isinstance(type_hint, row_type.RowTypeConstraint): + schema = schema_pb2.Schema( + fields=[ + schema_pb2.Field( + name=name, + type=typing_to_runner_api(type)) + for (name, type) in type_hint._fields + ], + id=str(uuid.uuid4())) Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #11961: [BEAM-10225] Add log message when starting job server
ibzib commented on pull request #11961: URL: https://github.com/apache/beam/pull/11961#issuecomment-641605323 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11959: refactor HCLS IO ITs to support stores in other projects
pabloem commented on pull request #11959: URL: https://github.com/apache/beam/pull/11959#issuecomment-641604325 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11950: [BEAM-8596]: Add SplunkIO transform to write messages to Splunk
pabloem commented on pull request #11950: URL: https://github.com/apache/beam/pull/11950#issuecomment-641604419 thanks. Looking once more... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11959: refactor HCLS IO ITs to support stores in other projects
pabloem commented on pull request #11959: URL: https://github.com/apache/beam/pull/11959#issuecomment-641604201 Run Java PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11086: [BEAM-8910] Make custom BQ source read from Avro
pabloem commented on pull request #11086: URL: https://github.com/apache/beam/pull/11086#issuecomment-641603355 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn opened a new pull request #11962: [BEAM-10227] Switches typing version modifier to python_full_version.
tvalentyn opened a new pull request #11962: URL: https://github.com/apache/beam/pull/11962 Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)[![Build
[GitHub] [beam] pabloem commented on pull request #11824: [BEAM-10101] Add HttpIO / HttpFileSystem (Python)
pabloem commented on pull request #11824: URL: https://github.com/apache/beam/pull/11824#issuecomment-641602134 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit merged pull request #11951: [BEAM-8828] Added BigQueryTableProvider WriteDisposition configuration
TheNeuralBit merged pull request #11951: URL: https://github.com/apache/beam/pull/11951 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11951: [BEAM-8828] Added BigQueryTableProvider WriteDisposition configuration
TheNeuralBit commented on pull request #11951: URL: https://github.com/apache/beam/pull/11951#issuecomment-641598227 Looks like SQL PostCommit finished successfully [including the new tests](https://builds.apache.org/job/beam_PostCommit_SQL_PR/321/testReport/org.apache.beam.sdk.extensions.sql.meta.provider.bigquery/BigQueryReadWriteIT/testSQLWriteAndRead_WithWriteDispositionAppend/), just hasn't been updated here yet. Merging now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] annaqin418 opened a new pull request #11961: [BEAM-10225] Add log message when starting job server
annaqin418 opened a new pull request #11961: URL: https://github.com/apache/beam/pull/11961 R: @ibzib @robertwb Adds a log output message that clarifies when a newly-started job server is ready to receive jobs: `INFO org.apache.beam.runners.jobsubmission.JobServerDriver: Job server now running, terminate with Ctrl+C` Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python |
[GitHub] [beam] tysonjh opened a new pull request #11960: [BEAM-9999] Remove Gearpump runner.
tysonjh opened a new pull request #11960: URL: https://github.com/apache/beam/pull/11960 Remove Gearpump runner. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
[GitHub] [beam] pabloem commented on pull request #11086: [BEAM-8910] Make custom BQ source read from Avro
pabloem commented on pull request #11086: URL: https://github.com/apache/beam/pull/11086#issuecomment-641588460 So, I think the last remaining issue was whether to have the `use_json_exports` flag or individual flags per columns. Are there other topics to discuss? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj merged pull request #11928: [ BEAM-3788] Updates kafka.py pydocs
chamikaramj merged pull request #11928: URL: https://github.com/apache/beam/pull/11928 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11928: [ BEAM-3788] Updates kafka.py pydocs
chamikaramj commented on pull request #11928: URL: https://github.com/apache/beam/pull/11928#issuecomment-641587073 Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ihji commented on a change in pull request #11930: [BEAM-10202] make cross-language testing scripts OSX-compatible
ihji commented on a change in pull request #11930: URL: https://github.com/apache/beam/pull/11930#discussion_r437722946 ## File path: sdks/python/scripts/run_expansion_services.sh ## @@ -133,4 +136,7 @@ case $STARTSTOP in fi ;; esac -flock -u 200 + +if [[ $CHECK_FLOCK -eq 0 ]]; then Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on a change in pull request #11702: [BEAM-9990] Add Conditional Update and Conditional Create to FhirIO
pabloem commented on a change in pull request #11702: URL: https://github.com/apache/beam/pull/11702#discussion_r437721434 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java ## @@ -155,17 +168,53 @@ * FhirIO.Write.Result writeResult = * output.apply("Execute FHIR Bundles", FhirIO.executeBundles(options.getExistingFhirStore())); * + * // Alternatively you could use import for high throughput to a new store. + * FhirIO.Write.Result writeResult = + * output.apply("Import FHIR Resources", FhirIO.executeBundles(options.getNewFhirStore())); + * // [End Writing ] + * * PCollection> failedBundles = writeResult.getFailedInsertsWithErr(); * + * // [Begin Writing to Dead Letter Queue] * failedBundles.apply("Write failed bundles to BigQuery", * BigQueryIO * .write() * .to(option.getBQFhirExecuteBundlesDeadLetterTable()) * .withFormatFunction(new HealthcareIOErrorToTableRow())); + * // [End Writing to Dead Letter Queue] + * + * // Alternatively you may want to handle DeadLetter with conditional update + * // [Begin Reconciliation with Conditional Update] + * failedBundles + * .apply("Reconcile with Conditional Update", + * FhirIO.ConditionalUpdate(fhirStore) + * .withFormatBodyFunction(HealthcareIOError::getDataResource) + * .withTypeFunction((HealthcareIOError err) -> { + * String body = err.getDataResource(); + * // TODO(user) insert logic to exctract type. + * return params; + * }) + * .withSearchParametersFunction((HealthcareIOError err) -> { Review comment: I think that makes total sense. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11850: [BEAM-1438] Allow 0 shards on WriteFiles streaming
pabloem commented on pull request #11850: URL: https://github.com/apache/beam/pull/11850#issuecomment-641570009 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem merged pull request #11923: [BEAM-10176] Support STRUCT, FLOAT64, INT64 BigQuery types
pabloem merged pull request #11923: URL: https://github.com/apache/beam/pull/11923 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11923: [BEAM-10176] Support STRUCT, FLOAT64, INT64 BigQuery types
pabloem commented on pull request #11923: URL: https://github.com/apache/beam/pull/11923#issuecomment-641568293 thanks @chunyang This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on a change in pull request #11086: [BEAM-8910] Make custom BQ source read from Avro
pabloem commented on a change in pull request #11086: URL: https://github.com/apache/beam/pull/11086#discussion_r437719751 ## File path: sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py ## @@ -254,11 +256,36 @@ def test_big_query_new_types(self): 'output_schema': NEW_TYPES_OUTPUT_SCHEMA, 'use_standard_sql': False, 'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS, +'use_json_exports': True, 'on_success_matcher': all_of(*pipeline_verifiers) } options = self.test_pipeline.get_full_options_as_args(**extra_opts) big_query_query_to_table_pipeline.run_bq_pipeline(options) + @attr('IT') + def test_big_query_new_types(self): Review comment: Thanks for pointing that out. I've renamed the test cases to keep coverage for the feature on the same test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive m
amaliujia commented on pull request #11821: URL: https://github.com/apache/beam/pull/11821#issuecomment-641558615 Thanks for adding me as a reviewer. I tried to go through this PR and couldn't come up with valuable comments. (in fact, it was a good learning process for me to read these code changes). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on a change in pull request #11838: [BEAM-9322] Modify the TestStream to output a dict when no output_tags are specified
robertwb commented on a change in pull request #11838: URL: https://github.com/apache/beam/pull/11838#discussion_r437702523 ## File path: sdks/python/apache_beam/testing/test_stream.py ## @@ -291,10 +291,10 @@ def expand(self, pbegin): assert isinstance(pbegin, pvalue.PBegin) self.pipeline = pbegin.pipeline if not self.output_tags: - self.output_tags = set([None]) + self.output_tags = {None} Review comment: If the user explicitly sets the output tags to {None}, they might be expecting a dict. (Specifically, they might get a set from elsewhere, and set the output tags from that set, and it would be awkward to have to check that set to determine how to interpret the result. So in this case I would do ``` if not self.output_tags: return pvalue.PCollection(self.pipeline, is_bounded=False) else: return { ... for tag in self.output_tags} ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on pull request #11838: [BEAM-9322] Modify the TestStream to output a dict when no output_tags are specified
robertwb commented on pull request #11838: URL: https://github.com/apache/beam/pull/11838#issuecomment-641555415 R: @robertwb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] stale[bot] closed pull request #10958: [BEAM] Submitting final communication strategy
stale[bot] closed pull request #10958: URL: https://github.com/apache/beam/pull/10958 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] stale[bot] commented on pull request #10958: [BEAM] Submitting final communication strategy
stale[bot] commented on pull request #10958: URL: https://github.com/apache/beam/pull/10958#issuecomment-641555161 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] udim commented on pull request #11939: [BEAM-10197] Support typehints for Python's frozenset
udim commented on pull request #11939: URL: https://github.com/apache/beam/pull/11939#issuecomment-641543855 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11951: [BEAM-8828] Added BigQueryTableProvider WriteDisposition configuration
TheNeuralBit commented on pull request #11951: URL: https://github.com/apache/beam/pull/11951#issuecomment-641543407 Run SQL PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11951: [BEAM-8828] Added BigQueryTableProvider WriteDisposition configuration
TheNeuralBit commented on pull request #11951: URL: https://github.com/apache/beam/pull/11951#issuecomment-641540376 Run SQL Postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11951: [BEAM-8828] Added BigQueryTableProvider WriteDisposition configuration
TheNeuralBit commented on pull request #11951: URL: https://github.com/apache/beam/pull/11951#issuecomment-641540296 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.
robertwb commented on pull request #11932: URL: https://github.com/apache/beam/pull/11932#issuecomment-641534358 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia merged pull request #11958: [BEAM-10198] Roll back changes
amaliujia merged pull request #11958: URL: https://github.com/apache/beam/pull/11958 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on pull request #11958: [BEAM-10198] Roll back changes
amaliujia commented on pull request #11958: URL: https://github.com/apache/beam/pull/11958#issuecomment-641521023 @ihji This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] sabhyankar commented on pull request #11950: [BEAM-8596]: Add SplunkIO transform to write messages to Splunk
sabhyankar commented on pull request #11950: URL: https://github.com/apache/beam/pull/11950#issuecomment-641500443 @pabloem Thanks for the quick reviews! I have pushed a couple of commits with the mods you requested. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] sabhyankar commented on a change in pull request #11950: [BEAM-8596]: Add SplunkIO transform to write messages to Splunk
sabhyankar commented on a change in pull request #11950: URL: https://github.com/apache/beam/pull/11950#discussion_r437640982 ## File path: sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java ## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.splunk; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An unbounded sink for Splunk's Http Event Collector (HEC). + * + * For more information, see the online documentation at https://dev.splunk.com/enterprise/docs/dataapps/httpeventcollector/;>Splunk HEC. + * + * Writing to Splunk's HEC + * + * The {@link SplunkIO} class provides a {@link PTransform} that allows writing {@link + * SplunkEvent} messages into a Splunk HEC end point. + * + * It takes as an input a {@link PCollection PCollectionSplunkEvent}, where each {@link + * SplunkEvent} represents an event to be published to HEC. + * + * To configure a {@link SplunkIO}, you must provide at a minimum: + * + * + * url - HEC endpoint URL. + * token - HEC endpoint token. + * + * + * The {@link SplunkIO} transform can be customized further by optionally specifying: + * + * + * parallelism - Number of parallel requests to the HEC. + * batchCount - Number of events in a single batch. + * disableCertificateValidation - Whether to disable ssl validation (useful for self-signed + * certificates) + * + * + * This transform will return any non-transient write failures via a {@link PCollection + * PCollectionSplunkWriteError}, where each {@link SplunkWriteError} captures the error that + * occurred while attempting to write to HEC. These can be published to a dead-letter sink or + * reprocessed. + * + * For example: + * + * {@code + * PCollection events = ...; + * + * PCollection errors = + * events.apply("WriteToSplunk", + * SplunkIO.writeBuilder() Review comment: Thanks for that input! Switched to a fluent factory pattern for the SplunkIO transform. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11959: refactor HCLS IO ITs to support stores in other projects
pabloem commented on pull request #11959: URL: https://github.com/apache/beam/pull/11959#issuecomment-641496977 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on pull request #11954: [Do not merge] Add a unit test exposing BEAM-10217
tvalentyn commented on pull request #11954: URL: https://github.com/apache/beam/pull/11954#issuecomment-641495342 The tests failed on Py3.6. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay edited a comment on pull request #11877: [BEAM-10184] Build python wheels on GitHub Actions for Linux/MacOS
aaltay edited a comment on pull request #11877: URL: https://github.com/apache/beam/pull/11877#issuecomment-641482606 This looks nice. I have a few clarifiying questions: on pull_request: This is good. Would it trigger on every pull request. This may not be needed. I am not sure what GH resource we will have and what kind of queue there will be. We would not want ot add test load to all PRs. Questions: - Can we trigger this with a phrase? And/Or limit it to python changes? - What is gh-action artifacts? How does one access this to get the artifacts? on push: Question: - What is release-candidate branches? It is release-* ? (If yes, this looks good.) on schedule: this looks good. Other questions: - How do we use cancel previous runs workflow? Does it work automatically? (If yes, this is great.) > Nightly is addressed by on schedule trigger. +1. > I investigated triggering builds manually by using repository_dispatch and curl however build triggered by it is executed on master branch and I think there is no convenient way to run it on different branches. If your question is related to "Build Release Candidate" phase it may be solved here as well since build will be triggered during pushing to master/release/release-candidate branches (and related to it by commit hash). Yes, I think this works. My question was how could a release manager build wheels. Release manager will push some commits but they will not do this using a PR. Would it still trigger builds? And release manager might want to build release at a specific commit in the release branch could they do this by opening a PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on pull request #11877: [BEAM-10184] Build python wheels on GitHub Actions for Linux/MacOS
aaltay commented on pull request #11877: URL: https://github.com/apache/beam/pull/11877#issuecomment-641482606 This looks nice. I have a few clarifiying questions: on pull_request: This is good. Would it trigger on every pull request. This may not be needed. I am not sure what GH resource we will have and what kind of queue there will be. We would not want ot add test load to all PRs. Questions: - Can we trigger this with a phrase? And/Or limit it to python changes? - What is gh-action artifacts? How does one access this to get the artifacts? on push: Question: - What is release-candidate branches? It is release-* ? (If yes, this looks good.) on schedule: this looks good. Other questions: - How do we use cancel previous runs workflow? Does it work automatically? (If yes, this is great.) > Nightly is addressed by on schedule trigger. +1. > I investigated triggering builds manually by using repository_dispatch and curl however build triggered by it is executed on master branch and I think there is no convenient way to run it on different branches. If your question is related to "Build Release Candidate" phase it may be solved here as well since build will be triggered during pushing to master/release/release-candidate branches (and related to it by commit hash). Yes, I think this works. My question was how could a release manager build wheels. Release manager will push some commits but they will not do this using a PR. Would it still trigger builds? And release manager might want to build release at a specific commit in the release branch could they do this by opening a PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on pull request #11851: [BEAM-10144] Update PipelineOptions snippets for best practices
aaltay commented on pull request #11851: URL: https://github.com/apache/beam/pull/11851#issuecomment-641482781 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] davidcavazos commented on pull request #11851: [BEAM-10144] Update PipelineOptions snippets for best practices
davidcavazos commented on pull request #11851: URL: https://github.com/apache/beam/pull/11851#issuecomment-641474784 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] davidcavazos commented on pull request #11851: [BEAM-10144] Update PipelineOptions snippets for best practices
davidcavazos commented on pull request #11851: URL: https://github.com/apache/beam/pull/11851#issuecomment-641474131 Forgot to use `argv` to parse the pipeline options in `pipeline_options_local`. Tests should pass now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] sabhyankar commented on a change in pull request #11950: [BEAM-8596]: Add SplunkIO transform to write messages to Splunk
sabhyankar commented on a change in pull request #11950: URL: https://github.com/apache/beam/pull/11950#discussion_r437606598 ## File path: sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEvent.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.splunk; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import com.google.gson.annotations.SerializedName; +import javax.annotation.Nullable; + +/** + * A {@link SplunkEvent} describes a single payload sent to Splunk's Http Event Collector (HEC) + * endpoint. + * + * Each object represents a single event and related metadata elements such as: + * + * + * time + * host + * source + * sourceType + * index + * + */ +@AutoValue +public abstract class SplunkEvent { Review comment: Thank for the pointer @pabloem I have switched from using custom coders to DefaultSchema with AutoValueSchema. I noticed that in [AutoValueUtils](https://github.com/apache/beam/blob/c3a2dd89616faea5a2171ae6d8e39a77f6e39422/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java#L187-L191) that the build method should literally be called 'build'. This required me to change some of my method names. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia opened a new pull request #11958: fixup! roll back changes
amaliujia opened a new pull request #11958: URL: https://github.com/apache/beam/pull/11958 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
[GitHub] [beam] jaketf commented on a change in pull request #11702: [BEAM-9990] Add Conditional Update and Conditional Create to FhirIO
jaketf commented on a change in pull request #11702: URL: https://github.com/apache/beam/pull/11702#discussion_r437590913 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java ## @@ -155,17 +168,53 @@ * FhirIO.Write.Result writeResult = * output.apply("Execute FHIR Bundles", FhirIO.executeBundles(options.getExistingFhirStore())); * + * // Alternatively you could use import for high throughput to a new store. + * FhirIO.Write.Result writeResult = + * output.apply("Import FHIR Resources", FhirIO.executeBundles(options.getNewFhirStore())); + * // [End Writing ] + * * PCollection> failedBundles = writeResult.getFailedInsertsWithErr(); * + * // [Begin Writing to Dead Letter Queue] * failedBundles.apply("Write failed bundles to BigQuery", * BigQueryIO * .write() * .to(option.getBQFhirExecuteBundlesDeadLetterTable()) * .withFormatFunction(new HealthcareIOErrorToTableRow())); + * // [End Writing to Dead Letter Queue] + * + * // Alternatively you may want to handle DeadLetter with conditional update + * // [Begin Reconciliation with Conditional Update] + * failedBundles + * .apply("Reconcile with Conditional Update", + * FhirIO.ConditionalUpdate(fhirStore) + * .withFormatBodyFunction(HealthcareIOError::getDataResource) + * .withTypeFunction((HealthcareIOError err) -> { + * String body = err.getDataResource(); + * // TODO(user) insert logic to exctract type. + * return params; + * }) + * .withSearchParametersFunction((HealthcareIOError err) -> { Review comment: Hmm on second thought the best way to do this if for the user to use HAPI library search builder which will be better maintained. I think we can add this as a suggestion in the docs rather than making this a beam dependency or duplicating the functionality. Thoughts? https://hapifhir.io/hapi-fhir/docs/server_plain/rest_operations_search.html This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robinyqiu opened a new pull request #11957: [BEAM-10033] zetaSqlValueToJavaObject uses value.getType()
robinyqiu opened a new pull request #11957: URL: https://github.com/apache/beam/pull/11957 R: @apilloud Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)[![Build
[GitHub] [beam] robertwb merged pull request #11940: [BEAM-6215] Additional tests for FlatMap label.
robertwb merged pull request #11940: URL: https://github.com/apache/beam/pull/11940 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.
robertwb commented on pull request #11932: URL: https://github.com/apache/beam/pull/11932#issuecomment-641449939 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on pull request #11939: [BEAM-10197] Support typehints for Python's frozenset
aaltay commented on pull request #11939: URL: https://github.com/apache/beam/pull/11939#issuecomment-641409860 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik merged pull request #11941: [BEAM-2939] Fix splittable DoFn lifecycle.
lukecwik merged pull request #11941: URL: https://github.com/apache/beam/pull/11941 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11941: [BEAM-2939] Fix splittable DoFn lifecycle.
lukecwik commented on a change in pull request #11941: URL: https://github.com/apache/beam/pull/11941#discussion_r436891532 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -193,7 +193,21 @@ bundleFinalizer); // Register the appropriate handlers. - startFunctionRegistry.register(pTransformId, runner::startBundle); + switch (pTransform.getSpec().getUrn()) { +case PTransformTranslation.PAR_DO_TRANSFORM_URN: +case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN: +case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN: + startFunctionRegistry.register(pTransformId, runner::startBundle); + break; +case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN: Review comment: We can't re-use the existing `startBundle`/`finishBundle` methods since it would be confusing to the user as to which context they are executing in (e.g. finishBundle can produce output) so this would require adding `startBundleForGetInitialRestriction`, `finishBundleForGetInitialRestriction`, `startBundleForPairWithRestriction` and `finishBundleForPairWithRestriction`. I could see value in this for per bundle object lifetime management but any such change should likely happen outside of the scope of this PR. Any reason not to use `setup`/`teardown` for your object cache? ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -784,9 +812,8 @@ private ByteString encodeProgress(double value) throws IOException { default: // no-op } - } - private void startBundle() { +// TODO: Support caching state data across bundle boundaries. Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #11901: Prototype schema-inferring Row constructor.
TheNeuralBit commented on a change in pull request #11901: URL: https://github.com/apache/beam/pull/11901#discussion_r437536115 ## File path: sdks/python/apache_beam/transforms/sql.py ## @@ -74,3 +77,8 @@ def __init__(self, query, dialect=None): SqlTransformSchema(query=query, dialect=dialect)), BeamJarExpansionService( ':sdks:java:extensions:sql:expansion-service:shadowJar')) + + +class Row(object): + def __init__(self, **kwargs): +self.__dict__.update(kwargs) Review comment: Mm nothing comes to mind. I suppose it could just be `apache_beam.Row` for now, and we can alias it if we add a schema package with other top-level schema stuff later. ## File path: sdks/python/apache_beam/coders/row_coder.py ## @@ -82,8 +86,19 @@ def from_runner_api_parameter(schema, components, unused_context): return RowCoder(schema) @staticmethod - def from_type_hint(named_tuple_type, registry): -return RowCoder(named_tuple_to_schema(named_tuple_type)) + def from_type_hint(type_hint, registry): +if isinstance(type_hint, row_type.RowTypeConstraint): + schema = schema_pb2.Schema( + fields=[ + schema_pb2.Field( + name=name, + type=typing_to_runner_api(type)) + for (name, type) in type_hint._fields + ], + id=str(uuid.uuid4())) Review comment: Could you move this inference to `typehints.schemas` alongside `named_tuple_to_schema`? I have a WIP PR for batching schema'd PCollections that are inputs to Dataframes and I should re-use this logic there. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] iemejia commented on pull request #11909: [BEAM-8134] Grafana dashboards for Nexmark tests
iemejia commented on pull request #11909: URL: https://github.com/apache/beam/pull/11909#issuecomment-641060650 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aromanenko-dev commented on pull request #11396: [BEAM-9742] Add Configurable FluentBackoff to JdbcIO Write
aromanenko-dev commented on pull request #11396: URL: https://github.com/apache/beam/pull/11396#issuecomment-640562379 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #11948: [BEAM-10213] @Ignore: fix the test for testCastToDateWithCase.
amaliujia commented on a change in pull request #11948: URL: https://github.com/apache/beam/pull/11948#discussion_r436967724 ## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ## @@ -3565,19 +3566,16 @@ public void testCaseWithValueNoElseNoMatch() { } @Test - @Ignore( - "Codegen generates code that Janino cannot compile, need further investigation on root" - + " cause.") public void testCastToDateWithCase() { String sql = "SELECT f_int, \n" + "CASE WHEN CHAR_LENGTH(TRIM(f_string)) = 8 \n" + "THEN CAST (CONCAT(\n" -+ " SUBSTR(TRIM(f_string), 0, 4) \n" ++ " SUBSTR(TRIM(f_string), 1, 4) \n" Review comment: SUBSTR count the first char from position 1. ## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ## @@ -3565,19 +3566,16 @@ public void testCaseWithValueNoElseNoMatch() { } @Test - @Ignore( - "Codegen generates code that Janino cannot compile, need further investigation on root" - + " cause.") public void testCastToDateWithCase() { String sql = "SELECT f_int, \n" + "CASE WHEN CHAR_LENGTH(TRIM(f_string)) = 8 \n" + "THEN CAST (CONCAT(\n" -+ " SUBSTR(TRIM(f_string), 0, 4) \n" ++ " SUBSTR(TRIM(f_string), 1, 4) \n" Review comment: SUBSTR counts the first char from position 1. ## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ## @@ -3587,11 +3585,14 @@ public void testCastToDateWithCase() { PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); Schema resultType = -Schema.builder().addInt32Field("f_int").addNullableField("f_date", DATETIME).build(); +Schema.builder() +.addInt64Field("f_long") +.addNullableField("f_date", FieldType.logicalType(SqlTypes.DATE)) +.build(); PAssert.that(stream) .containsInAnyOrder( -Row.withSchema(resultType).addValues(1, parseDate("2018-10-18")).build()); +Row.withSchema(resultType).addValues(1L, parseDateToLocalDate("2018-10-18")).build()); Review comment: Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on a change in pull request #11834: [BEAM-10117] Correct erroneous Job Failed message
chamikaramj commented on a change in pull request #11834: URL: https://github.com/apache/beam/pull/11834#discussion_r436891884 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java ## @@ -137,8 +137,8 @@ void waitForDone() throws Exception { throw e; } } else { -// Job failed, schedule it again. -LOG.info("Job {} failed. retrying.", jobInfo.pendingJob.currentJobId); +// Job not yet complete, schedule it again. Review comment: Hmm, seems like pollJob() call usually returns false if the job failed. https://github.com/apache/beam/blob/2a4092dfb8c46408818402b1c4a09a8cd44e907a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java#L260 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aromanenko-dev commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
aromanenko-dev commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r435359001 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -906,19 +955,110 @@ public void setValueDeserializer(String valueDeserializer) { Coder keyCoder = getKeyCoder(coderRegistry); Coder valueCoder = getValueCoder(coderRegistry); - // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set. - Unbounded> unbounded = - org.apache.beam.sdk.io.Read.from( - toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource()); + if (!isUseSDFTransform() + || !ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api") Review comment: In this case, please, add a comment about that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn opened a new pull request #11954: [Do not merge] Add a unit test exposing BEAM-10217
tvalentyn opened a new pull request #11954: URL: https://github.com/apache/beam/pull/11954 Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)[![Build
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r436922850 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java ## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and design + * doc:https://s.apache.org/beam-fn-api. The major difference from {@link KafkaIO.Read} is, {@link + * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadFromKafkaViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link + * KafkaIO.Read#getKeyCoder()}. + * {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link + * KafkaIO.Read#getValueCoder()}. + * {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same as {@link + *
[GitHub] [beam] robinyqiu commented on a change in pull request #11948: [BEAM-10213] @Ignore: fix the test for testCastToDateWithCase.
robinyqiu commented on a change in pull request #11948: URL: https://github.com/apache/beam/pull/11948#discussion_r436981137 ## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ## @@ -3587,11 +3585,14 @@ public void testCastToDateWithCase() { PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); Schema resultType = -Schema.builder().addInt32Field("f_int").addNullableField("f_date", DATETIME).build(); +Schema.builder() +.addInt64Field("f_long") +.addNullableField("f_date", FieldType.logicalType(SqlTypes.DATE)) +.build(); PAssert.that(stream) .containsInAnyOrder( -Row.withSchema(resultType).addValues(1, parseDate("2018-10-18")).build()); +Row.withSchema(resultType).addValues(1L, parseDateToLocalDate("2018-10-18")).build()); Review comment: I think you don't have to implement a 'parseDateToLocalDate' function yourself. You can use `LocalDate.of(y, m, d)` or 'LocalDate.parse("-mm-dd")' directly. ## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ## @@ -3587,11 +3585,14 @@ public void testCastToDateWithCase() { PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); Schema resultType = -Schema.builder().addInt32Field("f_int").addNullableField("f_date", DATETIME).build(); +Schema.builder() +.addInt64Field("f_long") +.addNullableField("f_date", FieldType.logicalType(SqlTypes.DATE)) +.build(); PAssert.that(stream) .containsInAnyOrder( -Row.withSchema(resultType).addValues(1, parseDate("2018-10-18")).build()); +Row.withSchema(resultType).addValues(1L, parseDateToLocalDate("2018-10-18")).build()); Review comment: I think you don't have to implement a 'parseDateToLocalDate' function yourself. You can use `LocalDate.of(y, m, d)` or `LocalDate.parse("-mm-dd")` directly. ## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ## @@ -3587,11 +3585,14 @@ public void testCastToDateWithCase() { PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); Schema resultType = -Schema.builder().addInt32Field("f_int").addNullableField("f_date", DATETIME).build(); +Schema.builder() +.addInt64Field("f_long") +.addNullableField("f_date", FieldType.logicalType(SqlTypes.DATE)) +.build(); PAssert.that(stream) .containsInAnyOrder( -Row.withSchema(resultType).addValues(1, parseDate("2018-10-18")).build()); +Row.withSchema(resultType).addValues(1L, parseDateToLocalDate("2018-10-18")).build()); Review comment: I think you don't have to implement a `parseDateToLocalDate` function yourself. You can use `LocalDate.of(y, m, d)` or `LocalDate.parse("-mm-dd")` directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org