[GitHub] [beam] lostluck commented on a change in pull request #12350: [BEAM-10289] Dynamic splitting implementation.

2020-08-03 Thread GitBox


lostluck commented on a change in pull request #12350:
URL: https://github.com/apache/beam/pull/12350#discussion_r464552611



##
File path: sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
##
@@ -18,6 +18,7 @@ package exec
 import (
"context"
"fmt"
+   "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"

Review comment:
   Please move this to the other beam imports below.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #12350: [BEAM-10289] Dynamic splitting implementation.

2020-07-30 Thread GitBox


lostluck commented on a change in pull request #12350:
URL: https://github.com/apache/beam/pull/12350#discussion_r462445798



##
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##
@@ -302,31 +308,76 @@ func (n *DataSource) Split(splits []int64, frac float64, 
bufSize int64) (int64,
}
 
n.mu.Lock()
+   defer n.mu.Unlock()
+
var currProg float64 // Current element progress.
-   if n.index < 0 { // Progress is at the end of the non-existant -1st 
element.
+   var su SplittableUnit
+   if n.index < 0 { // Progress is at the end of the non-existant -1st 
element.
currProg = 1.0
-   } else if n.rt == nil { // If this isn't sub-element splittable, 
estimate some progress.
+   } else if n.su == nil { // If this isn't sub-element splittable, 
estimate some progress.
currProg = 0.5
} else { // If this is sub-element splittable, get progress of the 
current element.
-   rt := <-n.rt
-   d, r := rt.GetProgress()
-   currProg = d / (d + r)
-   n.rt <- rt
+   // If splittable, hold this tracker for the rest of the 
function so the element
+   // doesn't finish processing during a split.
+   su = <-n.su
+   if su == nil {
+   return SplitResult{}, fmt.Errorf("failed to split: 
splittable unit was nil")
+   }
+   defer func() {
+   n.su <- su
+   }()
+   currProg = su.GetProgress()
}
// Size to split within is the minimum of bufSize or splitIdx so we 
avoid
// including elements we already know won't be processed.
if bufSize <= 0 || n.splitIdx < bufSize {
bufSize = n.splitIdx
}
-   s, _, err := splitHelper(n.index, bufSize, currProg, splits, frac, 
false)
+   s, f, err := splitHelper(n.index, bufSize, currProg, splits, frac, su 
!= nil)
if err != nil {
-   n.mu.Unlock()
-   return 0, err
+   return SplitResult{}, err
+   }
+
+   // No fraction returned, perform channel split.
+   if f < 0 {
+   n.splitIdx = s
+   return SplitResult{PI: s - 1, RI: s}, nil
+   }
+   // Otherwise, perform a sub-element split.
+   fr := f / (1.0 - currProg)
+   p, r, err := su.Split(fr)
+   if err != nil {
+   return SplitResult{}, err
+   }
+
+   if p != nil && r != nil { // Successful split.

Review comment:
   Consider that reversal technique I mentioned, so the short case is 
what's indented and returns early, and the long case is unindented.
   ```
   if p == nil || r == nil  {
   // Fallback to channel split, so split at next elm, not current.
n.splitIdx = s + 1
return SplitResult{PI: s, RI: s + 1}, nil
   }  // no need for an else.
   // .. the original contents of the if block ...
   ```

##
File path: sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
##
@@ -412,17 +423,120 @@ func TestDataSource_Split(t *testing.T) {
 
// SDK never splits on 0, so check that every test.
sp := SplitPoints{Splits: test.splitPts, Frac: test.frac, 
BufSize: test.bufSize}
-   if splitIdx, err := p.Split(sp); err != nil {
+   if splitRes, err := p.Split(sp); err != nil {
t.Fatalf("error in Split: %v", err)
-   } else if got, want := splitIdx, test.splitIdx; got != want {
-   t.Fatalf("error in Split: got splitIdx = %v, want %v ", 
got, want)
+   } else {

Review comment:
   And here.

##
File path: sdks/go/pkg/beam/core/runtime/exec/sdf.go
##
@@ -215,14 +215,25 @@ func (n *SplitAndSizeRestrictions) String() string {
 // changes to support the SDF's method signatures and the expected structure
 // of the FullValue being received.
 type ProcessSizedElementsAndRestrictions struct {
-   PDo *ParDo
-
-   inv *ctInvoker
-
-   // Rt allows this unit to send out restriction trackers being processed.
-   // Receivers of the tracker do not own it, and must send it back 
through the
-   // same channel once finished with it.
-   Rt chan sdf.RTracker
+   PDo *ParDo
+   TfIdstring // Transform ID. Needed for splitting.
+   ctInv   *ctInvoker
+   sizeInv *rsInvoker
+
+   // SU is a buffered channel for indicating when this unit is splittable.
+   // When this unit is processing an element, it sends a SplittableUnit
+   // interface through the channel. That interface can be received on 
other
+   // threads and used to perform splitting or other related operation.
+   //
+   // Receiving the SplittableUnit prevents the current element from 
finishing
+   // processing, so he element does not unexpectedly 

[GitHub] [beam] lostluck commented on a change in pull request #12350: [BEAM-10289] Dynamic splitting implementation.

2020-07-23 Thread GitBox


lostluck commented on a change in pull request #12350:
URL: https://github.com/apache/beam/pull/12350#discussion_r459721826



##
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##
@@ -302,31 +302,67 @@ func (n *DataSource) Split(splits []int64, frac float64, 
bufSize int64) (int64,
}
 
n.mu.Lock()
+   defer n.mu.Unlock()
+
var currProg float64 // Current element progress.
-   if n.index < 0 { // Progress is at the end of the non-existant -1st 
element.
+   var su SplittableUnit
+   if n.index < 0 { // Progress is at the end of the non-existant -1st 
element.
currProg = 1.0
-   } else if n.rt == nil { // If this isn't sub-element splittable, 
estimate some progress.
+   } else if n.su == nil { // If this isn't sub-element splittable, 
estimate some progress.
currProg = 0.5
} else { // If this is sub-element splittable, get progress of the 
current element.
-   rt := <-n.rt
-   d, r := rt.GetProgress()
-   currProg = d / (d + r)
-   n.rt <- rt
+   // If splittable, hold this tracker for the rest of the 
function so the element
+   // doesn't finish processing during a split.
+   su = <-n.su
+   if su == nil {
+   return SplitResult{}, fmt.Errorf("failed to split: 
splittable unit was nil")
+   }
+   defer func() {
+   n.su <- su
+   }()
+   currProg = su.GetProgress()
}
// Size to split within is the minimum of bufSize or splitIdx so we 
avoid
// including elements we already know won't be processed.
if bufSize <= 0 || n.splitIdx < bufSize {
bufSize = n.splitIdx
}
-   s, _, err := splitHelper(n.index, bufSize, currProg, splits, frac, 
false)
+   s, f, err := splitHelper(n.index, bufSize, currProg, splits, frac, su 
!= nil)
if err != nil {
-   n.mu.Unlock()
-   return 0, err
+   return SplitResult{}, err
+   }
+   if f > 0.0 {

Review comment:
   Consider inverting the if statement.  There's only 2 lines after this if 
statement, and 30 inside it. While it might mean the s.splitIdx assignment, and 
the SplitResult construction areduplicated, it will be easier to read the other 
way.

##
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##
@@ -302,31 +302,67 @@ func (n *DataSource) Split(splits []int64, frac float64, 
bufSize int64) (int64,
}
 
n.mu.Lock()
+   defer n.mu.Unlock()
+
var currProg float64 // Current element progress.
-   if n.index < 0 { // Progress is at the end of the non-existant -1st 
element.
+   var su SplittableUnit
+   if n.index < 0 { // Progress is at the end of the non-existant -1st 
element.
currProg = 1.0
-   } else if n.rt == nil { // If this isn't sub-element splittable, 
estimate some progress.
+   } else if n.su == nil { // If this isn't sub-element splittable, 
estimate some progress.
currProg = 0.5
} else { // If this is sub-element splittable, get progress of the 
current element.
-   rt := <-n.rt
-   d, r := rt.GetProgress()
-   currProg = d / (d + r)
-   n.rt <- rt
+   // If splittable, hold this tracker for the rest of the 
function so the element
+   // doesn't finish processing during a split.
+   su = <-n.su
+   if su == nil {
+   return SplitResult{}, fmt.Errorf("failed to split: 
splittable unit was nil")
+   }
+   defer func() {
+   n.su <- su
+   }()
+   currProg = su.GetProgress()
}
// Size to split within is the minimum of bufSize or splitIdx so we 
avoid
// including elements we already know won't be processed.
if bufSize <= 0 || n.splitIdx < bufSize {
bufSize = n.splitIdx
}
-   s, _, err := splitHelper(n.index, bufSize, currProg, splits, frac, 
false)
+   s, f, err := splitHelper(n.index, bufSize, currProg, splits, frac, su 
!= nil)
if err != nil {
-   n.mu.Unlock()
-   return 0, err
+   return SplitResult{}, err
+   }
+   if f > 0.0 {
+   fr := f / (1.0 - currProg)
+   p, r, err := su.Split(fr)
+   if err != nil {
+   return SplitResult{}, err
+   }
+
+   if p != nil && r != nil { // Successful split.
+   pEnc, err := encodeElm(p, n.Coder)
+   if err != nil {
+   return SplitResult{}, err
+   }
+   rEnc, err := encodeElm(r, n.Coder)

Review