Re: BEAM-3304 - Go Trigger RuntimeError question

2021-03-25 Thread Robert Burke
>From the outset, I agree with you that there's something the Go SDK didn't
expect with the stream of bytes it received. The non-global Window handling
in the Go SDK is grossly undertested to begin with, other than the
windowed_wordcount example (which I've run maybe once or twice).  As an
aside, this is something we should change now that we have an improved
integration test set up to validate things against the Python Portable ULR.

This one is rightly tricky, I'm going to type out my debugging process.
Search for *tl;dr;* to skip to the end for the result of the debugging.

The error messages can always be improved, but it's been a while since I've
seen an encoding error like this.
First lets eliminate that the varint coder is doing something wrong.

The important part is the bit after the plan:
```
caused by:
stream value decode failed
caused by:
invalid varintz encoding for: []
```

So I search the github beam repo for that error message "invalid varintz
encoding" [1] finding one location the varintz decoder [2].
The '[]' at the end of the error message is the byte array it was trying to
decode, which means apparently it received a 0 length []byte
after reading, and couldn't handle it.

It put the []byte into `binary.Varint` which if we look to the imports, it
comes from the "encoding/binary" package [3]. As per the Varint
documentation [4], when it returns a 0, it means the []byte buffer was too
small (in this case 0) so the error is legitimate, and the returned value
is invalid.

Because I know this varint should have been encoded by beam (knowing how
the windowed_wordcount.go example works), I look
to the encoding function and see that it uses `binary.PutVarint` [5] to
encode the value. Looking at the documentation for PutVarint [6] there's an
example which we can modify to also print out the "n" length written to the
buffer, confirming that even for the natural case for a 0 length byte buffer
a 0 value, always encodes to at least 1 byte.

OK the varint coder did everything it expected. So now the question becomes
"Why is there a 0 length prefix where the integer value is expected?"
What's different here?
The printed out plan shows the hydrated ProcessBundleDescriptor as the Go
SDK interpreted it. The ordering is reversed, but the first and last lines
are the DataSource and
DataSink, and those are the only places where a Coder is going to be used.
They're the boundaries for the data entering and leaving the Go SDK
respectively, so where Decoding
and encoding is happening. Let's look at the plan line for DataSource.

1: DataSource[S[CombinePerKey/Group/Read@localhost:63851], out]
Coder:W;c8_windowed>!IWC Out:7

The first part is the index (a 1) is just where it is in the plan listing,
and is used for more complex graphs, it's referred to by the Out lines to
which PTransform(s) this transform outputs.
The next part says it's a DataSource. This line comes from the
DataSource's String method [7]. It's outputting the Stream ID
"S[CombinePerKey/Group/Read@localhost:63851]" the runner gave it to access
the right data over the data channel, and the name "out" which is the key
used to identify this bundle in particular (used in progress reporting.)
This is followed by the Coder, and then what the datasource passes it's
output to. In this case, it's passing it to "7:
MergeAccumulators[stats.sumIntFn]".

The Coder is the important part for this bug hunt. I'm not 100% satisfied
with the format that's been ended up with here, but it is what it is. It's
a result of the general coder String() method [8].
What this coder is saying is that it's receiving CoGBK values,
wrapped in an Interval Window Coder.
Loosely, the ;c# (and that c8_windowed) values are the ids into the job's
pipeline proto coders map. It prints out it's kind, the coder id, and any
component coders it might have.

This seems all in order, as it's a windowed job. I'd expect this for
unmodified wordcounts as well. It does give us the next place to look
though, Coder handling! The Beam standard coders are defined in the
beam_runner_api.proto [9]. We're about to become good friends with the
coding formats listed there.

The DataSource creates the window and value coders separately at the top of
it's Process method [10], and then does special handling if the component
of the window is a CoGBK coder. Then the per element processing begins. The
Beam Datachannel at this point is represented as a stream of bytes (using
Go's  io.Reader interface) Each element for this Datasource represents a
windowed value. The windowed values are the Key, and all the values using
that key passed to the GBK.

Aside: The special CoGBK handling is because Beam doesn't have a first
class notion of a CoGBK coder or a GBK coder, it's represented with KVs and
Iterables.
Simple GBKs are represented by a KV> coder, and it's
the Iterable that's the
tricky portion, as the Go SDK doesn't provide a first class notion of an
unbounded iterable for users to pass around. 

BEAM-3304 - Go Trigger RuntimeError question

2021-03-24 Thread Eduardo Barrera
Hello Team,

We're testing an initial and very basic implementation of a trigger. While
testing different triggers like *Always* or *Never* using the
windowed_wordcount.go example and graphx/translate.go, we get this error
message:

RuntimeError: process bundle failed for instruction bundle_147 using plan 1
: while executing Process for Plan[1]:
2: DataSink[S[CoGBK/Write@localhost:63851]]
Coder:W;c9_windowed>!GWC
3: ParDo[beam.addFixedKeyFn] Out:[2]
4: WindowInto[GLO]. Out:3
5: ParDo[main.formatFn] Out:[4]
6: ExtractOutput[stats.sumIntFn] Keyed:false Out:5
7: MergeAccumulators[stats.sumIntFn] Keyed:false Out:6
1: DataSource[S[CombinePerKey/Group/Read@localhost:63851], out]
Coder:W;c8_windowed>!IWC Out:7
caused by:
stream value decode failed
caused by:
invalid varintz encoding for: []
Remote logging shutting down.exit status 1

We believe that this is an error caused by the lack of a trigger
coder/encoder, however, it works with the Default trigger (that's set by
default in graphx/translate.go).

Any guidance on this would be appreciated.

Thanks!
Eduardo

-- 
*This email and its contents (including any attachments) are being sent to
you on the condition of confidentiality and may be protected by legal
privilege. Access to this email by anyone other than the intended recipient
is unauthorized. If you are not the intended recipient, please immediately
notify the sender by replying to this message and delete the material
immediately from your system. Any further use, dissemination, distribution
or reproduction of this email is strictly prohibited. Further, no
representation is made with respect to any content contained in this email.*