I am feeling a bit stupid, but I haven't had time to try out the different
possibilities to model the Kafka -> Partitioned-Table-in-BQ pipeline in
Beam, until now.
I am using the snapshort 2.10 version at the moment and your comment was on
point: After rewriting the pipeline (which limits it to deal only with a
single topic for input and output instead of many, but that is ok), this
works:
pipeline
.apply(
KafkaIO.<String, String>read()
.withBootstrapServers(bootstrap)
.withTopic(topic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ConfigurableDeserializer.class)
.updateConsumerProperties(
ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
inputMessagesConfig))
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
.updateConsumerProperties(ImmutableMap.of("group.id",
groupId))
.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
.withReadCommitted()
.withTimestampPolicyFactory(withEventTs)
.commitOffsetsInFinalize())
.apply(ParDo.of(new ToEventFn()))
.apply(
BigQueryIO.<Event>write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(refreshFrequency)
.withNumFileShards(1)
.to(projectId + ":" + dataset + "." + tableName)
.withTimePartitioning(new
TimePartitioning().setField("event_date"))
.withSchema(tableSchema)
.withFormatFunction(
(SerializableFunction<Event, TableRow>)
KafkaToBigQuery::convertUserEventToTableRow)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
pipeline.run().waitUntilFinish();
Thank you!
On Wed, Jan 30, 2019 at 2:42 AM Reza Rokni <[email protected]> wrote:
> Thanx Tobi very interesting ! Sorry for the long gaps in email, I am based
> out of Singapore :-)
>
> I wonder if this is coming from the use of .to(
> partitionedTableDynamicDestinations), which I am guessing accesses the
> Intervalwindow. With the use of the Time column based partitioning in my
> pipeline I just use the URI to the table.
>
> And thanx for raising the question in general; Looking around I could not
> find a nice write up of the possible BigQueryIO patterns with the various
> partitioned and sharded table options that are now available. Will put
> something together over the next week or so and post back here, might see
> if its something that would suite a short blog as well.
>
> Cheers
>
> Reza
>
>
>
> On Tue, 29 Jan 2019 at 19:56, Kaymak, Tobias <[email protected]>
> wrote:
>
>> I am using FILE_LOADS and no timestamp column, but partitioned tables. In
>> this case I have seen the following error, when I comment the windowing
>> out (direct runner):
>>
>> Exception in thread "main"
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.ClassCastException:
>> org.apache.beam.sdk.transforms.windowing.GlobalWindow cannot be cast to
>> org.apache.beam.sdk.transforms.windowing.IntervalWindow
>>
>> On Tue, Jan 29, 2019 at 9:11 AM Reza Rokni <[email protected]> wrote:
>>
>>> Also my BQ table was partitioned based on a TIMESTAMP column, rather
>>> than being ingestion time based partitioning.
>>>
>>> Cheers
>>> Reza
>>>
>>> On Tue, 29 Jan 2019 at 15:44, Reza Rokni <[email protected]> wrote:
>>>
>>>> Hya Tobi,
>>>>
>>>> When you mention failed do you mean you get an error on running the
>>>> pipeline or there is a incorrect data issue?
>>>>
>>>> I was just trying some things with a PubSub source and a partitioned
>>>> table sink and was able to push things through, it was a very simple
>>>> pipeline through, with BigQueryIO.to() set to simple string.
>>>>
>>>> Cheers
>>>>
>>>> Reza
>>>>
>>>> On Mon, 28 Jan 2019 at 22:39, Kaymak, Tobias <[email protected]>
>>>> wrote:
>>>>
>>>>> But it seems like that it fails, when I remove the windowing from the
>>>>> pipeline, so I guess the answer is a no.
>>>>>
>>>>> On Mon, Jan 28, 2019 at 11:36 AM Kaymak, Tobias <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Yes I am making use of partitioned tables, that's why I was wondering
>>>>>> if the windowing step could be skipped. :)
>>>>>>
>>>>>> Cheers
>>>>>> Tobi
>>>>>>
>>>>>> On Mon, Jan 28, 2019 at 10:43 AM Reza Rokni <[email protected]> wrote:
>>>>>>
>>>>>>> My apologies Tobi too quick to hit the send button :-(
>>>>>>>
>>>>>>> I was checking to ask if you had also looked at partition tables in
>>>>>>> BigQuery, assuming the only partitioning you are doing is by Day.
>>>>>>>
>>>>>>> Cheers
>>>>>>> Reza
>>>>>>>
>>>>>>> On Mon, 28 Jan 2019 at 17:22, Reza Rokni <[email protected]> wrote:
>>>>>>>
>>>>>>>> Hi Tobi,
>>>>>>>>
>>>>>>>> Are you making use of partitioned tables in BigQuery or shard
>>>>>>>> tables?
>>>>>>>>
>>>>>>>> https://cloud.google.com/bigquery/docs/partitioned-tables
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>>
>>>>>>>> Reza
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, 28 Jan 2019 at 17:11, Kaymak, Tobias <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> I was spending some time with the "Streaming Systems" [0] book
>>>>>>>>> over the weekend and I thought that my pipeline might be doing
>>>>>>>>> something
>>>>>>>>> "too much" as the BigQuery sink already should partition the data by
>>>>>>>>> day
>>>>>>>>> and put it in the right place - so can my windowing function in the
>>>>>>>>> following pipeline be left out?
>>>>>>>>>
>>>>>>>>> I am asking this since sometimes I miss an element at the very
>>>>>>>>> edge of a window compared to a pipeline with a GCS sink and I thought
>>>>>>>>> maybe
>>>>>>>>> that this is related to doing the same thing twice (windowing and
>>>>>>>>> then the
>>>>>>>>> sink does "window" it again):
>>>>>>>>>
>>>>>>>>> pipeline
>>>>>>>>> .apply(
>>>>>>>>> KafkaIO.<String, String>read()
>>>>>>>>> .withBootstrapServers(bootstrap)
>>>>>>>>> .withTopics(topics)
>>>>>>>>> .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>
>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>> .updateConsumerProperties(
>>>>>>>>>
>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>> inputMessagesConfig))
>>>>>>>>>
>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>>>>>> "earliest"))
>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("
>>>>>>>>> group.id", groupId))
>>>>>>>>>
>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>>>>>>>> "true"))
>>>>>>>>> .withReadCommitted()
>>>>>>>>> .withTimestampPolicyFactory(withEventTs)
>>>>>>>>> .commitOffsetsInFinalize())
>>>>>>>>> .apply(ParDo.of(new ToEventFn()))
>>>>>>>>>
>>>>>>>>> // DELETE the following up to
>>>>>>>>> .apply(
>>>>>>>>> Window.into(new ZurichTimePartitioningWindowFn())
>>>>>>>>>
>>>>>>>>> .triggering(
>>>>>>>>> Repeatedly.forever(
>>>>>>>>> AfterFirst.of(
>>>>>>>>>
>>>>>>>>> AfterPane.elementCountAtLeast(bundleSize),
>>>>>>>>>
>>>>>>>>> AfterProcessingTime.pastFirstElementInPane()
>>>>>>>>> .plusDelayOf(refreshFrequency))))
>>>>>>>>> .withAllowedLateness(Duration.standardDays(14))
>>>>>>>>> .discardingFiredPanes())
>>>>>>>>> // HERE - END of DELETION
>>>>>>>>>
>>>>>>>>> .apply(
>>>>>>>>> BigQueryIO.<Event>write()
>>>>>>>>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>>>>>>>> .withTriggeringFrequency(refreshFrequency)
>>>>>>>>> .withNumFileShards(1)
>>>>>>>>> .to(partitionedTableDynamicDestinations)
>>>>>>>>> .withFormatFunction(
>>>>>>>>> (SerializableFunction<Event, TableRow>)
>>>>>>>>>
>>>>>>>>> KafkaToBigQuery::convertUserEventToTableRow)
>>>>>>>>>
>>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>>>>
>>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>>>
>>>>>>>>> pipeline.run().waitUntilFinish();
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Tobi
>>>>>>>>>
>>>>>>>>> [0] http://streamingsystems.net/
>>>>>>>>>
>>>>>>>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>
--
Tobias Kaymak
Data Engineer
[email protected]
www.ricardo.ch