I am feeling a bit stupid, but I haven't had time to try out the different
possibilities to model the Kafka -> Partitioned-Table-in-BQ pipeline in
Beam, until now.
I am using the snapshort 2.10 version at the moment and your comment was on
point: After rewriting the pipeline (which limits it to deal only with a
single topic for input and output instead of many, but that is ok), this
works:

  pipeline
        .apply(
            KafkaIO.<String, String>read()
                .withBootstrapServers(bootstrap)
                .withTopic(topic)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(ConfigurableDeserializer.class)
                .updateConsumerProperties(

ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
inputMessagesConfig))

.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
                .updateConsumerProperties(ImmutableMap.of("group.id",
groupId))

.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
                .withReadCommitted()
                .withTimestampPolicyFactory(withEventTs)
                .commitOffsetsInFinalize())
        .apply(ParDo.of(new ToEventFn()))
        .apply(
            BigQueryIO.<Event>write()
                .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                .withTriggeringFrequency(refreshFrequency)
                .withNumFileShards(1)
                .to(projectId + ":" + dataset + "." + tableName)
                .withTimePartitioning(new
TimePartitioning().setField("event_date"))
                .withSchema(tableSchema)
                .withFormatFunction(
                    (SerializableFunction<Event, TableRow>)
                        KafkaToBigQuery::convertUserEventToTableRow)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
  pipeline.run().waitUntilFinish();

Thank you!

On Wed, Jan 30, 2019 at 2:42 AM Reza Rokni <[email protected]> wrote:

> Thanx Tobi very interesting ! Sorry for the long gaps in email, I am based
> out of Singapore :-)
>
> I wonder if this is coming from the use of  .to(
> partitionedTableDynamicDestinations), which I am guessing accesses the
>  Intervalwindow. With the use of the Time column based partitioning in my
> pipeline I just use the URI to the table.
>
> And thanx for raising the question in general; Looking around I could not
> find a nice write up of the possible BigQueryIO patterns with the various
> partitioned and sharded table options that are now available. Will put
> something together over the next week or so and post back here, might see
> if its something that would suite a short blog as well.
>
> Cheers
>
> Reza
>
>
>
> On Tue, 29 Jan 2019 at 19:56, Kaymak, Tobias <[email protected]>
> wrote:
>
>> I am using FILE_LOADS and no timestamp column, but partitioned tables. In
>> this case I have seen the following error, when I comment the windowing
>> out (direct runner):
>>
>> Exception in thread "main"
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.ClassCastException:
>> org.apache.beam.sdk.transforms.windowing.GlobalWindow cannot be cast to
>> org.apache.beam.sdk.transforms.windowing.IntervalWindow
>>
>> On Tue, Jan 29, 2019 at 9:11 AM Reza Rokni <[email protected]> wrote:
>>
>>> Also my BQ table was partitioned based on a TIMESTAMP column, rather
>>> than being ingestion time based partitioning.
>>>
>>> Cheers
>>> Reza
>>>
>>> On Tue, 29 Jan 2019 at 15:44, Reza Rokni <[email protected]> wrote:
>>>
>>>> Hya Tobi,
>>>>
>>>> When you mention failed do you mean you get an error on running the
>>>> pipeline or there is a incorrect data issue?
>>>>
>>>> I was just trying some things with a PubSub source and a partitioned
>>>> table sink and was able to push things through, it was a very simple
>>>> pipeline through, with BigQueryIO.to() set to simple string.
>>>>
>>>> Cheers
>>>>
>>>> Reza
>>>>
>>>> On Mon, 28 Jan 2019 at 22:39, Kaymak, Tobias <[email protected]>
>>>> wrote:
>>>>
>>>>> But it seems like that it fails, when I remove the windowing from the
>>>>> pipeline, so I guess the answer is a no.
>>>>>
>>>>> On Mon, Jan 28, 2019 at 11:36 AM Kaymak, Tobias <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Yes I am making use of partitioned tables, that's why I was wondering
>>>>>> if the windowing step could be skipped. :)
>>>>>>
>>>>>> Cheers
>>>>>> Tobi
>>>>>>
>>>>>> On Mon, Jan 28, 2019 at 10:43 AM Reza Rokni <[email protected]> wrote:
>>>>>>
>>>>>>> My apologies Tobi too quick to hit the send button :-(
>>>>>>>
>>>>>>> I was checking to ask if you had also looked at partition tables in
>>>>>>> BigQuery, assuming the only partitioning you are doing is by Day.
>>>>>>>
>>>>>>> Cheers
>>>>>>> Reza
>>>>>>>
>>>>>>> On Mon, 28 Jan 2019 at 17:22, Reza Rokni <[email protected]> wrote:
>>>>>>>
>>>>>>>> Hi Tobi,
>>>>>>>>
>>>>>>>> Are you making use of partitioned tables in BigQuery or shard
>>>>>>>> tables?
>>>>>>>>
>>>>>>>> https://cloud.google.com/bigquery/docs/partitioned-tables
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>>
>>>>>>>> Reza
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, 28 Jan 2019 at 17:11, Kaymak, Tobias <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> I was spending some time with the "Streaming Systems" [0] book
>>>>>>>>> over the weekend and I thought that my pipeline might be doing 
>>>>>>>>> something
>>>>>>>>> "too much" as the BigQuery sink already should partition the data by 
>>>>>>>>> day
>>>>>>>>> and put it in the right place - so can my windowing function in the
>>>>>>>>> following pipeline be left out?
>>>>>>>>>
>>>>>>>>> I am asking this since sometimes I miss an element at the very
>>>>>>>>> edge of a window compared to a pipeline with a GCS sink and I thought 
>>>>>>>>> maybe
>>>>>>>>> that this is related to doing the same thing twice (windowing and 
>>>>>>>>> then the
>>>>>>>>> sink does "window" it again):
>>>>>>>>>
>>>>>>>>> pipeline
>>>>>>>>>         .apply(
>>>>>>>>>             KafkaIO.<String, String>read()
>>>>>>>>>                 .withBootstrapServers(bootstrap)
>>>>>>>>>                 .withTopics(topics)
>>>>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>
>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>                 .updateConsumerProperties(
>>>>>>>>>
>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>> inputMessagesConfig))
>>>>>>>>>
>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
>>>>>>>>> "earliest"))
>>>>>>>>>                 .updateConsumerProperties(ImmutableMap.of("
>>>>>>>>> group.id", groupId))
>>>>>>>>>
>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", 
>>>>>>>>> "true"))
>>>>>>>>>                 .withReadCommitted()
>>>>>>>>>                 .withTimestampPolicyFactory(withEventTs)
>>>>>>>>>                 .commitOffsetsInFinalize())
>>>>>>>>>         .apply(ParDo.of(new ToEventFn()))
>>>>>>>>>
>>>>>>>>> // DELETE the following up to
>>>>>>>>>         .apply(
>>>>>>>>>             Window.into(new ZurichTimePartitioningWindowFn())
>>>>>>>>>
>>>>>>>>>                 .triggering(
>>>>>>>>>                     Repeatedly.forever(
>>>>>>>>>                         AfterFirst.of(
>>>>>>>>>
>>>>>>>>> AfterPane.elementCountAtLeast(bundleSize),
>>>>>>>>>
>>>>>>>>> AfterProcessingTime.pastFirstElementInPane()
>>>>>>>>>                                 .plusDelayOf(refreshFrequency))))
>>>>>>>>>                 .withAllowedLateness(Duration.standardDays(14))
>>>>>>>>>                 .discardingFiredPanes())
>>>>>>>>> // HERE - END of DELETION
>>>>>>>>>
>>>>>>>>>         .apply(
>>>>>>>>>             BigQueryIO.<Event>write()
>>>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>>>>>>>>                 .withTriggeringFrequency(refreshFrequency)
>>>>>>>>>                 .withNumFileShards(1)
>>>>>>>>>                 .to(partitionedTableDynamicDestinations)
>>>>>>>>>                 .withFormatFunction(
>>>>>>>>>                     (SerializableFunction<Event, TableRow>)
>>>>>>>>>
>>>>>>>>> KafkaToBigQuery::convertUserEventToTableRow)
>>>>>>>>>
>>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>>>>
>>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>>>
>>>>>>>>>     pipeline.run().waitUntilFinish();
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Tobi
>>>>>>>>>
>>>>>>>>> [0] http://streamingsystems.net/
>>>>>>>>>
>>>>>>>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>


-- 
Tobias Kaymak
Data Engineer

[email protected]
www.ricardo.ch

Reply via email to