This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch go-sdk in repository https://gitbox.apache.org/repos/asf/beam.git
commit efa1c20a9328177a6cd6ec4d744853417fd0ef48 Author: Bill Neubauer <w...@google.com> AuthorDate: Tue Jan 16 15:39:20 2018 -0800 Add additional comments about concurrency invariants. Explain how gRPC-specific goroutines are managed. Explain how plans are managed in the active and plans maps. --- sdks/go/pkg/beam/core/runtime/harness/harness.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 335e2b3..8822c62 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -90,6 +90,11 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { } var cpuProfBuf bytes.Buffer + + // gRPC requires all readers of a stream be the same goroutine, so this goroutine + // is responsible for managing the network data. All it does is pull data from + // the stream, and hand off the message to a goroutine to actually be handled, + // so as to avoid blocking the underlying network channel. for { req, err := client.Recv() if err != nil { @@ -132,10 +137,14 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { } type control struct { - plans map[string]*exec.Plan // protected by mu + // plans that are candidates for execution. + plans map[string]*exec.Plan // protected by mu + // plans that are actively being executed. + // a plan can only be in one of these maps at any time. active map[string]*exec.Plan // protected by mu mu sync.Mutex - data *DataManager + + data *DataManager } func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRequest) *fnpb.InstructionResponse { -- To stop receiving notification emails like this one, please contact "commits@beam.apache.org" <commits@beam.apache.org>.