Re: KinesisIO checkpointing

2020-07-09 Thread Mani Kolbe
Is it required to set JobName and checkpointDir options for checkpointing
to work?



On Thu, 9 Jul, 2020, 9:25 PM Luke Cwik,  wrote:

> The BoundedReadFromUnboundedReader does checkpoint the underlying
> UnboundedSource, is that checkpoint logic not working?
> Do you have KinesisIO configured to always read from a specific point?
>
> On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe  wrote:
>
>> We did the same and started using maxReadTime and put the application to
>> run on a recurring schedule of 5 minutes. It works fine end to end without
>> any error.
>>
>>
>>
>> But the problem is that it always starts reading from the beginning of
>> the Kinesis stream when it stop-starts.
>>
>>
>>
>> When I did some investigation on that, I found that when you set
>> maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That
>> essentially converts source in to a bounded one. This means checkpointing
>> or watermark no longer supported. Reader just reads for x number of time
>> and exists.
>>
>>
>>
>> Is there anyway recommended way to resume reading from the position it
>> finished? Either using maxReadTime or in unboundedSource mode?
>>
>>
>>
>> Could some point me to a sample pipeline code that uses Kinesis as source?
>>
>>
>>
>> Regards,
>>
>> Mani
>>
>>
>>
>> *From:* Lars Almgren Schwartz 
>> *Sent:* Thursday, June 25, 2020 7:53 AM
>> *To:* user@beam.apache.org
>> *Subject:* Re: KinesisIO checkpointing
>>
>>
>>
>> *CAUTION:* This email originated from outside of D Please do not
>> click links or open attachments unless you recognize the sender and know
>> the content is safe.
>>
>>
>>
>> We had the exact same problem, but have not spent any time trying to
>> solve it, we just skipped checkpointing for now.
>>
>> When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18
>> and 2.19.
>>
>>
>>
>> On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe  wrote:
>>
>> We are on spark 2.4 and Beam 2.22.0
>>
>>
>>
>> *From:* Alexey Romanenko 
>> *Sent:* Wednesday, June 24, 2020 5:15 PM
>> *To:* user@beam.apache.org
>> *Subject:* Re: KinesisIO checkpointing
>>
>>
>>
>> *CAUTION:* This email originated from outside of D Please do not
>> click links or open attachments unless you recognize the sender and know
>> the content is safe.
>>
>>
>>
>> Yes, KinesisIO supports restart from checkpoints and it’s based on runner
>> checkpoints support [1].
>>
>>
>>
>> Could you specify which version of Spark and Beam you use?
>>
>>
>>
>> [1]
>> https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838
>> 
>>
>>
>>
>> On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe  wrote:
>>
>>
>>
>> Hello,
>>
>>
>>
>> We are developing a beam pipeline which runs on SparkRunner on streaming
>> mode. This pipeline read from Kinesis, do some translations, filtering and
>> finally output to S3 using AvroIO writer. We are using Fixed windows with
>> triggers based on element count and processing time intervals. Outputs path
>> is partitioned by window start timestamp. allowedLateness=0sec
>>
>>
>>
>> This is working fine, but I have noticed that whenever we restarts
>> streaming, application is starting to read from Kinesis TRIM_HORIZON. That
>> is, it is not resuming from last checkpoint position. Then I found that the
>> checkpoint directory is based on --jobName and --checkpointDir properties.
>> So I tried running as below:
>>
>>
>>
>> *spark-submit --master yarn --deploy-mode cluster --conf
>> spark.dynamicAllocation.enabled=false \*
>>
>> *--driver-memory 1g --executor-memory 1g --num-executors 1
>> --executor-cores 1 \*
>>
>> *--class com.dnb.optimus.prime.processor.PrimeStreamProcessor \*
>>
>> *--conf spark.executor.extraClassPath=/etc/hbase/conf \*
>>
>> */tmp/stream-processor-0.0.0.8-spark.jar \*
>>
>> *--runner=SparkRunner \*
>>
>> *--jobName=PrimeStreamProcessor \*
>>
>> *--checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \*
>>
>> *--useWindow=true \*
>>
>> *--windowDuration=60s --windowLateness=0s --windowElementCount=1 \*
>>
>> *--maxReadTime=-1 \*
>>
>> *--streaming=true*
>>
>>
>>
>> I can see that it is able to fetch checkpoint data from *checkpointDir* path
>> provided. But When the driver tries to broadcast this information to
>> executors, it is failing with below exception.
>>
>>
>>
>>
>>
>>
>>
>>
>> *20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message:
>> User class threw exception:
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.UnsupportedOperationException: Accumulator must be registered
>> before send to executor   

Re: KinesisIO checkpointing

2020-07-09 Thread Luke Cwik
The BoundedReadFromUnboundedReader does checkpoint the underlying
UnboundedSource, is that checkpoint logic not working?
Do you have KinesisIO configured to always read from a specific point?

On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe  wrote:

