But it seems like that it fails, when I remove the windowing from the pipeline, so I guess the answer is a no.
On Mon, Jan 28, 2019 at 11:36 AM Kaymak, Tobias <[email protected]> wrote: > Yes I am making use of partitioned tables, that's why I was wondering if > the windowing step could be skipped. :) > > Cheers > Tobi > > On Mon, Jan 28, 2019 at 10:43 AM Reza Rokni <[email protected]> wrote: > >> My apologies Tobi too quick to hit the send button :-( >> >> I was checking to ask if you had also looked at partition tables in >> BigQuery, assuming the only partitioning you are doing is by Day. >> >> Cheers >> Reza >> >> On Mon, 28 Jan 2019 at 17:22, Reza Rokni <[email protected]> wrote: >> >>> Hi Tobi, >>> >>> Are you making use of partitioned tables in BigQuery or shard tables? >>> >>> https://cloud.google.com/bigquery/docs/partitioned-tables >>> >>> Cheers >>> >>> Reza >>> >>> >>> On Mon, 28 Jan 2019 at 17:11, Kaymak, Tobias <[email protected]> >>> wrote: >>> >>>> I was spending some time with the "Streaming Systems" [0] book over >>>> the weekend and I thought that my pipeline might be doing something "too >>>> much" as the BigQuery sink already should partition the data by day and put >>>> it in the right place - so can my windowing function in the following >>>> pipeline be left out? >>>> >>>> I am asking this since sometimes I miss an element at the very edge of >>>> a window compared to a pipeline with a GCS sink and I thought maybe that >>>> this is related to doing the same thing twice (windowing and then the sink >>>> does "window" it again): >>>> >>>> pipeline >>>> .apply( >>>> KafkaIO.<String, String>read() >>>> .withBootstrapServers(bootstrap) >>>> .withTopics(topics) >>>> .withKeyDeserializer(StringDeserializer.class) >>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>> .updateConsumerProperties( >>>> >>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>> inputMessagesConfig)) >>>> >>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest")) >>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>> groupId)) >>>> >>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true")) >>>> .withReadCommitted() >>>> .withTimestampPolicyFactory(withEventTs) >>>> .commitOffsetsInFinalize()) >>>> .apply(ParDo.of(new ToEventFn())) >>>> >>>> // DELETE the following up to >>>> .apply( >>>> Window.into(new ZurichTimePartitioningWindowFn()) >>>> >>>> .triggering( >>>> Repeatedly.forever( >>>> AfterFirst.of( >>>> AfterPane.elementCountAtLeast(bundleSize), >>>> AfterProcessingTime.pastFirstElementInPane() >>>> .plusDelayOf(refreshFrequency)))) >>>> .withAllowedLateness(Duration.standardDays(14)) >>>> .discardingFiredPanes()) >>>> // HERE - END of DELETION >>>> >>>> .apply( >>>> BigQueryIO.<Event>write() >>>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) >>>> .withTriggeringFrequency(refreshFrequency) >>>> .withNumFileShards(1) >>>> .to(partitionedTableDynamicDestinations) >>>> .withFormatFunction( >>>> (SerializableFunction<Event, TableRow>) >>>> KafkaToBigQuery::convertUserEventToTableRow) >>>> >>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>>> >>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); >>>> >>>> pipeline.run().waitUntilFinish(); >>>> >>>> >>>> Best, >>>> Tobi >>>> >>>> [0] http://streamingsystems.net/ >>>> >>> >>> >>> -- >>> >>> This email may be confidential and privileged. If you received this >>> communication by mistake, please don't forward it to anyone else, please >>> erase all copies and attachments, and please let me know that it has gone >>> to the wrong person. >>> >>> The above terms reflect a potential business arrangement, are provided >>> solely as a basis for further discussion, and are not intended to be and do >>> not constitute a legally binding obligation. No legally binding obligations >>> will be created, implied, or inferred until an agreement in final form is >>> executed in writing by all parties involved. >>> >> >> >> -- >> >> This email may be confidential and privileged. If you received this >> communication by mistake, please don't forward it to anyone else, please >> erase all copies and attachments, and please let me know that it has gone >> to the wrong person. >> >> The above terms reflect a potential business arrangement, are provided >> solely as a basis for further discussion, and are not intended to be and do >> not constitute a legally binding obligation. No legally binding obligations >> will be created, implied, or inferred until an agreement in final form is >> executed in writing by all parties involved. >> > > > -- > Tobias Kaymak > Data Engineer > > [email protected] > www.ricardo.ch > -- Tobias Kaymak Data Engineer [email protected] www.ricardo.ch
