Your welcome ! And I have a blog / note on my list of Todo to help navigate the various combinations possible. :-)
Cheers Reza On Wed, 30 Jan 2019, 15:45 Kaymak, Tobias <[email protected] wrote: > I am feeling a bit stupid, but I haven't had time to try out the different > possibilities to model the Kafka -> Partitioned-Table-in-BQ pipeline in > Beam, until now. > I am using the snapshort 2.10 version at the moment and your comment was > on point: After rewriting the pipeline (which limits it to deal only with a > single topic for input and output instead of many, but that is ok), this > works: > > pipeline > .apply( > KafkaIO.<String, String>read() > .withBootstrapServers(bootstrap) > .withTopic(topic) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(ConfigurableDeserializer.class) > .updateConsumerProperties( > > ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, > inputMessagesConfig)) > > .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest")) > .updateConsumerProperties(ImmutableMap.of("group.id", > groupId)) > > .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true")) > .withReadCommitted() > .withTimestampPolicyFactory(withEventTs) > .commitOffsetsInFinalize()) > .apply(ParDo.of(new ToEventFn())) > .apply( > BigQueryIO.<Event>write() > .withMethod(BigQueryIO.Write.Method.FILE_LOADS) > .withTriggeringFrequency(refreshFrequency) > .withNumFileShards(1) > .to(projectId + ":" + dataset + "." + tableName) > .withTimePartitioning(new > TimePartitioning().setField("event_date")) > .withSchema(tableSchema) > .withFormatFunction( > (SerializableFunction<Event, TableRow>) > KafkaToBigQuery::convertUserEventToTableRow) > > .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) > > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); > pipeline.run().waitUntilFinish(); > > Thank you! > > On Wed, Jan 30, 2019 at 2:42 AM Reza Rokni <[email protected]> wrote: > >> Thanx Tobi very interesting ! Sorry for the long gaps in email, I am >> based out of Singapore :-) >> >> I wonder if this is coming from the use of .to( >> partitionedTableDynamicDestinations), which I am guessing accesses the >> Intervalwindow. With the use of the Time column based partitioning in >> my pipeline I just use the URI to the table. >> >> And thanx for raising the question in general; Looking around I could not >> find a nice write up of the possible BigQueryIO patterns with the various >> partitioned and sharded table options that are now available. Will put >> something together over the next week or so and post back here, might see >> if its something that would suite a short blog as well. >> >> Cheers >> >> Reza >> >> >> >> On Tue, 29 Jan 2019 at 19:56, Kaymak, Tobias <[email protected]> >> wrote: >> >>> I am using FILE_LOADS and no timestamp column, but partitioned tables. >>> In this case I have seen the following error, when I comment the >>> windowing out (direct runner): >>> >>> Exception in thread "main" >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>> java.lang.ClassCastException: >>> org.apache.beam.sdk.transforms.windowing.GlobalWindow cannot be cast to >>> org.apache.beam.sdk.transforms.windowing.IntervalWindow >>> >>> On Tue, Jan 29, 2019 at 9:11 AM Reza Rokni <[email protected]> wrote: >>> >>>> Also my BQ table was partitioned based on a TIMESTAMP column, rather >>>> than being ingestion time based partitioning. >>>> >>>> Cheers >>>> Reza >>>> >>>> On Tue, 29 Jan 2019 at 15:44, Reza Rokni <[email protected]> wrote: >>>> >>>>> Hya Tobi, >>>>> >>>>> When you mention failed do you mean you get an error on running the >>>>> pipeline or there is a incorrect data issue? >>>>> >>>>> I was just trying some things with a PubSub source and a partitioned >>>>> table sink and was able to push things through, it was a very simple >>>>> pipeline through, with BigQueryIO.to() set to simple string. >>>>> >>>>> Cheers >>>>> >>>>> Reza >>>>> >>>>> On Mon, 28 Jan 2019 at 22:39, Kaymak, Tobias <[email protected]> >>>>> wrote: >>>>> >>>>>> But it seems like that it fails, when I remove the windowing from the >>>>>> pipeline, so I guess the answer is a no. >>>>>> >>>>>> On Mon, Jan 28, 2019 at 11:36 AM Kaymak, Tobias < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Yes I am making use of partitioned tables, that's why I was >>>>>>> wondering if the windowing step could be skipped. :) >>>>>>> >>>>>>> Cheers >>>>>>> Tobi >>>>>>> >>>>>>> On Mon, Jan 28, 2019 at 10:43 AM Reza Rokni <[email protected]> wrote: >>>>>>> >>>>>>>> My apologies Tobi too quick to hit the send button :-( >>>>>>>> >>>>>>>> I was checking to ask if you had also looked at partition tables in >>>>>>>> BigQuery, assuming the only partitioning you are doing is by Day. >>>>>>>> >>>>>>>> Cheers >>>>>>>> Reza >>>>>>>> >>>>>>>> On Mon, 28 Jan 2019 at 17:22, Reza Rokni <[email protected]> wrote: >>>>>>>> >>>>>>>>> Hi Tobi, >>>>>>>>> >>>>>>>>> Are you making use of partitioned tables in BigQuery or shard >>>>>>>>> tables? >>>>>>>>> >>>>>>>>> https://cloud.google.com/bigquery/docs/partitioned-tables >>>>>>>>> >>>>>>>>> Cheers >>>>>>>>> >>>>>>>>> Reza >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, 28 Jan 2019 at 17:11, Kaymak, Tobias < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> I was spending some time with the "Streaming Systems" [0] book >>>>>>>>>> over the weekend and I thought that my pipeline might be doing >>>>>>>>>> something >>>>>>>>>> "too much" as the BigQuery sink already should partition the data by >>>>>>>>>> day >>>>>>>>>> and put it in the right place - so can my windowing function in the >>>>>>>>>> following pipeline be left out? >>>>>>>>>> >>>>>>>>>> I am asking this since sometimes I miss an element at the very >>>>>>>>>> edge of a window compared to a pipeline with a GCS sink and I >>>>>>>>>> thought maybe >>>>>>>>>> that this is related to doing the same thing twice (windowing and >>>>>>>>>> then the >>>>>>>>>> sink does "window" it again): >>>>>>>>>> >>>>>>>>>> pipeline >>>>>>>>>> .apply( >>>>>>>>>> KafkaIO.<String, String>read() >>>>>>>>>> .withBootstrapServers(bootstrap) >>>>>>>>>> .withTopics(topics) >>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>> >>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>>>>>> .updateConsumerProperties( >>>>>>>>>> >>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>>>>>> inputMessagesConfig)) >>>>>>>>>> >>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>>>>>> "earliest")) >>>>>>>>>> .updateConsumerProperties(ImmutableMap.of(" >>>>>>>>>> group.id", groupId)) >>>>>>>>>> >>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>>>>>>> "true")) >>>>>>>>>> .withReadCommitted() >>>>>>>>>> .withTimestampPolicyFactory(withEventTs) >>>>>>>>>> .commitOffsetsInFinalize()) >>>>>>>>>> .apply(ParDo.of(new ToEventFn())) >>>>>>>>>> >>>>>>>>>> // DELETE the following up to >>>>>>>>>> .apply( >>>>>>>>>> Window.into(new ZurichTimePartitioningWindowFn()) >>>>>>>>>> >>>>>>>>>> .triggering( >>>>>>>>>> Repeatedly.forever( >>>>>>>>>> AfterFirst.of( >>>>>>>>>> >>>>>>>>>> AfterPane.elementCountAtLeast(bundleSize), >>>>>>>>>> >>>>>>>>>> AfterProcessingTime.pastFirstElementInPane() >>>>>>>>>> .plusDelayOf(refreshFrequency)))) >>>>>>>>>> .withAllowedLateness(Duration.standardDays(14)) >>>>>>>>>> .discardingFiredPanes()) >>>>>>>>>> // HERE - END of DELETION >>>>>>>>>> >>>>>>>>>> .apply( >>>>>>>>>> BigQueryIO.<Event>write() >>>>>>>>>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) >>>>>>>>>> .withTriggeringFrequency(refreshFrequency) >>>>>>>>>> .withNumFileShards(1) >>>>>>>>>> .to(partitionedTableDynamicDestinations) >>>>>>>>>> .withFormatFunction( >>>>>>>>>> (SerializableFunction<Event, TableRow>) >>>>>>>>>> >>>>>>>>>> KafkaToBigQuery::convertUserEventToTableRow) >>>>>>>>>> >>>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>>>>>>>>> >>>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); >>>>>>>>>> >>>>>>>>>> pipeline.run().waitUntilFinish(); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Tobi >>>>>>>>>> >>>>>>>>>> [0] http://streamingsystems.net/ >>>>>>>>>> >>>>>>>>> >> >> -- >> >> This email may be confidential and privileged. If you received this >> communication by mistake, please don't forward it to anyone else, please >> erase all copies and attachments, and please let me know that it has gone >> to the wrong person. >> >> The above terms reflect a potential business arrangement, are provided >> solely as a basis for further discussion, and are not intended to be and do >> not constitute a legally binding obligation. No legally binding obligations >> will be created, implied, or inferred until an agreement in final form is >> executed in writing by all parties involved. >> > > > -- > Tobias Kaymak > Data Engineer > > [email protected] > www.ricardo.ch >