> We did the same and started using maxReadTime and put the application to
> run on a recurring schedule of 5 minutes. It works fine end to end without
> any error.
>
>
>
> But the problem is that it always starts reading from the beginning of the
> Kinesis stream when it stop-starts.
>
>
>
> When I did some investigation on that, I found that when you set
> maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That
> essentially converts source in to a bounded one. This means checkpointing
> or watermark no longer supported. Reader just reads for x number of time
> and exists.
>
>
>
> Is there anyway recommended way to resume reading from the position it
> finished? Either using maxReadTime or in unboundedSource mode?
>
>
>
> Could some point me to a sample pipeline code that uses Kinesis as source?
>
>
>
> Regards,
>
> Mani
>
>
>
> *From:* Lars Almgren Schwartz 
> *Sent:* Thursday, June 25, 2020 7:53 AM
> *To:* user@beam.apache.org
> *Subject:* Re: KinesisIO checkpointing
>
>
>
> *CAUTION:* This email originated from outside of D Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> We had the exact same problem, but have not spent any time trying to solve
> it, we just skipped checkpointing for now.
>
> When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18
> and 2.19.
>
>
>
> On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe  wrote:
>
> We are on spark 2.4 and Beam 2.22.0
>
>
>
> *From:* Alexey Romanenko 
> *Sent:* Wednesday, June 24, 2020 5:15 PM
> *To:* user@beam.apache.org
> *Subject:* Re: KinesisIO checkpointing
>
>
>
> *CAUTION:* This email originated from outside of D Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> Yes, KinesisIO supports restart from checkpoints and it’s based on runner
> checkpoints support [1].
>
>
>
> Could you specify which version of Spark and Beam you use?
>
>
>
> [1]
> https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838
> 
>
>
>
> On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe  wrote:
>
>
>
> Hello,
>
>
>
> We are developing a beam pipeline which runs on SparkRunner on streaming
> mode. This pipeline read from Kinesis, do some translations, filtering and
> finally output to S3 using AvroIO writer. We are using Fixed windows with
> triggers based on element count and processing time intervals. Outputs path
> is partitioned by window start timestamp. allowedLateness=0sec
>
>
>
> This is working fine, but I have noticed that whenever we restarts
> streaming, application is starting to read from Kinesis TRIM_HORIZON. That
> is, it is not resuming from last checkpoint position. Then I found that the
> checkpoint directory is based on --jobName and --checkpointDir properties.
> So I tried running as below:
>
>
>
> *spark-submit --master yarn --deploy-mode cluster --conf
> spark.dynamicAllocation.enabled=false \*
>
> *--driver-memory 1g --executor-memory 1g --num-executors 1
> --executor-cores 1 \*
>
> *--class com.dnb.optimus.prime.processor.PrimeStreamProcessor \*
>
> *--conf spark.executor.extraClassPath=/etc/hbase/conf \*
>
> */tmp/stream-processor-0.0.0.8-spark.jar \*
>
> *--runner=SparkRunner \*
>
> *--jobName=PrimeStreamProcessor \*
>
> *--checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \*
>
> *--useWindow=true \*
>
> *--windowDuration=60s --windowLateness=0s --windowElementCount=1 \*
>
> *--maxReadTime=-1 \*
>
> *--streaming=true*
>
>
>
> I can see that it is able to fetch checkpoint data from *checkpointDir* path
> provided. But When the driver tries to broadcast this information to
> executors, it is failing with below exception.
>
>
>
>
>
>
>
>
> *20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message:
> User class threw exception:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.UnsupportedOperationException: Accumulator must be registered
> before send to executor at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
>  at
> 

Re: Unable to read value from state/Unable to fetch data due to token mismatch for key

2020-07-09 Thread Mohil Khare
Thanks Reuven for your reply.
Good to know that it is benign.

Regards
Mohil

On Wed, Jul 8, 2020 at 10:19 PM Reuven Lax  wrote:

> This error should be benign. It often means that ownership of the work
> item was moved to a different worker (possibly caused by autoscaling or
> other source of work rebalancing), so the in-progress work item on that
> worker failed. However the work item will be processed successfully on the
> new worker that owns it. This should not cause a persistent failure.
>
> On Wed, Jul 8, 2020 at 9:53 PM Mohil Khare  wrote:
>
>> Hello,
>>
>> I am using beam java sdk 2.19.0 (with enableStreamingEngine set as true)
>> and very heavily use stateful beam processing model.
>> However, sometimes I am seeing the following exception while reading
>> value from state for a key (Please note : here my key is a POJO where
>> fields create a kind of composite key. Also I am using AvroCoder for this
>> key):
>>
>> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.beam.runners.dataflow.worker.KeyTokenInvalidException: Unable to
>> fetch data due to token mismatch for key
>> 0ggadot_static@prosimo.ioHaa552bec-25f2-11ea-8705-267acc424a25H9219bdd5-335f-11ea-bd4f-de07a30b09ca
>> @ OC-AU sydney
>>
>>1.
>>   1. at
>>   
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
>>   AbstractFuture.getDoneValue (AbstractFuture.java:531
>>   
>> 
>>   )
>>   2. at
>>   
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
>>   AbstractFuture.get (AbstractFuture.java:492
>>   
>> 
>>   )
>>   3. at
>>   
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
>>   AbstractFuture$TrustedFuture.get (AbstractFuture.java:83
>>   
>> 
>>   )
>>   4. at
>>   
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
>>   ForwardingFuture.get (ForwardingFuture.java:62
>>   
>> 
>>   )
>>   5. atorg.apache.beam.runners.dataflow.worker.
>>   WindmillStateReader$WrappedFuture.get (WindmillStateReader.java:316
>>   
>> 
>>   )
>>   6. atorg.apache.beam.runners.dataflow.worker.
>>   WindmillStateInternals$WindmillValue.read (
>>   WindmillStateInternals.java:385
>>   
>> 
>>   )
>>
>> Caused by: org.apache.beam.runners.dataflow.worker.KeyTokenInvalidException:
>> Unable to fetch data due to token mismatch for key 
>>
>>1.
>>   1. atorg.apache.beam.runners.dataflow.worker.
>>   WindmillStateReader.consumeResponse (WindmillStateReader.java:482
>>   
>> 
>>   )
>>   2. atorg.apache.beam.runners.dataflow.worker.
>>   WindmillStateReader.startBatchAndBlock (
>>   WindmillStateReader.java:420
>>   
>> 
>>   )
>>   3. atorg.apache.beam.runners.dataflow.worker.
>>   WindmillStateReader$WrappedFuture.get (WindmillStateReader.java:313
>>   
>> 
>>   )
>>
>>
>> Any help to fix this issue would be greatly appreciated.
>>
>> Thanks and Regards
>> Mohil
>>
>


