This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch go-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git

commit efa1c20a9328177a6cd6ec4d744853417fd0ef48
Author: Bill Neubauer <w...@google.com>
AuthorDate: Tue Jan 16 15:39:20 2018 -0800

    Add additional comments about concurrency invariants.
    
    Explain how gRPC-specific goroutines are managed.
    Explain how plans are managed in the active and plans maps.
---
 sdks/go/pkg/beam/core/runtime/harness/harness.go | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 335e2b3..8822c62 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -90,6 +90,11 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
        }
 
        var cpuProfBuf bytes.Buffer
+
+       // gRPC requires all readers of a stream be the same goroutine, so this 
goroutine
+       // is responsible for managing the network data. All it does is pull 
data from
+       // the stream, and hand off the message to a goroutine to actually be 
handled,
+       // so as to avoid blocking the underlying network channel.
        for {
                req, err := client.Recv()
                if err != nil {
@@ -132,10 +137,14 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
 }
 
 type control struct {
-       plans  map[string]*exec.Plan // protected by mu
+       // plans that are candidates for execution.
+       plans map[string]*exec.Plan // protected by mu
+       // plans that are actively being executed.
+       // a plan can only be in one of these maps at any time.
        active map[string]*exec.Plan // protected by mu
        mu     sync.Mutex
-       data   *DataManager
+
+       data *DataManager
 }
 
 func (c *control) handleInstruction(ctx context.Context, req 
*fnpb.InstructionRequest) *fnpb.InstructionResponse {

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <commits@beam.apache.org>.

Reply via email to