[GitHub] [beam] jainsourabh2 commented on pull request #11804: Corrected the input to execute the Java JAR file
jainsourabh2 commented on pull request #11804: URL: https://github.com/apache/beam/pull/11804#issuecomment-639997856 tempLocation gives error. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ihji commented on pull request #11942: [BEAM-10208] add cross-language KafkaIO integration test
ihji commented on pull request #11942: URL: https://github.com/apache/beam/pull/11942#issuecomment-639983521 CC: @robertwb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ihji commented on pull request #11942: [BEAM-10208] add cross-language KafkaIO integration test
ihji commented on pull request #11942: URL: https://github.com/apache/beam/pull/11942#issuecomment-639966084 Please also run "Run Python 2 PostCommit" and "Run Python 3.7 PostCommit" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ihji commented on pull request #11942: [BEAM-10208] add cross-language KafkaIO integration test
ihji commented on pull request #11942: URL: https://github.com/apache/beam/pull/11942#issuecomment-639965704 R: @chamikaramj This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ihji opened a new pull request #11942: [BEAM-10208] add cross-language KafkaIO integration test
ihji opened a new pull request #11942: URL: https://github.com/apache/beam/pull/11942 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/last
[GitHub] [beam] lukecwik commented on pull request #11941: [BEAM-2939] Fix splittable DoFn lifecycle.
lukecwik commented on pull request #11941: URL: https://github.com/apache/beam/pull/11941#issuecomment-639964771 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11923: [BEAM-10176] Support STRUCT, FLOAT64, INT64 BigQuery types
pabloem commented on pull request #11923: URL: https://github.com/apache/beam/pull/11923#issuecomment-639949346 Run Python 3.7 PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] epicfaace commented on pull request #11824: [BEAM-10101] Add HttpIO / HttpFileSystem (Python)
epicfaace commented on pull request #11824: URL: https://github.com/apache/beam/pull/11824#issuecomment-639946797 I'll be making changes soon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] reuvenlax commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow
reuvenlax commented on a change in pull request #11929: URL: https://github.com/apache/beam/pull/11929#discussion_r436220739 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java ## @@ -72,10 +79,44 @@ @Experimental(Kind.SCHEMAS) public class JsonToRow { + private static final String LINE_FIELD_NAME = "line"; + private static final String ERROR_FIELD_NAME = "err"; + + public static final Schema ERROR_ROW_SCHEMA = + Schema.of( + Field.of(LINE_FIELD_NAME, FieldType.STRING), + Field.of(ERROR_FIELD_NAME, FieldType.STRING)); + + public static final TupleTag MAIN_TUPLE_TAG = new TupleTag() {}; + public static final TupleTag DEAD_LETTER_TUPLE_TAG = new TupleTag() {}; + public static PTransform, PCollection> withSchema(Schema rowSchema) { return JsonToRowFn.forSchema(rowSchema); } + /** + * Enable Dead letter support. If this value is set errors in the parsing layer are returned as + * Row objects of form: {@link JsonToRow#ERROR_ROW_SCHEMA} line : The original json string err : + * The error message from the parsing function. + * + * You can access the results by using: + * + * {@link JsonToRow#MAIN_TUPLE_TAG} + * + * {@Code PCollection personRows = + * results.get(JsonToRow.MAIN_TUPLE_TAG).setRowSchema(personSchema)} Review comment: +1. If you output.a Row, you should be setting the schema in your transform. ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java ## @@ -116,4 +157,65 @@ private ObjectMapper objectMapper() { return this.objectMapper; } } + + static class JsonToRowWithFailureCaptureFn + extends PTransform, PCollectionTuple> { +private transient volatile @Nullable ObjectMapper objectMapper; +private Schema schema; Review comment: make final ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java ## @@ -72,10 +79,44 @@ @Experimental(Kind.SCHEMAS) public class JsonToRow { + private static final String LINE_FIELD_NAME = "line"; + private static final String ERROR_FIELD_NAME = "err"; + + public static final Schema ERROR_ROW_SCHEMA = + Schema.of( + Field.of(LINE_FIELD_NAME, FieldType.STRING), + Field.of(ERROR_FIELD_NAME, FieldType.STRING)); + Review comment: would be nicer to make these field names configurable, though with defaults. ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java ## @@ -116,4 +157,65 @@ private ObjectMapper objectMapper() { return this.objectMapper; } } + + static class JsonToRowWithFailureCaptureFn + extends PTransform, PCollectionTuple> { Review comment: I think it would be cleaner to wrap this in a custom result class and not expose the TupleTags to users. Look org.apache.beam.sdk.io.gcp.bigquery.WriteResult for an example. ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java ## @@ -116,4 +157,65 @@ private ObjectMapper objectMapper() { return this.objectMapper; } } + + static class JsonToRowWithFailureCaptureFn + extends PTransform, PCollectionTuple> { +private transient volatile @Nullable ObjectMapper objectMapper; +private Schema schema; +private static final String METRIC_NAMESPACE = "JsonToRowFn"; +private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure"; + +private Distribution jsonConversionErrors = +Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME); + +public static final TupleTag main = MAIN_TUPLE_TAG; +public static final TupleTag deadLetter = DEAD_LETTER_TUPLE_TAG; + +PCollection deadLetterCollection; + +static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) { + // Throw exception if this schema is not supported by RowJson + RowJson.verifySchemaSupported(rowSchema); + return new JsonToRowWithFailureCaptureFn(rowSchema); +} + +private JsonToRowWithFailureCaptureFn(Schema schema) { + this.schema = schema; +} + +@Override +public PCollectionTuple expand(PCollection jsonStrings) { + + return jsonStrings.apply( + ParDo.of( + new DoFn() { +@ProcessElement +public void processElement(ProcessContext context) { Review comment: Why not use injected parameters instead of ProcessContext? ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java ## @@ -72,10 +79,44 @@ @Experimental(Kind.SCHEMAS) public class JsonToRow { + private static final String LINE_FIELD_NAME = "line"; + private static final String ERROR_FIELD_NAME = "err"; + + public static final Schema ERROR_ROW_SCHEMA = + Schema.of( + Field.of(LINE_FIELD_NAME, FieldType.STRING), +
[GitHub] [beam] pabloem commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow
pabloem commented on a change in pull request #11929: URL: https://github.com/apache/beam/pull/11929#discussion_r436215155 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java ## @@ -116,4 +157,65 @@ private ObjectMapper objectMapper() { return this.objectMapper; } } + + static class JsonToRowWithFailureCaptureFn + extends PTransform, PCollectionTuple> { +private transient volatile @Nullable ObjectMapper objectMapper; +private Schema schema; +private static final String METRIC_NAMESPACE = "JsonToRowFn"; +private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure"; + +private Distribution jsonConversionErrors = +Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME); + +public static final TupleTag main = MAIN_TUPLE_TAG; +public static final TupleTag deadLetter = DEAD_LETTER_TUPLE_TAG; + +PCollection deadLetterCollection; + +static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) { + // Throw exception if this schema is not supported by RowJson + RowJson.verifySchemaSupported(rowSchema); + return new JsonToRowWithFailureCaptureFn(rowSchema); +} + +private JsonToRowWithFailureCaptureFn(Schema schema) { + this.schema = schema; +} + +@Override +public PCollectionTuple expand(PCollection jsonStrings) { + + return jsonStrings.apply( + ParDo.of( + new DoFn() { +@ProcessElement +public void processElement(ProcessContext context) { + try { +context.output(jsonToRow(objectMapper(), context.element())); + } catch (Exception ex) { +context.output( +deadLetter, +Row.withSchema(ERROR_ROW_SCHEMA) +.addValue(context.element()) +.addValue(ex.getMessage()) +.build()); Review comment: (I am mostly leaning towards not doing this, but lmk what you think) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow
pabloem commented on a change in pull request #11929: URL: https://github.com/apache/beam/pull/11929#discussion_r436214157 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java ## @@ -116,4 +157,65 @@ private ObjectMapper objectMapper() { return this.objectMapper; } } + + static class JsonToRowWithFailureCaptureFn + extends PTransform, PCollectionTuple> { +private transient volatile @Nullable ObjectMapper objectMapper; +private Schema schema; +private static final String METRIC_NAMESPACE = "JsonToRowFn"; +private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure"; + +private Distribution jsonConversionErrors = +Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME); + +public static final TupleTag main = MAIN_TUPLE_TAG; +public static final TupleTag deadLetter = DEAD_LETTER_TUPLE_TAG; + +PCollection deadLetterCollection; + +static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) { + // Throw exception if this schema is not supported by RowJson + RowJson.verifySchemaSupported(rowSchema); + return new JsonToRowWithFailureCaptureFn(rowSchema); +} + +private JsonToRowWithFailureCaptureFn(Schema schema) { + this.schema = schema; +} + +@Override +public PCollectionTuple expand(PCollection jsonStrings) { + + return jsonStrings.apply( + ParDo.of( + new DoFn() { +@ProcessElement +public void processElement(ProcessContext context) { + try { +context.output(jsonToRow(objectMapper(), context.element())); + } catch (Exception ex) { +context.output( +deadLetter, +Row.withSchema(ERROR_ROW_SCHEMA) +.addValue(context.element()) +.addValue(ex.getMessage()) +.build()); Review comment: I guess this doesn't make sense, but - would it help to include the Row Schema that we tried(and failed) to use for this JSON string? Some users may not needed, and others can add it themselves in the downstream ParDo - but it's possible it may help. Thoughts? ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java ## @@ -116,4 +157,65 @@ private ObjectMapper objectMapper() { return this.objectMapper; } } + + static class JsonToRowWithFailureCaptureFn + extends PTransform, PCollectionTuple> { +private transient volatile @Nullable ObjectMapper objectMapper; +private Schema schema; +private static final String METRIC_NAMESPACE = "JsonToRowFn"; +private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure"; + +private Distribution jsonConversionErrors = +Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME); + +public static final TupleTag main = MAIN_TUPLE_TAG; +public static final TupleTag deadLetter = DEAD_LETTER_TUPLE_TAG; + +PCollection deadLetterCollection; + +static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) { + // Throw exception if this schema is not supported by RowJson + RowJson.verifySchemaSupported(rowSchema); + return new JsonToRowWithFailureCaptureFn(rowSchema); +} + +private JsonToRowWithFailureCaptureFn(Schema schema) { + this.schema = schema; +} + +@Override +public PCollectionTuple expand(PCollection jsonStrings) { + + return jsonStrings.apply( + ParDo.of( + new DoFn() { +@ProcessElement +public void processElement(ProcessContext context) { + try { +context.output(jsonToRow(objectMapper(), context.element())); + } catch (Exception ex) { +context.output( +deadLetter, +Row.withSchema(ERROR_ROW_SCHEMA) +.addValue(context.element()) +.addValue(ex.getMessage()) +.build()); + } +} + }) + .withOutputTags(main, TupleTagList.of(deadLetter))); Review comment: you can add the schema for the outputs here, so that users do not need to add them themselves? ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java ## @@ -116,4 +157,65 @@ private ObjectMapper objectMapper() { return this.objectMapper; } } + + static class JsonToRowWithFailureCaptureFn + extends PTransform, PCollectionTuple> { +private transient volatile @Nullable ObjectMapper objectMapper; +private S
[GitHub] [beam] aaltay commented on pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python
aaltay commented on pull request #8457: URL: https://github.com/apache/beam/pull/8457#issuecomment-639912763 > There are still failing tests on #11295. @mf2199 - What is the next step for this PR? PIng on this? What is our plan for this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11923: [BEAM-10176] Support STRUCT, FLOAT64, INT64 BigQuery types
pabloem commented on pull request #11923: URL: https://github.com/apache/beam/pull/11923#issuecomment-639911150 Run Python 3.7 PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on a change in pull request #11702: [BEAM-9990] Add Conditional Update and Conditional Create to FhirIO
pabloem commented on a change in pull request #11702: URL: https://github.com/apache/beam/pull/11702#discussion_r436210077 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java ## @@ -1173,4 +1276,339 @@ public void executeBundles(ProcessContext context) { } } } + + /** + * Create resources fhir io . create resources. + * + * @param the type parameter + * @param fhirStore the fhir store + * @return the fhir io . create resources + */ + public static FhirIO.CreateResources createResources(ValueProvider fhirStore) { +return new CreateResources(fhirStore); + } + + /** + * Create resources fhir io . create resources. + * + * @param the type parameter + * @param fhirStore the fhir store + * @return the fhir io . create resources + */ + public static FhirIO.CreateResources createResources(String fhirStore) { +return new CreateResources(fhirStore); + } + /** + * {@link PTransform} for Creating FHIR resources. + * + * https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create + */ + public static class CreateResources extends PTransform, Write.Result> { +private final String fhirStore; +private SerializableFunction ifNoneExistFunction; +private SerializableFunction formatBodyFunction; +private SerializableFunction typeFunction; +private static final Logger LOG = LoggerFactory.getLogger(CreateResources.class); + +/** + * Instantiates a new Create resources transform. + * + * @param fhirStore the fhir store + */ +CreateResources(ValueProvider fhirStore) { + this.fhirStore = fhirStore.get(); +} + +/** + * Instantiates a new Create resources. + * + * @param fhirStore the fhir store + */ +CreateResources(String fhirStore) { + this.fhirStore = fhirStore; +} + +/** + * This adds a {@link SerializableFunction} that reads an resource string and extracts an + * If-None-Exists query for conditional create. Typically this will just be extracting an ID to + * look for. + * + * https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create + * + * @param ifNoneExistFunction the if none exist function + * @return the create resources + */ +public CreateResources withIfNotExistFunction( +SerializableFunction ifNoneExistFunction) { + this.ifNoneExistFunction = ifNoneExistFunction; + return this; +} + +/** + * This adds a {@link SerializableFunction} that reads an resource string and extracts an + * resource type. + * + * https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create + * + * @param typeFunction for extracting type from a resource. + * @return the create resources + */ +public CreateResources withTypeFunction(SerializableFunction typeFunction) { + this.typeFunction = typeFunction; + return this; +} +/** + * With format body function create resources. + * + * @param formatBodyFunction the format body function + * @return the create resources + */ +public CreateResources withFormatBodyFunction( Review comment: I don't think I understand this function very well. It seems like a fn to format a resource properly in case its formatting is not correct? Could you detail the documentation for it? ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java ## @@ -155,17 +168,53 @@ * FhirIO.Write.Result writeResult = * output.apply("Execute FHIR Bundles", FhirIO.executeBundles(options.getExistingFhirStore())); * + * // Alternatively you could use import for high throughput to a new store. + * FhirIO.Write.Result writeResult = + * output.apply("Import FHIR Resources", FhirIO.executeBundles(options.getNewFhirStore())); + * // [End Writing ] + * * PCollection> failedBundles = writeResult.getFailedInsertsWithErr(); * + * // [Begin Writing to Dead Letter Queue] * failedBundles.apply("Write failed bundles to BigQuery", * BigQueryIO * .write() * .to(option.getBQFhirExecuteBundlesDeadLetterTable()) * .withFormatFunction(new HealthcareIOErrorToTableRow())); + * // [End Writing to Dead Letter Queue] + * + * // Alternatively you may want to handle DeadLetter with conditional update + * // [Begin Reconciliation with Conditional Update] + * failedBundles + * .apply("Reconcile with Conditional Update", + * FhirIO.ConditionalUpdate(fhirStore) + * .withFormatBodyFunction(HealthcareIOError::getDataResource) + * .withTypeFunction((HealthcareIOError err) -> { + * String body = err.getDataResource(); + *
[GitHub] [beam] lukecwik commented on pull request #11941: [BEAM-2939] Fix splittable DoFn lifecycle.
lukecwik commented on pull request #11941: URL: https://github.com/apache/beam/pull/11941#issuecomment-639897770 R: @boyuanzz @youngoli This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik opened a new pull request #11941: [BEAM-2939] Fix splittable DoFn lifecycle.
lukecwik opened a new pull request #11941: URL: https://github.com/apache/beam/pull/11941 getInitialRestriction/splitAndSize should not be wrapped with startBundle/FinishBundle invocations. Instead of copying the stateAccessor initialization (used for side inputs) I made it so that it was initialized only once and cleaned up the caches/references in the finalizeState call. Tested within Google using runner_v2. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_Po
[GitHub] [beam] pabloem commented on pull request #11824: [BEAM-10101] Add HttpIO / HttpFileSystem (Python)
pabloem commented on pull request #11824: URL: https://github.com/apache/beam/pull/11824#issuecomment-639885325 @epicfaace lmk what are your plans for this PR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11940: [BEAM-6215] Additional tests for FlatMap label.
pabloem commented on pull request #11940: URL: https://github.com/apache/beam/pull/11940#issuecomment-639884926 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow
pabloem commented on pull request #11929: URL: https://github.com/apache/beam/pull/11929#issuecomment-639885056 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11923: [BEAM-10176] Support STRUCT, FLOAT64, INT64 BigQuery types
pabloem commented on pull request #11923: URL: https://github.com/apache/beam/pull/11923#issuecomment-639882291 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] udim commented on a change in pull request #11939: [BEAM-10197] Support typehints for Python's frozenset
udim commented on a change in pull request #11939: URL: https://github.com/apache/beam/pull/11939#discussion_r436187871 ## File path: sdks/python/apache_beam/typehints/typehints_test.py ## @@ -612,54 +613,70 @@ def test_match_type_variables(self): hint.match_type_variables(typehints.Dict[int, str])) -class SetHintTestCase(TypeHintTestCase): +class BaseSetHintTest: + def __init__(self, string_type, py_type, beam_type, *args, **kwargs): Review comment: You should omit `*args, **kwargs` if there aren't any more args allowed. This is the style we follow in this codebase. Here and below as well. For example: if some code called `BaseSetHintTest('Set', set, typehints.Set, typehints.FrozenSet)` would you rather it be silently ignored or raised as an incorrect number of arguments? ## File path: sdks/python/apache_beam/typehints/typehints_test.py ## @@ -612,54 +613,70 @@ def test_match_type_variables(self): hint.match_type_variables(typehints.Dict[int, str])) -class SetHintTestCase(TypeHintTestCase): +class BaseSetHintTest: Review comment: If `BaseSetHintTest` inherited from `TypeHintTestCase` you could avoid multiple inheritance in the sub-classes below. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem merged pull request #11931: [BEAM-10145] Delete persistent disks after every KafkaIO performance test run
pabloem merged pull request #11931: URL: https://github.com/apache/beam/pull/11931 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] apilloud commented on a change in pull request #11820: [BEAM-10093] ZetaSql Nexmark variant
apilloud commented on a change in pull request #11820: URL: https://github.com/apache/beam/pull/11820#discussion_r436191652 ## File path: sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlBoundedSideInputJoinTest.java ## @@ -48,166 +47,182 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.experimental.categories.Category; +import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** Test the various NEXMark queries yield results coherent with their models. */ -@RunWith(JUnit4.class) +@RunWith(Enclosed.class) public class SqlBoundedSideInputJoinTest { - @Rule public TestPipeline p = TestPipeline.create(); + private abstract static class SqlBoundedSideInputJoinTestCases { - @Before - public void setupPipeline() { -NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p); - } +protected abstract SqlBoundedSideInputJoin getQuery(NexmarkConfiguration configuration); + +@Rule public TestPipeline p = TestPipeline.create(); + +@Before +public void setupPipeline() { + NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p); +} - /** Test {@code query} matches {@code model}. */ - private void queryMatchesModel( - String name, - NexmarkConfiguration config, - NexmarkQueryTransform query, - NexmarkQueryModel model, - boolean streamingMode) - throws Exception { - -ResourceId sideInputResourceId = -FileSystems.matchNewResource( -String.format( -"%s/JoinToFiles-%s", p.getOptions().getTempLocation(), new Random().nextInt()), -false); -config.sideInputUrl = sideInputResourceId.toString(); - -try { +/** Test {@code query} matches {@code model}. */ +private void queryMatchesModel( +String name, +NexmarkConfiguration config, +NexmarkQueryTransform query, +NexmarkQueryModel model, +boolean streamingMode) +throws Exception { + + ResourceId sideInputResourceId = + FileSystems.matchNewResource( + String.format( + "%s/JoinToFiles-%s", p.getOptions().getTempLocation(), new Random().nextInt()), + false); + config.sideInputUrl = sideInputResourceId.toString(); + + try { +PCollection> sideInput = NexmarkUtils.prepareSideInput(p, config); +query.setSideInput(sideInput); + +PCollection events = +p.apply( +name + ".Read", +streamingMode +? NexmarkUtils.streamEventsSource(config) +: NexmarkUtils.batchEventsSource(config)); + +PCollection> results = +(PCollection>) events.apply(new NexmarkQuery<>(config, query)); +PAssert.that(results).satisfies(model.assertionFor()); +PipelineResult result = p.run(); +result.waitUntilFinish(); + } finally { +NexmarkUtils.cleanUpSideInput(config); + } +} + +/** + * A smoke test that the count of input bids and outputs are the same, to help diagnose + * flakiness in more complex tests. + */ +@Test +public void inputOutputSameEvents() throws Exception { + NexmarkConfiguration config = NexmarkConfiguration.DEFAULT.copy(); + config.sideInputType = NexmarkUtils.SideInputType.DIRECT; + config.numEventGenerators = 1; + config.numEvents = 5000; + config.sideInputRowCount = 10; + config.sideInputNumShards = 3; PCollection> sideInput = NexmarkUtils.prepareSideInput(p, config); - query.setSideInput(sideInput); - - PCollection events = - p.apply( - name + ".Read", - streamingMode - ? NexmarkUtils.streamEventsSource(config) - : NexmarkUtils.batchEventsSource(config)); - - PCollection> results = - (PCollection>) events.apply(new NexmarkQuery<>(config, query)); - PAssert.that(results).satisfies(model.assertionFor()); - PipelineResult result = p.run(); - result.waitUntilFinish(); -} finally { - NexmarkUtils.cleanUpSideInput(config); + + try { +PCollection input = p.apply(NexmarkUtils.batchEventsSource(config)); +PCollection justBids = input.apply(NexmarkQueryUtil.JUST_BIDS); +PCollection bidCount = justBids.apply("Count Bids", Count.globally()); + +NexmarkQueryTransform query = getQuery(config); +query.setSideInput(sideInput); + +PCollection> output = +(PCollection>) input.apply(new NexmarkQuery(config, query)); +PCollection outputCount = output.apply("Count outputs", Count.globally()); + + PAssert.that(PCollectionList.of(bidCount).and(outputCount).apply(Flatten.pCollections())) +.satisfies( +counts -> { + assertTha
[GitHub] [beam] udim commented on pull request #11939: [BEAM-10197] Support typehints for Python's frozenset
udim commented on pull request #11939: URL: https://github.com/apache/beam/pull/11939#issuecomment-639864241 random comment to trigger tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] udim edited a comment on pull request #11939: [BEAM-10197] Support typehints for Python's frozenset
udim edited a comment on pull request #11939: URL: https://github.com/apache/beam/pull/11939#issuecomment-639864241 arbitrary comment to trigger tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #11868: [BEAM-9363] Support HOP and SESSION as TVF
amaliujia commented on a change in pull request #11868: URL: https://github.com/apache/beam/pull/11868#discussion_r436188166 ## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ## @@ -4805,6 +4805,93 @@ public void testTVFTumbleAggregation() { pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } + @Test Review comment: Good point. It can be a good point to move all streaming tests to another place. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r436187245 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java ## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and design + * doc:https://s.apache.org/beam-fn-api. The major difference from {@link KafkaIO.Read} is, {@link + * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadFromKafkaViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link + * KafkaIO.Read#getKeyCoder()}. + * {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link + * KafkaIO.Read#getValueCoder()}. + * {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same as {@link + * KafkaIO.Read#getKeyDeserializer
[GitHub] [beam] robinyqiu commented on a change in pull request #11868: [BEAM-9363] Support HOP and SESSION as TVF
robinyqiu commented on a change in pull request #11868: URL: https://github.com/apache/beam/pull/11868#discussion_r436178923 ## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ## @@ -4805,6 +4805,93 @@ public void testTVFTumbleAggregation() { pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } + @Test Review comment: +1. IIRC there are some other tests in this file that are testing our streaming extension. It makes sense to separate them to other file. ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TVFToPTransform.java ## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl; + +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall; + +/** Provides a function that produces a PCollection based on TVF and upstream PCollection. */ +public interface TVFToPTransform { Review comment: +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on a change in pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.
ibzib commented on a change in pull request #11932: URL: https://github.com/apache/beam/pull/11932#discussion_r436178918 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactRetrievalService.java ## @@ -89,34 +92,41 @@ public void resolveArtifacts( public void getArtifact( ArtifactApi.GetArtifactRequest request, StreamObserver responseObserver) { -switch (request.getArtifact().getTypeUrn()) { +try { + InputStream inputStream = getArtifact(request.getArtifact()); + byte[] buffer = new byte[bufferSize]; + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) > 0) { +responseObserver.onNext( +ArtifactApi.GetArtifactResponse.newBuilder() +.setData(ByteString.copyFrom(buffer, 0, bytesRead)) +.build()); + } + responseObserver.onCompleted(); +} catch (IOException exn) { + exn.printStackTrace(); + responseObserver.onError(exn); Review comment: Should we have wrapped this exception in a StatusException as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on pull request #11626: Cleanup ToString transforms.
robertwb commented on pull request #11626: URL: https://github.com/apache/beam/pull/11626#issuecomment-639845882 R: @y1chi This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on pull request #11940: [BEAM-6215] Additional tests for FlatMap label.
robertwb commented on pull request #11940: URL: https://github.com/apache/beam/pull/11940#issuecomment-639844043 R: @pabloem This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb opened a new pull request #11940: [BEAM-6215] Additional tests for FlatMap label.
robertwb opened a new pull request #11940: URL: https://github.com/apache/beam/pull/11940 This does not appear to be broken, but additional tests are good. Also cleaned up unneeded use of raw strings. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/
[GitHub] [beam] amaliujia merged pull request #11933: [BEAM-10204] @Ignore: re-enable LIKE operator related unit tests.
amaliujia merged pull request #11933: URL: https://github.com/apache/beam/pull/11933 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r436179356 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -1055,29 +1144,6 @@ public void populateDisplayData(DisplayData.Builder builder) { private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class); - /** Review comment: This common part is moved to the KafkaIOUtil.java This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r436177315 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -949,45 +1077,6 @@ public void setValueDeserializer(String valueDeserializer) { final SerializableFunction, OutT> fn) { return record -> fn.apply(record.getKV()); } - /// Review comment: This common part is moved to the KafkaIOUtil.java. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kennknowles commented on a change in pull request #11924: [BEAM-8543] Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle
kennknowles commented on a change in pull request #11924: URL: https://github.com/apache/beam/pull/11924#discussion_r436173779 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -3938,10 +3939,11 @@ public void testEventTimeTimerOrdering() throws Exception { ValidatesRunner.class, UsesTimersInParDo.class, UsesStatefulParDo.class, + UsesUnboundedPCollections.class, UsesStrictTimerOrdering.class }) public void testEventTimeTimerOrderingWithCreate() throws Exception { - final int numTestElements = 100; + final int numTestElements = 5; Review comment: Why shrink it? Does the test get really slow? Is this going to be a perf problem overall? ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java ## @@ -577,12 +583,21 @@ public void flushState() { WindmillTimerInternals.windmillTimerToTimerData( WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder)) .iterator(); + +cachedFiredUserTimers.forEachRemaining(toBeFiredTimersOrdered::add); + } + + Instant currentInputWatermark = userTimerInternals.currentInputWatermarkTime(); + if (userTimerInternals.hasTimerBefore(currentInputWatermark)) { +while (!toBeFiredTimersOrdered.isEmpty()) { + userTimerInternals.setTimer(toBeFiredTimersOrdered.poll()); +} } Review comment: Yea I don't actually understand what this block is for. FWIW to do timer deletion/reset cheaply without building a bespoke data structure just keep a map from id to firing time or tombstone. This way, whenever a timer comes up in the prio queue you pull out the actual time for it from the map. If it is actually set for another time, don't fire it. If it is obsolete, don't fire it. ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java ## @@ -577,12 +583,21 @@ public void flushState() { WindmillTimerInternals.windmillTimerToTimerData( WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder)) .iterator(); + +cachedFiredUserTimers.forEachRemaining(toBeFiredTimersOrdered::add); Review comment: Do we even need `cachedFiredUserTimers`? It seems obsolete if we populate the priority queue. The name is also wrong - even before this PR it wasn't a cache. It is a lazily initialized iterator. Instead, we should have a lazily initialized priority queue (like you do) and just a flag to say whether the incoming timers have been loaded yet. ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -4040,7 +4043,8 @@ public void onTimer( } }; - PCollection output = pipeline.apply(transform).apply(ParDo.of(fn)); + PCollection output = + pipeline.apply(transform).setIsBoundedInternal(IsBounded.UNBOUNDED).apply(ParDo.of(fn)); Review comment: Should not be calling `setIsBoundedInternal` here. Is this just to force streaming mode? We need to just create a separate run of ValidatesRunner that forces streaming mode. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] saavan-google-intern commented on pull request #11939: [BEAM-10197] Support typehints for Python's frozenset
saavan-google-intern commented on pull request #11939: URL: https://github.com/apache/beam/pull/11939#issuecomment-639824517 @udim This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] saavan-google-intern opened a new pull request #11939: [BEAM-10197] Support typehints for Python's frozenset
saavan-google-intern opened a new pull request #11939: URL: https://github.com/apache/beam/pull/11939 This PR improves Beam's parameterized type hint coverage by adding support for Python's frozenset container. Type annotations should now work with frozenset, typing.FrozenSet, and typehints.FrozenSet. The same level of unit test coverage that exists for set has been added for frozenset. When possible, similar tests were refactored via inheritance to support both set and frozenset to make the code extensible and reduce duplication. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_Pos
[GitHub] [beam] lukecwik merged pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn
lukecwik merged pull request #11922: URL: https://github.com/apache/beam/pull/11922 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi commented on pull request #11916: [BEAM-10189] Add ValueState user state to python sdk
y1chi commented on pull request #11916: URL: https://github.com/apache/beam/pull/11916#issuecomment-639789507 > Yes, the plan was to consider changing Java too, though that's harder due to backwards compatibility issues. Renamed to ReadModifyWriteState. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] reuvenlax commented on a change in pull request #11924: [BEAM-8543] Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle
reuvenlax commented on a change in pull request #11924: URL: https://github.com/apache/beam/pull/11924#discussion_r436147265 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java ## @@ -577,12 +583,21 @@ public void flushState() { WindmillTimerInternals.windmillTimerToTimerData( WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder)) .iterator(); + +cachedFiredUserTimers.forEachRemaining(toBeFiredTimersOrdered::add); + } + + Instant currentInputWatermark = userTimerInternals.currentInputWatermarkTime(); + if (userTimerInternals.hasTimerBefore(currentInputWatermark)) { +while (!toBeFiredTimersOrdered.isEmpty()) { + userTimerInternals.setTimer(toBeFiredTimersOrdered.poll()); +} } Review comment: @kennknowles for comment. This doesn't look right to me, as I don't think we should be modifying the WindmillTimerInternals here. I think we just want to merge the timer modifications from processing the workitem into this priority queue; note that if timers are deleted, we need to detect that as well and remove from the priority queue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn
lukecwik commented on pull request #11922: URL: https://github.com/apache/beam/pull/11922#issuecomment-639757431 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r436123330 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java ## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and design + * doc:https://s.apache.org/beam-fn-api. The major difference from {@link KafkaIO.Read} is, {@link + * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadFromKafkaViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link + * KafkaIO.Read#getKeyCoder()}. + * {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link + * KafkaIO.Read#getValueCoder()}. + * {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same as {@link + * KafkaIO.Read#getKeyDeserializer
[GitHub] [beam] lostluck merged pull request #11937: [BEAM-9615] Remove LP indicator on string methods.
lostluck merged pull request #11937: URL: https://github.com/apache/beam/pull/11937 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn
boyuanzz commented on a change in pull request #11922: URL: https://github.com/apache/beam/pull/11922#discussion_r436116972 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -577,26 +586,83 @@ public Instant timestamp(DoFn doFn) { switch (pTransform.getSpec().getUrn()) { case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN: this.convertSplitResultToWindowedSplitResult = -(splitResult, watermarkEstimatorState) -> -WindowedSplitResult.forRoots( -WindowedValue.of( -KV.of( -currentElement.getValue(), -KV.of(splitResult.getPrimary(), currentWatermarkEstimatorState)), -currentElement.getTimestamp(), -currentWindow, -currentElement.getPane()), -WindowedValue.of( -KV.of( -currentElement.getValue(), -KV.of(splitResult.getResidual(), watermarkEstimatorState)), -currentElement.getTimestamp(), -currentWindow, -currentElement.getPane())); +(splitResult, watermarkEstimatorState) -> { + List primaryFullyProcessedWindows = + ImmutableList.copyOf( + Iterables.limit( + currentElement.getWindows(), currentWindowIterator.previousIndex())); + // Advances the iterator consuming the remaining windows. + List residualUnprocessedWindows = + ImmutableList.copyOf(currentWindowIterator); Review comment: Ah I see. It's different from what I know about an `Iterator`. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb opened a new pull request #11938: [BEAM-9577] Remove uses of legacy artifact service in Java.
robertwb opened a new pull request #11938: URL: https://github.com/apache/beam/pull/11938 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/
[GitHub] [beam] lukecwik commented on a change in pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn
lukecwik commented on a change in pull request #11922: URL: https://github.com/apache/beam/pull/11922#discussion_r436113097 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -296,6 +299,12 @@ public void accept(WindowedValue input) throws Exception { /** Only valid during {@code processElement...} methods, null otherwise. */ private WindowedValue currentElement; + /** + * Only valid during {@link #processElementForSizedElementAndRestriction} and {@link + * #processElementForSizedElementAndRestriction}. Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj merged pull request #11846: [BEAM-9869] adding self-contained Kafka service jar for testing
chamikaramj merged pull request #11846: URL: https://github.com/apache/beam/pull/11846 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11846: [BEAM-9869] adding self-contained Kafka service jar for testing
chamikaramj commented on pull request #11846: URL: https://github.com/apache/beam/pull/11846#issuecomment-639725186 LGTM. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn
lukecwik commented on a change in pull request #11922: URL: https://github.com/apache/beam/pull/11922#discussion_r436112241 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -941,20 +1047,59 @@ private Progress getProgress() { convertSplitResultToWindowedSplitResult.apply(result, watermarkAndState.getValue()); } +List primaryRoots = new ArrayList<>(); Review comment: Yes. As shown in the test the self checkpoint will checkpoint for the "remaining" windows as well. I would like to leave it as is until we can build consensus around what we should do in these use cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn
lukecwik commented on a change in pull request #11922: URL: https://github.com/apache/beam/pull/11922#discussion_r436111838 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -632,6 +698,17 @@ public Object restriction() { } }); return WindowedSplitResult.forRoots( + primaryFullyProcessedWindows.isEmpty() + ? null + : WindowedValue.of( + KV.of( + KV.of( + currentElement.getValue(), + KV.of(currentRestriction, currentWatermarkEstimatorState)), + fullSize), Review comment: I don't think so but this warrants a larger discussion about what does an element+restriction in multiple windows mean and how that it impacts splitting/sizing. I would like to leave it as is until we can build that consensus. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn
lukecwik commented on a change in pull request #11922: URL: https://github.com/apache/beam/pull/11922#discussion_r43686 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -577,26 +586,83 @@ public Instant timestamp(DoFn doFn) { switch (pTransform.getSpec().getUrn()) { case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN: this.convertSplitResultToWindowedSplitResult = -(splitResult, watermarkEstimatorState) -> -WindowedSplitResult.forRoots( -WindowedValue.of( -KV.of( -currentElement.getValue(), -KV.of(splitResult.getPrimary(), currentWatermarkEstimatorState)), -currentElement.getTimestamp(), -currentWindow, -currentElement.getPane()), -WindowedValue.of( -KV.of( -currentElement.getValue(), -KV.of(splitResult.getResidual(), watermarkEstimatorState)), -currentElement.getTimestamp(), -currentWindow, -currentElement.getPane())); +(splitResult, watermarkEstimatorState) -> { + List primaryFullyProcessedWindows = + ImmutableList.copyOf( + Iterables.limit( + currentElement.getWindows(), currentWindowIterator.previousIndex())); + // Advances the iterator consuming the remaining windows. + List residualUnprocessedWindows = + ImmutableList.copyOf(currentWindowIterator); Review comment: ImmutableList.copyOf(iterator) drains all the remaining elements from iterator. It is effectively: ``` while (iterator.hasNext()) { list.add(iterator.next()); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn
lukecwik commented on a change in pull request #11922: URL: https://github.com/apache/beam/pull/11922#discussion_r436111296 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -577,26 +586,83 @@ public Instant timestamp(DoFn doFn) { switch (pTransform.getSpec().getUrn()) { case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN: this.convertSplitResultToWindowedSplitResult = -(splitResult, watermarkEstimatorState) -> -WindowedSplitResult.forRoots( -WindowedValue.of( -KV.of( -currentElement.getValue(), -KV.of(splitResult.getPrimary(), currentWatermarkEstimatorState)), -currentElement.getTimestamp(), -currentWindow, -currentElement.getPane()), -WindowedValue.of( -KV.of( -currentElement.getValue(), -KV.of(splitResult.getResidual(), watermarkEstimatorState)), -currentElement.getTimestamp(), -currentWindow, -currentElement.getPane())); +(splitResult, watermarkEstimatorState) -> { + List primaryFullyProcessedWindows = + ImmutableList.copyOf( + Iterables.limit( + currentElement.getWindows(), currentWindowIterator.previousIndex())); + // Advances the iterator consuming the remaining windows. + List residualUnprocessedWindows = + ImmutableList.copyOf(currentWindowIterator); + return WindowedSplitResult.forRoots( + primaryFullyProcessedWindows.isEmpty() + ? null + : WindowedValue.of( + KV.of( + currentElement.getValue(), + KV.of(currentRestriction, currentWatermarkEstimatorState)), + currentElement.getTimestamp(), + primaryFullyProcessedWindows, + currentElement.getPane()), + WindowedValue.of( + KV.of( + currentElement.getValue(), + KV.of(splitResult.getPrimary(), currentWatermarkEstimatorState)), + currentElement.getTimestamp(), + currentWindow, + currentElement.getPane()), + WindowedValue.of( + KV.of( + currentElement.getValue(), + KV.of(splitResult.getResidual(), watermarkEstimatorState)), + currentElement.getTimestamp(), + currentWindow, + currentElement.getPane()), + residualUnprocessedWindows.isEmpty() + ? null + : WindowedValue.of( + KV.of( + currentElement.getValue(), + KV.of(currentRestriction, currentWatermarkEstimatorState)), + currentElement.getTimestamp(), + residualUnprocessedWindows, + currentElement.getPane())); +}; break; case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN: this.convertSplitResultToWindowedSplitResult = (splitResult, watermarkEstimatorState) -> { + List primaryFullyProcessedWindows = + ImmutableList.copyOf( + Iterables.limit( + currentElement.getWindows(), currentWindowIterator.previousIndex())); + // Advances the iterator consuming the remaining windows. + List residualUnprocessedWindows = + ImmutableList.copyOf(currentWindowIterator); Review comment: ImmutableList.copyOf(iterator) drains all the remaining elements from iterator. It is effectively: ``` while (iterator.hasNext()) { list.add(iterator.next()); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on pull request #11937: [BEAM-9615] Remove LP indicator on string methods.
lostluck commented on pull request #11937: URL: https://github.com/apache/beam/pull/11937#issuecomment-639718856 R: @tysonjh This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck opened a new pull request #11937: [BEAM-9615] Remove LP indicator on string methods.
lostluck opened a new pull request #11937: URL: https://github.com/apache/beam/pull/11937 Missed commit from PR 11925. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.o
[GitHub] [beam] amaliujia commented on a change in pull request #11933: [BEAM-10204] @Ignore: re-enable LIKE operator related unit tests.
amaliujia commented on a change in pull request #11933: URL: https://github.com/apache/beam/pull/11933#discussion_r436101952 ## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ## @@ -837,7 +834,6 @@ public void testLikeAllowsEscapingBackslash() { } @Test - @Ignore("Currently non UTF-8 values are coerced to UTF-8") public void testThrowsErrorForNonUTF8() { Review comment: PR updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #11933: [BEAM-10204] @Ignore: re-enable LIKE operator related unit tests.
amaliujia commented on a change in pull request #11933: URL: https://github.com/apache/beam/pull/11933#discussion_r436094165 ## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ## @@ -837,7 +834,6 @@ public void testLikeAllowsEscapingBackslash() { } @Test - @Ignore("Currently non UTF-8 values are coerced to UTF-8") public void testThrowsErrorForNonUTF8() { Review comment: This is a good point. I will choose to remove this test. I tried to dig into ZetaSQL's documentation and internal tests suite, there was no clear explanation how to deal with nonUTF8 chars by LIKE operator (there are code-generated nonUTF8 test cases, but they don't have clear comments to demonstrate their purpose). So I will leave this part to be tested by internal test suite. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on pull request #11925: [BEAM-9615] Add string coder utility functions.
lostluck commented on pull request #11925: URL: https://github.com/apache/beam/pull/11925#issuecomment-639698549 Ah dang it. I thought I had pushed the commit with the rename, but it was waiting on a password. I'll have that as another PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck merged pull request #11927: [BEAM-9615] finish standardizing proto import names
lostluck merged pull request #11927: URL: https://github.com/apache/beam/pull/11927 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck edited a comment on pull request #11927: [BEAM-9615] finish standardizing proto import names
lostluck edited a comment on pull request #11927: URL: https://github.com/apache/beam/pull/11927#issuecomment-639695778 @lukecwik I'm 100% certain that the Java PreCommit failure here is unrelated to the change. 1. The short names are for programmer convenience and shouldn't change the compiled binary. 2. [That test](https://builds.apache.org/job/beam_PreCommit_Java_Phrase/2291/testReport/org.apache.beam.sdk.extensions.ml/VideoIntelligenceIT/annotateVideoFromURINoContext/) doesn't even run the boot binary. @tysonjh Filed it as https://issues.apache.org/jira/browse/BEAM-10206 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11922: [BEAM-2939] Fix FnApiDoFnRunner to ensure that we output within the correct window when processing a splittable dofn
boyuanzz commented on a change in pull request #11922: URL: https://github.com/apache/beam/pull/11922#discussion_r436072548 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -296,6 +299,12 @@ public void accept(WindowedValue input) throws Exception { /** Only valid during {@code processElement...} methods, null otherwise. */ private WindowedValue currentElement; + /** + * Only valid during {@link #processElementForSizedElementAndRestriction} and {@link + * #processElementForSizedElementAndRestriction}. Review comment: Duplicated {@link #processElementForSizedElementAndRestriction}? ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -577,26 +586,83 @@ public Instant timestamp(DoFn doFn) { switch (pTransform.getSpec().getUrn()) { case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN: this.convertSplitResultToWindowedSplitResult = -(splitResult, watermarkEstimatorState) -> -WindowedSplitResult.forRoots( -WindowedValue.of( -KV.of( -currentElement.getValue(), -KV.of(splitResult.getPrimary(), currentWatermarkEstimatorState)), -currentElement.getTimestamp(), -currentWindow, -currentElement.getPane()), -WindowedValue.of( -KV.of( -currentElement.getValue(), -KV.of(splitResult.getResidual(), watermarkEstimatorState)), -currentElement.getTimestamp(), -currentWindow, -currentElement.getPane())); +(splitResult, watermarkEstimatorState) -> { + List primaryFullyProcessedWindows = + ImmutableList.copyOf( + Iterables.limit( + currentElement.getWindows(), currentWindowIterator.previousIndex())); + // Advances the iterator consuming the remaining windows. + List residualUnprocessedWindows = + ImmutableList.copyOf(currentWindowIterator); Review comment: `currentWindowIterator .next()`? ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -941,20 +1047,59 @@ private Progress getProgress() { convertSplitResultToWindowedSplitResult.apply(result, watermarkAndState.getValue()); } +List primaryRoots = new ArrayList<>(); Review comment: This also takes care of self checkpoint, right? ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -577,26 +586,83 @@ public Instant timestamp(DoFn doFn) { switch (pTransform.getSpec().getUrn()) { case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN: this.convertSplitResultToWindowedSplitResult = -(splitResult, watermarkEstimatorState) -> -WindowedSplitResult.forRoots( -WindowedValue.of( -KV.of( -currentElement.getValue(), -KV.of(splitResult.getPrimary(), currentWatermarkEstimatorState)), -currentElement.getTimestamp(), -currentWindow, -currentElement.getPane()), -WindowedValue.of( -KV.of( -currentElement.getValue(), -KV.of(splitResult.getResidual(), watermarkEstimatorState)), -currentElement.getTimestamp(), -currentWindow, -currentElement.getPane())); +(splitResult, watermarkEstimatorState) -> { + List primaryFullyProcessedWindows = + ImmutableList.copyOf( + Iterables.limit( + currentElement.getWindows(), currentWindowIterator.previousIndex())); + // Advances the iterator consuming the remaining windows. + List residualUnprocessedWindows = + ImmutableList.copyOf(currentWindowIterator); + return WindowedSplitResult.forRoots( + primaryFullyProcessedWindows.isEmpty() + ? null + : WindowedValue.of( + KV.of( + currentElement.getValue(), + KV.of(currentRestriction, currentWatermarkEstimatorState)), + currentElement.getTimestamp(), + primaryFullyProcessedWindows, + currentElement.getPane()), +
[GitHub] [beam] amaliujia commented on a change in pull request #11933: [BEAM-10204] @Ignore: re-enable LIKE operator related unit tests.
amaliujia commented on a change in pull request #11933: URL: https://github.com/apache/beam/pull/11933#discussion_r436094165 ## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ## @@ -837,7 +834,6 @@ public void testLikeAllowsEscapingBackslash() { } @Test - @Ignore("Currently non UTF-8 values are coerced to UTF-8") public void testThrowsErrorForNonUTF8() { Review comment: This is a good point. I will choose to remove this test. I tried to dig into ZetaSQL's documentation and internal tests suite, there was no clear explanation how to deal with nonUTF8 chars by LIKE operator (there are code-generated nonUTF8 test cases, but they don't have clear comments to demonstrate their purpose) So I will leave this part to be tested by internal test suite. ## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ## @@ -837,7 +834,6 @@ public void testLikeAllowsEscapingBackslash() { } @Test - @Ignore("Currently non UTF-8 values are coerced to UTF-8") public void testThrowsErrorForNonUTF8() { Review comment: This is a good point. I will choose to remove this test. I tried to dig into ZetaSQL's documentation and internal tests suite, there was no clear explanation how to deal with nonUTF8 chars by LIKE operator (there are code-generated nonUTF8 test cases, but they don't have clear comments to demonstrate their purpose). So I will leave this part to be tested by internal test suite. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on pull request #11927: [BEAM-9615] finish standardizing proto import names
lostluck commented on pull request #11927: URL: https://github.com/apache/beam/pull/11927#issuecomment-639695778 @lukecwik I'm 100% certain that the Java PreCommit failure here is unrelated to the change. 1. The short names are for programmer convenience and don't change the boot binary at all. [That test](https://builds.apache.org/job/beam_PreCommit_Java_Phrase/2291/testReport/org.apache.beam.sdk.extensions.ml/VideoIntelligenceIT/annotateVideoFromURINoContext/) doesn't even run the boot binary. @tysonjh Filed it as https://issues.apache.org/jira/browse/BEAM-10206 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #11925: [BEAM-9615] Add string coder utility functions.
lostluck commented on a change in pull request #11925: URL: https://github.com/apache/beam/pull/11925#discussion_r436095065 ## File path: sdks/go/pkg/beam/core/graph/coder/stringutf8_test.go ## @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coder + +import ( + "bytes" + "encoding/base64" + "io" + "strings" + "testing" + "unicode/utf8" +) + +var testValues = []string{ + "", + "a", + "13", + "hello", + "a longer string with spaces and all that", + "a string with a \n newline", + "スタリング", + "I am the very model of a modern major general.\nI've information animal, vegetable, and mineral", +} + +// Base64 encoded versions of the above strings, without the length prefix. +var testEncodings = []string{ + "", + "YQ", + "MTM", + "aGVsbG8", + "YSBsb25nZXIgc3RyaW5nIHdpdGggc3BhY2VzIGFuZCBhbGwgdGhhdA", + "YSBzdHJpbmcgd2l0aCBhIAogbmV3bGluZQ", + "44K544K_44Oq44Oz44Kw", + "SSBhbSB0aGUgdmVyeSBtb2RlbCBvZiBhIG1vZGVybiBtYWpvciBnZW5lcmFsLgpJJ3ZlIGluZm9ybWF0aW9uIGFuaW1hbCwgdmVnZXRhYmxlLCBhbmQgbWluZXJhbA", +} + +// TestLen serves as a verification that string lengths +// match their equivalent byte lengths, and not their rune +// representation. +func TestLen(t *testing.T) { + runeCount := []int{0, 1, 2, 5, 40, 25, 5, 94} + for i, s := range testValues { + if got, want := len(s), len([]byte(s)); got != want { + t.Errorf("string and []byte len do not match. got %v, want %v", got, want) + } + if got, want := utf8.RuneCountInString(s), runeCount[i]; got != want { + t.Errorf("Rune count of %q change len do not match. got %v, want %v", s, got, want) + } + } +} + +func TestEncodeStringUTF8(t *testing.T) { + for i, s := range testValues { + s := s + want := testEncodings[i] + t.Run(s, func(t *testing.T) { + var b strings.Builder + base64enc := base64.NewEncoder(base64.RawURLEncoding, &b) + + if err := encodeStringUTF8(s, base64enc); err != nil { + t.Fatal(err) + } + base64enc.Close() + got := b.String() + if got != want { + t.Errorf("encodeStringUTF8(%q) = %q, want %q", s, got, want) + } + }) + } +} + +func TestDecodeStringUTF8(t *testing.T) { + for i, s := range testEncodings { + s := s + want := testValues[i] + t.Run(want, func(t *testing.T) { + b := bytes.NewBufferString(s) + base64dec := base64.NewDecoder(base64.RawURLEncoding, b) + + got, err := decodeStringUTF8(int64(len(want)), base64dec) + if err != nil && err != io.EOF { + t.Fatal(err) + } + if got != want { + t.Errorf("decodeStringUTF8(%q) = %q, want %q", s, got, want) + } + }) + } +} + +func TestEncodeDecodeStringUTF8LP(t *testing.T) { + for _, s := range testValues { + want := s + t.Run(want, func(t *testing.T) { + var build strings.Builder + if err := EncodeStringUTF8LP(want, &build); err != nil { Review comment: That did occur to me as well, but it would fail the moment we run a wordcount since nothing could get decoded properly, and the runner would end up with very strange data under/over reads when it tries to use the initial character encodings as varints. In practice, this is not going to change, and certainly not to remove both length prefixes at the same time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL ab
[GitHub] [beam] lostluck merged pull request #11925: [BEAM-9615] Add string coder utility functions.
lostluck merged pull request #11925: URL: https://github.com/apache/beam/pull/11925 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.
robertwb commented on pull request #11932: URL: https://github.com/apache/beam/pull/11932#issuecomment-639691021 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #11933: [BEAM-10204] @Ignore: re-enable LIKE operator related unit tests.
amaliujia commented on a change in pull request #11933: URL: https://github.com/apache/beam/pull/11933#discussion_r436094165 ## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ## @@ -837,7 +834,6 @@ public void testLikeAllowsEscapingBackslash() { } @Test - @Ignore("Currently non UTF-8 values are coerced to UTF-8") public void testThrowsErrorForNonUTF8() { Review comment: This is a good point. I will choose to remove this test. I tried to dig into ZetaSQL's documentation and internal tests suite, there was no clear explanation how to deal with nonUTF8 chars by LIKE operator (there is are code-generated nonUTF8 test cases, but they don't have clear comments to demonstrate their purpose) So I will leave this part to be tested by internal test suite. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kennknowles commented on pull request #11820: [BEAM-10093] ZetaSql Nexmark variant
kennknowles commented on pull request #11820: URL: https://github.com/apache/beam/pull/11820#issuecomment-639689286 PTAL. The first commit is unchanged, the second is a total rewrite (or unwrite, if you like). - Modified each `SqlQuery2`, etc, class to have just the amount of variation necessary - Made each test class run as `@Enclosed` with an inner class for each dialect. I don't love this use of inheritance, but it seems to be how JUnit wants things. It has a clearer test signal than using `@Parameterized` across the dialects, and allows separate `@Ignore` for broken/unsupported features. Did not do: - Jenkins job to publish benchmarks; planning that as a followup - Any other refactor to make suites more manageable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tysonjh commented on a change in pull request #11925: [BEAM-9615] Add string coder utility functions.
tysonjh commented on a change in pull request #11925: URL: https://github.com/apache/beam/pull/11925#discussion_r436092337 ## File path: sdks/go/pkg/beam/core/graph/coder/stringutf8_test.go ## @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coder + +import ( + "bytes" + "encoding/base64" + "io" + "strings" + "testing" + "unicode/utf8" +) + +var testValues = []string{ + "", + "a", + "13", + "hello", + "a longer string with spaces and all that", + "a string with a \n newline", + "スタリング", + "I am the very model of a modern major general.\nI've information animal, vegetable, and mineral", +} + +// Base64 encoded versions of the above strings, without the length prefix. +var testEncodings = []string{ + "", + "YQ", + "MTM", + "aGVsbG8", + "YSBsb25nZXIgc3RyaW5nIHdpdGggc3BhY2VzIGFuZCBhbGwgdGhhdA", + "YSBzdHJpbmcgd2l0aCBhIAogbmV3bGluZQ", + "44K544K_44Oq44Oz44Kw", + "SSBhbSB0aGUgdmVyeSBtb2RlbCBvZiBhIG1vZGVybiBtYWpvciBnZW5lcmFsLgpJJ3ZlIGluZm9ybWF0aW9uIGFuaW1hbCwgdmVnZXRhYmxlLCBhbmQgbWluZXJhbA", +} + +// TestLen serves as a verification that string lengths +// match their equivalent byte lengths, and not their rune +// representation. +func TestLen(t *testing.T) { + runeCount := []int{0, 1, 2, 5, 40, 25, 5, 94} + for i, s := range testValues { + if got, want := len(s), len([]byte(s)); got != want { + t.Errorf("string and []byte len do not match. got %v, want %v", got, want) + } + if got, want := utf8.RuneCountInString(s), runeCount[i]; got != want { + t.Errorf("Rune count of %q change len do not match. got %v, want %v", s, got, want) + } + } +} + +func TestEncodeStringUTF8(t *testing.T) { + for i, s := range testValues { + s := s + want := testEncodings[i] + t.Run(s, func(t *testing.T) { + var b strings.Builder + base64enc := base64.NewEncoder(base64.RawURLEncoding, &b) + + if err := encodeStringUTF8(s, base64enc); err != nil { + t.Fatal(err) + } + base64enc.Close() + got := b.String() + if got != want { + t.Errorf("encodeStringUTF8(%q) = %q, want %q", s, got, want) + } + }) + } +} + +func TestDecodeStringUTF8(t *testing.T) { + for i, s := range testEncodings { + s := s + want := testValues[i] + t.Run(want, func(t *testing.T) { + b := bytes.NewBufferString(s) + base64dec := base64.NewDecoder(base64.RawURLEncoding, b) + + got, err := decodeStringUTF8(int64(len(want)), base64dec) + if err != nil && err != io.EOF { + t.Fatal(err) + } + if got != want { + t.Errorf("decodeStringUTF8(%q) = %q, want %q", s, got, want) + } + }) + } +} + +func TestEncodeDecodeStringUTF8LP(t *testing.T) { + for _, s := range testValues { + want := s + t.Run(want, func(t *testing.T) { + var build strings.Builder + if err := EncodeStringUTF8LP(want, &build); err != nil { Review comment: If the LP part got removed, or unused somehow, in both encode/decode (a stretch to be sure), then this test would pass despite there being no LP. My thought was that since the exposed methods suggest LP as part of the abstraction, it should be explicitly verified to avoid any surprises. I'll leave it up to your discretion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific commen
[GitHub] [beam] lostluck commented on a change in pull request #11925: [BEAM-9615] Add string coder utility functions.
lostluck commented on a change in pull request #11925: URL: https://github.com/apache/beam/pull/11925#discussion_r436091546 ## File path: sdks/go/pkg/beam/core/graph/coder/stringutf8_test.go ## @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coder + +import ( + "bytes" + "encoding/base64" + "io" + "strings" + "testing" + "unicode/utf8" +) + +var testValues = []string{ + "", + "a", + "13", + "hello", + "a longer string with spaces and all that", + "a string with a \n newline", + "スタリング", + "I am the very model of a modern major general.\nI've information animal, vegetable, and mineral", +} + +// Base64 encoded versions of the above strings, without the length prefix. +var testEncodings = []string{ + "", + "YQ", + "MTM", + "aGVsbG8", + "YSBsb25nZXIgc3RyaW5nIHdpdGggc3BhY2VzIGFuZCBhbGwgdGhhdA", + "YSBzdHJpbmcgd2l0aCBhIAogbmV3bGluZQ", + "44K544K_44Oq44Oz44Kw", + "SSBhbSB0aGUgdmVyeSBtb2RlbCBvZiBhIG1vZGVybiBtYWpvciBnZW5lcmFsLgpJJ3ZlIGluZm9ybWF0aW9uIGFuaW1hbCwgdmVnZXRhYmxlLCBhbmQgbWluZXJhbA", +} + +// TestLen serves as a verification that string lengths +// match their equivalent byte lengths, and not their rune +// representation. +func TestLen(t *testing.T) { + runeCount := []int{0, 1, 2, 5, 40, 25, 5, 94} + for i, s := range testValues { + if got, want := len(s), len([]byte(s)); got != want { + t.Errorf("string and []byte len do not match. got %v, want %v", got, want) + } + if got, want := utf8.RuneCountInString(s), runeCount[i]; got != want { + t.Errorf("Rune count of %q change len do not match. got %v, want %v", s, got, want) + } + } +} + +func TestEncodeStringUTF8(t *testing.T) { + for i, s := range testValues { + s := s + want := testEncodings[i] + t.Run(s, func(t *testing.T) { + var b strings.Builder + base64enc := base64.NewEncoder(base64.RawURLEncoding, &b) + + if err := encodeStringUTF8(s, base64enc); err != nil { + t.Fatal(err) + } + base64enc.Close() + got := b.String() + if got != want { + t.Errorf("encodeStringUTF8(%q) = %q, want %q", s, got, want) + } + }) + } +} + +func TestDecodeStringUTF8(t *testing.T) { + for i, s := range testEncodings { + s := s + want := testValues[i] + t.Run(want, func(t *testing.T) { + b := bytes.NewBufferString(s) + base64dec := base64.NewDecoder(base64.RawURLEncoding, b) + + got, err := decodeStringUTF8(int64(len(want)), base64dec) + if err != nil && err != io.EOF { + t.Fatal(err) + } + if got != want { + t.Errorf("decodeStringUTF8(%q) = %q, want %q", s, got, want) + } + }) + } +} + +func TestEncodeDecodeStringUTF8LP(t *testing.T) { + for _, s := range testValues { + want := s + t.Run(want, func(t *testing.T) { + var build strings.Builder + if err := EncodeStringUTF8LP(want, &build); err != nil { Review comment: By removing the callout for the LP, it's part of the implementation details. Eg if you expect the next value is a StringUTF8, then it must be length prefixed. This code would only work in that situation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] damondouglas opened a new pull request #11936: [BEAM-9679] Add CombinePerKey to Core Transforms Go Katas
damondouglas opened a new pull request #11936: URL: https://github.com/apache/beam/pull/11936 This pull requests adds a Combine/CombinePerKey lesson to the Go SDK katas. I would like to request the following reviewers: (R: @lostluck ) (R: @henryken ) If accepted by both reviewers, please wait until the [Stepik course](https://stepik.org/course/70387) is updated before finally merging this PR. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_J
[GitHub] [beam] tysonjh commented on a change in pull request #11925: [BEAM-9615] Add string coder utility functions.
tysonjh commented on a change in pull request #11925: URL: https://github.com/apache/beam/pull/11925#discussion_r436090874 ## File path: sdks/go/pkg/beam/core/graph/coder/stringutf8.go ## @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coder + +import ( + "io" + "strings" + + "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx" +) + +const bufCap = 64 + +// EncodeStringUTF8LP encodes a UTF string with a length prefix. Review comment: Sounds good. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #11925: [BEAM-9615] Add string coder utility functions.
lostluck commented on a change in pull request #11925: URL: https://github.com/apache/beam/pull/11925#discussion_r436090722 ## File path: sdks/go/pkg/beam/core/graph/coder/stringutf8.go ## @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coder + +import ( + "io" + "strings" + + "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx" +) + +const bufCap = 64 + +// EncodeStringUTF8LP encodes a UTF string with a length prefix. Review comment: I removed it. It's part of the beam spec for an encoded UTF8 string, so the call out is not necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay merged pull request #11882: [BEAM-10112] Add state and timer python examples to website
aaltay merged pull request #11882: URL: https://github.com/apache/beam/pull/11882 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay merged pull request #11805: Add documentation for python apache-beam[aws] installation
aaltay merged pull request #11805: URL: https://github.com/apache/beam/pull/11805 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] KevinGG commented on pull request #11884: [BEAM-7923] Initialize an empty Jupyter labextension with README
KevinGG commented on pull request #11884: URL: https://github.com/apache/beam/pull/11884#issuecomment-639670746 > There's a lot of files there that don't seem relevant; I think we should go through and figure out what's needed for the actual plugin vs. what's "extras" that just got copied from a template. We also need to figure out the distribution story. Will this be released with beam? As another pypi pacakge (and npm package)? I can go through these files and explain their usages and the distribution story. If this is not enough, we can talk offline. [ESLint](https://eslint.org/docs/user-guide/getting-started) ``` .eslintignore .eslintrc.js ``` [Github workflow](https://help.github.com/en/actions/configuring-and-managing-workflows/configuring-a-workflow) ``` # This is later used in the README to display a status badge. .github/workflows/build.yml ``` [Prettier](https://prettier.io/docs/en/install.html) ``` .prettierignore .prettierrc.json ``` [Python Packaging](https://packaging.python.org/guides/using-manifest-in/) ``` MANIFEST.in ``` JupyterLab server extension ``` interactive_beam_side_panel/*.py ``` JupyterLab frontend extension ``` src/*.ts[x] ``` JupyterLab frontend extension styles ``` style/*.css ``` JupyterLab configuration ``` jupyter-config/*.json ``` [TypeScript configuration](https://www.typescriptlang.org/docs/handbook/tsconfig-json.html) ``` tsconfig.json ``` [PEP 518](https://www.python.org/dev/peps/pep-0518/) ``` pyproject.toml ``` [npm package](https://nodejs.org/en/knowledge/getting-started/npm/what-is-the-file-package-json/) ``` package.json ``` The distribution story is [here](https://jupyterlab.readthedocs.io/en/stable/developer/extension_dev.html#shipping-packages). The plan is to ship the server extension (PYPI) and the frontend extension (NPM) separately. Temporarily, the server extension will just be a placeholder. We only need to release it once. The frontend extension can be shipped regularly: `The general idea is to pack the Jupyterlab extension using npm pack, and then use the data_files logic in setup.py to ensure the file ends up in the /share/jupyter/lab/extensions directory.` It doesn't have to be released with Beam releases. If this `sdks/python/apache_beam/runners/interactive` is not a suitable place to put the code base, we can move it to some other directory in this repo outside of `sdks`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #11925: [BEAM-9615] Add string coder utility functions.
lostluck commented on a change in pull request #11925: URL: https://github.com/apache/beam/pull/11925#discussion_r436076913 ## File path: sdks/go/pkg/beam/core/graph/coder/stringutf8.go ## @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coder + +import ( + "io" + "strings" + + "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx" +) + +const bufCap = 64 + +// EncodeStringUTF8LP encodes a UTF string with a length prefix. Review comment: It's part of the StringUTF8 so I could remove the explicit call out. However, in Java an python tests for the string encodings, they use those common encodings (which is why we have a separate encoding/decoding tests with the golden values), but those encodings *do not* include the length prefix. I'd rather have the positive inclusion of a length prefix in the name, instead of having the helper method be "withoutLP" instead. Some of that is from the archaic concept of "nested" and "unnested" coders which is largely phased out. In practice, if you have a variable amount of data, a length prefix is required. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #11925: [BEAM-9615] Add string coder utility functions.
lostluck commented on a change in pull request #11925: URL: https://github.com/apache/beam/pull/11925#discussion_r436077202 ## File path: sdks/go/pkg/beam/core/graph/coder/stringutf8_test.go ## @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coder + +import ( + "bytes" + "encoding/base64" + "io" + "strings" + "testing" + "unicode/utf8" +) + +var testValues = []string{ + "", + "a", + "13", + "hello", + "a longer string with spaces and all that", + "a string with a \n newline", + "スタリング", + "I am the very model of a modern major general.\nI've information animal, vegetable, and mineral", +} + +// Base64 encoded versions of the above strings, without the length prefix. +var testEncodings = []string{ + "", + "YQ", + "MTM", + "aGVsbG8", + "YSBsb25nZXIgc3RyaW5nIHdpdGggc3BhY2VzIGFuZCBhbGwgdGhhdA", + "YSBzdHJpbmcgd2l0aCBhIAogbmV3bGluZQ", + "44K544K_44Oq44Oz44Kw", + "SSBhbSB0aGUgdmVyeSBtb2RlbCBvZiBhIG1vZGVybiBtYWpvciBnZW5lcmFsLgpJJ3ZlIGluZm9ybWF0aW9uIGFuaW1hbCwgdmVnZXRhYmxlLCBhbmQgbWluZXJhbA", +} + +// TestLen serves as a verification that string lengths +// match their equivalent byte lengths, and not their rune +// representation. +func TestLen(t *testing.T) { + runeCount := []int{0, 1, 2, 5, 40, 25, 5, 94} + for i, s := range testValues { + if got, want := len(s), len([]byte(s)); got != want { + t.Errorf("string and []byte len do not match. got %v, want %v", got, want) + } + if got, want := utf8.RuneCountInString(s), runeCount[i]; got != want { + t.Errorf("Rune count of %q change len do not match. got %v, want %v", s, got, want) + } + } +} + +func TestEncodeStringUTF8(t *testing.T) { + for i, s := range testValues { + s := s + want := testEncodings[i] + t.Run(s, func(t *testing.T) { + var b strings.Builder + base64enc := base64.NewEncoder(base64.RawURLEncoding, &b) + + if err := encodeStringUTF8(s, base64enc); err != nil { + t.Fatal(err) + } + base64enc.Close() + got := b.String() + if got != want { + t.Errorf("encodeStringUTF8(%q) = %q, want %q", s, got, want) + } + }) + } +} + +func TestDecodeStringUTF8(t *testing.T) { + for i, s := range testEncodings { + s := s + want := testValues[i] + t.Run(want, func(t *testing.T) { + b := bytes.NewBufferString(s) + base64dec := base64.NewDecoder(base64.RawURLEncoding, b) + + got, err := decodeStringUTF8(int64(len(want)), base64dec) + if err != nil && err != io.EOF { + t.Fatal(err) + } + if got != want { + t.Errorf("decodeStringUTF8(%q) = %q, want %q", s, got, want) + } + }) + } +} + +func TestEncodeDecodeStringUTF8LP(t *testing.T) { + for _, s := range testValues { + want := s + t.Run(want, func(t *testing.T) { + var build strings.Builder + if err := EncodeStringUTF8LP(want, &build); err != nil { Review comment: Technically, this test has the LP verification. If the LP weren't present, it wouldn't be possible to get the result back again on decode since the decoder won't know how much data to read. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tysonjh commented on pull request #11927: [BEAM-9615] finish standardizing proto import names
tysonjh commented on pull request #11927: URL: https://github.com/apache/beam/pull/11927#issuecomment-639668226 > @lukecwik There probably is, but I'm not shaving that yak today. The gradle commands don't meaningfully work for Go, and not when I'm ripping them apart in the next two months to be replaced with something better. > In practice this is only a problem for very few packages, and only when those packages get new files. Consider cutting a jira to not lose track of this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TobKed commented on pull request #11877: [BEAM-10184] Build python wheels on GitHub Actions for Linux/MacOS
TobKed commented on pull request #11877: URL: https://github.com/apache/beam/pull/11877#issuecomment-639667163 cc @potiuk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on pull request #11927: [BEAM-9615] finish standardizing proto import names
lostluck commented on pull request #11927: URL: https://github.com/apache/beam/pull/11927#issuecomment-639664788 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia merged pull request #11934: [BEAM-10205] @Ignore:BYTES can work with UNION ALL
amaliujia merged pull request #11934: URL: https://github.com/apache/beam/pull/11934 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tysonjh commented on a change in pull request #11933: [BEAM-10204] @Ignore: re-enable LIKE operator related unit tests.
tysonjh commented on a change in pull request #11933: URL: https://github.com/apache/beam/pull/11933#discussion_r436071355 ## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ## @@ -837,7 +834,6 @@ public void testLikeAllowsEscapingBackslash() { } @Test - @Ignore("Currently non UTF-8 values are coerced to UTF-8") public void testThrowsErrorForNonUTF8() { Review comment: This test name doesn't seem correct after these changes. What is it testing now? What about the previous assertions on throwing an exception for invalid utf8? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb opened a new pull request #11935: [BEAM-9577] Remove use of legacy artifact service in Python.
robertwb opened a new pull request #11935: URL: https://github.com/apache/beam/pull/11935 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/
[GitHub] [beam] TobKed commented on a change in pull request #11877: [BEAM-10184] Build python wheels on GitHub Actions for Linux/MacOS
TobKed commented on a change in pull request #11877: URL: https://github.com/apache/beam/pull/11877#discussion_r436068238 ## File path: .github/workflows/build_wheels.yml ## @@ -0,0 +1,141 @@ +name: Build python wheels + +on: + push: +branches: + - master + - release-* +tags: + - v* + +jobs: + + build_source: +runs-on: ubuntu-18.04 +steps: + - name: Checkout code +uses: actions/checkout@v2 + - name: Install python +uses: actions/setup-python@v2 +with: + python-version: 3.7 + - name: Get build dependencies +working-directory: ./sdks/python +run: python3 -m pip install cython && python3 -m pip install -r build-requirements.txt + - name: Install wheels +run: python3 -m pip install wheel + - name: Buld source +working-directory: ./sdks/python +run: python3 setup.py sdist --formats=gztar,zip + - name: Unzip source +working-directory: ./sdks/python +run: unzip dist/$(ls dist | grep .zip | head -n 1) + - name: Rename source directory +working-directory: ./sdks/python +run: mv $(ls | grep apache-beam) apache-beam-source + - name: Upload source +uses: actions/upload-artifact@v2 +with: + name: source + path: sdks/python/apache-beam-source + - name: Upload compressed sources +uses: actions/upload-artifact@v2 +with: + name: source_gztar_zip + path: sdks/python/dist + + prepare_gcs: +name: Prepare GCS +needs: build_source +runs-on: ubuntu-18.04 +steps: + - name: Authenticate on GCP +uses: GoogleCloudPlatform/github-actions/setup-gcloud@master +with: + service_account_email: ${{ secrets.CCP_SA_EMAIL }} + service_account_key: ${{ secrets.CCP_SA_KEY }} + - name: Remove existing files on GCS bucket +run: gsutil rm -r "gs://${{ secrets.CCP_BUCKET }}/${GITHUB_REF##*/}/" || true + + upload_source_to_gcs: +name: Upload source to GCS bucket +needs: prepare_gcs +runs-on: ubuntu-18.04 +steps: + - name: Download wheels +uses: actions/download-artifact@v2 +with: + name: source_gztar_zip + path: source/ + - name: Authenticate on GCP +uses: GoogleCloudPlatform/github-actions/setup-gcloud@master +with: + service_account_email: ${{ secrets.CCP_SA_EMAIL }} + service_account_key: ${{ secrets.CCP_SA_KEY }} + - name: Copy sources to GCS bucket +run: gsutil cp -r -a public-read source/* gs://${{ secrets.CCP_BUCKET }}/${GITHUB_REF##*/}/ + - name: List sources on GCS bucket +run: | + gsutil ls "gs://${{ secrets.CCP_BUCKET }}/${GITHUB_REF##*/}/*.tar.gz" + gsutil ls "gs://${{ secrets.CCP_BUCKET }}/${GITHUB_REF##*/}/*.zip" + + build_wheels: +name: Build wheels on ${{ matrix.os }} +needs: prepare_gcs +runs-on: ${{ matrix.os }} +strategy: + matrix: +os : [ubuntu-18.04, macos-10.15] Review comment: Hopefully yes 😄 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TobKed commented on a change in pull request #11877: [BEAM-10184] Build python wheels on GitHub Actions for Linux/MacOS
TobKed commented on a change in pull request #11877: URL: https://github.com/apache/beam/pull/11877#discussion_r436068410 ## File path: .github/workflows/build_wheels.yml ## @@ -0,0 +1,141 @@ +name: Build python wheels + +on: + push: +branches: + - master + - release-* +tags: + - v* + +jobs: + + build_source: +runs-on: ubuntu-18.04 +steps: + - name: Checkout code +uses: actions/checkout@v2 + - name: Install python +uses: actions/setup-python@v2 +with: + python-version: 3.7 + - name: Get build dependencies +working-directory: ./sdks/python +run: python3 -m pip install cython && python3 -m pip install -r build-requirements.txt + - name: Install wheels +run: python3 -m pip install wheel + - name: Buld source Review comment: Thanks!. Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TobKed commented on a change in pull request #11877: [BEAM-10184] Build python wheels on GitHub Actions for Linux/MacOS
TobKed commented on a change in pull request #11877: URL: https://github.com/apache/beam/pull/11877#discussion_r436068604 ## File path: .github/workflows/build_wheels.yml ## @@ -0,0 +1,141 @@ +name: Build python wheels + +on: + push: +branches: + - master + - release-* +tags: + - v* + +jobs: + + build_source: +runs-on: ubuntu-18.04 +steps: + - name: Checkout code +uses: actions/checkout@v2 + - name: Install python +uses: actions/setup-python@v2 +with: + python-version: 3.7 + - name: Get build dependencies +working-directory: ./sdks/python +run: python3 -m pip install cython && python3 -m pip install -r build-requirements.txt + - name: Install wheels +run: python3 -m pip install wheel + - name: Buld source +working-directory: ./sdks/python +run: python3 setup.py sdist --formats=gztar,zip + - name: Unzip source +working-directory: ./sdks/python +run: unzip dist/$(ls dist | grep .zip | head -n 1) + - name: Rename source directory +working-directory: ./sdks/python +run: mv $(ls | grep apache-beam) apache-beam-source + - name: Upload source +uses: actions/upload-artifact@v2 +with: + name: source + path: sdks/python/apache-beam-source + - name: Upload compressed sources +uses: actions/upload-artifact@v2 +with: + name: source_gztar_zip + path: sdks/python/dist + + prepare_gcs: +name: Prepare GCS +needs: build_source +runs-on: ubuntu-18.04 +steps: + - name: Authenticate on GCP +uses: GoogleCloudPlatform/github-actions/setup-gcloud@master +with: + service_account_email: ${{ secrets.CCP_SA_EMAIL }} + service_account_key: ${{ secrets.CCP_SA_KEY }} + - name: Remove existing files on GCS bucket +run: gsutil rm -r "gs://${{ secrets.CCP_BUCKET }}/${GITHUB_REF##*/}/" || true + + upload_source_to_gcs: +name: Upload source to GCS bucket +needs: prepare_gcs +runs-on: ubuntu-18.04 +steps: + - name: Download wheels +uses: actions/download-artifact@v2 +with: + name: source_gztar_zip + path: source/ + - name: Authenticate on GCP +uses: GoogleCloudPlatform/github-actions/setup-gcloud@master +with: + service_account_email: ${{ secrets.CCP_SA_EMAIL }} + service_account_key: ${{ secrets.CCP_SA_KEY }} + - name: Copy sources to GCS bucket +run: gsutil cp -r -a public-read source/* gs://${{ secrets.CCP_BUCKET }}/${GITHUB_REF##*/}/ + - name: List sources on GCS bucket +run: | + gsutil ls "gs://${{ secrets.CCP_BUCKET }}/${GITHUB_REF##*/}/*.tar.gz" + gsutil ls "gs://${{ secrets.CCP_BUCKET }}/${GITHUB_REF##*/}/*.zip" + + build_wheels: +name: Build wheels on ${{ matrix.os }} +needs: prepare_gcs +runs-on: ${{ matrix.os }} +strategy: + matrix: +os : [ubuntu-18.04, macos-10.15] +steps: +- name: Download source + uses: actions/download-artifact@v2 + with: +name: source +path: apache-beam-source +- name: Install Python + uses: actions/setup-python@v2 + with: +python-version: 3.7 +- name: Install packages on Mac + if: startsWith(matrix.os, 'macos') + run: | +brew update +brew install pkg-config +- name: Install cibuildwheel + run: pip install cibuildwheel==1.4.2 +- name: Build wheel + working-directory: apache-beam-source + env: +CIBW_BUILD: cp27-* cp35-* cp36-* cp37-* +CIBW_BUILD_VERBOSITY: 3 Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tysonjh commented on a change in pull request #11925: [BEAM-9615] Add string coder utility functions.
tysonjh commented on a change in pull request #11925: URL: https://github.com/apache/beam/pull/11925#discussion_r436016818 ## File path: sdks/go/pkg/beam/core/graph/coder/stringutf8.go ## @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coder + +import ( + "io" + "strings" + + "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx" +) + +const bufCap = 64 + +// EncodeStringUTF8LP encodes a UTF string with a length prefix. Review comment: Is the length prefix more than an implementation detail or should this just be named EncodeStringUTF8? ## File path: sdks/go/pkg/beam/core/graph/coder/stringutf8_test.go ## @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coder + +import ( + "bytes" + "encoding/base64" + "io" + "strings" + "testing" + "unicode/utf8" +) + +var testValues = []string{ + "", + "a", + "13", + "hello", + "a longer string with spaces and all that", + "a string with a \n newline", + "スタリング", + "I am the very model of a modern major general.\nI've information animal, vegetable, and mineral", +} + +// Base64 encoded versions of the above strings, without the length prefix. +var testEncodings = []string{ + "", + "YQ", + "MTM", + "aGVsbG8", + "YSBsb25nZXIgc3RyaW5nIHdpdGggc3BhY2VzIGFuZCBhbGwgdGhhdA", + "YSBzdHJpbmcgd2l0aCBhIAogbmV3bGluZQ", + "44K544K_44Oq44Oz44Kw", + "SSBhbSB0aGUgdmVyeSBtb2RlbCBvZiBhIG1vZGVybiBtYWpvciBnZW5lcmFsLgpJJ3ZlIGluZm9ybWF0aW9uIGFuaW1hbCwgdmVnZXRhYmxlLCBhbmQgbWluZXJhbA", +} + +// TestLen serves as a verification that string lengths +// match their equivalent byte lengths, and not their rune +// representation. +func TestLen(t *testing.T) { + runeCount := []int{0, 1, 2, 5, 40, 25, 5, 94} + for i, s := range testValues { + if got, want := len(s), len([]byte(s)); got != want { + t.Errorf("string and []byte len do not match. got %v, want %v", got, want) + } + if got, want := utf8.RuneCountInString(s), runeCount[i]; got != want { + t.Errorf("Rune count of %q change len do not match. got %v, want %v", s, got, want) + } + } +} + +func TestEncodeStringUTF8(t *testing.T) { + for i, s := range testValues { + s := s + want := testEncodings[i] + t.Run(s, func(t *testing.T) { + var b strings.Builder + base64enc := base64.NewEncoder(base64.RawURLEncoding, &b) + + if err := encodeStringUTF8(s, base64enc); err != nil { + t.Fatal(err) + } + base64enc.Close() + got := b.String() + if got != want { + t.Errorf("encodeStringUTF8(%q) = %q, want %q", s, got, want) + } + }) + } +} + +func TestDecodeStringUTF8(t *testing.T) { + for i, s := range testEncodings { + s := s + want := testValues[i] + t.Run(want, func(t *testing.T) { + b := bytes.NewBufferString(s) + base64dec := base64.NewDecoder(base64.RawURLEncoding, b) + + got, err
[GitHub] [beam] boyuanzz commented on a change in pull request #11894: [BEAM-7074] Enable test_pardo_timers_clear for fn_runner
boyuanzz commented on a change in pull request #11894: URL: https://github.com/apache/beam/pull/11894#discussion_r436068037 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py ## @@ -377,11 +377,6 @@ def process_timer( assert_that(actual, equal_to(expected)) def test_pardo_timers_clear(self): -if type(self).__name__ != 'FlinkRunnerTest': Review comment: We have problems if overriding subclass. The `PortableRunnerTest` doesn't support clearing timer, but its subclass `FlinkRunnerTest` and `SparkRunnerTest` support it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TobKed commented on a change in pull request #11877: [BEAM-10184] Build python wheels on GitHub Actions for Linux/MacOS
TobKed commented on a change in pull request #11877: URL: https://github.com/apache/beam/pull/11877#discussion_r436067638 ## File path: .github/workflows/build_wheels.yml ## @@ -0,0 +1,141 @@ +name: Build python wheels + +on: + push: +branches: + - master + - release-* +tags: + - v* + +jobs: + + build_source: +runs-on: ubuntu-18.04 +steps: + - name: Checkout code +uses: actions/checkout@v2 + - name: Install python +uses: actions/setup-python@v2 +with: + python-version: 3.7 + - name: Get build dependencies +working-directory: ./sdks/python +run: python3 -m pip install cython && python3 -m pip install -r build-requirements.txt + - name: Install wheels +run: python3 -m pip install wheel + - name: Buld source +working-directory: ./sdks/python +run: python3 setup.py sdist --formats=gztar,zip + - name: Unzip source +working-directory: ./sdks/python +run: unzip dist/$(ls dist | grep .zip | head -n 1) + - name: Rename source directory +working-directory: ./sdks/python +run: mv $(ls | grep apache-beam) apache-beam-source + - name: Upload source +uses: actions/upload-artifact@v2 +with: + name: source + path: sdks/python/apache-beam-source + - name: Upload compressed sources +uses: actions/upload-artifact@v2 +with: + name: source_gztar_zip Review comment: 1. Currently two steps: `List sources on GCS bucket` and `Copy wheels to GCS bucket` are listing files of specific types. Instead of this two separate steps I could create job which will list all files in specific gcs folder. I think it would be much cleaner and explicit. Did I understand correctly your idea? About cleaning up these GCS locations I consider two options: - setting lifecycle management on the bucket which will delete files older than some arbitrary age, e.g. 365 days. I think advantage of this is that will be maintenance free. - creating another scheduled workflow on github actions which will delete gcs folders if corresponding branch does not exist anymore. Could be scheduled to run e.g. once pre week. Which option has more sense for you? 2. "Upload" steps perform file upload as artifacts so they could be passed between jobs and being available for download for 90 days (if not deleted earlier). These artifacts are picked up later by "Upload to GCS" jobs. What do you think about renaming these steps e.g.: "Upload wheels" -> "Upload wheels as artifacts" ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on pull request #11933: [BEAM-10204] @Ignore: re-enable LIKE operator related unit tests.
amaliujia commented on pull request #11933: URL: https://github.com/apache/beam/pull/11933#issuecomment-639656091 Run SQL PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia opened a new pull request #11934: [BEAM-10205] @Ignore:BYTES can work with UNION ALL
amaliujia opened a new pull request #11934: URL: https://github.com/apache/beam/pull/11934 See: https://jira.apache.org/jira/browse/BEAM-10205 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status]
[GitHub] [beam] amaliujia opened a new pull request #11933: [BEAM-10204] @Ignore: re-enable LIKE operator related unit tests.
amaliujia opened a new pull request #11933: URL: https://github.com/apache/beam/pull/11933 See https://issues.apache.org/jira/browse/BEAM-10204 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Statu
[GitHub] [beam] apilloud merged pull request #11758: Old Fastjson has a serious security problem
apilloud merged pull request #11758: URL: https://github.com/apache/beam/pull/11758 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck merged pull request #11926: [BEAM-9615] Additional coder unit tests
lostluck merged pull request #11926: URL: https://github.com/apache/beam/pull/11926 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #11926: [BEAM-9615] Additional coder unit tests
lostluck commented on a change in pull request #11926: URL: https://github.com/apache/beam/pull/11926#discussion_r436032240 ## File path: sdks/go/pkg/beam/coder_test.go ## @@ -63,3 +64,57 @@ func TestJSONCoder(t *testing.T) { } } } + +func TestCoders(t *testing.T) { + ptrString := "test *string" + tests := []interface{}{ + 43, + 12431235, + -2, + 0, + 1, + true, + "a string", + map[int64]string{1: "one", 11: "oneone", 21: "twoone", 1211: "onetwooneone"}, + struct { + A int + B *string + C bool + }{4, &ptrString, false}, + [...]int64{1, 2, 3, 4, 5}, // array + []int64{1, 2, 3, 4, 5},// slice + struct { + A []int + B [3]int + }{A: []int{1, 2, 3}, B: [...]int{4, 5, 6}}, + } + + for _, test := range tests { + var results []string + rt := reflect.TypeOf(test) + enc := NewElementEncoder(rt) + for i := 0; i < 10; i++ { + var buf bytes.Buffer + if err := enc.Encode(test, &buf); err != nil { + t.Fatalf("Failed to encode %v: %v", tests, err) + } + results = append(results, string(buf.Bytes())) + } + for i, data := range results { + if data != results[0] { + t.Errorf("coder not deterministic: data[%d]: %v != %v ", i, data, results[0]) Review comment: Good catch! Fixed to make it clearer what the test is checking for. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb merged pull request #11835: Various fixes to allow Java PAssert to run on Python
robertwb merged pull request #11835: URL: https://github.com/apache/beam/pull/11835 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb merged pull request #11766: [BEAM-10036] More flexible dataframes partitioning.
robertwb merged pull request #11766: URL: https://github.com/apache/beam/pull/11766 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.
robertwb commented on pull request #11932: URL: https://github.com/apache/beam/pull/11932#issuecomment-639605068 R: @ibzib This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb opened a new pull request #11932: [BEAM-9577] Migrate PortablePipelineJarCreator to new artifact service.
robertwb opened a new pull request #11932: URL: https://github.com/apache/beam/pull/11932 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/