RE: KinesisIO checkpointing

2020-07-09 Thread Sunny, Mani Kolbe
We did the same and started using maxReadTime and put the application to run on 
a recurring schedule of 5 minutes. It works fine end to end without any error.

But the problem is that it always starts reading from the beginning of the 
Kinesis stream when it stop-starts.

When I did some investigation on that, I found that when you set maxReadTime, 
it will run using BoundedReadFromUnboundedSource mode. That essentially 
converts source in to a bounded one. This means checkpointing or watermark no 
longer supported. Reader just reads for x number of time and exists.

Is there anyway recommended way to resume reading from the position it 
finished? Either using maxReadTime or in unboundedSource mode?

Could some point me to a sample pipeline code that uses Kinesis as source?

Regards,
Mani

From: Lars Almgren Schwartz 
Sent: Thursday, June 25, 2020 7:53 AM
To: user@beam.apache.org
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.

We had the exact same problem, but have not spent any time trying to solve it, 
we just skipped checkpointing for now.
When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18 and 
2.19.

On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe 
mailto:sun...@dnb.com>> wrote:
We are on spark 2.4 and Beam 2.22.0

From: Alexey Romanenko 
mailto:aromanenko@gmail.com>>
Sent: Wednesday, June 24, 2020 5:15 PM
To: user@beam.apache.org
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.

Yes, KinesisIO supports restart from checkpoints and it’s based on runner 
checkpoints support [1].

Could you specify which version of Spark and Beam you use?

[1] 
https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838

On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe 
mailto:sun...@dnb.com>> wrote:

Hello,

We are developing a beam pipeline which runs on SparkRunner on streaming mode. 
This pipeline read from Kinesis, do some translations, filtering and finally 
output to S3 using AvroIO writer. We are using Fixed windows with triggers 
based on element count and processing time intervals. Outputs path is 
partitioned by window start timestamp. allowedLateness=0sec

This is working fine, but I have noticed that whenever we restarts streaming, 
application is starting to read from Kinesis TRIM_HORIZON. That is, it is not 
resuming from last checkpoint position. Then I found that the checkpoint 
directory is based on --jobName and --checkpointDir properties. So I tried 
running as below:

spark-submit --master yarn --deploy-mode cluster --conf 
spark.dynamicAllocation.enabled=false \
--driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 
1 \
--class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
--conf spark.executor.extraClassPath=/etc/hbase/conf \
/tmp/stream-processor-0.0.0.8-spark.jar \
--runner=SparkRunner \
--jobName=PrimeStreamProcessor \
--checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \
--useWindow=true \
--windowDuration=60s --windowLateness=0s --windowElementCount=1 \
--maxReadTime=-1 \
--streaming=true

I can see that it is able to fetch checkpoint data from checkpointDir path 
provided. But When the driver tries to broadcast this information to executors, 
it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User 
class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.UnsupportedOperationException: Accumulator must be registered before 
send to executor
at 
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
at 
org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)

at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be 
registered before send to executor
at 
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)


Any idea? Is resuming from checkpoint position on application restart is 
actually supported on KinesisIO?

Regards,
Mani



Re: ReadFromKafka: UnsupportedOperationException: The ActiveBundle does not have a registered bundle checkpoint handler

2020-07-09 Thread Piotr Filipiuk
Thank you for looking into this.

I upvoted the BEAM-6868 .
Is there anything else I can do to have that feature prioritized, other
than trying to contribute myself?

Regarding DirectRunner, as I mentioned above I can see in the
worker_handlers.py logs that some data is being consumed from Kafka, since
the offsets logged are consistent with what is being published. Is using
DirectRunner known to be working for this use case? I do not see any errors
(I attached all the logs). The apache/beam_java_sdk:2.22.0 is running (see
logs attached).

On Thu, Jul 9, 2020 at 7:45 AM Maximilian Michels  wrote:

> This used to be working but it appears @FinalizeBundle (which KafkaIO
> requires) was simply ignored for portable (Python) pipelines. It looks
> relatively easy to fix.
>
> -Max
>
> On 07.07.20 03:37, Luke Cwik wrote:
> > The KafkaIO implementation relies on checkpointing to be able to update
> > the last committed offset. This is currently unsupported in the portable
> > Flink runner. BEAM-6868[1] is the associated JIRA. Please vote on it
> > and/or offer to provide an implementation.
> >
> > 1: https://issues.apache.org/jira/browse/BEAM-6868
> >
> > On Mon, Jul 6, 2020 at 1:42 PM Piotr Filipiuk  > > wrote:
> >
> > Hi,
> >
> > I am trying to run a simple example that uses Python API to
> > ReadFromKafka, however I am getting the following error when using
> > Flink Runner:
> >
> > java.lang.UnsupportedOperationException: The ActiveBundle does not
> > have a registered bundle checkpoint handler.
> >
> > See full log in read_from_kafka_flink.log
> >
> > I am using:
> > Kafka 2.5.0
> > Beam 2.22.0
> > Flink 1.10
> >
> > When using Direct runner, the pipeline does not fail but does not
> > seem to be consuming any data (see read_from_kafka.log) even though
> > the updated offsets are being logged:
> >
> > [2020-07-06 13:36:01,342] {worker_handlers.py:398} INFO - severity:
> INFO
> > timestamp {
> >seconds: 1594067761
> >nanos: 34000
> > }
> > message: "Reader-0: reading from test-topic-0 starting at offset 165"
> > log_location: "org.apache.beam.sdk.io.kafka.KafkaUnboundedSource"
> > thread: "23"
> >
> > I am running both Kafka and Flink locally. I would appreciate your
> > help understanding and fixing the issue.
> >
> > --
> > Best regards,
> > Piotr
> >
>


-- 
Best regards,
Piotr


beam_java_sdk.log
Description: Binary data


Re: ReadFromKafka: UnsupportedOperationException: The ActiveBundle does not have a registered bundle checkpoint handler

2020-07-09 Thread Maximilian Michels
This used to be working but it appears @FinalizeBundle (which KafkaIO 
requires) was simply ignored for portable (Python) pipelines. It looks 
relatively easy to fix.


-Max

On 07.07.20 03:37, Luke Cwik wrote:
The KafkaIO implementation relies on checkpointing to be able to update 
the last committed offset. This is currently unsupported in the portable 
Flink runner. BEAM-6868[1] is the associated JIRA. Please vote on it 
and/or offer to provide an implementation.


1: https://issues.apache.org/jira/browse/BEAM-6868

On Mon, Jul 6, 2020 at 1:42 PM Piotr Filipiuk > wrote:


Hi,

I am trying to run a simple example that uses Python API to
ReadFromKafka, however I am getting the following error when using
Flink Runner:

java.lang.UnsupportedOperationException: The ActiveBundle does not
have a registered bundle checkpoint handler.

See full log in read_from_kafka_flink.log

I am using:
Kafka 2.5.0
Beam 2.22.0
Flink 1.10

When using Direct runner, the pipeline does not fail but does not
seem to be consuming any data (see read_from_kafka.log) even though
the updated offsets are being logged:

[2020-07-06 13:36:01,342] {worker_handlers.py:398} INFO - severity: INFO
timestamp {
   seconds: 1594067761
   nanos: 34000
}
message: "Reader-0: reading from test-topic-0 starting at offset 165"
log_location: "org.apache.beam.sdk.io.kafka.KafkaUnboundedSource"
thread: "23"

I am running both Kafka and Flink locally. I would appreciate your
help understanding and fixing the issue.

-- 
Best regards,

Piotr



Re: TableRow class is not the same after serialization

2020-07-09 Thread Kirill Zhdanovich
Cool! Thanks a lot for your explanation and your time, Jeff, very much
appreciated.

On Thu, 9 Jul 2020 at 17:27, Jeff Klukas  wrote:

> On Thu, Jul 9, 2020 at 10:18 AM Kirill Zhdanovich 
> wrote:
>
>> So I guess I need to switch to Map instead of TableRow?
>>
>
> Yes, I would definitely recommend that you switch to Map.
> That's the most basic interface, and every deserialization of a top-level
> TableRow object must provide objects matching that interface wherever the
> BQ schema has a nested STRUCT/RECORD.
>
> Note that the latest docs for BigQueryIO do include a table that maps BQ
> types to Java types, but unfortunately that table lists STRUCTs as mapping
> to avro GenericRecord, which doesn't give you the info you need to
> understand the Map interface inside TableRows:
>
>
> https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html
>
> You may want to file a JIRA ticket requesting more explicit documentation
> about how to parse structs out of TableRow objects.
>
>
>> On Thu, 9 Jul 2020 at 17:13, Jeff Klukas  wrote:
>>
>>> It looks like the fact that your pipeline in production produces nested
>>> TableRows is an artifact of the following decision within BigQueryIO logic:
>>>
>>>
>>> https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java#L348-L350
>>>
>>> The convertGenericRecordToTableRow function is used recursively for
>>> RECORD-type fields, so you end up with nested TableRows in the PCollection
>>> returned from BigQueryIO.read. But then the TableRowJsonCoder uses a
>>> Jackson ObjectMapper, which makes different decisions as to what map type
>>> to use.
>>>
>>> > Thanks for explaining. Is it documented somewhere that TableRow
>>> contains Map?
>>>
>>> I don't see that explicitly spelled out anywhere. If you follow the
>>> trail of links from TableRow, you'll get to these docs about Google's JSON
>>> handling in Java, which may or may not be relevant to this question:
>>>
>>> https://googleapis.github.io/google-http-java-client/json.html
>>>
>>>
>>>
>>> On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich 
>>> wrote:
>>>
 Thanks for explaining. Is it documented somewhere that TableRow
 contains Map?
 I don't construct it, I fetch from Google Analytics export to BigQuery
 table.

 On Thu, 9 Jul 2020 at 16:40, Jeff Klukas  wrote:

> I would expect the following line to fail:
>
> List h = ((List) bigQueryRow.get("hits"));
>
> The top-level bigQueryRow will be a TableRow, but
> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some
> class that implements `Map`. So that line needs to become:
>
> List h = ((List)
> bigQueryRow.get("hits"));
>
> And then your constructor for Hit must accept a Map
> rather than a TableRow.
>
> I imagine that TableRow is only intended to be used as a top-level
> object. Each row you get from a BQ result is a TableRow, but objects 
> nested
> inside it are not logically table rows; they're BQ structs that are 
> modeled
> as maps in JSON and Map in Java.
>
> Are you manually constructing TableRow objects with nested TableRows?
> I would expect that a result from BigQueryIO.read() would give a TableRow
> with some other map type for nested structs, so I'm surprised that this
> cast works in some contexts.
>
> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich <
> kzhdanov...@gmail.com> wrote:
>
>>I changed code a little bit not to use lambdas.
>>
>>Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
>> List h = ((List) bigQueryRow.get("hits"));
>> List hits = new ArrayList<>();
>>
>> for (TableRow tableRow : h) { <-- breaks here
>> hits.add(new Hit(tableRow));
>> }
>> ...
>> }
>>
>> Stack trace
>>
>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>> cast to class com.google.api.services.bigquery.model.TableRow
>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>> loader 'app')
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be 
>> cast
>> to class com.google.api.services.bigquery.model.TableRow
>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>> loader 'app')
>> at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
>> at
>> 

Re: TableRow class is not the same after serialization

2020-07-09 Thread Jeff Klukas
On Thu, Jul 9, 2020 at 10:18 AM Kirill Zhdanovich 
wrote:

> So I guess I need to switch to Map instead of TableRow?
>

Yes, I would definitely recommend that you switch to Map.
That's the most basic interface, and every deserialization of a top-level
TableRow object must provide objects matching that interface wherever the
BQ schema has a nested STRUCT/RECORD.

Note that the latest docs for BigQueryIO do include a table that maps BQ
types to Java types, but unfortunately that table lists STRUCTs as mapping
to avro GenericRecord, which doesn't give you the info you need to
understand the Map interface inside TableRows:

https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html

You may want to file a JIRA ticket requesting more explicit documentation
about how to parse structs out of TableRow objects.


> On Thu, 9 Jul 2020 at 17:13, Jeff Klukas  wrote:
>
>> It looks like the fact that your pipeline in production produces nested
>> TableRows is an artifact of the following decision within BigQueryIO logic:
>>
>>
>> https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java#L348-L350
>>
>> The convertGenericRecordToTableRow function is used recursively for
>> RECORD-type fields, so you end up with nested TableRows in the PCollection
>> returned from BigQueryIO.read. But then the TableRowJsonCoder uses a
>> Jackson ObjectMapper, which makes different decisions as to what map type
>> to use.
>>
>> > Thanks for explaining. Is it documented somewhere that TableRow
>> contains Map?
>>
>> I don't see that explicitly spelled out anywhere. If you follow the trail
>> of links from TableRow, you'll get to these docs about Google's JSON
>> handling in Java, which may or may not be relevant to this question:
>>
>> https://googleapis.github.io/google-http-java-client/json.html
>>
>>
>>
>> On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich 
>> wrote:
>>
>>> Thanks for explaining. Is it documented somewhere that TableRow contains
>>> Map?
>>> I don't construct it, I fetch from Google Analytics export to BigQuery
>>> table.
>>>
>>> On Thu, 9 Jul 2020 at 16:40, Jeff Klukas  wrote:
>>>
 I would expect the following line to fail:

 List h = ((List) bigQueryRow.get("hits"));

 The top-level bigQueryRow will be a TableRow, but
 `bigQueryRow.get("hits")` is only guaranteed to be an instance of some
 class that implements `Map`. So that line needs to become:

 List h = ((List)
 bigQueryRow.get("hits"));

 And then your constructor for Hit must accept a Map
 rather than a TableRow.

 I imagine that TableRow is only intended to be used as a top-level
 object. Each row you get from a BQ result is a TableRow, but objects nested
 inside it are not logically table rows; they're BQ structs that are modeled
 as maps in JSON and Map in Java.

 Are you manually constructing TableRow objects with nested TableRows? I
 would expect that a result from BigQueryIO.read() would give a TableRow
 with some other map type for nested structs, so I'm surprised that this
 cast works in some contexts.

 On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich 
 wrote:

>I changed code a little bit not to use lambdas.
>
>Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
> List h = ((List) bigQueryRow.get("hits"));
> List hits = new ArrayList<>();
>
> for (TableRow tableRow : h) { <-- breaks here
> hits.add(new Hit(tableRow));
> }
> ...
> }
>
> Stack trace
>
> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
> cast to class com.google.api.services.bigquery.model.TableRow
> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
> com.google.api.services.bigquery.model.TableRow is in unnamed module of
> loader 'app')
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
> to class com.google.api.services.bigquery.model.TableRow
> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
> com.google.api.services.bigquery.model.TableRow is in unnamed module of
> loader 'app')
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
> at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
> at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
> at 

Re: TableRow class is not the same after serialization

2020-07-09 Thread Kirill Zhdanovich
So I guess I need to switch to Map instead of TableRow?

