So I guess I need to switch to Map<String, Object> instead of TableRow?

On Thu, 9 Jul 2020 at 17:13, Jeff Klukas <[email protected]> wrote:

> It looks like the fact that your pipeline in production produces nested
> TableRows is an artifact of the following decision within BigQueryIO logic:
>
>
> https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java#L348-L350
>
> The convertGenericRecordToTableRow function is used recursively for
> RECORD-type fields, so you end up with nested TableRows in the PCollection
> returned from BigQueryIO.read. But then the TableRowJsonCoder uses a
> Jackson ObjectMapper, which makes different decisions as to what map type
> to use.
>
> > Thanks for explaining. Is it documented somewhere that TableRow contains
> Map<String, Object>?
>
> I don't see that explicitly spelled out anywhere. If you follow the trail
> of links from TableRow, you'll get to these docs about Google's JSON
> handling in Java, which may or may not be relevant to this question:
>
> https://googleapis.github.io/google-http-java-client/json.html
>
>
>
> On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich <[email protected]>
> wrote:
>
>> Thanks for explaining. Is it documented somewhere that TableRow contains
>> Map<String, Object>?
>> I don't construct it, I fetch from Google Analytics export to BigQuery
>> table.
>>
>> On Thu, 9 Jul 2020 at 16:40, Jeff Klukas <[email protected]> wrote:
>>
>>> I would expect the following line to fail:
>>>
>>>         List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
>>>
>>> The top-level bigQueryRow will be a TableRow, but
>>> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some
>>> class that implements `Map`. So that line needs to become:
>>>
>>>         List<Map<String, Object> h = ((List<Map<String, Object>)
>>> bigQueryRow.get("hits"));
>>>
>>> And then your constructor for Hit must accept a Map<String, Object>
>>> rather than a TableRow.
>>>
>>> I imagine that TableRow is only intended to be used as a top-level
>>> object. Each row you get from a BQ result is a TableRow, but objects nested
>>> inside it are not logically table rows; they're BQ structs that are modeled
>>> as maps in JSON and Map<String, Object> in Java.
>>>
>>> Are you manually constructing TableRow objects with nested TableRows? I
>>> would expect that a result from BigQueryIO.read() would give a TableRow
>>> with some other map type for nested structs, so I'm surprised that this
>>> cast works in some contexts.
>>>
>>> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich <[email protected]>
>>> wrote:
>>>
>>>>    I changed code a little bit not to use lambdas.
>>>>
>>>>    Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
>>>>         List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
>>>>         List<Hit> hits = new ArrayList<>();
>>>>
>>>>         for (TableRow tableRow : h) { <-- breaks here
>>>>             hits.add(new Hit(tableRow));
>>>>         }
>>>>         ...
>>>>     }
>>>>
>>>> Stack trace
>>>>
>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>>>> cast to class com.google.api.services.bigquery.model.TableRow
>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>>> loader 'app')
>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
>>>> to class com.google.api.services.bigquery.model.TableRow
>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>>> loader 'app')
>>>> at
>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
>>>> at
>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
>>>> at
>>>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
>>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
>>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
>>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
>>>> at
>>>> com.ikea.search.ab.bootstrap.JobTest.testNumberOfSessionsIsCorrect(JobTest.java:78)
>>>> at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>> Method)
>>>> at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>> at
>>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>>>> at
>>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>>> at
>>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>>>> at
>>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>>> at
>>>> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
>>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>>> at
>>>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>>>> at
>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>>>> at
>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>>>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>>>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>>>> at
>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>>>> at
>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>>>> at
>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>>>> at
>>>> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>>>> at
>>>> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>>>> at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>> Method)
>>>> at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>> at
>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>>>> at
>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>>>> at
>>>> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>>>> at
>>>> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>>>> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>>>> at
>>>> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>>>> at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>> Method)
>>>> at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>> at
>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>>>> at
>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>>>> at
>>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
>>>> at
>>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
>>>> at
>>>> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
>>>> at
>>>> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
>>>> at
>>>> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
>>>> at
>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>> at
>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>> at
>>>> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
>>>> at java.base/java.lang.Thread.run(Thread.java:834)
>>>> Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap
>>>> cannot be cast to class com.google.api.services.bigquery.model.TableRow
>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>>> loader 'app')
>>>> at com.ikea.search.ab.bootstrap.Session.<init>(Session.java:35)
>>>> at
>>>> com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics.processElement(Job.java:82)
>>>>
>>>> On Wed, 8 Jul 2020 at 23:59, Jeff Klukas <[email protected]> wrote:
>>>>
>>>>> Does the stack trace tell you where specifically in the code the cast
>>>>> is happening? I'm guessing there may be assumptions inside your
>>>>> CreateSessionMetrics class if that's where you're manipulating the 
>>>>> TableRow
>>>>> objects.
>>>>>
>>>>> On Wed, Jul 8, 2020 at 4:44 PM Kirill Zhdanovich <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Interesting. All my code does is following:
>>>>>>
>>>>>> public static void main(String[] args) {
>>>>>>     PCollection<TableRow> bqResult =
>>>>>> p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql());
>>>>>>     PCollection<SomeClass> result = runJob(bqResult, boolean and
>>>>>> string params);
>>>>>>     // store results
>>>>>> }
>>>>>>
>>>>>> and
>>>>>>
>>>>>> private static PCollection<SomeClass> runJob(PCollection<TableRow>
>>>>>> bqResult,
>>>>>>
>>>>>>       ...) {
>>>>>>         return bqResult
>>>>>>                 // In this step I convert TableRow into my custom
>>>>>> class object
>>>>>>                 .apply("Create metrics based on sessions",
>>>>>>                         ParDo.of(new CreateSessionMetrics(boolean and
>>>>>> string params)))
>>>>>>                // few more transformations
>>>>>>
>>>>>> }
>>>>>>
>>>>>> This is basically similar to examples you can find here
>>>>>> https://beam.apache.org/documentation/io/built-in/google-bigquery/
>>>>>>
>>>>>> On Wed, 8 Jul 2020 at 23:31, Jeff Klukas <[email protected]> wrote:
>>>>>>
>>>>>>> On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> So from what I understand, it works like this by design and it's
>>>>>>>> not possible to test my code with the current coder implementation. Is 
>>>>>>>> that
>>>>>>>> correct?
>>>>>>>>
>>>>>>>
>>>>>>> I would argue that this test failure is indicating an area of
>>>>>>> potential failure in your code that should be addressed. It may be that
>>>>>>> your current production pipeline relies on fusion which is not 
>>>>>>> guaranteed
>>>>>>> by the Beam model, and so the pipeline could fail if the runner makes an
>>>>>>> internal change that affect fusion (in practice this is unlikely).
>>>>>>>
>>>>>>> Is it possible to update your code such that it does not need to
>>>>>>> make assumptions about the concrete Map type returned by TableRow 
>>>>>>> objects?
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards,
>>>>>> Kirill
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Kirill
>>>>
>>>
>>
>> --
>> Best Regards,
>> Kirill
>>
>

-- 
Best Regards,
Kirill

Reply via email to