Your welcome !

And I have a blog / note on my list of Todo to help navigate the various
combinations possible. :-)

Cheers

Reza

On Wed, 30 Jan 2019, 15:45 Kaymak, Tobias <[email protected] wrote:

> I am feeling a bit stupid, but I haven't had time to try out the different
> possibilities to model the Kafka -> Partitioned-Table-in-BQ pipeline in
> Beam, until now.
> I am using the snapshort 2.10 version at the moment and your comment was
> on point: After rewriting the pipeline (which limits it to deal only with a
> single topic for input and output instead of many, but that is ok), this
> works:
>
>   pipeline
>         .apply(
>             KafkaIO.<String, String>read()
>                 .withBootstrapServers(bootstrap)
>                 .withTopic(topic)
>                 .withKeyDeserializer(StringDeserializer.class)
>                 .withValueDeserializer(ConfigurableDeserializer.class)
>                 .updateConsumerProperties(
>
> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
> inputMessagesConfig))
>
> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
>                 .updateConsumerProperties(ImmutableMap.of("group.id",
> groupId))
>
> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
>                 .withReadCommitted()
>                 .withTimestampPolicyFactory(withEventTs)
>                 .commitOffsetsInFinalize())
>         .apply(ParDo.of(new ToEventFn()))
>         .apply(
>             BigQueryIO.<Event>write()
>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>                 .withTriggeringFrequency(refreshFrequency)
>                 .withNumFileShards(1)
>                 .to(projectId + ":" + dataset + "." + tableName)
>                 .withTimePartitioning(new
> TimePartitioning().setField("event_date"))
>                 .withSchema(tableSchema)
>                 .withFormatFunction(
>                     (SerializableFunction<Event, TableRow>)
>                         KafkaToBigQuery::convertUserEventToTableRow)
>
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>   pipeline.run().waitUntilFinish();
>
> Thank you!
>
> On Wed, Jan 30, 2019 at 2:42 AM Reza Rokni <[email protected]> wrote:
>
>> Thanx Tobi very interesting ! Sorry for the long gaps in email, I am
>> based out of Singapore :-)
>>
>> I wonder if this is coming from the use of  .to(
>> partitionedTableDynamicDestinations), which I am guessing accesses the
>>  Intervalwindow. With the use of the Time column based partitioning in
>> my pipeline I just use the URI to the table.
>>
>> And thanx for raising the question in general; Looking around I could not
>> find a nice write up of the possible BigQueryIO patterns with the various
>> partitioned and sharded table options that are now available. Will put
>> something together over the next week or so and post back here, might see
>> if its something that would suite a short blog as well.
>>
>> Cheers
>>
>> Reza
>>
>>
>>
>> On Tue, 29 Jan 2019 at 19:56, Kaymak, Tobias <[email protected]>
>> wrote:
>>
>>> I am using FILE_LOADS and no timestamp column, but partitioned tables.
>>> In this case I have seen the following error, when I comment the
>>> windowing out (direct runner):
>>>
>>> Exception in thread "main"
>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>> java.lang.ClassCastException:
>>> org.apache.beam.sdk.transforms.windowing.GlobalWindow cannot be cast to
>>> org.apache.beam.sdk.transforms.windowing.IntervalWindow
>>>
>>> On Tue, Jan 29, 2019 at 9:11 AM Reza Rokni <[email protected]> wrote:
>>>
>>>> Also my BQ table was partitioned based on a TIMESTAMP column, rather
>>>> than being ingestion time based partitioning.
>>>>
>>>> Cheers
>>>> Reza
>>>>
>>>> On Tue, 29 Jan 2019 at 15:44, Reza Rokni <[email protected]> wrote:
>>>>
>>>>> Hya Tobi,
>>>>>
>>>>> When you mention failed do you mean you get an error on running the
>>>>> pipeline or there is a incorrect data issue?
>>>>>
>>>>> I was just trying some things with a PubSub source and a partitioned
>>>>> table sink and was able to push things through, it was a very simple
>>>>> pipeline through, with BigQueryIO.to() set to simple string.
>>>>>
>>>>> Cheers
>>>>>
>>>>> Reza
>>>>>
>>>>> On Mon, 28 Jan 2019 at 22:39, Kaymak, Tobias <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> But it seems like that it fails, when I remove the windowing from the
>>>>>> pipeline, so I guess the answer is a no.
>>>>>>
>>>>>> On Mon, Jan 28, 2019 at 11:36 AM Kaymak, Tobias <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Yes I am making use of partitioned tables, that's why I was
>>>>>>> wondering if the windowing step could be skipped. :)
>>>>>>>
>>>>>>> Cheers
>>>>>>> Tobi
>>>>>>>
>>>>>>> On Mon, Jan 28, 2019 at 10:43 AM Reza Rokni <[email protected]> wrote:
>>>>>>>
>>>>>>>> My apologies Tobi too quick to hit the send button :-(
>>>>>>>>
>>>>>>>> I was checking to ask if you had also looked at partition tables in
>>>>>>>> BigQuery, assuming the only partitioning you are doing is by Day.
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>> Reza
>>>>>>>>
>>>>>>>> On Mon, 28 Jan 2019 at 17:22, Reza Rokni <[email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi Tobi,
>>>>>>>>>
>>>>>>>>> Are you making use of partitioned tables in BigQuery or shard
>>>>>>>>> tables?
>>>>>>>>>
>>>>>>>>> https://cloud.google.com/bigquery/docs/partitioned-tables
>>>>>>>>>
>>>>>>>>> Cheers
>>>>>>>>>
>>>>>>>>> Reza
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, 28 Jan 2019 at 17:11, Kaymak, Tobias <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> I was spending some time with the "Streaming Systems" [0] book
>>>>>>>>>> over the weekend and I thought that my pipeline might be doing 
>>>>>>>>>> something
>>>>>>>>>> "too much" as the BigQuery sink already should partition the data by 
>>>>>>>>>> day
>>>>>>>>>> and put it in the right place - so can my windowing function in the
>>>>>>>>>> following pipeline be left out?
>>>>>>>>>>
>>>>>>>>>> I am asking this since sometimes I miss an element at the very
>>>>>>>>>> edge of a window compared to a pipeline with a GCS sink and I 
>>>>>>>>>> thought maybe
>>>>>>>>>> that this is related to doing the same thing twice (windowing and 
>>>>>>>>>> then the
>>>>>>>>>> sink does "window" it again):
>>>>>>>>>>
>>>>>>>>>> pipeline
>>>>>>>>>>         .apply(
>>>>>>>>>>             KafkaIO.<String, String>read()
>>>>>>>>>>                 .withBootstrapServers(bootstrap)
>>>>>>>>>>                 .withTopics(topics)
>>>>>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>
>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>>                 .updateConsumerProperties(
>>>>>>>>>>
>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>>> inputMessagesConfig))
>>>>>>>>>>
>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
>>>>>>>>>> "earliest"))
>>>>>>>>>>                 .updateConsumerProperties(ImmutableMap.of("
>>>>>>>>>> group.id", groupId))
>>>>>>>>>>
>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", 
>>>>>>>>>> "true"))
>>>>>>>>>>                 .withReadCommitted()
>>>>>>>>>>                 .withTimestampPolicyFactory(withEventTs)
>>>>>>>>>>                 .commitOffsetsInFinalize())
>>>>>>>>>>         .apply(ParDo.of(new ToEventFn()))
>>>>>>>>>>
>>>>>>>>>> // DELETE the following up to
>>>>>>>>>>         .apply(
>>>>>>>>>>             Window.into(new ZurichTimePartitioningWindowFn())
>>>>>>>>>>
>>>>>>>>>>                 .triggering(
>>>>>>>>>>                     Repeatedly.forever(
>>>>>>>>>>                         AfterFirst.of(
>>>>>>>>>>
>>>>>>>>>> AfterPane.elementCountAtLeast(bundleSize),
>>>>>>>>>>
>>>>>>>>>> AfterProcessingTime.pastFirstElementInPane()
>>>>>>>>>>                                 .plusDelayOf(refreshFrequency))))
>>>>>>>>>>                 .withAllowedLateness(Duration.standardDays(14))
>>>>>>>>>>                 .discardingFiredPanes())
>>>>>>>>>> // HERE - END of DELETION
>>>>>>>>>>
>>>>>>>>>>         .apply(
>>>>>>>>>>             BigQueryIO.<Event>write()
>>>>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>>>>>>>>>                 .withTriggeringFrequency(refreshFrequency)
>>>>>>>>>>                 .withNumFileShards(1)
>>>>>>>>>>                 .to(partitionedTableDynamicDestinations)
>>>>>>>>>>                 .withFormatFunction(
>>>>>>>>>>                     (SerializableFunction<Event, TableRow>)
>>>>>>>>>>
>>>>>>>>>> KafkaToBigQuery::convertUserEventToTableRow)
>>>>>>>>>>
>>>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>>>>>
>>>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>>>>
>>>>>>>>>>     pipeline.run().waitUntilFinish();
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Tobi
>>>>>>>>>>
>>>>>>>>>> [0] http://streamingsystems.net/
>>>>>>>>>>
>>>>>>>>>
>>
>> --
>>
>> This email may be confidential and privileged. If you received this
>> communication by mistake, please don't forward it to anyone else, please
>> erase all copies and attachments, and please let me know that it has gone
>> to the wrong person.
>>
>> The above terms reflect a potential business arrangement, are provided
>> solely as a basis for further discussion, and are not intended to be and do
>> not constitute a legally binding obligation. No legally binding obligations
>> will be created, implied, or inferred until an agreement in final form is
>> executed in writing by all parties involved.
>>
>
>
> --
> Tobias Kaymak
> Data Engineer
>
> [email protected]
> www.ricardo.ch
>

Reply via email to