On Thu, 9 Jul 2020 at 17:13, Jeff Klukas  wrote:

> It looks like the fact that your pipeline in production produces nested
> TableRows is an artifact of the following decision within BigQueryIO logic:
>
>
> https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java#L348-L350
>
> The convertGenericRecordToTableRow function is used recursively for
> RECORD-type fields, so you end up with nested TableRows in the PCollection
> returned from BigQueryIO.read. But then the TableRowJsonCoder uses a
> Jackson ObjectMapper, which makes different decisions as to what map type
> to use.
>
> > Thanks for explaining. Is it documented somewhere that TableRow contains
> Map?
>
> I don't see that explicitly spelled out anywhere. If you follow the trail
> of links from TableRow, you'll get to these docs about Google's JSON
> handling in Java, which may or may not be relevant to this question:
>
> https://googleapis.github.io/google-http-java-client/json.html
>
>
>
> On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich 
> wrote:
>
>> Thanks for explaining. Is it documented somewhere that TableRow contains
>> Map?
>> I don't construct it, I fetch from Google Analytics export to BigQuery
>> table.
>>
>> On Thu, 9 Jul 2020 at 16:40, Jeff Klukas  wrote:
>>
>>> I would expect the following line to fail:
>>>
>>> List h = ((List) bigQueryRow.get("hits"));
>>>
>>> The top-level bigQueryRow will be a TableRow, but
>>> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some
>>> class that implements `Map`. So that line needs to become:
>>>
>>> List h = ((List)
>>> bigQueryRow.get("hits"));
>>>
>>> And then your constructor for Hit must accept a Map
>>> rather than a TableRow.
>>>
>>> I imagine that TableRow is only intended to be used as a top-level
>>> object. Each row you get from a BQ result is a TableRow, but objects nested
>>> inside it are not logically table rows; they're BQ structs that are modeled
>>> as maps in JSON and Map in Java.
>>>
>>> Are you manually constructing TableRow objects with nested TableRows? I
>>> would expect that a result from BigQueryIO.read() would give a TableRow
>>> with some other map type for nested structs, so I'm surprised that this
>>> cast works in some contexts.
>>>
>>> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich 
>>> wrote:
>>>
I changed code a little bit not to use lambdas.

Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
 List h = ((List) bigQueryRow.get("hits"));
 List hits = new ArrayList<>();

 for (TableRow tableRow : h) { <-- breaks here
 hits.add(new Hit(tableRow));
 }
 ...
 }

 Stack trace

 java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
 cast to class com.google.api.services.bigquery.model.TableRow
 (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
 com.google.api.services.bigquery.model.TableRow is in unnamed module of
 loader 'app')
 org.apache.beam.sdk.Pipeline$PipelineExecutionException:
 java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
 to class com.google.api.services.bigquery.model.TableRow
 (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
 com.google.api.services.bigquery.model.TableRow is in unnamed module of
 loader 'app')
 at
 org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
 at
 org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
 at
 org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
 at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
 at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
 at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
 at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
 at
 com.ikea.search.ab.bootstrap.JobTest.testNumberOfSessionsIsCorrect(JobTest.java:78)
 at
 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method)
 at
 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
 java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566)
 at
 org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at
 org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at
 org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at

Re: TableRow class is not the same after serialization

2020-07-09 Thread Jeff Klukas
It looks like the fact that your pipeline in production produces nested
TableRows is an artifact of the following decision within BigQueryIO logic:

https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java#L348-L350

The convertGenericRecordToTableRow function is used recursively for
RECORD-type fields, so you end up with nested TableRows in the PCollection
returned from BigQueryIO.read. But then the TableRowJsonCoder uses a
Jackson ObjectMapper, which makes different decisions as to what map type
to use.

> Thanks for explaining. Is it documented somewhere that TableRow contains
Map?

I don't see that explicitly spelled out anywhere. If you follow the trail
of links from TableRow, you'll get to these docs about Google's JSON
handling in Java, which may or may not be relevant to this question:

https://googleapis.github.io/google-http-java-client/json.html



On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich 
wrote:

> Thanks for explaining. Is it documented somewhere that TableRow contains
> Map?
> I don't construct it, I fetch from Google Analytics export to BigQuery
> table.
>
> On Thu, 9 Jul 2020 at 16:40, Jeff Klukas  wrote:
>
>> I would expect the following line to fail:
>>
>> List h = ((List) bigQueryRow.get("hits"));
>>
>> The top-level bigQueryRow will be a TableRow, but
>> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some
>> class that implements `Map`. So that line needs to become:
>>
>> List h = ((List)
>> bigQueryRow.get("hits"));
>>
>> And then your constructor for Hit must accept a Map
>> rather than a TableRow.
>>
>> I imagine that TableRow is only intended to be used as a top-level
>> object. Each row you get from a BQ result is a TableRow, but objects nested
>> inside it are not logically table rows; they're BQ structs that are modeled
>> as maps in JSON and Map in Java.
>>
>> Are you manually constructing TableRow objects with nested TableRows? I
>> would expect that a result from BigQueryIO.read() would give a TableRow
>> with some other map type for nested structs, so I'm surprised that this
>> cast works in some contexts.
>>
>> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich 
>> wrote:
>>
>>>I changed code a little bit not to use lambdas.
>>>
>>>Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
>>> List h = ((List) bigQueryRow.get("hits"));
>>> List hits = new ArrayList<>();
>>>
>>> for (TableRow tableRow : h) { <-- breaks here
>>> hits.add(new Hit(tableRow));
>>> }
>>> ...
>>> }
>>>
>>> Stack trace
>>>
>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>>> cast to class com.google.api.services.bigquery.model.TableRow
>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>> loader 'app')
>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
>>> to class com.google.api.services.bigquery.model.TableRow
>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>> loader 'app')
>>> at
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
>>> at
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
>>> at
>>> com.ikea.search.ab.bootstrap.JobTest.testNumberOfSessionsIsCorrect(JobTest.java:78)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> at
>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>>> at
>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>> at
>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>>> at
>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>> at
>>> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
>>> at 

