[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=344024&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-344024
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 15/Nov/19 07:10
Start Date: 15/Nov/19 07:10
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #10013: [BEAM-8554] 
Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 344024)
Time Spent: 4h  (was: 3h 50m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=343935&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-343935
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 15/Nov/19 01:13
Start Date: 15/Nov/19 01:13
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #10013: [BEAM-8554] Use 
WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#issuecomment-554163794
 
 
   Run Dataflow ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 343935)
Time Spent: 3h 50m  (was: 3h 40m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=343611&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-343611
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 14/Nov/19 18:11
Start Date: 14/Nov/19 18:11
Worklog Time Spent: 10m 
  Work Description: stevekoonce commented on pull request #10013: 
[BEAM-8554] Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r346467235
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 ##
 @@ -1330,20 +1334,19 @@ private void process(
   WorkItemCommitRequest commitRequest = outputBuilder.build();
   int byteLimit = maxWorkItemCommitBytes;
   int commitSize = commitRequest.getSerializedSize();
+  int estimatedCommitSize = commitSize < 0 ? Integer.MAX_VALUE : 
commitSize;
+
   // Detect overflow of integer serialized size or if the byte limit was 
exceeded.
-  windmillMaxObservedWorkItemCommitBytes.addValue(
-  commitSize < 0 ? Integer.MAX_VALUE : commitSize);
-  if (commitSize < 0) {
-throw KeyCommitTooLargeException.causedBy(computationId, byteLimit, 
commitRequest);
-  } else if (commitSize > byteLimit) {
-// Once supported, we should communicate the desired truncation for 
the commit to the
-// streaming engine. For now we report the error but attempt the 
commit so that it will be
-// truncated by the streaming engine backend.
+  windmillMaxObservedWorkItemCommitBytes.addValue(estimatedCommitSize);
+  if (estimatedCommitSize > byteLimit) {
 KeyCommitTooLargeException e =
 KeyCommitTooLargeException.causedBy(computationId, byteLimit, 
commitRequest);
 reportFailure(computationId, workItem, e);
 LOG.error(e.toString());
+
+commitRequest = buildWorkItemTruncationRequest(key, workItem, 
estimatedCommitSize);
 
 Review comment:
   Sure - just added it and re-squashed the commits
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 343611)
Time Spent: 3h 40m  (was: 3.5h)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=343043&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-343043
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 14/Nov/19 01:01
Start Date: 14/Nov/19 01:01
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #10013: [BEAM-8554] 
Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r346081816
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 ##
 @@ -1330,20 +1334,19 @@ private void process(
   WorkItemCommitRequest commitRequest = outputBuilder.build();
   int byteLimit = maxWorkItemCommitBytes;
   int commitSize = commitRequest.getSerializedSize();
+  int estimatedCommitSize = commitSize < 0 ? Integer.MAX_VALUE : 
commitSize;
+
   // Detect overflow of integer serialized size or if the byte limit was 
exceeded.
-  windmillMaxObservedWorkItemCommitBytes.addValue(
-  commitSize < 0 ? Integer.MAX_VALUE : commitSize);
-  if (commitSize < 0) {
-throw KeyCommitTooLargeException.causedBy(computationId, byteLimit, 
commitRequest);
-  } else if (commitSize > byteLimit) {
-// Once supported, we should communicate the desired truncation for 
the commit to the
-// streaming engine. For now we report the error but attempt the 
commit so that it will be
-// truncated by the streaming engine backend.
+  windmillMaxObservedWorkItemCommitBytes.addValue(estimatedCommitSize);
+  if (estimatedCommitSize > byteLimit) {
 KeyCommitTooLargeException e =
 KeyCommitTooLargeException.causedBy(computationId, byteLimit, 
commitRequest);
 reportFailure(computationId, workItem, e);
 LOG.error(e.toString());
+
+commitRequest = buildWorkItemTruncationRequest(key, workItem, 
estimatedCommitSize);
 
 Review comment:
   Could you add a comment to this effect so that it's clear in the code why 
you're doing it this way?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 343043)
Time Spent: 3.5h  (was: 3h 20m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=342964&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-342964
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 13/Nov/19 22:13
Start Date: 13/Nov/19 22:13
Worklog Time Spent: 10m 
  Work Description: stevekoonce commented on pull request #10013: 
[BEAM-8554] Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r346030994
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 ##
 @@ -1330,20 +1334,19 @@ private void process(
   WorkItemCommitRequest commitRequest = outputBuilder.build();
   int byteLimit = maxWorkItemCommitBytes;
   int commitSize = commitRequest.getSerializedSize();
+  int estimatedCommitSize = commitSize < 0 ? Integer.MAX_VALUE : 
commitSize;
+
   // Detect overflow of integer serialized size or if the byte limit was 
exceeded.
-  windmillMaxObservedWorkItemCommitBytes.addValue(
-  commitSize < 0 ? Integer.MAX_VALUE : commitSize);
-  if (commitSize < 0) {
-throw KeyCommitTooLargeException.causedBy(computationId, byteLimit, 
commitRequest);
-  } else if (commitSize > byteLimit) {
-// Once supported, we should communicate the desired truncation for 
the commit to the
-// streaming engine. For now we report the error but attempt the 
commit so that it will be
-// truncated by the streaming engine backend.
+  windmillMaxObservedWorkItemCommitBytes.addValue(estimatedCommitSize);
+  if (estimatedCommitSize > byteLimit) {
 KeyCommitTooLargeException e =
 KeyCommitTooLargeException.causedBy(computationId, byteLimit, 
commitRequest);
 reportFailure(computationId, workItem, e);
 LOG.error(e.toString());
+
+commitRequest = buildWorkItemTruncationRequest(key, workItem, 
estimatedCommitSize);
 
 Review comment:
   All of the 'doomed' messages, timers, counters, etc. are being cleared as 
well.  It would be a little more efficient to do that explicitly rather than 
just start over, but less future-proof as new fields are added to the proto and 
need to be cleared out here as well.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 342964)
Time Spent: 3h 20m  (was: 3h 10m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=342959&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-342959
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 13/Nov/19 22:11
Start Date: 13/Nov/19 22:11
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on issue #10013: [BEAM-8554] Use 
WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#issuecomment-553627998
 
 
   We want to only set the necessary routing information (ie key, work_token,
   cache_token etc) but not keep all the other data, which is using many bytes.
   
   
   On Wed, Nov 13, 2019 at 2:09 PM reuvenlax  wrote:
   
   > *@reuvenlax* commented on this pull request.
   > --
   >
   > In
   > 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
   > :
   >
   > >  KeyCommitTooLargeException e =
   >  KeyCommitTooLargeException.causedBy(computationId, byteLimit, 
commitRequest);
   >  reportFailure(computationId, workItem, e);
   >  LOG.error(e.toString());
   > +
   > +commitRequest = buildWorkItemTruncationRequest(key, workItem, 
estimatedCommitSize);
   >
   > Instead of reinitializing everything from scratch, can't you just reuse
   > the existing commitRequest? Something like:
   >
   > commitRequest = commitRequest.toBuilder().
   > 
setExceedsMaxWorkItemCommitBytes(true).setEstimatedWorkItemCommitBytes(estimatedCommitSize).build()
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > 
,
   > or unsubscribe
   > 

   > .
   >
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 342959)
Time Spent: 3h 10m  (was: 3h)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=342957&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-342957
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 13/Nov/19 22:09
Start Date: 13/Nov/19 22:09
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #10013: [BEAM-8554] 
Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r346028789
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 ##
 @@ -1330,20 +1334,19 @@ private void process(
   WorkItemCommitRequest commitRequest = outputBuilder.build();
   int byteLimit = maxWorkItemCommitBytes;
   int commitSize = commitRequest.getSerializedSize();
+  int estimatedCommitSize = commitSize < 0 ? Integer.MAX_VALUE : 
commitSize;
+
   // Detect overflow of integer serialized size or if the byte limit was 
exceeded.
-  windmillMaxObservedWorkItemCommitBytes.addValue(
-  commitSize < 0 ? Integer.MAX_VALUE : commitSize);
-  if (commitSize < 0) {
-throw KeyCommitTooLargeException.causedBy(computationId, byteLimit, 
commitRequest);
-  } else if (commitSize > byteLimit) {
-// Once supported, we should communicate the desired truncation for 
the commit to the
-// streaming engine. For now we report the error but attempt the 
commit so that it will be
-// truncated by the streaming engine backend.
+  windmillMaxObservedWorkItemCommitBytes.addValue(estimatedCommitSize);
+  if (estimatedCommitSize > byteLimit) {
 KeyCommitTooLargeException e =
 KeyCommitTooLargeException.causedBy(computationId, byteLimit, 
commitRequest);
 reportFailure(computationId, workItem, e);
 LOG.error(e.toString());
+
+commitRequest = buildWorkItemTruncationRequest(key, workItem, 
estimatedCommitSize);
 
 Review comment:
   Instead of reinitializing everything from scratch, can't you just reuse the 
existing commitRequest? Something like:
   
   commitRequest = commitRequest.toBuilder(). 
setExceedsMaxWorkItemCommitBytes(true).setEstimatedWorkItemCommitBytes(estimatedCommitSize).build()
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 342957)
Time Spent: 3h  (was: 2h 50m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=342928&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-342928
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 13/Nov/19 21:42
Start Date: 13/Nov/19 21:42
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #10013: [BEAM-8554] Use 
WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#issuecomment-553616965
 
 
   Run Dataflow ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 342928)
Time Spent: 2h 50m  (was: 2h 40m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=342080&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-342080
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 12/Nov/19 19:14
Start Date: 12/Nov/19 19:14
Worklog Time Spent: 10m 
  Work Description: stevekoonce commented on issue #10013: [BEAM-8554] Use 
WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#issuecomment-553070068
 
 
   R: @reuvenlax
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 342080)
Time Spent: 2h 40m  (was: 2.5h)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=340774&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340774
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 08/Nov/19 22:16
Start Date: 08/Nov/19 22:16
Worklog Time Spent: 10m 
  Work Description: stevekoonce commented on issue #10013: [BEAM-8554] Use 
WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#issuecomment-552010960
 
 
   Squashed the commits.  Passing to a committer for review
   
   R: @reuvenlax
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 340774)
Time Spent: 2.5h  (was: 2h 20m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=340747&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340747
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 08/Nov/19 20:50
Start Date: 08/Nov/19 20:50
Worklog Time Spent: 10m 
  Work Description: stevekoonce commented on pull request #10013: 
[BEAM-8554] Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r344361066
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 ##
 @@ -949,63 +949,14 @@ public void testKeyCommitTooLargeException() throws 
Exception {
 assertEquals(2, result.size());
 assertEquals(makeExpectedOutput(2, 0, "key", "key").build(), 
result.get(2L));
 assertTrue(result.containsKey(1L));
-assertEquals("large_key", result.get(1L).getKey().toStringUtf8());
-assertTrue(result.get(1L).getSerializedSize() > 1000);
 
-// Spam worker updates a few times.
-int maxTries = 10;
-while (--maxTries > 0) {
-  worker.reportPeriodicWorkerUpdates();
-  Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
-}
+WorkItemCommitRequest largeCommit = result.get(1L);
+assertEquals("large_key", largeCommit.getKey().toStringUtf8());
 
-// We should see an exception reported for the large commit but not the 
small one.
-ArgumentCaptor workItemStatusCaptor =
-ArgumentCaptor.forClass(WorkItemStatus.class);
-verify(mockWorkUnitClient, 
atLeast(2)).reportWorkItemStatus(workItemStatusCaptor.capture());
-List capturedStatuses = 
workItemStatusCaptor.getAllValues();
-boolean foundErrors = false;
-for (WorkItemStatus status : capturedStatuses) {
-  if (!status.getErrors().isEmpty()) {
-assertFalse(foundErrors);
-foundErrors = true;
-String errorMessage = status.getErrors().get(0).getMessage();
-assertThat(errorMessage, 
Matchers.containsString("KeyCommitTooLargeException"));
-  }
-}
-assertTrue(foundErrors);
-  }
-
-  @Test
-  public void testKeyCommitTooLargeException_StreamingEngine() throws 
Exception {
-KvCoder kvCoder = KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of());
-
-List instructions =
-Arrays.asList(
-makeSourceInstruction(kvCoder),
-makeDoFnInstruction(new LargeCommitFn(), 0, kvCoder),
-makeSinkInstruction(kvCoder, 1));
-
-FakeWindmillServer server = new FakeWindmillServer(errorCollector);
-server.setExpectedExceptionCount(1);
-
-StreamingDataflowWorkerOptions options =
-createTestingPipelineOptions(server, 
"--experiments=enable_streaming_engine");
-StreamingDataflowWorker worker = makeWorker(instructions, options, true /* 
publishCounters */);
-worker.setMaxWorkItemCommitBytes(1000);
-worker.start();
-
-server.addWorkToOffer(makeInput(1, 0, "large_key"));
-server.addWorkToOffer(makeInput(2, 0, "key"));
-server.waitForEmptyWorkQueue();
-
-Map result = 
server.waitForAndGetCommits(1);
-
-assertEquals(2, result.size());
-assertEquals(makeExpectedOutput(2, 0, "key", "key").build(), 
result.get(2L));
-assertTrue(result.containsKey(1L));
-assertEquals("large_key", result.get(1L).getKey().toStringUtf8());
-assertTrue(result.get(1L).getSerializedSize() > 1000);
+// The large commit should have its flags set marking it for truncation
+assertTrue(largeCommit.getExceedsMaxWorkItemCommitBytes());
+assertTrue(largeCommit.getSerializedSize() < 100);
 
 Review comment:
   Yes.  I needed to keep the estimated bytes field check given the method for 
generating the expected truncated commit, but should have cleaned up the 
others.  Doing that now.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 340747)
Time Spent: 2h 20m  (was: 2h 10m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated tha

[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=340746&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340746
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 08/Nov/19 20:46
Start Date: 08/Nov/19 20:46
Worklog Time Spent: 10m 
  Work Description: stevekoonce commented on pull request #10013: 
[BEAM-8554] Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r344359785
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
 ##
 @@ -293,19 +293,16 @@ message WorkItemCommitRequest {
   optional SourceState source_state_updates = 12;
   optional int64 source_watermark = 13 [default=-0x8000];
   optional int64 source_backlog_bytes = 17 [default=-1];
-  optional int64 source_bytes_processed = 22 [default = 0];
+  optional int64 source_bytes_processed = 22;
 
   repeated WatermarkHold watermark_holds = 14;
 
-  repeated int64 finalize_ids = 19 [packed = true];
-
-  optional int64 testonly_fake_clock_time_usec = 23;
-
   // DEPRECATED
   repeated GlobalDataId global_data_id_requests = 9;
 
   reserved 6;
 
 Review comment:
   Ok, I'll make the change
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 340746)
Time Spent: 2h 10m  (was: 2h)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=340701&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340701
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 08/Nov/19 19:27
Start Date: 08/Nov/19 19:27
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on pull request #10013: [BEAM-8554] 
Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r344331362
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
 ##
 @@ -293,19 +293,16 @@ message WorkItemCommitRequest {
   optional SourceState source_state_updates = 12;
   optional int64 source_watermark = 13 [default=-0x8000];
   optional int64 source_backlog_bytes = 17 [default=-1];
-  optional int64 source_bytes_processed = 22 [default = 0];
+  optional int64 source_bytes_processed = 22;
 
   repeated WatermarkHold watermark_holds = 14;
 
-  repeated int64 finalize_ids = 19 [packed = true];
-
-  optional int64 testonly_fake_clock_time_usec = 23;
-
   // DEPRECATED
   repeated GlobalDataId global_data_id_requests = 9;
 
   reserved 6;
 
 Review comment:
   Nit but I think you can do
   reserved 6, 19, 23;
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 340701)
Time Spent: 1h 50m  (was: 1h 40m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=340702&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340702
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 08/Nov/19 19:27
Start Date: 08/Nov/19 19:27
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on pull request #10013: [BEAM-8554] 
Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r344331200
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 ##
 @@ -949,63 +949,14 @@ public void testKeyCommitTooLargeException() throws 
Exception {
 assertEquals(2, result.size());
 assertEquals(makeExpectedOutput(2, 0, "key", "key").build(), 
result.get(2L));
 assertTrue(result.containsKey(1L));
-assertEquals("large_key", result.get(1L).getKey().toStringUtf8());
-assertTrue(result.get(1L).getSerializedSize() > 1000);
 
-// Spam worker updates a few times.
-int maxTries = 10;
-while (--maxTries > 0) {
-  worker.reportPeriodicWorkerUpdates();
-  Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
-}
+WorkItemCommitRequest largeCommit = result.get(1L);
+assertEquals("large_key", largeCommit.getKey().toStringUtf8());
 
-// We should see an exception reported for the large commit but not the 
small one.
-ArgumentCaptor workItemStatusCaptor =
-ArgumentCaptor.forClass(WorkItemStatus.class);
-verify(mockWorkUnitClient, 
atLeast(2)).reportWorkItemStatus(workItemStatusCaptor.capture());
-List capturedStatuses = 
workItemStatusCaptor.getAllValues();
-boolean foundErrors = false;
-for (WorkItemStatus status : capturedStatuses) {
-  if (!status.getErrors().isEmpty()) {
-assertFalse(foundErrors);
-foundErrors = true;
-String errorMessage = status.getErrors().get(0).getMessage();
-assertThat(errorMessage, 
Matchers.containsString("KeyCommitTooLargeException"));
-  }
-}
-assertTrue(foundErrors);
-  }
-
-  @Test
-  public void testKeyCommitTooLargeException_StreamingEngine() throws 
Exception {
-KvCoder kvCoder = KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of());
-
-List instructions =
-Arrays.asList(
-makeSourceInstruction(kvCoder),
-makeDoFnInstruction(new LargeCommitFn(), 0, kvCoder),
-makeSinkInstruction(kvCoder, 1));
-
-FakeWindmillServer server = new FakeWindmillServer(errorCollector);
-server.setExpectedExceptionCount(1);
-
-StreamingDataflowWorkerOptions options =
-createTestingPipelineOptions(server, 
"--experiments=enable_streaming_engine");
-StreamingDataflowWorker worker = makeWorker(instructions, options, true /* 
publishCounters */);
-worker.setMaxWorkItemCommitBytes(1000);
-worker.start();
-
-server.addWorkToOffer(makeInput(1, 0, "large_key"));
-server.addWorkToOffer(makeInput(2, 0, "key"));
-server.waitForEmptyWorkQueue();
-
-Map result = 
server.waitForAndGetCommits(1);
-
-assertEquals(2, result.size());
-assertEquals(makeExpectedOutput(2, 0, "key", "key").build(), 
result.get(2L));
-assertTrue(result.containsKey(1L));
-assertEquals("large_key", result.get(1L).getKey().toStringUtf8());
-assertTrue(result.get(1L).getSerializedSize() > 1000);
+// The large commit should have its flags set marking it for truncation
+assertTrue(largeCommit.getExceedsMaxWorkItemCommitBytes());
+assertTrue(largeCommit.getSerializedSize() < 100);
 
 Review comment:
   Now that you are making a fully expected truncated commit you could remove 
the field specific checks as they are redundant.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 340702)
Time Spent: 2h  (was: 1h 50m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommi

[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=339618&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339618
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 06/Nov/19 22:32
Start Date: 06/Nov/19 22:32
Worklog Time Spent: 10m 
  Work Description: stevekoonce commented on pull request #10013: 
[BEAM-8554] Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r343363529
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 ##
 @@ -949,63 +949,14 @@ public void testKeyCommitTooLargeException() throws 
Exception {
 assertEquals(2, result.size());
 assertEquals(makeExpectedOutput(2, 0, "key", "key").build(), 
result.get(2L));
 assertTrue(result.containsKey(1L));
-assertEquals("large_key", result.get(1L).getKey().toStringUtf8());
-assertTrue(result.get(1L).getSerializedSize() > 1000);
 
-// Spam worker updates a few times.
-int maxTries = 10;
-while (--maxTries > 0) {
-  worker.reportPeriodicWorkerUpdates();
-  Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
-}
+WorkItemCommitRequest largeCommit = result.get(1L);
+assertEquals("large_key", largeCommit.getKey().toStringUtf8());
 
 Review comment:
   Done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 339618)
Time Spent: 1h 40m  (was: 1.5h)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=339617&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339617
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 06/Nov/19 22:32
Start Date: 06/Nov/19 22:32
Worklog Time Spent: 10m 
  Work Description: stevekoonce commented on pull request #10013: 
[BEAM-8554] Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r343363496
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 ##
 @@ -949,63 +949,14 @@ public void testKeyCommitTooLargeException() throws 
Exception {
 assertEquals(2, result.size());
 assertEquals(makeExpectedOutput(2, 0, "key", "key").build(), 
result.get(2L));
 assertTrue(result.containsKey(1L));
-assertEquals("large_key", result.get(1L).getKey().toStringUtf8());
-assertTrue(result.get(1L).getSerializedSize() > 1000);
 
-// Spam worker updates a few times.
-int maxTries = 10;
-while (--maxTries > 0) {
-  worker.reportPeriodicWorkerUpdates();
-  Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
-}
+WorkItemCommitRequest largeCommit = result.get(1L);
+assertEquals("large_key", largeCommit.getKey().toStringUtf8());
 
-// We should see an exception reported for the large commit but not the 
small one.
-ArgumentCaptor workItemStatusCaptor =
-ArgumentCaptor.forClass(WorkItemStatus.class);
-verify(mockWorkUnitClient, 
atLeast(2)).reportWorkItemStatus(workItemStatusCaptor.capture());
-List capturedStatuses = 
workItemStatusCaptor.getAllValues();
-boolean foundErrors = false;
-for (WorkItemStatus status : capturedStatuses) {
-  if (!status.getErrors().isEmpty()) {
-assertFalse(foundErrors);
-foundErrors = true;
-String errorMessage = status.getErrors().get(0).getMessage();
-assertThat(errorMessage, 
Matchers.containsString("KeyCommitTooLargeException"));
-  }
-}
-assertTrue(foundErrors);
-  }
-
-  @Test
-  public void testKeyCommitTooLargeException_StreamingEngine() throws 
Exception {
-KvCoder kvCoder = KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of());
-
-List instructions =
-Arrays.asList(
-makeSourceInstruction(kvCoder),
-makeDoFnInstruction(new LargeCommitFn(), 0, kvCoder),
-makeSinkInstruction(kvCoder, 1));
-
-FakeWindmillServer server = new FakeWindmillServer(errorCollector);
-server.setExpectedExceptionCount(1);
-
-StreamingDataflowWorkerOptions options =
-createTestingPipelineOptions(server, 
"--experiments=enable_streaming_engine");
-StreamingDataflowWorker worker = makeWorker(instructions, options, true /* 
publishCounters */);
-worker.setMaxWorkItemCommitBytes(1000);
-worker.start();
-
-server.addWorkToOffer(makeInput(1, 0, "large_key"));
-server.addWorkToOffer(makeInput(2, 0, "key"));
-server.waitForEmptyWorkQueue();
-
-Map result = 
server.waitForAndGetCommits(1);
-
-assertEquals(2, result.size());
-assertEquals(makeExpectedOutput(2, 0, "key", "key").build(), 
result.get(2L));
-assertTrue(result.containsKey(1L));
-assertEquals("large_key", result.get(1L).getKey().toStringUtf8());
-assertTrue(result.get(1L).getSerializedSize() > 1000);
+// The large commit should have its flags set marking it for truncation
+assertTrue(largeCommit.getExceedsMaxWorkItemCommitBytes());
+assertTrue(largeCommit.getSerializedSize() < 100);
 
 Review comment:
   Done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 339617)
Time Spent: 1.5h  (was: 1h 20m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects 

[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=339614&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339614
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 06/Nov/19 22:23
Start Date: 06/Nov/19 22:23
Worklog Time Spent: 10m 
  Work Description: stevekoonce commented on pull request #10013: 
[BEAM-8554] Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r343360175
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
 ##
 @@ -290,12 +293,19 @@ message WorkItemCommitRequest {
   optional SourceState source_state_updates = 12;
   optional int64 source_watermark = 13 [default=-0x8000];
   optional int64 source_backlog_bytes = 17 [default=-1];
+  optional int64 source_bytes_processed = 22 [default = 0];
+
   repeated WatermarkHold watermark_holds = 14;
 
+  repeated int64 finalize_ids = 19 [packed = true];
+
+  optional int64 testonly_fake_clock_time_usec = 23;
+
   // DEPRECATED
   repeated GlobalDataId global_data_id_requests = 9;
 
   reserved 6;
+
 
 Review comment:
   Done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 339614)
Time Spent: 1h 20m  (was: 1h 10m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=339612&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339612
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 06/Nov/19 22:23
Start Date: 06/Nov/19 22:23
Worklog Time Spent: 10m 
  Work Description: stevekoonce commented on pull request #10013: 
[BEAM-8554] Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r343360096
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
 ##
 @@ -290,12 +293,19 @@ message WorkItemCommitRequest {
   optional SourceState source_state_updates = 12;
   optional int64 source_watermark = 13 [default=-0x8000];
   optional int64 source_backlog_bytes = 17 [default=-1];
+  optional int64 source_bytes_processed = 22 [default = 0];
 
 Review comment:
   Done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 339612)
Time Spent: 1h  (was: 50m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=339613&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339613
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 06/Nov/19 22:23
Start Date: 06/Nov/19 22:23
Worklog Time Spent: 10m 
  Work Description: stevekoonce commented on pull request #10013: 
[BEAM-8554] Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r343360142
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
 ##
 @@ -290,12 +293,19 @@ message WorkItemCommitRequest {
   optional SourceState source_state_updates = 12;
   optional int64 source_watermark = 13 [default=-0x8000];
   optional int64 source_backlog_bytes = 17 [default=-1];
+  optional int64 source_bytes_processed = 22 [default = 0];
+
   repeated WatermarkHold watermark_holds = 14;
 
+  repeated int64 finalize_ids = 19 [packed = true];
+
+  optional int64 testonly_fake_clock_time_usec = 23;
 
 Review comment:
   Done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 339613)
Time Spent: 1h 10m  (was: 1h)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=339593&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339593
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 06/Nov/19 21:12
Start Date: 06/Nov/19 21:12
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on pull request #10013: [BEAM-8554] 
Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r343328612
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 ##
 @@ -949,63 +949,14 @@ public void testKeyCommitTooLargeException() throws 
Exception {
 assertEquals(2, result.size());
 assertEquals(makeExpectedOutput(2, 0, "key", "key").build(), 
result.get(2L));
 assertTrue(result.containsKey(1L));
-assertEquals("large_key", result.get(1L).getKey().toStringUtf8());
-assertTrue(result.get(1L).getSerializedSize() > 1000);
 
-// Spam worker updates a few times.
-int maxTries = 10;
-while (--maxTries > 0) {
-  worker.reportPeriodicWorkerUpdates();
-  Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
-}
+WorkItemCommitRequest largeCommit = result.get(1L);
+assertEquals("large_key", largeCommit.getKey().toStringUtf8());
 
-// We should see an exception reported for the large commit but not the 
small one.
-ArgumentCaptor workItemStatusCaptor =
-ArgumentCaptor.forClass(WorkItemStatus.class);
-verify(mockWorkUnitClient, 
atLeast(2)).reportWorkItemStatus(workItemStatusCaptor.capture());
-List capturedStatuses = 
workItemStatusCaptor.getAllValues();
-boolean foundErrors = false;
-for (WorkItemStatus status : capturedStatuses) {
-  if (!status.getErrors().isEmpty()) {
-assertFalse(foundErrors);
-foundErrors = true;
-String errorMessage = status.getErrors().get(0).getMessage();
-assertThat(errorMessage, 
Matchers.containsString("KeyCommitTooLargeException"));
-  }
-}
-assertTrue(foundErrors);
-  }
-
-  @Test
-  public void testKeyCommitTooLargeException_StreamingEngine() throws 
Exception {
-KvCoder kvCoder = KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of());
-
-List instructions =
-Arrays.asList(
-makeSourceInstruction(kvCoder),
-makeDoFnInstruction(new LargeCommitFn(), 0, kvCoder),
-makeSinkInstruction(kvCoder, 1));
-
-FakeWindmillServer server = new FakeWindmillServer(errorCollector);
-server.setExpectedExceptionCount(1);
-
-StreamingDataflowWorkerOptions options =
-createTestingPipelineOptions(server, 
"--experiments=enable_streaming_engine");
-StreamingDataflowWorker worker = makeWorker(instructions, options, true /* 
publishCounters */);
-worker.setMaxWorkItemCommitBytes(1000);
-worker.start();
-
-server.addWorkToOffer(makeInput(1, 0, "large_key"));
-server.addWorkToOffer(makeInput(2, 0, "key"));
-server.waitForEmptyWorkQueue();
-
-Map result = 
server.waitForAndGetCommits(1);
-
-assertEquals(2, result.size());
-assertEquals(makeExpectedOutput(2, 0, "key", "key").build(), 
result.get(2L));
-assertTrue(result.containsKey(1L));
-assertEquals("large_key", result.get(1L).getKey().toStringUtf8());
-assertTrue(result.get(1L).getSerializedSize() > 1000);
+// The large commit should have its flags set marking it for truncation
+assertTrue(largeCommit.getExceedsMaxWorkItemCommitBytes());
+assertTrue(largeCommit.getSerializedSize() < 100);
 
 Review comment:
   verify the timers and output messages repeated fields are empty
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 339593)
Time Spent: 50m  (was: 40m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request

[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=339590&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339590
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 06/Nov/19 21:12
Start Date: 06/Nov/19 21:12
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on pull request #10013: [BEAM-8554] 
Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r343329744
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 ##
 @@ -949,63 +949,14 @@ public void testKeyCommitTooLargeException() throws 
Exception {
 assertEquals(2, result.size());
 assertEquals(makeExpectedOutput(2, 0, "key", "key").build(), 
result.get(2L));
 assertTrue(result.containsKey(1L));
-assertEquals("large_key", result.get(1L).getKey().toStringUtf8());
-assertTrue(result.get(1L).getSerializedSize() > 1000);
 
-// Spam worker updates a few times.
-int maxTries = 10;
-while (--maxTries > 0) {
-  worker.reportPeriodicWorkerUpdates();
-  Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
-}
+WorkItemCommitRequest largeCommit = result.get(1L);
+assertEquals("large_key", largeCommit.getKey().toStringUtf8());
 
 Review comment:
   verify sharding_key, work_token, cache_token also (verified in other cases 
with makeExpectedOutput)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 339590)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=339592&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339592
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 06/Nov/19 21:12
Start Date: 06/Nov/19 21:12
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on pull request #10013: [BEAM-8554] 
Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r343327998
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
 ##
 @@ -290,12 +293,19 @@ message WorkItemCommitRequest {
   optional SourceState source_state_updates = 12;
   optional int64 source_watermark = 13 [default=-0x8000];
   optional int64 source_backlog_bytes = 17 [default=-1];
+  optional int64 source_bytes_processed = 22 [default = 0];
+
   repeated WatermarkHold watermark_holds = 14;
 
+  repeated int64 finalize_ids = 19 [packed = true];
+
+  optional int64 testonly_fake_clock_time_usec = 23;
 
 Review comment:
   rm and instead have in reserved field list below
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 339592)
Time Spent: 50m  (was: 40m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=339591&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339591
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 06/Nov/19 21:12
Start Date: 06/Nov/19 21:12
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on pull request #10013: [BEAM-8554] 
Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r343328116
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
 ##
 @@ -290,12 +293,19 @@ message WorkItemCommitRequest {
   optional SourceState source_state_updates = 12;
   optional int64 source_watermark = 13 [default=-0x8000];
   optional int64 source_backlog_bytes = 17 [default=-1];
+  optional int64 source_bytes_processed = 22 [default = 0];
+
   repeated WatermarkHold watermark_holds = 14;
 
+  repeated int64 finalize_ids = 19 [packed = true];
+
+  optional int64 testonly_fake_clock_time_usec = 23;
+
   // DEPRECATED
   repeated GlobalDataId global_data_id_requests = 9;
 
   reserved 6;
+
 
 Review comment:
   rm blank line
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 339591)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=339589&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339589
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 06/Nov/19 21:12
Start Date: 06/Nov/19 21:12
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on pull request #10013: [BEAM-8554] 
Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r343327746
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
 ##
 @@ -290,12 +293,19 @@ message WorkItemCommitRequest {
   optional SourceState source_state_updates = 12;
   optional int64 source_watermark = 13 [default=-0x8000];
   optional int64 source_backlog_bytes = 17 [default=-1];
+  optional int64 source_bytes_processed = 22 [default = 0];
 
 Review comment:
   rm default=0, it's the default :)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 339589)
Time Spent: 40m  (was: 0.5h)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=339584&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339584
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 06/Nov/19 20:58
Start Date: 06/Nov/19 20:58
Worklog Time Spent: 10m 
  Work Description: stevekoonce commented on issue #10013: [BEAM-8554] Use 
WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#issuecomment-550497575
 
 
   R: @scwhittle 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 339584)
Time Spent: 0.5h  (was: 20m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=339583&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339583
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 06/Nov/19 20:58
Start Date: 06/Nov/19 20:58
Worklog Time Spent: 10m 
  Work Description: stevekoonce commented on issue #10013: [BEAM-8554] Use 
WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#issuecomment-550497470
 
 
   retest this please
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 339583)
Time Spent: 20m  (was: 10m)

> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to 
> be broken up
> -
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Steve Koonce
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted 
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and 
> the request is still sent to the service.  The service rejects the commit, 
> but breaks up input messages that were bundled together and adds them to new, 
> smaller work items that will later be pulled and re-tried - likely without 
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to 
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is 
> sent back to the service.
>  
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to 
> the service.  Instead send flags in the commit request signaling that the 
> current work item led to a commit that is too large and the work item should 
> be broken up.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8554) Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to be broken up

2019-11-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=339570&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339570
 ]

ASF GitHub Bot logged work on BEAM-8554:


Author: ASF GitHub Bot
Created on: 06/Nov/19 20:07
Start Date: 06/Nov/19 20:07
Worklog Time Spent: 10m 
  Work Description: stevekoonce commented on pull request #10013: 
[BEAM-8554] Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013
 
 
   …a WorkItem needs to be broken up
   
   This implements the improvement described in 
[BEAM-8554](https://issues.apache.org/jira/browse/BEAM-8554): when the 
serialized size of a WorkItemCommitRequest proto is larger than the maximum 
size, the commit request will be replaced by a request for a server-side 
'truncation' which will cause the WorkItem itself to be broken up and, after 
reprocessing, result in multiple, smaller WorkItemCommitRequests that are each 
smaller and can be successfully submitted.
   
   I updated an existing unit test and removed a redundant one - the 
StreamingDataflowWorkerTest is already configured to run all tests with and 
without StreamingEngine and Windmill, so separate, otherwise-identical tests 
are not necessary.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCom