Yes I am making use of partitioned tables, that's why I was wondering if the windowing step could be skipped. :)
Cheers Tobi On Mon, Jan 28, 2019 at 10:43 AM Reza Rokni <[email protected]> wrote: > My apologies Tobi too quick to hit the send button :-( > > I was checking to ask if you had also looked at partition tables in > BigQuery, assuming the only partitioning you are doing is by Day. > > Cheers > Reza > > On Mon, 28 Jan 2019 at 17:22, Reza Rokni <[email protected]> wrote: > >> Hi Tobi, >> >> Are you making use of partitioned tables in BigQuery or shard tables? >> >> https://cloud.google.com/bigquery/docs/partitioned-tables >> >> Cheers >> >> Reza >> >> >> On Mon, 28 Jan 2019 at 17:11, Kaymak, Tobias <[email protected]> >> wrote: >> >>> I was spending some time with the "Streaming Systems" [0] book over >>> the weekend and I thought that my pipeline might be doing something "too >>> much" as the BigQuery sink already should partition the data by day and put >>> it in the right place - so can my windowing function in the following >>> pipeline be left out? >>> >>> I am asking this since sometimes I miss an element at the very edge of a >>> window compared to a pipeline with a GCS sink and I thought maybe that this >>> is related to doing the same thing twice (windowing and then the sink does >>> "window" it again): >>> >>> pipeline >>> .apply( >>> KafkaIO.<String, String>read() >>> .withBootstrapServers(bootstrap) >>> .withTopics(topics) >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializer(ConfigurableDeserializer.class) >>> .updateConsumerProperties( >>> >>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>> inputMessagesConfig)) >>> >>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest")) >>> .updateConsumerProperties(ImmutableMap.of("group.id", >>> groupId)) >>> >>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true")) >>> .withReadCommitted() >>> .withTimestampPolicyFactory(withEventTs) >>> .commitOffsetsInFinalize()) >>> .apply(ParDo.of(new ToEventFn())) >>> >>> // DELETE the following up to >>> .apply( >>> Window.into(new ZurichTimePartitioningWindowFn()) >>> >>> .triggering( >>> Repeatedly.forever( >>> AfterFirst.of( >>> AfterPane.elementCountAtLeast(bundleSize), >>> AfterProcessingTime.pastFirstElementInPane() >>> .plusDelayOf(refreshFrequency)))) >>> .withAllowedLateness(Duration.standardDays(14)) >>> .discardingFiredPanes()) >>> // HERE - END of DELETION >>> >>> .apply( >>> BigQueryIO.<Event>write() >>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) >>> .withTriggeringFrequency(refreshFrequency) >>> .withNumFileShards(1) >>> .to(partitionedTableDynamicDestinations) >>> .withFormatFunction( >>> (SerializableFunction<Event, TableRow>) >>> KafkaToBigQuery::convertUserEventToTableRow) >>> >>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>> >>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); >>> >>> pipeline.run().waitUntilFinish(); >>> >>> >>> Best, >>> Tobi >>> >>> [0] http://streamingsystems.net/ >>> >> >> >> -- >> >> This email may be confidential and privileged. If you received this >> communication by mistake, please don't forward it to anyone else, please >> erase all copies and attachments, and please let me know that it has gone >> to the wrong person. >> >> The above terms reflect a potential business arrangement, are provided >> solely as a basis for further discussion, and are not intended to be and do >> not constitute a legally binding obligation. No legally binding obligations >> will be created, implied, or inferred until an agreement in final form is >> executed in writing by all parties involved. >> > > > -- > > This email may be confidential and privileged. If you received this > communication by mistake, please don't forward it to anyone else, please > erase all copies and attachments, and please let me know that it has gone > to the wrong person. > > The above terms reflect a potential business arrangement, are provided > solely as a basis for further discussion, and are not intended to be and do > not constitute a legally binding obligation. No legally binding obligations > will be created, implied, or inferred until an agreement in final form is > executed in writing by all parties involved. > -- Tobias Kaymak Data Engineer [email protected] www.ricardo.ch