Re: TableRow class is not the same after serialization

2020-07-09 Thread Kirill Zhdanovich
Thanks for explaining. Is it documented somewhere that TableRow contains
Map?
I don't construct it, I fetch from Google Analytics export to BigQuery
table.

On Thu, 9 Jul 2020 at 16:40, Jeff Klukas  wrote:

> I would expect the following line to fail:
>
> List h = ((List) bigQueryRow.get("hits"));
>
> The top-level bigQueryRow will be a TableRow, but
> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some
> class that implements `Map`. So that line needs to become:
>
> List h = ((List)
> bigQueryRow.get("hits"));
>
> And then your constructor for Hit must accept a Map rather
> than a TableRow.
>
> I imagine that TableRow is only intended to be used as a top-level object.
> Each row you get from a BQ result is a TableRow, but objects nested inside
> it are not logically table rows; they're BQ structs that are modeled as
> maps in JSON and Map in Java.
>
> Are you manually constructing TableRow objects with nested TableRows? I
> would expect that a result from BigQueryIO.read() would give a TableRow
> with some other map type for nested structs, so I'm surprised that this
> cast works in some contexts.
>
> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich 
> wrote:
>
>>I changed code a little bit not to use lambdas.
>>
>>Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
>> List h = ((List) bigQueryRow.get("hits"));
>> List hits = new ArrayList<>();
>>
>> for (TableRow tableRow : h) { <-- breaks here
>> hits.add(new Hit(tableRow));
>> }
>> ...
>> }
>>
>> Stack trace
>>
>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>> cast to class com.google.api.services.bigquery.model.TableRow
>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>> loader 'app')
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
>> to class com.google.api.services.bigquery.model.TableRow
>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>> loader 'app')
>> at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
>> at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
>> at
>> com.ikea.search.ab.bootstrap.JobTest.testNumberOfSessionsIsCorrect(JobTest.java:78)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>> at
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>> at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at
>> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>> at
>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>> at
>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>> at
>> 

Re: TableRow class is not the same after serialization

2020-07-09 Thread Jeff Klukas
I would expect the following line to fail:

List h = ((List) bigQueryRow.get("hits"));

The top-level bigQueryRow will be a TableRow, but `bigQueryRow.get("hits")`
is only guaranteed to be an instance of some class that implements `Map`.
So that line needs to become:

List h = ((List)
bigQueryRow.get("hits"));

And then your constructor for Hit must accept a Map rather
than a TableRow.

I imagine that TableRow is only intended to be used as a top-level object.
Each row you get from a BQ result is a TableRow, but objects nested inside
it are not logically table rows; they're BQ structs that are modeled as
maps in JSON and Map in Java.

Are you manually constructing TableRow objects with nested TableRows? I
would expect that a result from BigQueryIO.read() would give a TableRow
with some other map type for nested structs, so I'm surprised that this
cast works in some contexts.

On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich 
wrote:

>I changed code a little bit not to use lambdas.
>
>Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
> List h = ((List) bigQueryRow.get("hits"));
> List hits = new ArrayList<>();
>
> for (TableRow tableRow : h) { <-- breaks here
> hits.add(new Hit(tableRow));
> }
> ...
> }
>
> Stack trace
>
> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
> to class com.google.api.services.bigquery.model.TableRow
> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
> com.google.api.services.bigquery.model.TableRow is in unnamed module of
> loader 'app')
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
> to class com.google.api.services.bigquery.model.TableRow
> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
> com.google.api.services.bigquery.model.TableRow is in unnamed module of
> loader 'app')
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
> at
> com.ikea.search.ab.bootstrap.JobTest.testNumberOfSessionsIsCorrect(JobTest.java:78)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
> at
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
> at
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
> at 

Re: Beam supports Flink Async IO operator

2020-07-09 Thread Kaymak, Tobias
Hi Eleanore,

Maybe batched RPC is what you are looking for?
https://beam.apache.org/blog/timely-processing/

On Wed, Jul 8, 2020 at 6:20 PM Eleanore Jin  wrote:

