Mutability checking might catch that.

I meant to suggest not putting the check in the pipeline, but offering a
testing discipline that will catch such issues. One thing that's been on
the back burner for a long time is making CoderProperties into a
CoderTester like Guava's EqualityTester. Then it can run through all the
properties without a user setting up test suites. Downside is that the test
failure signal gets aggregated.

Kenn

On Wed, Jun 2, 2021 at 12:09 PM Brian Hulette <bhule...@google.com> wrote:

> Could the DirectRunner just do an equality check whenever it does an
> encode/decode? It sounds like it's already effectively performing
> a CoderProperties.coderDecodeEncodeEqual for every element, just omitting
> the equality check.
>
> On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax <re...@google.com> wrote:
>
>> There is no bug in the Coder itself, so that wouldn't catch it. We could
>> insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo, but if
>> the Direct runner already does an encode/decode before that ParDo, then
>> that would have fixed the problem before we could see it.
>>
>> On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> Would it be caught by CoderProperties?
>>>
>>> Kenn
>>>
>>> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I don't think this bug is schema specific - we created a Java object
>>>> that is inconsistent with its encoded form, which could happen to any
>>>> transform.
>>>>
>>>> This does seem to be a gap in DirectRunner testing though. It also
>>>> makes it hard to test using PAssert, as I believe that puts everything in a
>>>> side input, forcing an encoding/decoding.
>>>>
>>>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette <bhule...@google.com>
>>>> wrote:
>>>>
>>>>> +dev <d...@beam.apache.org>
>>>>>
>>>>> > I bet the DirectRunner is encoding and decoding in between, which
>>>>> fixes the object.
>>>>>
>>>>> Do we need better testing of schema-aware (and potentially other
>>>>> built-in) transforms in the face of fusion to root out issues like this?
>>>>>
>>>>> Brian
>>>>>
>>>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <
>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>
>>>>>> I have some other work-related things I need to do this week, so I
>>>>>> will likely report back on this over the weekend.  Thank you for the
>>>>>> explanation.  It makes perfect sense now.
>>>>>>
>>>>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Some more context - the problem is that RenameFields outputs (in
>>>>>>> this case) Java Row objects that are inconsistent with the actual 
>>>>>>> schema.
>>>>>>> For example if you have the following schema:
>>>>>>>
>>>>>>> Row {
>>>>>>>    field1: Row {
>>>>>>>       field2: string
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>> And rename field1.field2 -> renamed, you'll get the following schema
>>>>>>>
>>>>>>> Row {
>>>>>>>   field1: Row {
>>>>>>>      renamed: string
>>>>>>>    }
>>>>>>> }
>>>>>>>
>>>>>>> However the Java object for the _nested_ row will return the old
>>>>>>> schema if getSchema() is called on it. This is because we only update 
>>>>>>> the
>>>>>>> schema on the top-level row.
>>>>>>>
>>>>>>> I think this explains why your test works in the direct runner. If
>>>>>>> the row ever goes through an encode/decode path, it will come back 
>>>>>>> correct.
>>>>>>> The original incorrect Java objects are no longer around, and new
>>>>>>> (consistent) objects are constructed from the raw data and the 
>>>>>>> PCollection
>>>>>>> schema. Dataflow tends to fuse ParDos together, so the following ParDo 
>>>>>>> will
>>>>>>> see the incorrect Row object. I bet the DirectRunner is encoding and
>>>>>>> decoding in between, which fixes the object.
>>>>>>>
>>>>>>> You can validate this theory by forcing a shuffle after RenameFields
>>>>>>> using Reshufflle. It should fix the issue If it does, let me know and 
>>>>>>> I'll
>>>>>>> work on a fix to RenameFields.
>>>>>>>
>>>>>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Aha, yes this indeed another bug in the transform. The schema is
>>>>>>>> set on the top-level Row but not on any nested rows.
>>>>>>>>
>>>>>>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <
>>>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thank you everyone for your input.  I believe it will be easiest
>>>>>>>>> to respond to all feedback in a single message rather than messages 
>>>>>>>>> per
>>>>>>>>> person.
>>>>>>>>>
>>>>>>>>>    - NeedsRunner - The tests are run eventually, so obviously all
>>>>>>>>>    good on my end.  I was trying to run the smallest subset of test 
>>>>>>>>> cases
>>>>>>>>>    possible and didn't venture beyond `gradle test`.
>>>>>>>>>    - Stack Trace - There wasn't any unfortunately because no
>>>>>>>>>    exception thrown in the code.  The Beam Row was translated into a 
>>>>>>>>> BQ
>>>>>>>>>    TableRow and an insertion was attempted.  The error "message" was 
>>>>>>>>> part of
>>>>>>>>>    the response JSON that came back as a result of a request against 
>>>>>>>>> the BQ
>>>>>>>>>    API.
>>>>>>>>>    - Desired Behaviour - (field0_1.field1_0, nestedStringField)
>>>>>>>>>    -> field0_1.nestedStringField is what I am looking for.
>>>>>>>>>    - Info Logging Findings (In Lieu of a Stack Trace)
>>>>>>>>>       - The Beam Schema was as expected with all renames applied.
>>>>>>>>>       - The example I provided was heavily stripped down in order
>>>>>>>>>       to isolate the problem.  My work example which a bit 
>>>>>>>>> impractical because
>>>>>>>>>       it's part of some generic tooling has 4 levels of nesting and 
>>>>>>>>> also produces
>>>>>>>>>       the correct output too.
>>>>>>>>>       - BigQueryUtils.toTableRow(Row) returns the expected
>>>>>>>>>       TableRow in DirectRunner.  In DataflowRunner however, only the 
>>>>>>>>> top-level
>>>>>>>>>       renames were reflected in the TableRow and all renames in the 
>>>>>>>>> nested fields
>>>>>>>>>       weren't.
>>>>>>>>>       - BigQueryUtils.toTableRow(Row) recurses on the Row values
>>>>>>>>>       and uses the Row.schema to get the field names.  This makes 
>>>>>>>>> sense to me,
>>>>>>>>>       but if a value is actually a Row then its schema appears to be 
>>>>>>>>> inconsistent
>>>>>>>>>       with the top-level schema
>>>>>>>>>    - My Current Workaround - I forked RenameFields and replaced
>>>>>>>>>    the attachValues in expand method to be a "deep" rename.  This is 
>>>>>>>>> obviously
>>>>>>>>>    inefficient and I will not be submitting a PR for that.
>>>>>>>>>    - JIRA ticket -
>>>>>>>>>    https://issues.apache.org/jira/browse/BEAM-12442
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> This transform is the same across all runners. A few comments on
>>>>>>>>>> the test:
>>>>>>>>>>
>>>>>>>>>>   - Using attachValues directly is error prone (per the comment
>>>>>>>>>> on the method). I recommend using the withFieldValue builders 
>>>>>>>>>> instead.
>>>>>>>>>>   - I recommend capturing the RenameFields PCollection into a
>>>>>>>>>> local variable of type PCollection<Row> and printing out the schema 
>>>>>>>>>> (which
>>>>>>>>>> you can get using the PCollection.getSchema method) to ensure that 
>>>>>>>>>> the
>>>>>>>>>> output schema looks like you expect.
>>>>>>>>>>    - RenameFields doesn't flatten. So renaming field0_1.field1_0
>>>>>>>>>> - > nestedStringField results in field0_1.nestedStringField; if you 
>>>>>>>>>> wanted
>>>>>>>>>> to flatten, then the better transform would be
>>>>>>>>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField).
>>>>>>>>>>
>>>>>>>>>> This all being said, eyeballing the implementation of
>>>>>>>>>> RenameFields makes me think that it is buggy in the case where you 
>>>>>>>>>> specify
>>>>>>>>>> a top-level field multiple times like you do. I think it is simply
>>>>>>>>>> adding the top-level field into the output schema multiple times, 
>>>>>>>>>> and the
>>>>>>>>>> second time is with the field0_1 base name; I have no idea why your 
>>>>>>>>>> test
>>>>>>>>>> doesn't catch this in the DirectRunner, as it's equally broken 
>>>>>>>>>> there. Could
>>>>>>>>>> you file a JIRA about this issue and assign it to me?
>>>>>>>>>>
>>>>>>>>>> Reuven
>>>>>>>>>>
>>>>>>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <
>>>>>>>>>>> bhule...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Matthew,
>>>>>>>>>>>>
>>>>>>>>>>>> > The unit tests also seem to be disabled for this as well and
>>>>>>>>>>>> so I don’t know if the PTransform behaves as expected.
>>>>>>>>>>>>
>>>>>>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our
>>>>>>>>>>>> testing framework. NeedsRunner indicates that a test suite can't be
>>>>>>>>>>>> executed with the SDK alone, it needs a runner. So that exclusion 
>>>>>>>>>>>> just
>>>>>>>>>>>> makes sure we don't run the test when we're verifying the SDK by 
>>>>>>>>>>>> itself in
>>>>>>>>>>>> the :sdks:java:core:test task. The test is still run in other 
>>>>>>>>>>>> tasks where
>>>>>>>>>>>> we have a runner, most notably in the Java PreCommit [1], where we 
>>>>>>>>>>>> run it
>>>>>>>>>>>> as part of the :runners:direct-java:test task.
>>>>>>>>>>>>
>>>>>>>>>>>> That being said, we may only run these tests continuously with
>>>>>>>>>>>> the DirectRunner, I'm not sure if we test them on all the runners 
>>>>>>>>>>>> like we
>>>>>>>>>>>> do with ValidatesRunner tests.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> That is correct. The tests are tests _of the transform_ so they
>>>>>>>>>>> run only on the DirectRunner. They are not tests of the runner, 
>>>>>>>>>>> which is
>>>>>>>>>>> only responsible for correctly implementing Beam's primitives. The
>>>>>>>>>>> transform should not behave differently on different runners, 
>>>>>>>>>>> except for
>>>>>>>>>>> fundamental differences in how they schedule work and checkpoint.
>>>>>>>>>>>
>>>>>>>>>>> Kenn
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> > The error message I’m receiving, : Error while reading data,
>>>>>>>>>>>> error message: JSON parsing error in row starting at position 0: 
>>>>>>>>>>>> No such
>>>>>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying
>>>>>>>>>>>> to use the original name for the nested field and not the 
>>>>>>>>>>>> substitute name.
>>>>>>>>>>>>
>>>>>>>>>>>> Is there a stacktrace associated with this error? It would be
>>>>>>>>>>>> helpful to see where the error is coming from.
>>>>>>>>>>>>
>>>>>>>>>>>> Brian
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang <
>>>>>>>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I’m trying to use the RenameFields transform prior to
>>>>>>>>>>>>> inserting into BigQuery on nested fields.  Insertion into 
>>>>>>>>>>>>> BigQuery is
>>>>>>>>>>>>> successful with DirectRunner, but DataflowRunner has an issue 
>>>>>>>>>>>>> with renamed
>>>>>>>>>>>>> nested fields  The error message I’m receiving, : Error while
>>>>>>>>>>>>> reading data, error message: JSON parsing error in row starting 
>>>>>>>>>>>>> at position
>>>>>>>>>>>>> 0: No such field: nestedField.field1_0, suggests the BigQuery
>>>>>>>>>>>>> is trying to use the original name for the nested field and not 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> substitute name.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The code for RenameFields seems simple enough but does it
>>>>>>>>>>>>> behave differently in different runners?  Will a deep 
>>>>>>>>>>>>> attachValues be
>>>>>>>>>>>>> necessary in order get the nested renames to work across all 
>>>>>>>>>>>>> runners? Is
>>>>>>>>>>>>> there something wrong in my code?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186
>>>>>>>>>>>>>
>>>>>>>>>>>>> The unit tests also seem to be disabled for this as well and
>>>>>>>>>>>>> so I don’t know if the PTransform behaves as expected.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
>>>>>>>>>>>>>
>>>>>>>>>>>>> package ca.loblaw.cerebro.PipelineControl;
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> import com.google.api.services.bigquery.model.TableReference;
>>>>>>>>>>>>>> import
>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
>>>>>>>>>>>>>> ;
>>>>>>>>>>>>>> import org.apache.beam.sdk.Pipeline;
>>>>>>>>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
>>>>>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>>>>>>>>>>>> import org.apache.beam.sdk.schemas.Schema;
>>>>>>>>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields;
>>>>>>>>>>>>>> import org.apache.beam.sdk.transforms.Create;
>>>>>>>>>>>>>> import org.apache.beam.sdk.values.Row;
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> import java.io.File;
>>>>>>>>>>>>>> import java.util.Arrays;
>>>>>>>>>>>>>> import java.util.HashSet;
>>>>>>>>>>>>>> import java.util.stream.Collectors;
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> import static java.util.Arrays.*asList*;
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public class BQRenameFields {
>>>>>>>>>>>>>>     public static void main(String[] args) {
>>>>>>>>>>>>>>         PipelineOptionsFactory.*register*(
>>>>>>>>>>>>>> DataflowPipelineOptions.class);
>>>>>>>>>>>>>>         DataflowPipelineOptions options =
>>>>>>>>>>>>>> PipelineOptionsFactory.*fromArgs*(args).as(
>>>>>>>>>>>>>> DataflowPipelineOptions.class);
>>>>>>>>>>>>>>         options.setFilesToStage(
>>>>>>>>>>>>>>                 Arrays.*stream*(System.*getProperty*(
>>>>>>>>>>>>>> "java.class.path").
>>>>>>>>>>>>>>                         split(File.*pathSeparator*)).
>>>>>>>>>>>>>>                         map(entry -> (new
>>>>>>>>>>>>>> File(entry)).toString()).collect(Collectors.*toList*()));
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>         Pipeline pipeline = Pipeline.*create*(options);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>         Schema nestedSchema = Schema.*builder*().addField(
>>>>>>>>>>>>>> Schema.Field.*nullable*("field1_0", Schema.FieldType.*STRING*
>>>>>>>>>>>>>> )).build();
>>>>>>>>>>>>>>         Schema.Field field = Schema.Field.*nullable*(
>>>>>>>>>>>>>> "field0_0", Schema.FieldType.*STRING*);
>>>>>>>>>>>>>>         Schema.Field nested = Schema.Field.*nullable*(
>>>>>>>>>>>>>> "field0_1", Schema.FieldType.*row*(nestedSchema));
>>>>>>>>>>>>>>         Schema.Field runner = Schema.Field.*nullable*(
>>>>>>>>>>>>>> "field0_2", Schema.FieldType.*STRING*);
>>>>>>>>>>>>>>         Schema rowSchema = Schema.*builder*()
>>>>>>>>>>>>>>                 .addFields(field, nested, runner)
>>>>>>>>>>>>>>                 .build();
>>>>>>>>>>>>>>         Row testRow = Row.*withSchema*(rowSchema
>>>>>>>>>>>>>> ).attachValues("value0_0", Row.*withSchema*(nestedSchema
>>>>>>>>>>>>>> ).attachValues("value1_0"), options.getRunner().toString());
>>>>>>>>>>>>>>         pipeline
>>>>>>>>>>>>>>                 .apply(Create.*of*(testRow).withRowSchema(
>>>>>>>>>>>>>> rowSchema))
>>>>>>>>>>>>>>                 .apply(RenameFields.<Row>*create*()
>>>>>>>>>>>>>>                         .rename("field0_0", "stringField")
>>>>>>>>>>>>>>                         .rename("field0_1", "nestedField")
>>>>>>>>>>>>>>                         .rename("field0_1.field1_0",
>>>>>>>>>>>>>> "nestedStringField")
>>>>>>>>>>>>>>                         .rename("field0_2", "runner"))
>>>>>>>>>>>>>>                 .apply(BigQueryIO.<Row>*write*()
>>>>>>>>>>>>>>                         .to(new
>>>>>>>>>>>>>> TableReference().setProjectId("lt-dia-lake-exp-raw"
>>>>>>>>>>>>>> ).setDatasetId("prototypes").setTableId("matto_renameFields"
>>>>>>>>>>>>>> ))
>>>>>>>>>>>>>>                         .withCreateDisposition(BigQueryIO.
>>>>>>>>>>>>>> Write.CreateDisposition.*CREATE_IF_NEEDED*)
>>>>>>>>>>>>>>                         .withWriteDisposition(BigQueryIO.
>>>>>>>>>>>>>> Write.WriteDisposition.*WRITE_APPEND*)
>>>>>>>>>>>>>>                         .withSchemaUpdateOptions(new
>>>>>>>>>>>>>> HashSet<>(*asList*(BigQueryIO.Write.SchemaUpdateOption.
>>>>>>>>>>>>>> *ALLOW_FIELD_ADDITION*)))
>>>>>>>>>>>>>>                         .useBeamSchema());
>>>>>>>>>>>>>>         pipeline.run();
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>

Reply via email to