[GitHub] [beam] dmvk merged pull request #11750: [BEAM-9900] Fix polling behavior in UnboundedSourceWrapper

2020-05-19 Thread GitBox


dmvk merged pull request #11750:
URL: https://github.com/apache/beam/pull/11750


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] dmvk commented on pull request #11750: [BEAM-9900] Fix polling behavior in UnboundedSourceWrapper

2020-05-19 Thread GitBox


dmvk commented on pull request #11750:
URL: https://github.com/apache/beam/pull/11750#issuecomment-631269828


   Thanks for the fix. 👍 🎉 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jaketf edited a comment on pull request #11339: [BEAM-9468] Fhir io

2020-05-19 Thread GitBox


jaketf edited a comment on pull request #11339:
URL: https://github.com/apache/beam/pull/11339#issuecomment-631265943


   I think that something about `TestPubsubSignal` does not play well with the 
way this integration test suite runs on Dataflow.
   
   Following in suit with `PubsubReadIT` (which also uses `TestPububSignal`) 
`BigQueryIOReadIT` and `BigQueryIOStorageReadTableRowIT` I've added this to the 
exclude of this project's build file.
   
   This test can still be run (and passes) using DirectRunner and the evidence 
in the [above 
comment](https://github.com/apache/beam/pull/11339#issuecomment-631203002) 
points to this being a red herring of an issue with test pubsub signal and 
dataflow runner. 
   
   Potentially related issue: 
[BEAM-6804](https://issues.apache.org/jira/browse/BEAM-6804?jql=text%20~%20%22TestPubsubSignal%22)
   
   I've filed an issue for this instance 
[BEAM-10040](https://issues.apache.org/jira/browse/BEAM-10040).
   
   @pabloem please let me know if this is acceptable.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jaketf commented on pull request #11339: [BEAM-9468] Fhir io

2020-05-19 Thread GitBox


jaketf commented on pull request #11339:
URL: https://github.com/apache/beam/pull/11339#issuecomment-631265943


   I think that something about `TestPubsubSignal` does not play well with the 
way this integration test suite runs on Dataflow.
   
   Following in suit with `PubsubReadIT` (which also uses `TestPububSignal`) 
`BigQueryIOReadIT` and `BigQueryIOStorageReadTableRowIT` I've added this to the 
exclude of this project's build file.
   
   This test can still be run (and passes) using DirectRunner and the evidence 
in the [above 
comment](https://github.com/apache/beam/pull/11339#issuecomment-631203002) 
points to this being a red herring of an issue with test pubsub signal and 
dataflow runner. 
   
   Potentially related issue: 
[BEAM-6804](https://issues.apache.org/jira/browse/BEAM-6804?jql=text%20~%20%22TestPubsubSignal%22)
   
   I've filed an issue for this instance.
   
   @pabloem please let me know if this is acceptable.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] damondouglas commented on pull request #11734: [BEAM-9679] Add Core Transforms section / GroupByKey lesson to the Go SDK katas

2020-05-19 Thread GitBox


damondouglas commented on pull request #11734:
URL: https://github.com/apache/beam/pull/11734#issuecomment-631261638


   @lostluck and @henryken, I've updated the 
[stepik](https://stepik.org/course/70387) course and committed the 
`*-remote.yaml` files to this PR.  Thank you again for all your help and 
guidance.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jaketf edited a comment on pull request #11339: [BEAM-9468] Fhir io

2020-05-19 Thread GitBox


jaketf edited a comment on pull request #11339:
URL: https://github.com/apache/beam/pull/11339#issuecomment-631203002


   The issue with FhirIOReadIT seems to be some misuse of 
[TestPubsubSignal](https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.html)
   [Example 
Job](https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing)
 clearly has the expected >2000 elements added to the "waitForAnyMessage" task
   but the success signal never gets published to the results topic.
   
   Notably there are job level warnings about metric descriptors and [warnings 
in shuffle 
logs](https://pantheon.corp.google.com/logs/viewer?dateRangeEnd=2020-05-20T02:18:25.642Z&dateRangeStart=2020-05-20T02:05:21.347Z&expandAll=false&interval=CUSTOM&project=apache-beam-testing&resource=dataflow_step%2Fjob_id%2F2020-05-19_19_05_19-1057064265214622054×tamp=2020-05-20T02:57:46.18700Z&logName=projects%2Fapache-beam-testing%2Flogs%2Fdataflow.googleapis.com%252Fshuffler&minLogLevel=0&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2020-05-20T02:15:27.999165390Z)
 which warns:
   ```
   "Update range task returned 'invalid argument'. Assuming lost lease for work 
with id 5061980071068333770 (expiration time: 1589940982000, now: 
1589940923590, full status: INVALID_ARGUMENT: Http(400) Bad Request). For more 
information, see https://cloud.google.com/dataflow/docs/guides/common-errors."; 
   ```
   the 
[docs](https://cloud.google.com/dataflow/docs/guides/common-errors#bad-request-shuffler-logs)
 say this can be ignored but smells suspicious here.
   
   
   This is orthogonal to the behavior being tested. Investigating other means 
of performing this test.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


y1chi commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r427759019



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) {
 }
   }
 
-  private static class FnApiTimerMap implements TimerMap {
-FnApiTimerMap() {}
+  private class FnApiTimerMap implements TimerMap {
+private final String timerFamilyId;
+private final K userKey;
+private final TimeDomain timeDomain;
+private final Instant elementTimestampOrTimerHoldTimestamp;
+private final Instant elementTimestampOrTimerFireTimestamp;
+private final BoundedWindow boundedWindow;
+private final PaneInfo paneInfo;
+
+FnApiTimerMap(
+String timerFamilyId,
+K userKey,
+BoundedWindow boundedWindow,
+Instant elementTimestampOrTimerHoldTimestamp,
+Instant elementTimestampOrTimerFireTimestamp,
+PaneInfo paneInfo) {
+  this.timerFamilyId = timerFamilyId;
+  this.userKey = userKey;
+  this.elementTimestampOrTimerHoldTimestamp = 
elementTimestampOrTimerHoldTimestamp;
+  this.elementTimestampOrTimerFireTimestamp = 
elementTimestampOrTimerFireTimestamp;
+  this.boundedWindow = boundedWindow;
+  this.paneInfo = paneInfo;
+
+  TimerFamilyDeclaration timerFamilyDeclaration =
+  doFnSignature.timerFamilyDeclarations().get(timerFamilyId);
+  this.timeDomain =
+  DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, 
doFn).getTimeDomain();
+}
 
 @Override
-public void set(String timerId, Instant absoluteTime) {}
+public void set(String dynamicTimerTag, Instant absoluteTime) {

Review comment:
   probably makes sense to keep the set shortcut since it is the most 
frequently used one.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


y1chi commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r427758607



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) {
 }
   }
 
-  private static class FnApiTimerMap implements TimerMap {
-FnApiTimerMap() {}
+  private class FnApiTimerMap implements TimerMap {
+private final String timerFamilyId;
+private final K userKey;
+private final TimeDomain timeDomain;
+private final Instant elementTimestampOrTimerHoldTimestamp;
+private final Instant elementTimestampOrTimerFireTimestamp;
+private final BoundedWindow boundedWindow;
+private final PaneInfo paneInfo;
+
+FnApiTimerMap(
+String timerFamilyId,
+K userKey,
+BoundedWindow boundedWindow,
+Instant elementTimestampOrTimerHoldTimestamp,
+Instant elementTimestampOrTimerFireTimestamp,
+PaneInfo paneInfo) {
+  this.timerFamilyId = timerFamilyId;
+  this.userKey = userKey;
+  this.elementTimestampOrTimerHoldTimestamp = 
elementTimestampOrTimerHoldTimestamp;
+  this.elementTimestampOrTimerFireTimestamp = 
elementTimestampOrTimerFireTimestamp;
+  this.boundedWindow = boundedWindow;
+  this.paneInfo = paneInfo;
+
+  TimerFamilyDeclaration timerFamilyDeclaration =
+  doFnSignature.timerFamilyDeclarations().get(timerFamilyId);
+  this.timeDomain =
+  DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, 
doFn).getTimeDomain();
+}
 
 @Override
-public void set(String timerId, Instant absoluteTime) {}
+public void set(String dynamicTimerTag, Instant absoluteTime) {

Review comment:
   This is the interface required by TimerMap though: 
https://github.com/apache/beam/blob/9108832cf1cb57161997e16190dbc6eccdc10492/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerMap.java#L25





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3

2020-05-19 Thread GitBox


chamikaramj commented on pull request #11651:
URL: https://github.com/apache/beam/pull/11651#issuecomment-631254015


   Have you run the linkage checker ?
   https://cwiki.apache.org/confluence/display/BEAM/Dependency+Upgrades



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3

2020-05-19 Thread GitBox


chamikaramj commented on pull request #11651:
URL: https://github.com/apache/beam/pull/11651#issuecomment-631252638


   Run Python2_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3

2020-05-19 Thread GitBox


chamikaramj commented on pull request #11651:
URL: https://github.com/apache/beam/pull/11651#issuecomment-631252696


   Run Python2_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on a change in pull request #11740: [BEAM-8019] Prevent Dataflow from starting multiple containers for the same image

2020-05-19 Thread GitBox


chamikaramj commented on a change in pull request #11740:
URL: https://github.com/apache/beam/pull/11740#discussion_r427755500



##
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##
@@ -310,15 +312,15 @@ def __init__(
 environment_payload = proto_utils.parse_Bytes(
 environment.payload, beam_runner_api_pb2.DockerPayload)
 container_image_url = environment_payload.container_image
-if container_image_url == pipeline_sdk_container_image:
-  # This was already added
+if container_image_url in already_added_containers:
+  # Do not add the pipeline environment again.

Review comment:
   Sent https://github.com/apache/beam/pull/11757.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11757: [BEAM-8019] Clarifies Dataflow execution environment model

2020-05-19 Thread GitBox


chamikaramj commented on pull request #11757:
URL: https://github.com/apache/beam/pull/11757#issuecomment-631251258


   R: @robertwb @ihji 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj opened a new pull request #11757: [BEAM-8019] Clarifies Dataflow execution environment model

2020-05-19 Thread GitBox


chamikaramj opened a new pull request #11757:
URL: https://github.com/apache/beam/pull/11757


   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Buil

[GitHub] [beam] boyuanzz commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


boyuanzz commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r427750709



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) {
 }
   }
 
-  private static class FnApiTimerMap implements TimerMap {
-FnApiTimerMap() {}
+  private class FnApiTimerMap implements TimerMap {
+private final String timerFamilyId;
+private final K userKey;
+private final TimeDomain timeDomain;
+private final Instant elementTimestampOrTimerHoldTimestamp;
+private final Instant elementTimestampOrTimerFireTimestamp;
+private final BoundedWindow boundedWindow;
+private final PaneInfo paneInfo;
+
+FnApiTimerMap(
+String timerFamilyId,
+K userKey,
+BoundedWindow boundedWindow,
+Instant elementTimestampOrTimerHoldTimestamp,
+Instant elementTimestampOrTimerFireTimestamp,
+PaneInfo paneInfo) {
+  this.timerFamilyId = timerFamilyId;
+  this.userKey = userKey;
+  this.elementTimestampOrTimerHoldTimestamp = 
elementTimestampOrTimerHoldTimestamp;
+  this.elementTimestampOrTimerFireTimestamp = 
elementTimestampOrTimerFireTimestamp;
+  this.boundedWindow = boundedWindow;
+  this.paneInfo = paneInfo;
+
+  TimerFamilyDeclaration timerFamilyDeclaration =
+  doFnSignature.timerFamilyDeclarations().get(timerFamilyId);
+  this.timeDomain =
+  DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, 
doFn).getTimeDomain();
+}
 
 @Override
-public void set(String timerId, Instant absoluteTime) {}
+public void set(String dynamicTimerTag, Instant absoluteTime) {

Review comment:
   Thanks for the clarification! I still think we should have a better 
API(and doc) here, like `getTimer(timerId)`? And I would prefer not exposing 
`set()` since `getTimer()` is a more recommended way. What do you think? You 
can also start a discussion thread in dev list since it's a user faced API.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


y1chi commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r427738041



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) {
 }
   }
 
-  private static class FnApiTimerMap implements TimerMap {
-FnApiTimerMap() {}
+  private class FnApiTimerMap implements TimerMap {
+private final String timerFamilyId;
+private final K userKey;
+private final TimeDomain timeDomain;
+private final Instant elementTimestampOrTimerHoldTimestamp;
+private final Instant elementTimestampOrTimerFireTimestamp;
+private final BoundedWindow boundedWindow;
+private final PaneInfo paneInfo;
+
+FnApiTimerMap(
+String timerFamilyId,
+K userKey,
+BoundedWindow boundedWindow,
+Instant elementTimestampOrTimerHoldTimestamp,
+Instant elementTimestampOrTimerFireTimestamp,
+PaneInfo paneInfo) {
+  this.timerFamilyId = timerFamilyId;
+  this.userKey = userKey;
+  this.elementTimestampOrTimerHoldTimestamp = 
elementTimestampOrTimerHoldTimestamp;
+  this.elementTimestampOrTimerFireTimestamp = 
elementTimestampOrTimerFireTimestamp;
+  this.boundedWindow = boundedWindow;
+  this.paneInfo = paneInfo;
+
+  TimerFamilyDeclaration timerFamilyDeclaration =
+  doFnSignature.timerFamilyDeclarations().get(timerFamilyId);
+  this.timeDomain =
+  DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, 
doFn).getTimeDomain();
+}
 
 @Override
-public void set(String timerId, Instant absoluteTime) {}
+public void set(String dynamicTimerTag, Instant absoluteTime) {

Review comment:
   I believe we can always call the get() first to access FnApiTimer and 
call it's APIs. Probably that's sufficient enough? I feel adding more shortcuts 
only makes the API slightly more user-friendly.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


y1chi commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r427737289



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -962,16 +971,25 @@ private Progress getProgress() {
 .build());
   }
 
-  private  void processTimer(String timerId, TimeDomain timeDomain, 
Timer timer) {
+  private  void processTimer(
+  String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer timer) {
 currentTimer = timer;
 currentTimeDomain = timeDomain;
 onTimerContext = new OnTimerContext<>(timer.getUserKey());
+String timerId =
+timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)

Review comment:
   it will be ignored anyway, apparently only one of timerId or the 
timerFamilyId takes effect.
   
https://github.com/apache/beam/blob/591de3473144de54beef0932131025e2a4d8504b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L223





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tysonjh commented on a change in pull request #11566: [BEAM-9723] Add DLP integration transforms

2020-05-19 Thread GitBox


tysonjh commented on a change in pull request #11566:
URL: https://github.com/apache/beam/pull/11566#discussion_r427471127



##
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.privacy.dlp.v2.Table;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DoFn batching the input PCollection into bigger requests in order to better 
utilize the Cloud DLP

Review comment:
   It's good practice to start all javadoc comments with a short summary 
fragment. There are more details at Google's java style guide [1]. For example, 
I would phrase the summary fragment for this class as:
   
   'Batches input rows to reduce the number of requests sent to the Cloud DLP 
service.'
   
   Would you please go through this CL and add such comments to public classes 
and methods? I personally like to add them to all classes, non-trivial methods, 
and tricky blocks of code, regardless of access modifiers.
   
   [1] https://google.github.io/styleguide/javaguide.html#s7.2-summary-fragment





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

2020-05-19 Thread GitBox


pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631224035


   Run Java PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


boyuanzz commented on pull request #11756:
URL: https://github.com/apache/beam/pull/11756#issuecomment-631223098


   Run Java Flink PortableValidatesRunner Streaming



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


boyuanzz commented on pull request #11756:
URL: https://github.com/apache/beam/pull/11756#issuecomment-631223003


   Run Java Flink PortableValidatesRunner Batch



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


boyuanzz commented on pull request #11756:
URL: https://github.com/apache/beam/pull/11756#issuecomment-631220097


   Run Java Flink PortableValidatesRunner Batch



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz closed pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


boyuanzz closed pull request #11756:
URL: https://github.com/apache/beam/pull/11756


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


boyuanzz commented on pull request #11756:
URL: https://github.com/apache/beam/pull/11756#issuecomment-631219846







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


boyuanzz commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r427720451



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -962,16 +971,25 @@ private Progress getProgress() {
 .build());
   }
 
-  private  void processTimer(String timerId, TimeDomain timeDomain, 
Timer timer) {
+  private  void processTimer(
+  String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer timer) {
 currentTimer = timer;
 currentTimeDomain = timeDomain;
 onTimerContext = new OnTimerContext<>(timer.getUserKey());
+String timerId =
+timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)

Review comment:
   If the `timerIdOrTimerFamilyId ` is for a timer family, should the 
timerId be the `timer.dynamicTimerTag`?

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) {
 }
   }
 
-  private static class FnApiTimerMap implements TimerMap {
-FnApiTimerMap() {}
+  private class FnApiTimerMap implements TimerMap {
+private final String timerFamilyId;
+private final K userKey;
+private final TimeDomain timeDomain;
+private final Instant elementTimestampOrTimerHoldTimestamp;
+private final Instant elementTimestampOrTimerFireTimestamp;
+private final BoundedWindow boundedWindow;
+private final PaneInfo paneInfo;
+
+FnApiTimerMap(
+String timerFamilyId,
+K userKey,
+BoundedWindow boundedWindow,
+Instant elementTimestampOrTimerHoldTimestamp,
+Instant elementTimestampOrTimerFireTimestamp,
+PaneInfo paneInfo) {
+  this.timerFamilyId = timerFamilyId;
+  this.userKey = userKey;
+  this.elementTimestampOrTimerHoldTimestamp = 
elementTimestampOrTimerHoldTimestamp;
+  this.elementTimestampOrTimerFireTimestamp = 
elementTimestampOrTimerFireTimestamp;
+  this.boundedWindow = boundedWindow;
+  this.paneInfo = paneInfo;
+
+  TimerFamilyDeclaration timerFamilyDeclaration =
+  doFnSignature.timerFamilyDeclarations().get(timerFamilyId);
+  this.timeDomain =

Review comment:
   Similar to `FnApiTimer` above, we should have `timeDomain` from proto, 
right?

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -460,14 +461,22 @@ public void accept(WindowedValue input) throws Exception {
   // Extract out relevant TimerFamilySpec information in preparation for 
execution.
   for (Map.Entry entry :
   parDoPayload.getTimerFamilySpecsMap().entrySet()) {
-String timerFamilyId = entry.getKey();
-TimeDomain timeDomain =
-DoFnSignatures.getTimerSpecOrThrow(
-doFnSignature.timerDeclarations().get(timerFamilyId), doFn)
-.getTimeDomain();
+String timerIdOrTimerFamilyId = entry.getKey();
+TimeDomain timeDomain;
+if (timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)) {
+  timeDomain =

Review comment:
   The `TTimerFamilySpec` should have `time_domain ` field. Maybe we could  
do something similar to 
https://github.com/apache/beam/blob/1de50c348706ed25af2bab9c9477d7d4f36ef8bf/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java#L657-L668

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -460,14 +461,22 @@ public void accept(WindowedValue input) throws Exception {
   // Extract out relevant TimerFamilySpec information in preparation for 
execution.
   for (Map.Entry entry :
   parDoPayload.getTimerFamilySpecsMap().entrySet()) {
-String timerFamilyId = entry.getKey();
-TimeDomain timeDomain =
-DoFnSignatures.getTimerSpecOrThrow(
-doFnSignature.timerDeclarations().get(timerFamilyId), doFn)
-.getTimeDomain();
+String timerIdOrTimerFamilyId = entry.getKey();
+TimeDomain timeDomain;
+if (timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)) {
+  timeDomain =
+  DoFnSignatures.getTimerFamilySpecOrThrow(
+  
doFnSignature.timerFamilyDeclarations().get(timerIdOrTimerFamilyId), doFn)
+  .getTimeDomain();
+} else {
+  timeDomain =
+  DoFnSignatures.getTimerSpecOrThrow(
+  
doFnSignature.timerDeclarations().get(timerIdOrTimerFamilyId), doFn)
+  .getTimeDomain();
+}
 Coder> timerCoder =
 (Coder) 
rehydratedComponents.getCoder(entry.getValue().getTimerFamilyCoderId());
-timerFamilyInfosBuilder.put(timerFamilyId, KV.of(timeDomain, 
timerCoder));
+timerFamilyInfosBui

[GitHub] [beam] henryken commented on pull request #11734: [BEAM-9679] Add Core Transforms section / GroupByKey lesson to the Go SDK katas

2020-05-19 Thread GitBox


henryken commented on pull request #11734:
URL: https://github.com/apache/beam/pull/11734#issuecomment-631216779


   @damondouglas, please help to update the Stepik course.
   Afterwards, we can merge this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jaketf edited a comment on pull request #11339: [BEAM-9468] Fhir io

2020-05-19 Thread GitBox


jaketf edited a comment on pull request #11339:
URL: https://github.com/apache/beam/pull/11339#issuecomment-631203002


   The issue with FhirIOReadIT seems to be some misuse of 
[TestPubsubSignal](https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.html)
   [Example 
Job](https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing)
 clearly has elements added to the "waitForAnyMessage" task
   but the success signal never gets published to the results topic.
   
   Notably there are job level warnings about metric descriptors and [warnings 
in shuffle 
logs](https://pantheon.corp.google.com/logs/viewer?dateRangeEnd=2020-05-20T02:18:25.642Z&dateRangeStart=2020-05-20T02:05:21.347Z&expandAll=false&interval=CUSTOM&project=apache-beam-testing&resource=dataflow_step%2Fjob_id%2F2020-05-19_19_05_19-1057064265214622054×tamp=2020-05-20T02:57:46.18700Z&logName=projects%2Fapache-beam-testing%2Flogs%2Fdataflow.googleapis.com%252Fshuffler&minLogLevel=0&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2020-05-20T02:15:27.999165390Z)
 which warns:
   ```
   "Update range task returned 'invalid argument'. Assuming lost lease for work 
with id 5061980071068333770 (expiration time: 1589940982000, now: 
1589940923590, full status: INVALID_ARGUMENT: Http(400) Bad Request). For more 
information, see https://cloud.google.com/dataflow/docs/guides/common-errors."; 
   ```
   the 
[docs](https://cloud.google.com/dataflow/docs/guides/common-errors#bad-request-shuffler-logs)
 say this can be ignored but smells suspicious here.
   
   
   This is orthogonal to the behavior being tested. Investigating other means 
of performing this test.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on a change in pull request #11360: [BEAM-9722] added SnowflakeIO with Read operation

2020-05-19 Thread GitBox


chamikaramj commented on a change in pull request #11360:
URL: https://github.com/apache/beam/pull/11360#discussion_r427718773



##
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import static org.apache.beam.sdk.io.TextIO.readFiles;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+import java.io.IOException;
+import java.io.Serializable;
+import java.security.PrivateKey;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.sql.DataSource;
+import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import 
org.apache.beam.sdk.io.snowflake.credentials.KeyPairSnowflakeCredentials;
+import 
org.apache.beam.sdk.io.snowflake.credentials.OAuthTokenSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
+import 
org.apache.beam.sdk.io.snowflake.credentials.UsernamePasswordSnowflakeCredentials;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Wait;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IO to read and write data on Snowflake.
+ *
+ * SnowflakeIO uses https://docs.snowflake.net/manuals/user-guide/jdbc.html";>Snowflake
+ * JDBC driver under the hood, but data isn't read/written using JDBC 
directly. Instead,
+ * SnowflakeIO uses dedicated COPY operations to read/write data 
from/to a cloud bucket. By
+ * now only Google Cloud Storage is supported.
+ *
+ * To configure SnowflakeIO to read/write from your Snowflake instance, you 
have to provide a
+ * {@link DataSourceConfiguration} using {@link
+ * DataSourceConfiguration#create(SnowflakeCredentials)}, where {@link 
SnowflakeCredentials might be
+ * created using {@link 
org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory}}.
+ * Additionally one of {@link DataSourceConfiguration#withServerName(String)} 
or {@link
+ * DataSourceConfiguration#withUrl(String)} must be used to tell SnowflakeIO 
which instance to use.
+ * 
+ * There are also other options available to configure connection to Snowflake:
+ *
+ * 
+ *   {@link DataSourceConfiguration#withWarehouse(String)} to specify 
which Warehouse to use
+ *   {@link DataSourceConfiguration#withDatabase(String)} to specify which 
Database to connect
+ *   to
+ *   {@link DataSourceConfiguration#withSchema(String)} to specify which 
schema to use
+ *   {@link DataSourceConfiguration#withRole(String)} to specify which 
role to use
+ *   {@link DataSourceConfiguration#withLoginTimeout(Integer)} to specify 
the timeout for the
+ *   login
+ *   {@link DataSourceConfiguration#withPortNumber(Integer)} to specify 
custom port of Snowflake
+ *   instance
+ * 
+ *
+ * For example:
+ *
+ * {@code
+ * SnowflakeIO.DataSourceConfiguration dataSourceConfiguration =
+ * 
SnowflakeIO.DataSourceConfiguration.create(SnowflakeCredentialsFactory.of(options))
+ * .withServerName(options.getServerName())
+ * .withWarehouse(options.getWarehouse())
+ * .withDatabase(options.getDatabase())
+ * .withSchema(options.getSchema());
+ * }
+ *
+ *

[GitHub] [beam] jaketf edited a comment on pull request #11339: [BEAM-9468] Fhir io

2020-05-19 Thread GitBox


jaketf edited a comment on pull request #11339:
URL: https://github.com/apache/beam/pull/11339#issuecomment-631203002


   The issue with FhirIOReadIT seems to be some misuse of 
[TestPubsubSignal](https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.html)
   [Example 
Job](https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing)
 clearly has elements added to the "waitForAnyMessage" task
   but the success signal never gets published to the results topic.
   
   Notably there are [warnings in shuffle 
logs](https://pantheon.corp.google.com/logs/viewer?dateRangeEnd=2020-05-20T02:18:25.642Z&dateRangeStart=2020-05-20T02:05:21.347Z&expandAll=false&interval=CUSTOM&project=apache-beam-testing&resource=dataflow_step%2Fjob_id%2F2020-05-19_19_05_19-1057064265214622054×tamp=2020-05-20T02:57:46.18700Z&logName=projects%2Fapache-beam-testing%2Flogs%2Fdataflow.googleapis.com%252Fshuffler&minLogLevel=0&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2020-05-20T02:15:27.999165390Z)
 which warns:
   ```
   "Update range task returned 'invalid argument'. Assuming lost lease for work 
with id 5061980071068333770 (expiration time: 1589940982000, now: 
1589940923590, full status: INVALID_ARGUMENT: Http(400) Bad Request). For more 
information, see https://cloud.google.com/dataflow/docs/guides/common-errors."; 
   ```
   the 
[docs](https://cloud.google.com/dataflow/docs/guides/common-errors#bad-request-shuffler-logs)
 say this can be ignored but smells suspicious here.
   
   
   This is orthogonal to the behavior being tested. Investigating other means 
of performing this test.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #11715: [BEAM-9977] Implement GrowableOffsetRangeTracker

2020-05-19 Thread GitBox


boyuanzz commented on pull request #11715:
URL: https://github.com/apache/beam/pull/11715#issuecomment-631204566


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jaketf edited a comment on pull request #11339: [BEAM-9468] Fhir io

2020-05-19 Thread GitBox


jaketf edited a comment on pull request #11339:
URL: https://github.com/apache/beam/pull/11339#issuecomment-631203002


   The issue with FhirIOReadIT seems to be some misuse of 
[TestPubsubSignal](https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.html)
   [Example 
Job](https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing)
 clearly has elements added to the "waitForAnyMessage" task
   but the success signal never gets published to the results topic.
   
   This is orthogonal to the behavior being tested. Investigating other means 
of performing this test.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jaketf commented on pull request #11339: [BEAM-9468] Fhir io

2020-05-19 Thread GitBox


jaketf commented on pull request #11339:
URL: https://github.com/apache/beam/pull/11339#issuecomment-631203002


   The issue with FhirIOReadIT seems to be some misuse of 
[TestPubsubSignal](https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.html)
   [Example 
Job](https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing)
 clearly has elements added to the "waitForAnyMessage" task
   but the success signal never gets published to the results topic.
   
   This is tangential to the behavior being tested. Investigating other means 
of performing this test.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] henryken commented on pull request #11736: Katas - Convert task description from HTML to Markdown

2020-05-19 Thread GitBox


henryken commented on pull request #11736:
URL: https://github.com/apache/beam/pull/11736#issuecomment-631201222


   Thanks @pabloem! 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

2020-05-19 Thread GitBox


pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631189470


   Run Java PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

2020-05-19 Thread GitBox


pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631189422


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] omarismail94 commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build

2020-05-19 Thread GitBox


omarismail94 commented on a change in pull request #11754:
URL: https://github.com/apache/beam/pull/11754#discussion_r427686703



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
##
@@ -66,38 +68,47 @@ public static void main(String[] args) {
 inputTable.apply(SqlTransform.query("select c1, c2, c3 from 
PCOLLECTION where c1 > 1"));
 
 // print the output record of case 1;
-outputStream.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  PCOLLECTION: [3, row, 3.0]
-//  PCOLLECTION: [2, row, 2.0]
-System.out.println("PCOLLECTION: " + input.getValues());
-return input;
-  }
-}));
+outputStream
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  PCOLLECTION: [3, row, 3.0]
+//  PCOLLECTION: [2, row, 2.0]
+System.out.println("PCOLLECTION: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(RowCoder.of(type));
 
 // Case 2. run the query with SqlTransform.query over result PCollection 
of case 1.
 PCollection outputStream2 =
 PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream)
 .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT 
group by c2"));
 
 // print the output record of case 2;
-outputStream2.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  CASE1_RESULT: [row, 5.0]
-System.out.println("CASE1_RESULT: " + input.getValues());
-return input;
-  }
-}));
+outputStream2
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  CASE1_RESULT: [row, 5.0]
+System.out.println("CASE1_RESULT: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(
+RowCoder.of(
+Schema.builder()
+.addStringField("stringField")
+.addDoubleField("doubleField")
+.build()));

Review comment:
   Actually, it is not due to the reduction in the number of field, but the 
order in which the fields are selected in the SELECT statement. Here is the 
order it expects
   
   * Int, String, Double
   
   and the fields that represent those types are: c1, c2, c3
   
   If your results print out of order, it fails due to the 
`ClassCastException`. I tried doing this query and it failed:
   `select  c2, sum(c1), sum(c3) from CASE1_RESULT group by c2`,
   
   but if I do 
   `select  sum(c1),c2, sum(c3) from CASE1_RESULT group by c2` 
   
   it works! You can see that in the one that failed, c1 and c2s positions have 
switched, so the encoder trips out. What's cool is that you can see the results 
correctly calculated in:
   ` System.out.println("CASE1_RESULT: " + input.getValues());`
   
   but it seems that when the result is encoded, the program throws an error 
due to the results being out of order. I guess this is because it sees 
`.setRowSchema(type);`, and as the order of the schema is "Int, String, 
Double", the results have to abide by that rule. That why it fails when we did:
   
   `c2, sum(c3) from CASE1_RESULT group by c2`
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] omarismail94 commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build

2020-05-19 Thread GitBox


omarismail94 commented on a change in pull request #11754:
URL: https://github.com/apache/beam/pull/11754#discussion_r427686703



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
##
@@ -66,38 +68,47 @@ public static void main(String[] args) {
 inputTable.apply(SqlTransform.query("select c1, c2, c3 from 
PCOLLECTION where c1 > 1"));
 
 // print the output record of case 1;
-outputStream.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  PCOLLECTION: [3, row, 3.0]
-//  PCOLLECTION: [2, row, 2.0]
-System.out.println("PCOLLECTION: " + input.getValues());
-return input;
-  }
-}));
+outputStream
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  PCOLLECTION: [3, row, 3.0]
+//  PCOLLECTION: [2, row, 2.0]
+System.out.println("PCOLLECTION: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(RowCoder.of(type));
 
 // Case 2. run the query with SqlTransform.query over result PCollection 
of case 1.
 PCollection outputStream2 =
 PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream)
 .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT 
group by c2"));
 
 // print the output record of case 2;
-outputStream2.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  CASE1_RESULT: [row, 5.0]
-System.out.println("CASE1_RESULT: " + input.getValues());
-return input;
-  }
-}));
+outputStream2
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  CASE1_RESULT: [row, 5.0]
+System.out.println("CASE1_RESULT: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(
+RowCoder.of(
+Schema.builder()
+.addStringField("stringField")
+.addDoubleField("doubleField")
+.build()));

Review comment:
   Actually, it is not due to the reduction in the number of field, but the 
order in which the fields are printed. Here is the order it expects
   
   * Int, String, Double
   
   and the fields that represent those types are: c1, c2, c3
   
   If your results print out of order, it fails due to the 
`ClassCastException`. I tried doing this query and it failed:
   `select  c2, sum(c1), sum(c3) from CASE1_RESULT group by c2`,
   
   but if I do 
   `select  sum(c1),c2, sum(c3) from CASE1_RESULT group by c2` 
   
   it works! You can see that in the one that failed, c1 and c2s positions have 
switched, so the encoder trips out. What's cool is that you can see the results 
correctly calculated in:
   ` System.out.println("CASE1_RESULT: " + input.getValues());`
   
   but it seems that when the result is encoded, the program throws an error 
due to the results being out of order. I guess this is because it sees 
`.setRowSchema(type);`, and as the order of the schema is "Int, String, 
Double", the results have to abide by that rule. That why it fails when we did:
   
   `c2, sum(c3) from CASE1_RESULT group by c2`
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

2020-05-19 Thread GitBox


jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427686974



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##
@@ -427,29 +454,75 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
  * @param filter the filter
  */
 ListHL7v2Messages(ValueProvider> hl7v2Stores, 
ValueProvider filter) {
-  this.hl7v2Stores = hl7v2Stores.get();
-  this.filter = filter.get();
+  this.hl7v2Stores = hl7v2Stores;
+  this.filter = filter;
+}
+
+/**
+ * Instantiates a new List hl 7 v 2 messages.
+ *
+ * @param hl7v2Stores the hl 7 v 2 stores
+ * @param filter the filter
+ * @param initialSplitDuration the initial split duration for sendTime 
dimension splits
+ */
+ListHL7v2Messages(
+ValueProvider> hl7v2Stores,
+ValueProvider filter,
+Duration initialSplitDuration) {
+  this.hl7v2Stores = hl7v2Stores;
+  this.filter = filter;
+  this.initialSplitDuration = initialSplitDuration;
 }
 
+/**
+ * Instantiates a new List hl7v2 messages.
+ *
+ * @param hl7v2Stores the hl7v2 stores
+ */
 ListHL7v2Messages(ValueProvider> hl7v2Stores) {
-  this.hl7v2Stores = hl7v2Stores.get();
+  this.hl7v2Stores = hl7v2Stores;
   this.filter = null;
 }
 
+/**
+ * Instantiates a new List hl7v2 messages.
+ *
+ * @param hl7v2Stores the hl7v2 stores
+ * @param initialSplitDuration the initial split duration
+ */
+ListHL7v2Messages(ValueProvider> hl7v2Stores, Duration 
initialSplitDuration) {
+  this.hl7v2Stores = hl7v2Stores;
+  this.initialSplitDuration = initialSplitDuration;
+}
+
 @Override
 public PCollection expand(PBegin input) {
   return input
-  .apply(Create.of(this.hl7v2Stores))
-  .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter)))
+  .apply(Create.ofProvider(this.hl7v2Stores, 
ListCoder.of(StringUtf8Coder.of(
+  .apply(FlatMapElements.into(TypeDescriptors.strings()).via((x) -> x))
+  .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter, 
initialSplitDuration)))
   .setCoder(new HL7v2MessageCoder())
   // Break fusion to encourage parallelization of downstream 
processing.
   .apply(Reshuffle.viaRandomKey());
 }
   }
 
+  /**
+   * Implemented as Splitable DoFn that claims millisecond resolutions of 
offset restrictions in the
+   * Message.sendTime dimension.
+   */
+  @BoundedPerElement
   static class ListHL7v2MessagesFn extends DoFn {
 
-private final String filter;
+private static final Logger LOG = 
LoggerFactory.getLogger(ListHL7v2MessagesFn.class);
+private ValueProvider filter;
+// These control the initial restriction split which means that the list 
of integer pairs
+// must comfortably fit in memory.
+private static final Duration DEFAULT_DESIRED_SPLIT_DURATION = 
Duration.standardDays(1);
+private static final Duration DEFAULT_MIN_SPLIT_DURATION = 
Duration.standardHours(1);
+private Duration initialSplitDuration;
+private Instant from;
+private Instant to;

Review comment:
   I don't think so they don't get set until we make the earliest / lastest 
sendTime query in @GetInitialRestriction





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] omarismail94 commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build

2020-05-19 Thread GitBox


omarismail94 commented on a change in pull request #11754:
URL: https://github.com/apache/beam/pull/11754#discussion_r427686703



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
##
@@ -66,38 +68,47 @@ public static void main(String[] args) {
 inputTable.apply(SqlTransform.query("select c1, c2, c3 from 
PCOLLECTION where c1 > 1"));
 
 // print the output record of case 1;
-outputStream.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  PCOLLECTION: [3, row, 3.0]
-//  PCOLLECTION: [2, row, 2.0]
-System.out.println("PCOLLECTION: " + input.getValues());
-return input;
-  }
-}));
+outputStream
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  PCOLLECTION: [3, row, 3.0]
+//  PCOLLECTION: [2, row, 2.0]
+System.out.println("PCOLLECTION: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(RowCoder.of(type));
 
 // Case 2. run the query with SqlTransform.query over result PCollection 
of case 1.
 PCollection outputStream2 =
 PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream)
 .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT 
group by c2"));
 
 // print the output record of case 2;
-outputStream2.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  CASE1_RESULT: [row, 5.0]
-System.out.println("CASE1_RESULT: " + input.getValues());
-return input;
-  }
-}));
+outputStream2
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  CASE1_RESULT: [row, 5.0]
+System.out.println("CASE1_RESULT: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(
+RowCoder.of(
+Schema.builder()
+.addStringField("stringField")
+.addDoubleField("doubleField")
+.build()));

Review comment:
   Actually, it is not due to the reduction in the number of field, but the 
order in which the fields are printed. Here is the order it expects
   
   * Int, String, Double
   
   and the fields that represent those types are: c1, c2, c3
   
   If your results print out of order, it fails due to the 
`ClassCastException`. I tried doing this query and it failed:
   `select  c2, sum(c1), sum(c3) from CASE1_RESULT group by c2`,
   
   but if I do 
   `select  sum(c1),c2, sum(c3) from CASE1_RESULT group by c2` 
   
   it works! You can see that in the one that failed, c1 and c2s positions have 
switched, so the encoder trips out. What's cool is that you can see the results 
correctly calculated in:
   ` System.out.println("CASE1_RESULT: " + input.getValues());`
   
   but it seems that when the result is encoded, the program throws an error 
due to the results being out of order





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] santhh commented on a change in pull request #11566: [BEAM-9723] Add DLP integration transforms

2020-05-19 Thread GitBox


santhh commented on a change in pull request #11566:
URL: https://github.com/apache/beam/pull/11566#discussion_r427684818



##
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and deidentifying text 
according to provided
+ * settings. The transform supports both CSV formatted input data and 
unstructured input.
+ *
+ * If the csvHeader property is set, csvDelimiter also should be, else the 
results will be
+ * incorrect. If csvHeader is not set, input is assumed to be unstructured.
+ *
+ * Either inspectTemplateName (String) or inspectConfig {@link 
InspectConfig} need to be set. The
+ * situation is the same with deidentifyTemplateName and deidentifyConfig 
({@link DeidentifyConfig}.
+ *
+ * Batch size defines how big are batches sent to DLP at once in bytes.
+ *
+ * The transform outputs {@link KV} of {@link String} (eg. filename) and 
{@link
+ * DeidentifyContentResponse}, which will contain {@link Table} of results for 
the user to consume.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPDeidentifyText
+extends PTransform<
+PCollection>, PCollection>> {
+
+  @Nullable
+  public abstract String inspectTemplateName();
+
+  @Nullable
+  public abstract String deidentifyTemplateName();
+
+  @Nullable
+  public abstract InspectConfig inspectConfig();
+
+  @Nullable
+  public abstract DeidentifyConfig deidentifyConfig();
+
+  @Nullable
+  public abstract PCollectionView> csvHeader();
+
+  @Nullable
+  public abstract String csvDelimiter();
+
+  public abstract Integer batchSize();
+
+  public abstract String projectId();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+public abstract Builder setCsvHeader(PCollectionView> 
csvHeader);
+
+public abstract Builder setCsvDelimiter(String delimiter);
+
+public abstract Builder setBatchSize(Integer batchSize);
+
+public abstract Builder setProjectId(String projectId);
+
+public abstract Builder setDeidentifyTemplateName(String 
deidentifyTemplateName);
+
+public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+public abstract Builder setDeidentifyConfig(DeidentifyConfig 
deidentifyConfig);
+
+public abstract DLPDeidentifyText build();
+  }
+
+  public static DLPDeidentifyText.Builder newBuilder() {
+return new AutoValue_DLPDeidentifyText.Builder();
+  }
+
+  /**
+   * The transform batches the contents of input PCollection and then calls 
Cloud DLP service to
+   * perform the deidentification.
+   *
+   * @param input input PCollection
+   * @return PCollection after transformations
+   */
+  @Override
+  public PCollection> expand(
+  PCollection> input) {
+return input
+.apply(ParDo.of(new MapStringToDlpRow(csvDelimiter(
+.apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize(
+.apply(
+"DLPDeidentify",
+ParDo.of(
+new DeidentifyTex

[GitHub] [beam] santhh commented on a change in pull request #11566: [BEAM-9723] Add DLP integration transforms

2020-05-19 Thread GitBox


santhh commented on a change in pull request #11566:
URL: https://github.com/apache/beam/pull/11566#discussion_r427683846



##
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and deidentifying text 
according to provided
+ * settings. The transform supports both CSV formatted input data and 
unstructured input.
+ *
+ * If the csvHeader property is set, csvDelimiter also should be, else the 
results will be
+ * incorrect. If csvHeader is not set, input is assumed to be unstructured.
+ *
+ * Either inspectTemplateName (String) or inspectConfig {@link 
InspectConfig} need to be set. The
+ * situation is the same with deidentifyTemplateName and deidentifyConfig 
({@link DeidentifyConfig}.
+ *
+ * Batch size defines how big are batches sent to DLP at once in bytes.
+ *
+ * The transform outputs {@link KV} of {@link String} (eg. filename) and 
{@link
+ * DeidentifyContentResponse}, which will contain {@link Table} of results for 
the user to consume.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPDeidentifyText
+extends PTransform<
+PCollection>, PCollection>> {
+
+  @Nullable
+  public abstract String inspectTemplateName();
+
+  @Nullable
+  public abstract String deidentifyTemplateName();
+
+  @Nullable
+  public abstract InspectConfig inspectConfig();
+
+  @Nullable
+  public abstract DeidentifyConfig deidentifyConfig();
+
+  @Nullable
+  public abstract PCollectionView> csvHeader();
+
+  @Nullable
+  public abstract String csvDelimiter();
+
+  public abstract Integer batchSize();
+
+  public abstract String projectId();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+public abstract Builder setCsvHeader(PCollectionView> 
csvHeader);
+
+public abstract Builder setCsvDelimiter(String delimiter);
+
+public abstract Builder setBatchSize(Integer batchSize);
+
+public abstract Builder setProjectId(String projectId);
+
+public abstract Builder setDeidentifyTemplateName(String 
deidentifyTemplateName);
+
+public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+public abstract Builder setDeidentifyConfig(DeidentifyConfig 
deidentifyConfig);
+
+public abstract DLPDeidentifyText build();
+  }
+

Review comment:
   For de-id - it's also same as re-id. de-id template in required but 
inspect is optional.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] santhh commented on a change in pull request #11566: [BEAM-9723] Add DLP integration transforms

2020-05-19 Thread GitBox


santhh commented on a change in pull request #11566:
URL: https://github.com/apache/beam/pull/11566#discussion_r427683019



##
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.privacy.dlp.v2.Table;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DoFn batching the input PCollection into bigger requests in order to better 
utilize the Cloud DLP
+ * service.
+ */
+@Experimental
+class BatchRequestForDLP extends DoFn, KV>> {
+  public static final Logger LOG = 
LoggerFactory.getLogger(BatchRequestForDLP.class);
+
+  private final Counter numberOfRowsBagged =
+  Metrics.counter(BatchRequestForDLP.class, "numberOfRowsBagged");
+  private final Integer batchSize;
+
+  @StateId("elementsBag")
+  private final StateSpec>> elementsBag = 
StateSpecs.bag();
+
+  @TimerId("eventTimer")
+  private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+  public BatchRequestForDLP(Integer batchSize) {

Review comment:
   +1





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] y1chi opened a new pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


y1chi opened a new pull request #11756:
URL: https://github.com/apache/beam/pull/11756


   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
St

[GitHub] [beam] y1chi commented on pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


y1chi commented on pull request #11756:
URL: https://github.com/apache/beam/pull/11756#issuecomment-631169795


   R: @boyuanzz 
   This is ready for review.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] santhh commented on a change in pull request #11566: [BEAM-9723] Add DLP integration transforms

2020-05-19 Thread GitBox


santhh commented on a change in pull request #11566:
URL: https://github.com/apache/beam/pull/11566#discussion_r427681778



##
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java
##
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.ReidentifyContentRequest;
+import com.google.privacy.dlp.v2.ReidentifyContentResponse;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and inspecting text for 
identifying data according
+ * to provided settings.
+ *
+ * Either inspectTemplateName (String) or inspectConfig {@link 
InspectConfig} need to be set, the
+ * same goes for reidentifyTemplateName or reidentifyConfig.
+ *
+ * Batch size defines how big are batches sent to DLP at once in bytes.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPReidentifyText
+extends PTransform<
+PCollection>, PCollection>> {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(DLPInspectText.class);
+
+  public static final Integer DLP_PAYLOAD_LIMIT = 52400;
+
+  @Nullable
+  public abstract String inspectTemplateName();
+
+  @Nullable
+  public abstract String reidentifyTemplateName();
+
+  @Nullable
+  public abstract InspectConfig inspectConfig();
+
+  @Nullable
+  public abstract DeidentifyConfig reidentifyConfig();
+
+  @Nullable
+  public abstract String csvDelimiter();
+
+  @Nullable
+  public abstract PCollectionView> csvHeaders();
+
+  public abstract Integer batchSize();
+
+  public abstract String projectId();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+public abstract Builder setReidentifyConfig(DeidentifyConfig 
deidentifyConfig);
+
+public abstract Builder setReidentifyTemplateName(String 
deidentifyTemplateName);
+
+public abstract Builder setBatchSize(Integer batchSize);
+
+public abstract Builder setCsvHeaders(PCollectionView> 
csvHeaders);
+
+public abstract Builder setCsvDelimiter(String delimiter);
+
+public abstract Builder setProjectId(String projectId);
+
+public abstract DLPReidentifyText build();
+  }
+
+  public static DLPReidentifyText.Builder newBuilder() {
+return new AutoValue_DLPReidentifyText.Builder();
+  }
+
+  @Override
+  public PCollection> expand(
+  PCollection> input) {
+return input
+.apply(ParDo.of(new MapStringToDlpRow(csvDelimiter(
+.apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize(
+.apply(
+"DLPDeidentify",
+ParDo.of(
+new ReidentifyText(
+projectId(),
+inspectTemplateName(),
+reidentifyTemplateName(),
+inspectConfig(),
+reidentifyConfig(),
+csvHeaders(;
+  }
+
+  public static class ReidentifyText
+  extends DoFn>, KV> {
+private final String projectId;
+private final String inspectTemplateName;
+private final String reidentifyTemplateName;
+private final InspectConfig inspectCon

[GitHub] [beam] TheNeuralBit commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build

2020-05-19 Thread GitBox


TheNeuralBit commented on a change in pull request #11754:
URL: https://github.com/apache/beam/pull/11754#discussion_r427681580



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
##
@@ -66,38 +68,47 @@ public static void main(String[] args) {
 inputTable.apply(SqlTransform.query("select c1, c2, c3 from 
PCOLLECTION where c1 > 1"));
 
 // print the output record of case 1;
-outputStream.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  PCOLLECTION: [3, row, 3.0]
-//  PCOLLECTION: [2, row, 2.0]
-System.out.println("PCOLLECTION: " + input.getValues());
-return input;
-  }
-}));
+outputStream
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  PCOLLECTION: [3, row, 3.0]
+//  PCOLLECTION: [2, row, 2.0]
+System.out.println("PCOLLECTION: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(RowCoder.of(type));
 
 // Case 2. run the query with SqlTransform.query over result PCollection 
of case 1.
 PCollection outputStream2 =
 PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream)
 .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT 
group by c2"));
 
 // print the output record of case 2;
-outputStream2.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  CASE1_RESULT: [row, 5.0]
-System.out.println("CASE1_RESULT: " + input.getValues());
-return input;
-  }
-}));
+outputStream2
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  CASE1_RESULT: [row, 5.0]
+System.out.println("CASE1_RESULT: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(
+RowCoder.of(
+Schema.builder()
+.addStringField("stringField")
+.addDoubleField("doubleField")
+.build()));

Review comment:
   oh for this call you will need to use
   ```
   Schema.builder()
   .addStringField("stringField")
   .addDoubleField("doubleField")
   .build()
   ```
   like you had in the setCoder call





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] santhh commented on a change in pull request #11566: [BEAM-9723] Add DLP integration transforms

2020-05-19 Thread GitBox


santhh commented on a change in pull request #11566:
URL: https://github.com/apache/beam/pull/11566#discussion_r427681156



##
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java
##
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.ReidentifyContentRequest;
+import com.google.privacy.dlp.v2.ReidentifyContentResponse;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and inspecting text for 
identifying data according
+ * to provided settings.
+ *
+ * Either inspectTemplateName (String) or inspectConfig {@link 
InspectConfig} need to be set, the
+ * same goes for reidentifyTemplateName or reidentifyConfig.
+ *
+ * Batch size defines how big are batches sent to DLP at once in bytes.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPReidentifyText
+extends PTransform<
+PCollection>, PCollection>> {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(DLPInspectText.class);
+
+  public static final Integer DLP_PAYLOAD_LIMIT = 52400;
+
+  @Nullable
+  public abstract String inspectTemplateName();
+
+  @Nullable
+  public abstract String reidentifyTemplateName();
+
+  @Nullable
+  public abstract InspectConfig inspectConfig();
+
+  @Nullable
+  public abstract DeidentifyConfig reidentifyConfig();
+
+  @Nullable
+  public abstract String csvDelimiter();
+
+  @Nullable
+  public abstract PCollectionView> csvHeaders();
+
+  public abstract Integer batchSize();
+
+  public abstract String projectId();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+public abstract Builder setReidentifyConfig(DeidentifyConfig 
deidentifyConfig);
+
+public abstract Builder setReidentifyTemplateName(String 
deidentifyTemplateName);
+
+public abstract Builder setBatchSize(Integer batchSize);
+
+public abstract Builder setCsvHeaders(PCollectionView> 
csvHeaders);
+
+public abstract Builder setCsvDelimiter(String delimiter);
+
+public abstract Builder setProjectId(String projectId);
+
+public abstract DLPReidentifyText build();
+  }
+
+  public static DLPReidentifyText.Builder newBuilder() {
+return new AutoValue_DLPReidentifyText.Builder();
+  }
+
+  @Override
+  public PCollection> expand(
+  PCollection> input) {
+return input
+.apply(ParDo.of(new MapStringToDlpRow(csvDelimiter(
+.apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize(
+.apply(
+"DLPDeidentify",

Review comment:
   should this be re-identify?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #11755: [BEAM-10038] Add script to mass-comment Jenkins triggers on PR

2020-05-19 Thread GitBox


TheNeuralBit commented on a change in pull request #11755:
URL: https://github.com/apache/beam/pull/11755#discussion_r427681057



##
File path: release/src/main/scripts/mass_comment.py
##
@@ -0,0 +1,141 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Script for mass-commenting Jenkins test triggers on a Beam PR."""
+
+import itertools
+import os
+import socket
+import sys
+import time
+import traceback
+import re
+import requests
+from datetime import datetime
+
+
+COMMENTS_TO_ADD=[
+  "Run Go PostCommit",
+  "Run Java PostCommit",
+  "Run Java PortabilityApi PostCommit",
+  "Run Java Flink PortableValidatesRunner Batch",
+  "Run Java Flink PortableValidatesRunner Streaming",
+  "Run Apex ValidatesRunner",
+  "Run Dataflow ValidatesRunner",
+  "Run Flink ValidatesRunner",
+  "Run Gearpump ValidatesRunner",
+  "Run Dataflow PortabilityApi ValidatesRunner",
+  "Run Samza ValidatesRunner",
+  "Run Spark ValidatesRunner",
+  "Run Python Dataflow ValidatesContainer",
+  "Run Python Dataflow ValidatesRunner",
+  "Run Python 3.5 Flink ValidatesRunner",
+  "Run Python 2 PostCommit",
+  "Run Python 3.5 PostCommit",
+  "Run SQL PostCommit",
+  "Run Go PreCommit",
+  "Run Java PreCommit",
+  "Run Java_Examples_Dataflow PreCommit",
+  "Run JavaPortabilityApi PreCommit",
+  "Run Portable_Python PreCommit",
+  "Run PythonLint PreCommit",
+  "Run Python PreCommit",
+  "Run Python DockerBuild PreCommit"
+]

Review comment:
   Should we also remove the duplicate list from 
[verify_release_build.sh](https://github.com/apache/beam/blob/master/release/src/main/scripts/verify_release_build.sh#L43)
 as part of this PR?
   
   I'm not really clear on where this list comes from. Is the goal to launch 
every single jenkins job?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] omarismail94 commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build

2020-05-19 Thread GitBox


omarismail94 commented on a change in pull request #11754:
URL: https://github.com/apache/beam/pull/11754#discussion_r427678727



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
##
@@ -66,38 +68,47 @@ public static void main(String[] args) {
 inputTable.apply(SqlTransform.query("select c1, c2, c3 from 
PCOLLECTION where c1 > 1"));
 
 // print the output record of case 1;
-outputStream.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  PCOLLECTION: [3, row, 3.0]
-//  PCOLLECTION: [2, row, 2.0]
-System.out.println("PCOLLECTION: " + input.getValues());
-return input;
-  }
-}));
+outputStream
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  PCOLLECTION: [3, row, 3.0]
+//  PCOLLECTION: [2, row, 2.0]
+System.out.println("PCOLLECTION: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(RowCoder.of(type));
 
 // Case 2. run the query with SqlTransform.query over result PCollection 
of case 1.
 PCollection outputStream2 =
 PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream)
 .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT 
group by c2"));
 
 // print the output record of case 2;
-outputStream2.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  CASE1_RESULT: [row, 5.0]
-System.out.println("CASE1_RESULT: " + input.getValues());
-return input;
-  }
-}));
+outputStream2
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  CASE1_RESULT: [row, 5.0]
+System.out.println("CASE1_RESULT: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(
+RowCoder.of(
+Schema.builder()
+.addStringField("stringField")
+.addDoubleField("doubleField")
+.build()));

Review comment:
   This is part of the Stack trace that makes me think that
   ```
   Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
java.lang.Integer
   at org.apache.beam.sdk.coders.VarIntCoder.encode(VarIntCoder.java:33)
   at 
org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:270)
   at 
org.apache.beam.sdk.coders.Coder$ByteBuddy$E99UrF3W.encode(Unknown Source)
   at 
org.apache.beam.sdk.coders.Coder$ByteBuddy$E99UrF3W.encode(Unknown Source)
   at 
org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:115)
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] y1chi closed pull request #11753: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


y1chi closed pull request #11753:
URL: https://github.com/apache/beam/pull/11753


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] omarismail94 commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build

2020-05-19 Thread GitBox


omarismail94 commented on a change in pull request #11754:
URL: https://github.com/apache/beam/pull/11754#discussion_r427678033



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
##
@@ -66,38 +68,47 @@ public static void main(String[] args) {
 inputTable.apply(SqlTransform.query("select c1, c2, c3 from 
PCOLLECTION where c1 > 1"));
 
 // print the output record of case 1;
-outputStream.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  PCOLLECTION: [3, row, 3.0]
-//  PCOLLECTION: [2, row, 2.0]
-System.out.println("PCOLLECTION: " + input.getValues());
-return input;
-  }
-}));
+outputStream
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  PCOLLECTION: [3, row, 3.0]
+//  PCOLLECTION: [2, row, 2.0]
+System.out.println("PCOLLECTION: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(RowCoder.of(type));
 
 // Case 2. run the query with SqlTransform.query over result PCollection 
of case 1.
 PCollection outputStream2 =
 PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream)
 .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT 
group by c2"));
 
 // print the output record of case 2;
-outputStream2.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  CASE1_RESULT: [row, 5.0]
-System.out.println("CASE1_RESULT: " + input.getValues());
-return input;
-  }
-}));
+outputStream2
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  CASE1_RESULT: [row, 5.0]
+System.out.println("CASE1_RESULT: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(
+RowCoder.of(
+Schema.builder()
+.addStringField("stringField")
+.addDoubleField("doubleField")
+.build()));

Review comment:
   I tried `setRowSchema(type)` and it failed with `: 
java.lang.ClassCastException: java.lang.String cannot be cast to 
java.lang.Integer`.
   
   I think it is inferring the schema as 3 fields, but the result only returns 
two fields, and that's why it throws the error





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] omarismail94 commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build

2020-05-19 Thread GitBox


omarismail94 commented on a change in pull request #11754:
URL: https://github.com/apache/beam/pull/11754#discussion_r427678033



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
##
@@ -66,38 +68,47 @@ public static void main(String[] args) {
 inputTable.apply(SqlTransform.query("select c1, c2, c3 from 
PCOLLECTION where c1 > 1"));
 
 // print the output record of case 1;
-outputStream.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  PCOLLECTION: [3, row, 3.0]
-//  PCOLLECTION: [2, row, 2.0]
-System.out.println("PCOLLECTION: " + input.getValues());
-return input;
-  }
-}));
+outputStream
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  PCOLLECTION: [3, row, 3.0]
+//  PCOLLECTION: [2, row, 2.0]
+System.out.println("PCOLLECTION: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(RowCoder.of(type));
 
 // Case 2. run the query with SqlTransform.query over result PCollection 
of case 1.
 PCollection outputStream2 =
 PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream)
 .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT 
group by c2"));
 
 // print the output record of case 2;
-outputStream2.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  CASE1_RESULT: [row, 5.0]
-System.out.println("CASE1_RESULT: " + input.getValues());
-return input;
-  }
-}));
+outputStream2
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  CASE1_RESULT: [row, 5.0]
+System.out.println("CASE1_RESULT: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(
+RowCoder.of(
+Schema.builder()
+.addStringField("stringField")
+.addDoubleField("doubleField")
+.build()));

Review comment:
   I tried `setRowSchema(type)` and it failed with `: 
java.lang.ClassCastException: java.lang.String cannot be cast to 
java.lang.Integer`.
   
   I think it is inferring the schema as 3 fields, but the result only returns 
fields, and that's why it throws the error





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] omarismail94 commented on pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build

2020-05-19 Thread GitBox


omarismail94 commented on pull request #11754:
URL: https://github.com/apache/beam/pull/11754#issuecomment-631165775


   > Thank you @omarismail94!
   > 
   > We should probably be running this continuously to make sure we don't 
break it again. Would you mind adding the gradle task for this to the SQL 
preCommit [here](https://github.com/apache/beam/blob/master/build.gradle#L154)? 
That way it will run before we merge any PR that affects SQL.
   
   I will add both this and runPojoExample to SQL preCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] omarismail94 commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build

2020-05-19 Thread GitBox


omarismail94 commented on a change in pull request #11754:
URL: https://github.com/apache/beam/pull/11754#discussion_r427678033



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
##
@@ -66,38 +68,47 @@ public static void main(String[] args) {
 inputTable.apply(SqlTransform.query("select c1, c2, c3 from 
PCOLLECTION where c1 > 1"));
 
 // print the output record of case 1;
-outputStream.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  PCOLLECTION: [3, row, 3.0]
-//  PCOLLECTION: [2, row, 2.0]
-System.out.println("PCOLLECTION: " + input.getValues());
-return input;
-  }
-}));
+outputStream
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  PCOLLECTION: [3, row, 3.0]
+//  PCOLLECTION: [2, row, 2.0]
+System.out.println("PCOLLECTION: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(RowCoder.of(type));
 
 // Case 2. run the query with SqlTransform.query over result PCollection 
of case 1.
 PCollection outputStream2 =
 PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream)
 .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT 
group by c2"));
 
 // print the output record of case 2;
-outputStream2.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  CASE1_RESULT: [row, 5.0]
-System.out.println("CASE1_RESULT: " + input.getValues());
-return input;
-  }
-}));
+outputStream2
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  CASE1_RESULT: [row, 5.0]
+System.out.println("CASE1_RESULT: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(
+RowCoder.of(
+Schema.builder()
+.addStringField("stringField")
+.addDoubleField("doubleField")
+.build()));

Review comment:
   I tried `setRowSchema(type) and it failed with `: 
java.lang.ClassCastException: java.lang.String cannot be cast to 
java.lang.Integer`.
   
   I think it is inferring the schema as 3 fields, but the result only returns 
fields, and that's why it throws the error





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] omarismail94 commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build

2020-05-19 Thread GitBox


omarismail94 commented on a change in pull request #11754:
URL: https://github.com/apache/beam/pull/11754#discussion_r427677681



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
##
@@ -66,38 +68,47 @@ public static void main(String[] args) {
 inputTable.apply(SqlTransform.query("select c1, c2, c3 from 
PCOLLECTION where c1 > 1"));
 
 // print the output record of case 1;
-outputStream.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  PCOLLECTION: [3, row, 3.0]
-//  PCOLLECTION: [2, row, 2.0]
-System.out.println("PCOLLECTION: " + input.getValues());
-return input;
-  }
-}));
+outputStream
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  PCOLLECTION: [3, row, 3.0]
+//  PCOLLECTION: [2, row, 2.0]
+System.out.println("PCOLLECTION: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(RowCoder.of(type));

Review comment:
   I can do that. I did `setRowSchema(type)` and it worked!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib opened a new pull request #11755: [BEAM-10038] Add script to mass-comment Jenkins triggers on PR

2020-05-19 Thread GitBox


ibzib opened a new pull request #11755:
URL: https://github.com/apache/beam/pull/11755


   @Ardagan wrote most of this script a while back, I just generalized it a bit.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBui

[GitHub] [beam] TheNeuralBit commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build

2020-05-19 Thread GitBox


TheNeuralBit commented on a change in pull request #11754:
URL: https://github.com/apache/beam/pull/11754#discussion_r427674472



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
##
@@ -66,38 +68,47 @@ public static void main(String[] args) {
 inputTable.apply(SqlTransform.query("select c1, c2, c3 from 
PCOLLECTION where c1 > 1"));
 
 // print the output record of case 1;
-outputStream.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  PCOLLECTION: [3, row, 3.0]
-//  PCOLLECTION: [2, row, 2.0]
-System.out.println("PCOLLECTION: " + input.getValues());
-return input;
-  }
-}));
+outputStream
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  PCOLLECTION: [3, row, 3.0]
+//  PCOLLECTION: [2, row, 2.0]
+System.out.println("PCOLLECTION: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(RowCoder.of(type));
 
 // Case 2. run the query with SqlTransform.query over result PCollection 
of case 1.
 PCollection outputStream2 =
 PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream)
 .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT 
group by c2"));
 
 // print the output record of case 2;
-outputStream2.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  CASE1_RESULT: [row, 5.0]
-System.out.println("CASE1_RESULT: " + input.getValues());
-return input;
-  }
-}));
+outputStream2
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  CASE1_RESULT: [row, 5.0]
+System.out.println("CASE1_RESULT: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(
+RowCoder.of(
+Schema.builder()
+.addStringField("stringField")
+.addDoubleField("doubleField")
+.build()));

Review comment:
   Here as well

##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
##
@@ -66,38 +68,47 @@ public static void main(String[] args) {
 inputTable.apply(SqlTransform.query("select c1, c2, c3 from 
PCOLLECTION where c1 > 1"));
 
 // print the output record of case 1;
-outputStream.apply(
-"log_result",
-MapElements.via(
-new SimpleFunction() {
-  @Override
-  public Row apply(Row input) {
-// expect output:
-//  PCOLLECTION: [3, row, 3.0]
-//  PCOLLECTION: [2, row, 2.0]
-System.out.println("PCOLLECTION: " + input.getValues());
-return input;
-  }
-}));
+outputStream
+.apply(
+"log_result",
+MapElements.via(
+new SimpleFunction() {
+  @Override
+  public Row apply(Row input) {
+// expect output:
+//  PCOLLECTION: [3, row, 3.0]
+//  PCOLLECTION: [2, row, 2.0]
+System.out.println("PCOLLECTION: " + input.getValues());
+return input;
+  }
+}))
+.setCoder(RowCoder.of(type));

Review comment:
   could you change this to `withRowSchema(type)`? It does the same thing, 
but it's less verbose





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] omarismail94 opened a new pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build

2020-05-19 Thread GitBox


omarismail94 opened a new pull request #11754:
URL: https://github.com/apache/beam/pull/11754


   R: @TheNeuralBit 
   
   In the `BeamSqlExample.java` class, the instructions state that to run the 
example, use: 
   
   `./gradlew :sdks:java:extensions:sql:runBasicExample`. 
   
   I tried this and the build failed due to `java.lang.IllegalStateException: 
Unable to return a default Coder`
   
   I fixed this by setting the Coder for both anon transforms. 
   
   
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)

[GitHub] [beam] y1chi commented on pull request #11753: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


y1chi commented on pull request #11753:
URL: https://github.com/apache/beam/pull/11753#issuecomment-631157957


   Sorry, I should have marked PR as draft as I'm still testing it. Expecting a 
couple more minor fixes.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #11753: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


boyuanzz commented on pull request #11753:
URL: https://github.com/apache/beam/pull/11753#issuecomment-631153847


   Run Java Flink PortableValidatesRunner Batch



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #11403: [DO NOT MERGE] Run all PostCommit and PreCommit Tests against Release Branch

2020-05-19 Thread GitBox


ibzib commented on pull request #11403:
URL: https://github.com/apache/beam/pull/11403#issuecomment-631152816


   test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #11403: [DO NOT MERGE] Run all PostCommit and PreCommit Tests against Release Branch

2020-05-19 Thread GitBox


ibzib commented on pull request #11403:
URL: https://github.com/apache/beam/pull/11403#issuecomment-631149865


   test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #11701: [BEAM-9899] Fix some issues around storing schema `id` on user types

2020-05-19 Thread GitBox


TheNeuralBit commented on a change in pull request #11701:
URL: https://github.com/apache/beam/pull/11701#discussion_r427655332



##
File path: sdks/python/apache_beam/coders/row_coder.py
##
@@ -134,19 +134,18 @@ def __init__(self, schema, components):
   def encode_to_stream(self, value, out, nested):
 nvals = len(self.schema.fields)
 self.SIZE_CODER.encode_to_stream(nvals, out, True)
-attrs = [getattr(value, f.name) for f in self.schema.fields]

Review comment:
   Yeah that's right. Right now it should only be possible to get here with 
a NamedTuple instance, so it should be safe.
   
   Looking forward to the day where more types might go through this code.. I 
kind of like the idea of using `tuple` as the "base" schema type in Python 
(i.e. the type we must be able to convert to/from for use in row coder). 
Relying on attributes isn't great since it limits us to field names that are 
valid python identifiers.
   
   All that being said I'd be fine dropping this part of the change for now. 
Renaming the special `id` field also fixes the bug.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

2020-05-19 Thread GitBox


jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427654986



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##
@@ -472,24 +551,118 @@ public void initClient() throws IOException {
   this.client = new HttpHealthcareApiClient();
 }
 
+@GetInitialRestriction
+public OffsetRange getEarliestToLatestRestriction(@Element String 
hl7v2Store)
+throws IOException {
+  from = this.client.getEarliestHL7v2SendTime(hl7v2Store, 
this.filter.get());
+  // filters are [from, to) to match logic of OffsetRangeTracker but need 
latest element to be
+  // included in results set to add an extra ms to the upper bound.
+  to = this.client.getLatestHL7v2SendTime(hl7v2Store, 
this.filter.get()).plus(1);
+  return new OffsetRange(from.getMillis(), to.getMillis());
+}
+
+@NewTracker
+public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+  return timeRange.newTracker();
+}
+
+@SplitRestriction
+public void split(@Restriction OffsetRange timeRange, 
OutputReceiver out) {
+  List splits =
+  timeRange.split(initialSplitDuration.getMillis(), 
DEFAULT_MIN_SPLIT_DURATION.getMillis());
+  Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+  Instant to = Instant.ofEpochMilli(timeRange.getTo());
+  Duration totalDuration = new Duration(from, to);
+  LOG.info(
+  String.format(
+  "splitting initial sendTime restriction of [minSendTime, now): 
[%s,%s), "
+  + "or [%s, %s). \n"
+  + "total days: %s \n"
+  + "into %s splits. \n"
+  + "Last split: %s",
+  from,
+  to,
+  timeRange.getFrom(),
+  timeRange.getTo(),
+  totalDuration.getStandardDays(),
+  splits.size(),
+  splits.get(splits.size() - 1).toString()));
+
+  for (OffsetRange s : splits) {
+out.output(s);
+  }
+}
+
 /**
  * List messages.
  *
- * @param context the context
+ * @param hl7v2Store the HL7v2 store to list messages from
  * @throws IOException the io exception
  */
 @ProcessElement
-public void listMessages(ProcessContext context) throws IOException {
-  String hl7v2Store = context.element();
-  // Output all elements of all pages.
+public void listMessages(
+@Element String hl7v2Store,
+RestrictionTracker tracker,
+OutputReceiver outputReceiver)
+throws IOException {
+  OffsetRange currentRestriction = (OffsetRange) 
tracker.currentRestriction();
+  Instant startRestriction = 
Instant.ofEpochMilli(currentRestriction.getFrom());
+  Instant endRestriction = 
Instant.ofEpochMilli(currentRestriction.getTo());
   HttpHealthcareApiClient.HL7v2MessagePages pages =
-  new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, 
this.filter);
+  new HttpHealthcareApiClient.HL7v2MessagePages(
+  client, hl7v2Store, startRestriction, endRestriction, 
filter.get(), "sendTime");
   long reqestTime = Instant.now().getMillis();
-  for (Stream page : pages) {
+  long lastClaimedMilliSecond;
+  Instant cursor;
+  boolean hangingClaim = false; // flag if the claimed ms spans spills 
over to the next page.
+  for (List page : pages) { // loop over pages.
+int i = 0;
+HL7v2Message msg = page.get(i);
+while (i < page.size()) { // loop over messages in page
+  cursor = Instant.parse(msg.getSendTime());
+  lastClaimedMilliSecond = cursor.getMillis();
+  LOG.info(
+  String.format(
+  "initial claim for page %s lastClaimedMilliSecond = %s",
+  i, lastClaimedMilliSecond));
+  if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) {
+// This means we have claimed an entire millisecond we need to 
make sure that we
+// process all messages for this millisecond because sendTime is 
allegedly nano second
+// resolution.
+// 
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message
+while (cursor.getMillis() == lastClaimedMilliSecond
+&& i < page.size()) { // loop over messages in millisecond.
+  outputReceiver.output(msg);

Review comment:
   Thanks for suggestion.
   In the interest of time, can I punt this to a future PR?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache

[GitHub] [beam] boyuanzz commented on a change in pull request #11715: [BEAM-9977] Implement GrowableOffsetRangeTracker

2020-05-19 Thread GitBox


boyuanzz commented on a change in pull request #11715:
URL: https://github.com/apache/beam/pull/11715#discussion_r427654115



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.range.OffsetRange;
+
+/**
+ * An {@link OffsetRangeTracker} for tracking a growable offset range. {@code 
Long.MAX_VALUE} is
+ * used as end range to indicate the possibility of infinity.
+ *
+ * An offset range is considered growable when the end offset could grow 
(or change) during
+ * execution time (e.g., Kafka topic partition offset, appended file, ...).
+ *
+ * The growable range is marked as done by claiming {@code Long.MAX_VALUE}.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public class GrowableOffsetRangeTracker extends OffsetRangeTracker {
+  /**
+   * An interface that should be implemented to fetch estimated end offset of 
the range.
+   *
+   * {@code estimateRangeEnd} is called to give the end offset when {@code 
trySplit} or {@code
+   * getProgress} is invoked. The end offset is exclusive for the range. The 
estimated end is not
+   * necessary to increase monotonically as it will only be taken into 
computation when the estimate
+   * end is larger than the current position. When returning {@code 
Long.MAX_VALUE} as estimate, it
+   * means the largest possible position for the range is {@code 
Long.MAX_VALUE - 1}. If there is
+   * not an estimate yet, {@code Long.MIN_VALUE} should be returned, where 
estimated end will not
+   * effect progress and split.
+   *
+   * Having a good estimate is important for providing a good signal of 
progress and splitting at
+   * a proper position.
+   *
+   * If {@code estimate()} is expensive to call, please consider wrapping 
the implementation with
+   * {@code Suppliers.memoizeWithExpiration} as an optimization.
+   */
+  @FunctionalInterface
+  public interface RangeEndEstimator {
+long estimate();
+  }
+
+  private final RangeEndEstimator rangeEndEstimator;
+
+  public GrowableOffsetRangeTracker(long start, RangeEndEstimator 
rangeEndEstimator) {
+super(new OffsetRange(start, Long.MAX_VALUE));
+this.rangeEndEstimator = checkNotNull(rangeEndEstimator);
+  }
+
+  @Override
+  public SplitResult trySplit(double fractionOfRemainder) {
+// If current tracking range is no longer growable, split it as a normal 
range.
+if (range.getTo() != Long.MAX_VALUE || range.getTo() == range.getFrom()) {
+  return super.trySplit(fractionOfRemainder);
+}
+// If current range has been done, there is no more space to split.
+if (lastAttemptedOffset != null && lastAttemptedOffset == Long.MAX_VALUE) {
+  return null;
+}
+double cur =

Review comment:
   Using `BigDecimal` in the latest revision. Thanks for your help!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #11753: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


boyuanzz commented on pull request #11753:
URL: https://github.com/apache/beam/pull/11753#issuecomment-631136771


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #11642: Replace call to .checkpoint() in SDF direct runner to .try_claim(0)

2020-05-19 Thread GitBox


boyuanzz commented on pull request #11642:
URL: https://github.com/apache/beam/pull/11642#issuecomment-631136611


   Hi Ashwin, do you want me to start to review now?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on pull request #11753: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


lukecwik commented on pull request #11753:
URL: https://github.com/apache/beam/pull/11753#issuecomment-631133371


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem merged pull request #11736: Katas - Convert task description from HTML to Markdown

2020-05-19 Thread GitBox


pabloem merged pull request #11736:
URL: https://github.com/apache/beam/pull/11736


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11736: Katas - Convert task description from HTML to Markdown

2020-05-19 Thread GitBox


pabloem commented on pull request #11736:
URL: https://github.com/apache/beam/pull/11736#issuecomment-63117


   alright RAT passing. Merging...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on a change in pull request #11701: [BEAM-9899] Fix some issues around storing schema `id` on user types

2020-05-19 Thread GitBox


robertwb commented on a change in pull request #11701:
URL: https://github.com/apache/beam/pull/11701#discussion_r427648180



##
File path: sdks/python/apache_beam/coders/row_coder.py
##
@@ -134,19 +134,18 @@ def __init__(self, schema, components):
   def encode_to_stream(self, value, out, nested):
 nvals = len(self.schema.fields)
 self.SIZE_CODER.encode_to_stream(nvals, out, True)
-attrs = [getattr(value, f.name) for f in self.schema.fields]

Review comment:
   This forces the value to be an iterable, rather than just having the 
right fields, right? Are we sure we want to do that? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3

2020-05-19 Thread GitBox


chamikaramj commented on pull request #11651:
URL: https://github.com/apache/beam/pull/11651#issuecomment-631132405


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on a change in pull request #11740: [BEAM-8019] Prevent Dataflow from starting multiple containers for the same image

2020-05-19 Thread GitBox


robertwb commented on a change in pull request #11740:
URL: https://github.com/apache/beam/pull/11740#discussion_r427646588



##
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##
@@ -310,15 +312,15 @@ def __init__(
 environment_payload = proto_utils.parse_Bytes(
 environment.payload, beam_runner_api_pb2.DockerPayload)
 container_image_url = environment_payload.container_image
-if container_image_url == pipeline_sdk_container_image:
-  # This was already added
+if container_image_url in already_added_containers:
+  # Do not add the pipeline environment again.

Review comment:
   Perhaps also add a comment that currently dataflow stages all 
dependencies to all environments?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay commented on a change in pull request #11744: [BEAM-10007] Handle ValueProvider pipeline options in PortableRunner

2020-05-19 Thread GitBox


aaltay commented on a change in pull request #11744:
URL: https://github.com/apache/beam/pull/11744#discussion_r427645484



##
File path: sdks/python/apache_beam/runners/portability/portable_runner.py
##
@@ -164,10 +165,19 @@ def add_runner_options(parser):
 all_options = self.options.get_all_options(
 add_extra_args_fn=add_runner_options,
 retain_unknown_options=self._retain_unknown_options)
+
 # TODO: Define URNs for options.
 # convert int values: https://issues.apache.org/jira/browse/BEAM-5509
+def convert_pipeline_option_value(v):
+  if type(v) == int:

Review comment:
   Interesting. For this PR, could you move that comment closer to the if 
type(v) == int line.
   
   > I can look into fixing that upstream so we can get rid of the special case 
here.
   
   If you have time, maybe file a bug. I do not think this is a very high 
priority for us.
   
   > I don't think it would be good to call str(v) on everything
   
   I agree. I was trying to understand why are we doing this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay commented on a change in pull request #11744: [BEAM-10007] Handle ValueProvider pipeline options in PortableRunner

2020-05-19 Thread GitBox


aaltay commented on a change in pull request #11744:
URL: https://github.com/apache/beam/pull/11744#discussion_r427645484



##
File path: sdks/python/apache_beam/runners/portability/portable_runner.py
##
@@ -164,10 +165,19 @@ def add_runner_options(parser):
 all_options = self.options.get_all_options(
 add_extra_args_fn=add_runner_options,
 retain_unknown_options=self._retain_unknown_options)
+
 # TODO: Define URNs for options.
 # convert int values: https://issues.apache.org/jira/browse/BEAM-5509
+def convert_pipeline_option_value(v):
+  if type(v) == int:

Review comment:
   Interesting. For this PR, could you move that comment closer to the if 
type(v) == int line.
   
   > I can look into fixing that upstream so we can get rid of the special case 
here.
   If you have time, maybe file a bug. I do not think this is a very high 
priority for us.
   
   > I don't think it would be good to call str(v) on everything
   I agree. I was trying to understand why are we doing this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

2020-05-19 Thread GitBox


jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427643888



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
 }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * This transform is optimized for dynamic splitting of message.list 
calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It 
will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste 
resources if there are large
+   * durations (days) of the sendTime dimension without data.

Review comment:
   note to self: remove reference to "dynamically rebalance" as this is not 
yet supported by dataflow runner.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tysonjh commented on a change in pull request #11566: [BEAM-9723] Add DLP integration transforms

2020-05-19 Thread GitBox


tysonjh commented on a change in pull request #11566:
URL: https://github.com/apache/beam/pull/11566#discussion_r427403454



##
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and deidentifying text 
according to provided
+ * settings. The transform supports both CSV formatted input data and 
unstructured input.
+ *
+ * If the csvHeader property is set, csvDelimiter also should be, else the 
results will be
+ * incorrect. If csvHeader is not set, input is assumed to be unstructured.
+ *
+ * Either inspectTemplateName (String) or inspectConfig {@link 
InspectConfig} need to be set. The
+ * situation is the same with deidentifyTemplateName and deidentifyConfig 
({@link DeidentifyConfig}.
+ *
+ * Batch size defines how big are batches sent to DLP at once in bytes.

Review comment:
   Are you agreeing that the comments should be moved to the methods, or 
that the comments are also useful here for the template inspection (I'm 
unfamiliar with what the 'inspect contents' and 'inspect template' actions)?

##
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.value

[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

2020-05-19 Thread GitBox


jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427643387



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
 }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * This transform is optimized for dynamic splitting of message.list 
calls for large batches of

Review comment:
   ```suggestion
  * This transform is optimized for splitting of message.list calls for 
large batches of
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on pull request #11753: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


lukecwik commented on pull request #11753:
URL: https://github.com/apache/beam/pull/11753#issuecomment-631126898


   R: @boyuanzz 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on pull request #11753: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


lukecwik commented on pull request #11753:
URL: https://github.com/apache/beam/pull/11753#issuecomment-631126805


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

2020-05-19 Thread GitBox


jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427641350



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
 }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * This transform is optimized for dynamic splitting of message.list 
calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It 
will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste 
resources if there are large
+   * durations (days) of the sendTime dimension without data.
+   *
+   * Implementation includes overhead for: 1. two api calls to determine 
the min/max sendTime of
+   * the HL7v2 store at invocation time. 2. initial splitting into 
non-overlapping time ranges
+   * (default daily) to achieve parallelization in separate messages.list 
calls.
+   *
+   * This will make more queries than necessary when used with very small 
data sets. (or very
+   * sparse data sets in the sendTime dimension).
+   *
+   * If you have large but sparse data (e.g. hours between consecutive 
message sendTimes) and
+   * know something about the time ranges where you have no data, consider 
using multiple instances
+   * of this transform specifying sendTime filters to omit the ranges where 
there is no data.

Review comment:
   That's great to know! will remove this guidance as it will lead to 
unnecessary complexity. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

2020-05-19 Thread GitBox


jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427640873



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
 }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * This transform is optimized for dynamic splitting of message.list 
calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It 
will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste 
resources if there are large
+   * durations (days) of the sendTime dimension without data.
+   *
+   * Implementation includes overhead for: 1. two api calls to determine 
the min/max sendTime of
+   * the HL7v2 store at invocation time. 2. initial splitting into 
non-overlapping time ranges
+   * (default daily) to achieve parallelization in separate messages.list 
calls.

Review comment:
   I originally included this for users who may try to benchmark this 
against tiny / sparse results set and be surprised why it is slow / making so 
many api calls.
   
   I see your point will remove.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

2020-05-19 Thread GitBox


jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427640111



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
 }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * This transform is optimized for dynamic splitting of message.list 
calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It 
will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste 
resources if there are large
+   * durations (days) of the sendTime dimension without data.

Review comment:
   correct.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

2020-05-19 Thread GitBox


lukecwik commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427618071



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
 }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * This transform is optimized for dynamic splitting of message.list 
calls for large batches of

Review comment:
   Consider using `{@code ...}` when referring to code and `{@link ...}` 
for things you can directly link against.
   
   ```suggestion
  * This transform is optimized for dynamic splitting of {@code 
message.list} calls for large batches of
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

2020-05-19 Thread GitBox


lukecwik commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427616856



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
 }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * This transform is optimized for dynamic splitting of message.list 
calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It 
will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste 
resources if there are large
+   * durations (days) of the sendTime dimension without data.

Review comment:
   wouldn' this just be a small amount of waste since we would effectively 
get an empty response?

##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
 }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * This transform is optimized for dynamic splitting of message.list 
calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It 
will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste 
resources if there are large
+   * durations (days) of the sendTime dimension without data.
+   *
+   * Implementation includes overhead for: 1. two api calls to determine 
the min/max sendTime of

Review comment:
   consider using `` and `` tags in the javadoc for your ordered 
list

##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
 }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * This transform is optimized for dynamic splitting of message.list 
calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It 
will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste 
resources if there are large
+   * durations (days) of the sendTime dimension without data.
+   *
+   * Implementation includes overhead for: 1. two api calls to determine 
the min/max sendTime of
+   * the HL7v2 store at invocation time. 2. initial splitting into 
non-overlapping time ranges
+   * (default daily) to achieve parallelization in separate messages.list 
calls.
+   *
+   * This will make more queries than necessary when used with very small 
data sets. (or very
+   * sparse data sets in the sendTime dimension).
+   *
+   * If you have large but sparse data (e.g. hours between consecutive 
message sendTimes) and
+   * know something about the time ranges where you have no data, consider 
using multiple instances
+   * of this transform specifying sendTime filters to omit the ranges where 
there is no data.
+   */
   public static class ListHL7v2Messages extends PTransform> {
-private final List hl7v2Stores;
-private final String filter;
+private final ValueProvider> hl7v2Stores;
+private ValueProvider filter;
+private Duration initialSplitDuration;

Review comment:
   even if a member variable is null, it should still be final since it 
doesn't look like we mutate it locally. Same reason for other places I suggest 
to change this.
   ```suggestion
   private final ValueProvider filter;
   private final Duration initialSplitDuration;
   ```

##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##
@@ -427,29 +454,75 @@ private Message fetchMessage(HealthcareApiClient client, 
String msgId)
  * @param filter the filter
  */
 ListHL7v2Messages(ValueProvider> hl7v2Stores, 
ValueProvider filter) {
-  this.hl7v2Stores = hl7v2Stores.get();
-  this.filter = filter.get();
+  this.hl7v2Stores = hl7v2Stores;
+  this.filter = filter;
+}
+
+/**
+ * Instantiates a new List hl 7 v 2 messages.
+ *
+ * @param hl7v2Stores the hl 7 v 2 stores
+ * @param filter the filter
+ * @param initialSplitDuration the initial split duration for sendTime 
dimension splits
+ */
+ListHL7v2Messages(
+ValueProvider> hl7v2Stores,
+ValueProvider filter,
+Duration initialSplitDuration) {
+  this.hl7v2Stores = hl7v2Stores;
+  this.filter = filter;
+  this.initialSplitDuration = initial

[GitHub] [beam] pabloem commented on pull request #11736: Katas - Convert task description from HTML to Markdown

2020-05-19 Thread GitBox


pabloem commented on pull request #11736:
URL: https://github.com/apache/beam/pull/11736#issuecomment-631112295


   I don't know why the SQL precommit is running : ) All i care about is the 
RAT precommit - which should pass...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11736: Katas - Convert task description from HTML to Markdown

2020-05-19 Thread GitBox


pabloem commented on pull request #11736:
URL: https://github.com/apache/beam/pull/11736#issuecomment-63401


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3

2020-05-19 Thread GitBox


chamikaramj commented on pull request #11651:
URL: https://github.com/apache/beam/pull/11651#issuecomment-631105651


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3

2020-05-19 Thread GitBox


chamikaramj commented on pull request #11651:
URL: https://github.com/apache/beam/pull/11651#issuecomment-631105532


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] veblush commented on a change in pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3

2020-05-19 Thread GitBox


veblush commented on a change in pull request #11651:
URL: https://github.com/apache/beam/pull/11651#discussion_r427623777



##
File path: 
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
##
@@ -489,6 +490,7 @@ class BeamModulePlugin implements Plugin {
 grpc_protobuf   : 
"io.grpc:grpc-protobuf:$grpc_version",
 grpc_protobuf_lite  : 
"io.grpc:grpc-protobuf-lite:$grpc_version",
 grpc_netty  : 
"io.grpc:grpc-netty:$grpc_version",
+grpc_netty_shaded   : 
"io.grpc:grpc-netty-shaded:$grpc_version",

Review comment:
   Note that current beam already has it from 
[gax-grpc](https://mvnrepository.com/artifact/com.google.api/gax-grpc/1.54.0) 
transitively. This can make sure that all these components are working with the 
same version.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] y1chi opened a new pull request #11753: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


y1chi opened a new pull request #11753:
URL: https://github.com/apache/beam/pull/11753


   Implemented the missing pieces in FnApiDoFnRunner to support timer family.
   Also refactored a few function signatures to avoid confusion.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://

[GitHub] [beam] amaliujia merged pull request #11737: [BEAM-9984] Support BIT_OR aggregation function in Beam SQL

2020-05-19 Thread GitBox


amaliujia merged pull request #11737:
URL: https://github.com/apache/beam/pull/11737


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11339: [BEAM-9468] Fhir io

2020-05-19 Thread GitBox


pabloem commented on pull request #11339:
URL: https://github.com/apache/beam/pull/11339#issuecomment-631081683


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj merged pull request #11740: [BEAM-8019] Prevent Dataflow from starting multiple containers for the same image

2020-05-19 Thread GitBox


chamikaramj merged pull request #11740:
URL: https://github.com/apache/beam/pull/11740


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #11744: [BEAM-10007] Handle ValueProvider pipeline options in PortableRunner

2020-05-19 Thread GitBox


TheNeuralBit commented on a change in pull request #11744:
URL: https://github.com/apache/beam/pull/11744#discussion_r427592587



##
File path: sdks/python/apache_beam/runners/portability/portable_runner.py
##
@@ -164,10 +165,19 @@ def add_runner_options(parser):
 all_options = self.options.get_all_options(
 add_extra_args_fn=add_runner_options,
 retain_unknown_options=self._retain_unknown_options)
+
 # TODO: Define URNs for options.
 # convert int values: https://issues.apache.org/jira/browse/BEAM-5509
+def convert_pipeline_option_value(v):
+  if type(v) == int:

Review comment:
   Oh the relevant jira is linked right there. It seems that this is really 
a workaround for a bug in json_format where ints and floats aren't treated 
differently. I can look into fixing that upstream so we can get rid of the 
special case here.
   
   I don't think it would be good to call str(v) on everything





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11740: [BEAM-8019] Prevent Dataflow from starting multiple containers for the same image

2020-05-19 Thread GitBox


chamikaramj commented on pull request #11740:
URL: https://github.com/apache/beam/pull/11740#issuecomment-631075979


   Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #11744: [BEAM-10007] Handle ValueProvider pipeline options in PortableRunner

2020-05-19 Thread GitBox


TheNeuralBit commented on a change in pull request #11744:
URL: https://github.com/apache/beam/pull/11744#discussion_r427586062



##
File path: sdks/python/apache_beam/runners/portability/portable_runner.py
##
@@ -164,10 +165,19 @@ def add_runner_options(parser):
 all_options = self.options.get_all_options(
 add_extra_args_fn=add_runner_options,
 retain_unknown_options=self._retain_unknown_options)
+
 # TODO: Define URNs for options.
 # convert int values: https://issues.apache.org/jira/browse/BEAM-5509
+def convert_pipeline_option_value(v):
+  if type(v) == int:

Review comment:
   Good question. I was just maintaining the status quo and adding a 
special case for `ValueProvider`. Let me see if I can dig up why the str(v) is 
there for ints





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn commented on pull request #11707: [BEAM-9810] Add a Tox (precommit) suite for Python 3.8

2020-05-19 Thread GitBox


tvalentyn commented on pull request #11707:
URL: https://github.com/apache/beam/pull/11707#issuecomment-631067325


   The code change LGTM, thank you. We need to address BEAM-9994 before we can 
merge this. Would you have time to investigate & recommend  a solution for that 
issue, @kamilwu ?




This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] veblush commented on a change in pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3

2020-05-19 Thread GitBox


veblush commented on a change in pull request #11651:
URL: https://github.com/apache/beam/pull/11651#discussion_r427581972



##
File path: sdks/java/io/google-cloud-platform/build.gradle
##
@@ -56,11 +56,13 @@ dependencies {
   compile library.java.google_http_client
   compile library.java.google_http_client_jackson2
   compile library.java.grpc_all
+  compile library.java.grpc_alts
   compile library.java.grpc_auth
   compile library.java.grpc_core
   compile library.java.grpc_context
   compile library.java.grpc_grpclb
   compile library.java.grpc_netty
+  compile library.java.grpc_netty_shaded

Review comment:
   Same as above.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] veblush commented on a change in pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3

2020-05-19 Thread GitBox


veblush commented on a change in pull request #11651:
URL: https://github.com/apache/beam/pull/11651#discussion_r427581456



##
File path: 
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
##
@@ -489,6 +490,7 @@ class BeamModulePlugin implements Plugin {
 grpc_protobuf   : 
"io.grpc:grpc-protobuf:$grpc_version",
 grpc_protobuf_lite  : 
"io.grpc:grpc-protobuf-lite:$grpc_version",
 grpc_netty  : 
"io.grpc:grpc-netty:$grpc_version",
+grpc_netty_shaded   : 
"io.grpc:grpc-netty-shaded:$grpc_version",

Review comment:
   This comes from grpc-alts. (grpc-alts is for directpath)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   >