> Thanks Luke and Max for the information.
>
> We have the use case that inside a DoFn, we will need to call external
> services to trigger some other flows. The calls to other services are REST
> based sync calls, and it will take 150 milliseconds plus to return. We are
> using Flink as the runner and I came across this Async I/O operator from
> flink, trying to figure out if this is the right approach and if Beam
> provides any similar concept for it.
>
> Thanks!
> Eleanore
>
> On Wed, Jul 8, 2020 at 2:55 AM Maximilian Michels  wrote:
>
>> Just to clarify: We could make the AsnycIO operator also available in
>> Beam but the operator has to be represented by a concept in Beam.
>> Otherwise, there is no way to know when to produce it as part of the
>> translation.
>>
>> On 08.07.20 11:53, Maximilian Michels wrote:
>> > Flink's AsycIO operator is useful for processing io-bound operations,
>> > e.g. sending network requests. Like Luke mentioned, it is not available
>> > in Beam.
>> >
>> > -Max
>> >
>> > On 07.07.20 22:11, Luke Cwik wrote:
>> >> Beam is a layer that sits on top of execution engines like Flink and
>> >> provides its own programming model thus native operators like Flink's
>> >> async IO operator are not exposed.
>> >>
>> >> Most people use a DoFn to do all their IO and sometimes will compose
>> >> it with another transform such as GroupIntoBatches[1] to simplify
>> >> their implementation.
>> >>
>> >> Why do you need async?
>> >>
>> >> 1:
>> >>
>> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
>> >>
>> >>
>> >>
>> >> On Tue, Jul 7, 2020 at 11:03 AM Eleanore Jin > >> > wrote:
>> >>
>> >> Hi community,
>> >>
>> >> I cannot find any documentation for Beam supporting Flink async IO
>> >> operator
>> >>
>> >> (
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html),
>>
>> >>
>> >> just wonder is this not supported right now?
>> >>
>> >> Thanks a lot!
>> >> Eleanore
>> >>
>>
>


Re: Recommended way to generate a TableRow from Json without using TableRowJsonCoder context OUTER?

2020-07-09 Thread Kaymak, Tobias
Thank you Lars and thank you Luke!

On Wed, Jul 8, 2020 at 9:33 PM Luke Cwik  wrote:

> The deprecated method is not going to be removed anytime soon so I
> wouldn't worry about it being removed.
>
> If you really want to use non-deprecated methods, then the
> TableRowJsonCoder uses the StringUtf8Coder to parse strings so it is
> looking for a nested encoding using the StringUtf8Coder encoding. So
> something like this:
> ByteArrayOutputStream baos = new ...
> StringUtf8Coder.of().encode(jsonString, baos);
> TableRow row = TableRowJsonCoder.of().decode(new
> ByteArrayInputStream(baos.toByteArray()));
>
> But why use a coder at all? TableRowJsonCoder is a thin wrapper around
> using Jackson's ObjectMapper to perform the conversion. So you could do
> something like:
> ObjectMapper mapper = new
> ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
> TableRow row = mapper.readValue(strValue, TableRow.class);
>
>
> On Wed, Jul 8, 2020 at 7:57 AM Lars Almgren Schwartz <
> lars.almg...@tink.com> wrote:
>
>> Hey,
>>
>> Don't know if it's the official way but we have written our own proto to
>> BigQuery converter which works pretty well.
>>
>> public static TableRow convertEventToTableRow(TableRow tableRow, Message 
>> event) {
>> Map fields = event.getAllFields();
>> for (Descriptors.FieldDescriptor field : fields.keySet()) {
>> tableRow = mapToBigQueryField(tableRow, field, fields.get(field));
>> }
>>
>> return tableRow;
>> }
>>
>> private static TableRow mapToBigQueryField(
>> TableRow tableRow, Descriptors.FieldDescriptor field, Object value) {
>> Descriptors.FieldDescriptor.JavaType fieldType = field.getJavaType();
>> switch (fieldType) {
>> case INT:
>> case LONG:
>> case FLOAT:
>> case DOUBLE:
>> case BOOLEAN:
>> return tableRow.set(field.getName(), value);
>> case BYTE_STRING:
>> if (field.isRepeated()) {
>> return tableRow.set(
>> field.getName(),
>> processRepeatedField(
>> value,
>> x ->
>> Base64.getEncoder()
>> .encodeToString(
>> ((ByteString) 
>> x).toByteArray(;
>> } else {
>> return tableRow.set(
>> field.getName(),
>> Base64.getEncoder().encodeToString(((ByteString) 
>> value).toByteArray()));
>> }
>> case ENUM:
>> if (field.isRepeated()) {
>> return tableRow.set(
>> field.getName(), processRepeatedField(value, x -> 
>> x.toString()));
>> } else {
>> return tableRow.set(field.getName(), value.toString());
>> }
>> case STRING:
>> if (isUUIDField(field.getName())) {
>> if (field.isRepeated()) {
>> return tableRow.set(
>> field.getName(),
>> processRepeatedField(
>> value, x -> 
>> UUIDUtil.getBase64FromUUID((String) x)));
>> } else {
>> return tableRow.set(
>> field.getName(), 
>> UUIDUtil.getBase64FromUUID((String) value));
>> }
>> } else {
>> return tableRow.set(field.getName(), value);
>> }
>> case MESSAGE:
>> switch (field.getMessageType().getFullName()) {
>> // Map well known message types that we have a specific 
>> mapping for.
>> case "google.protobuf.Timestamp":
>> if (field.isRepeated()) {
>> return tableRow.set(
>> field.getName(),
>> processRepeatedField(
>> value,
>> x ->
>> 
>> com.google.cloud.Timestamp.fromProto(
>> (Timestamp) 
>> x)
>> .toString()));
>> } else {
>> return tableRow.set(
>> field.getName(),
>> 
>> com.google.cloud.Timestamp.fromProto((Timestamp) value)
>> .toString());
>> }
>> case "xxx.xxx.ExactNumber":
>> if (field.isRepeated()) {
>> return tableRow.set(
>> field.getName(),
>> processRepeatedField(
>>