But it seems like that it fails, when I remove the windowing from the
pipeline, so I guess the answer is a no.

On Mon, Jan 28, 2019 at 11:36 AM Kaymak, Tobias <[email protected]>
wrote:

> Yes I am making use of partitioned tables, that's why I was wondering if
> the windowing step could be skipped. :)
>
> Cheers
> Tobi
>
> On Mon, Jan 28, 2019 at 10:43 AM Reza Rokni <[email protected]> wrote:
>
>> My apologies Tobi too quick to hit the send button :-(
>>
>> I was checking to ask if you had also looked at partition tables in
>> BigQuery, assuming the only partitioning you are doing is by Day.
>>
>> Cheers
>> Reza
>>
>> On Mon, 28 Jan 2019 at 17:22, Reza Rokni <[email protected]> wrote:
>>
>>> Hi Tobi,
>>>
>>> Are you making use of partitioned tables in BigQuery or shard tables?
>>>
>>> https://cloud.google.com/bigquery/docs/partitioned-tables
>>>
>>> Cheers
>>>
>>> Reza
>>>
>>>
>>> On Mon, 28 Jan 2019 at 17:11, Kaymak, Tobias <[email protected]>
>>> wrote:
>>>
>>>> I was spending some time with the "Streaming Systems" [0] book over
>>>> the weekend and I thought that my pipeline might be doing something "too
>>>> much" as the BigQuery sink already should partition the data by day and put
>>>> it in the right place - so can my windowing function in the following
>>>> pipeline be left out?
>>>>
>>>> I am asking this since sometimes I miss an element at the very edge of
>>>> a window compared to a pipeline with a GCS sink and I thought maybe that
>>>> this is related to doing the same thing twice (windowing and then the sink
>>>> does "window" it again):
>>>>
>>>> pipeline
>>>>         .apply(
>>>>             KafkaIO.<String, String>read()
>>>>                 .withBootstrapServers(bootstrap)
>>>>                 .withTopics(topics)
>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>                 .withValueDeserializer(ConfigurableDeserializer.class)
>>>>                 .updateConsumerProperties(
>>>>
>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>> inputMessagesConfig))
>>>>
>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
>>>>                 .updateConsumerProperties(ImmutableMap.of("group.id",
>>>> groupId))
>>>>
>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
>>>>                 .withReadCommitted()
>>>>                 .withTimestampPolicyFactory(withEventTs)
>>>>                 .commitOffsetsInFinalize())
>>>>         .apply(ParDo.of(new ToEventFn()))
>>>>
>>>> // DELETE the following up to
>>>>         .apply(
>>>>             Window.into(new ZurichTimePartitioningWindowFn())
>>>>
>>>>                 .triggering(
>>>>                     Repeatedly.forever(
>>>>                         AfterFirst.of(
>>>>                             AfterPane.elementCountAtLeast(bundleSize),
>>>>                             AfterProcessingTime.pastFirstElementInPane()
>>>>                                 .plusDelayOf(refreshFrequency))))
>>>>                 .withAllowedLateness(Duration.standardDays(14))
>>>>                 .discardingFiredPanes())
>>>> // HERE - END of DELETION
>>>>
>>>>         .apply(
>>>>             BigQueryIO.<Event>write()
>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>>>                 .withTriggeringFrequency(refreshFrequency)
>>>>                 .withNumFileShards(1)
>>>>                 .to(partitionedTableDynamicDestinations)
>>>>                 .withFormatFunction(
>>>>                     (SerializableFunction<Event, TableRow>)
>>>>                         KafkaToBigQuery::convertUserEventToTableRow)
>>>>
>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>
>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>
>>>>     pipeline.run().waitUntilFinish();
>>>>
>>>>
>>>> Best,
>>>> Tobi
>>>>
>>>> [0] http://streamingsystems.net/
>>>>
>>>
>>>
>>> --
>>>
>>> This email may be confidential and privileged. If you received this
>>> communication by mistake, please don't forward it to anyone else, please
>>> erase all copies and attachments, and please let me know that it has gone
>>> to the wrong person.
>>>
>>> The above terms reflect a potential business arrangement, are provided
>>> solely as a basis for further discussion, and are not intended to be and do
>>> not constitute a legally binding obligation. No legally binding obligations
>>> will be created, implied, or inferred until an agreement in final form is
>>> executed in writing by all parties involved.
>>>
>>
>>
>> --
>>
>> This email may be confidential and privileged. If you received this
>> communication by mistake, please don't forward it to anyone else, please
>> erase all copies and attachments, and please let me know that it has gone
>> to the wrong person.
>>
>> The above terms reflect a potential business arrangement, are provided
>> solely as a basis for further discussion, and are not intended to be and do
>> not constitute a legally binding obligation. No legally binding obligations
>> will be created, implied, or inferred until an agreement in final form is
>> executed in writing by all parties involved.
>>
>
>
> --
> Tobias Kaymak
> Data Engineer
>
> [email protected]
> www.ricardo.ch
>


-- 
Tobias Kaymak
Data Engineer

[email protected]
www.ricardo.ch

Reply via email to