My apologies Tobi too quick to hit the send button :-( I was checking to ask if you had also looked at partition tables in BigQuery, assuming the only partitioning you are doing is by Day.
Cheers Reza On Mon, 28 Jan 2019 at 17:22, Reza Rokni <[email protected]> wrote: > Hi Tobi, > > Are you making use of partitioned tables in BigQuery or shard tables? > > https://cloud.google.com/bigquery/docs/partitioned-tables > > Cheers > > Reza > > > On Mon, 28 Jan 2019 at 17:11, Kaymak, Tobias <[email protected]> > wrote: > >> I was spending some time with the "Streaming Systems" [0] book over >> the weekend and I thought that my pipeline might be doing something "too >> much" as the BigQuery sink already should partition the data by day and put >> it in the right place - so can my windowing function in the following >> pipeline be left out? >> >> I am asking this since sometimes I miss an element at the very edge of a >> window compared to a pipeline with a GCS sink and I thought maybe that this >> is related to doing the same thing twice (windowing and then the sink does >> "window" it again): >> >> pipeline >> .apply( >> KafkaIO.<String, String>read() >> .withBootstrapServers(bootstrap) >> .withTopics(topics) >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(ConfigurableDeserializer.class) >> .updateConsumerProperties( >> >> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >> inputMessagesConfig)) >> >> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest")) >> .updateConsumerProperties(ImmutableMap.of("group.id", >> groupId)) >> >> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true")) >> .withReadCommitted() >> .withTimestampPolicyFactory(withEventTs) >> .commitOffsetsInFinalize()) >> .apply(ParDo.of(new ToEventFn())) >> >> // DELETE the following up to >> .apply( >> Window.into(new ZurichTimePartitioningWindowFn()) >> >> .triggering( >> Repeatedly.forever( >> AfterFirst.of( >> AfterPane.elementCountAtLeast(bundleSize), >> AfterProcessingTime.pastFirstElementInPane() >> .plusDelayOf(refreshFrequency)))) >> .withAllowedLateness(Duration.standardDays(14)) >> .discardingFiredPanes()) >> // HERE - END of DELETION >> >> .apply( >> BigQueryIO.<Event>write() >> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) >> .withTriggeringFrequency(refreshFrequency) >> .withNumFileShards(1) >> .to(partitionedTableDynamicDestinations) >> .withFormatFunction( >> (SerializableFunction<Event, TableRow>) >> KafkaToBigQuery::convertUserEventToTableRow) >> >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >> >> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); >> >> pipeline.run().waitUntilFinish(); >> >> >> Best, >> Tobi >> >> [0] http://streamingsystems.net/ >> > > > -- > > This email may be confidential and privileged. If you received this > communication by mistake, please don't forward it to anyone else, please > erase all copies and attachments, and please let me know that it has gone > to the wrong person. > > The above terms reflect a potential business arrangement, are provided > solely as a basis for further discussion, and are not intended to be and do > not constitute a legally binding obligation. No legally binding obligations > will be created, implied, or inferred until an agreement in final form is > executed in writing by all parties involved. > -- This email may be confidential and privileged. If you received this communication by mistake, please don't forward it to anyone else, please erase all copies and attachments, and please let me know that it has gone to the wrong person. The above terms reflect a potential business arrangement, are provided solely as a basis for further discussion, and are not intended to be and do not constitute a legally binding obligation. No legally binding obligations will be created, implied, or inferred until an agreement in final form is executed in writing by all parties involved.
