Sorry i was on the phone, the Flink version is 1.5.4.
Am Fr., 1. Feb. 2019, 20:19 hat Juan Carlos Garcia <[email protected]
<mailto:[email protected]>> geschrieben:
Hi Tobias
I would like to ask the following and see if this apply to you.
How many kafka partitions you have?
How many Taskmanagers are you using? (parallelism)
There is bug in Flink, which is triggered as soon as you start
playing around with a parallelism greater than the amount of
partitions in your kafka topic.
If you were using Flink api directly you can control de parallelism
on each operation (sources and sinks), however when using beam the
parallelism is apply to all of the operator in the DAG..
I am using beam 2.9 with Flink 1.5.2 and just today we deployed a
pipeline (OnPremise) reading from our kafka and publishing to
bigquery, using hdfs as backing store for checkpoint and is working
flawless.
Here is a link for the Flink bug
https://issues.apache.org/jira/browse/FLINK-2491
Hope it helps.
JC
Am Do., 31. Jan. 2019, 15:51 hat Kaymak, Tobias
<[email protected] <mailto:[email protected]>>
geschrieben:
I should have drank the coffee before writing this ;)
The end-to-end-duration of snapshots is fine, the snapshots were
created at 10:00 in the morning and I thought they took
increasing more time because of the clock ;)
The rest of the findings are still valid.
On Thu, Jan 31, 2019 at 12:27 PM Kaymak, Tobias
<[email protected] <mailto:[email protected]>> wrote:
Follow up to summarize my findings so far:
Beam 2.9.0 with Flink 1.5.5: No checkpoints are created
Beam 2.10RC1 with Flink 1.6.2: Metrics shown in the
webinterface of Flink's jobmanager, Checkpoints are created
- but checkpoints take more than 10 minutes even if
end-to-end duration is 10 seconds.
Beam 2.10RC1 with Flink 1.7.1: No Metrics shown in the
webinterface of Flink's jobmanager, Checkpoints are created
- but checkpoints take more than 10 minutes even if
end-to-end duration is 10 seconds
Attached is a screenshot from the 1.7.1 webinterface. So far
using 1.6.2 with Beam 2.10RC1 seems to be the best option. I
am continuing to investigate why the checkpoints take so long.
image.png
On Wed, Jan 30, 2019 at 1:49 PM Maximilian Michels
<[email protected] <mailto:[email protected]>> wrote:
Thank you for verifying this. This is manifested in
https://jira.apache.org/jira/browse/BEAM-5386 and has
indeed been fixed already
for 2.10.0.
This likely warrants a 2.9.1 release. I'll check on the
dev mailing list.
Thanks,
Max
On 30.01.19 10:27, Kaymak, Tobias wrote:
> Hi Maximilian,
>
> I can confirm that checkpoints work with Beam
2.10-SNAPSHOT and do not work with
> version 2.9. I am very sure it is related to this issue:
> https://issues.apache.org/jira/browse/FLINK-2491 -
which has been fixed in 2.10,
> since parts of the pipeline are FINISHED after a
couple of minutes and this then
> triggers the shutdown of the checkpoints. However,
executing the pipeline on a
> Flink 1.5.5 cluster yields no metrics about the
elements processed in the
> webinterface anymore:
>
> 2019-01-30 09:14:53,934 WARN
org.apache.beam.sdk.metrics.MetricsEnvironment -
> Reporting metrics are not supported in the current
execution environment.
>
> Is this a known issue? I want to change my Flink
version to 1.6 to see if this
> is fixed, but I am unsure at this point how to
achieve this. Is it something I
> can pass in my pom.xml?
>
>
>
>
> image.png
>
> Best,
> Tobi
>
>
>
> On Tue, Jan 29, 2019 at 4:27 PM Maximilian Michels
<[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>> wrote:
>
> Hi Tobias,
>
> It is normal to see "No restore state for
UnbounedSourceWrapper" when not
> restoring from a checkpoint/savepoint.
>
> Just checking. You mentioned you set the
checkpoint interval via:
> --checkpointingInterval=300000
>
> That means you have to wait 5 minutes until the
first checkpoint will be taken.
> You should be seeing an INFO message like this:
"INFO: Triggering checkpoint
> 1 @
> 1548775459114 for job
3b5bdb811f1923bf49db24403e9c1ae9."
>
> Thanks,
> Max
>
> On 29.01.19 16:13, Kaymak, Tobias wrote:
> > Even after altering the pipeline and making it
way more simple it still
> does not
> > checkpoint. (I used a single KafkaTopic as a
source and altered the IO
> step the
> > following way:
> >
> > .apply(
> > BigQueryIO.<Event>write()
> >
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> >
.withTriggeringFrequency(refreshFrequency)
> > .withNumFileShards(1)
> > .to(projectId + ":" + dataset
+ "." + tableName)
> > .withTimePartitioning(new
> > TimePartitioning().setField("event_date"))
> > .withSchema(tableSchema)
> > .withFormatFunction(
> >
(SerializableFunction<Event, TableRow>)
> >
KafkaToBigQuery::convertUserEventToTableRow)
> >
> >
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
> >
> >
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
> >
> > The graph that Flink 1.5.5 generated looked
exactly the same and
> checkpointing
> > did not work still:
> > image.png
> >
> > On Tue, Jan 29, 2019 at 11:05 AM Kaymak,
Tobias <[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>
> > <mailto:[email protected]
<mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>>> wrote:
> >
> > If I have a pipeline running and I restart
the taskmanager on which it's
> > executing the log shows - I find the "No
restore state for
> > UnbounedSourceWrapper." interesting, as it
seems to indicate that the
> > pipeline never stored a state in the first
place?
> >
> > Starting taskexecutor as a console
application on host
> > flink-taskmanager-5d85dd6854-pm5bl.
> > 2019-01-29 09:20:48,706 WARN
org.apache.hadoop.util.NativeCodeLoader
> > - Unable to load
native-hadoop library for your
> platform...
> > using builtin-java classes where applicable
> > 2019-01-29 09:20:51,253 WARN
> >
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - SASL
> > configuration failed:
javax.security.auth.login.LoginException: No JAAS
> > configuration section named 'Client' was
found in specified JAAS
> > configuration file:
'/tmp/jaas-7768141350028767113.conf'. Will continue
> > connection to Zookeeper server without
SASL authentication, if Zookeeper
> > server allows it.
> > 2019-01-29 09:20:51,281 ERROR
> >
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState -
> > Authentication failed
> > 2019-01-29 09:21:53,814 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:53,828 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:53,834 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:53,917 INFO
> >
org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> <http://streaming.io>.UnboundedSourceWrapper
> > - No restore state for UnbounedSourceWrapper.
> > 2019-01-29 09:21:53,929 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:53,937 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:53,978 INFO
> >
org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> <http://streaming.io>.UnboundedSourceWrapper
> > - Unbounded Flink Source 0/1 is reading
from sources:
> >
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@69908217]
> > 2019-01-29 09:21:54,002 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,008 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,011 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,020 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,080 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,091 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,099 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,107 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,109 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,119 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,118 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,115 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,114 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,111 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,111 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,110 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,110 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,109 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,144 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,172 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,176 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,179 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,189 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,191 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,203 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,210 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,217 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,238 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,242 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
> > exceeded the 80 characters length limit
and was truncated.
> > 2019-01-29 09:21:54,339 INFO
> >
org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> <http://streaming.io>.UnboundedSourceWrapper
> > - No restore state for UnbounedSourceWrapper.
> > 2019-01-29 09:21:54,371 INFO
> >
org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> <http://streaming.io>.UnboundedSourceWrapper
> > - No restore state for UnbounedSourceWrapper.
> > 2019-01-29 09:21:54,479 INFO
> >
org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> <http://streaming.io>.UnboundedSourceWrapper
> > - No restore state for UnbounedSourceWrapper.
> > 2019-01-29 09:21:55,509 INFO
> >
org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> <http://streaming.io>.UnboundedSourceWrapper
> > - Unbounded Flink Source 0/1 is reading
from sources:
> >
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@2a501a64]
> > 2019-01-29 09:21:55,535 INFO
> >
org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> <http://streaming.io>.UnboundedSourceWrapper
> > - Unbounded Flink Source 0/1 is reading
from sources:
> >
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@60345813]
> > 2019-01-29 09:21:55,770 INFO
> >
org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> <http://streaming.io>.UnboundedSourceWrapper
> > - Unbounded Flink Source 0/1 is reading
from sources:
> > [org.apache.beam.sdk.io
<http://org.apache.beam.sdk.io>.kafka.KafkaUnboundedSource@75aab48]
> > 2019-01-29 09:21:56,280 WARN
> >
org.apache.kafka.clients.consumer.ConsumerConfig
- The
> > configuration
'metis.input.messages.config' was supplied but isn't a
> known
> > config.
> > 2019-01-29 09:21:57,387 INFO
> >
org.apache.beam.sdk.io.gcp.bigquery.BatchLoads
- Writing
> > BigQuery temporary files to
> >
>
gs://bucket/BigQueryWriteTemp/beam_load_ratingsflink01290921554a632112_85202e449b2d45729d384e3ac5f8cc2b/
> > before loading them.
> > 2019-01-29 09:21:58,118 INFO
> >
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
-
> Waiting for
> > jobs to complete.
> > 2019-01-29 09:21:58,118 INFO
> >
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
-
> Waiting for
> > jobs to complete.
> > 2019-01-29 09:21:58,118 INFO
> >
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
-
> Waiting for
> > jobs to complete.
> > 2019-01-29 09:21:58,140 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
- Reader-0:
> > reading from ratings-0 starting at offset
13112
> > 2019-01-29 09:21:58,141 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
- Reader-0:
> > reading from ratings-1 starting at offset
13407
> > 2019-01-29 09:21:58,142 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
- Reader-0:
> > reading from ratings-2 starting at offset
13034
> > 2019-01-29 09:21:58,142 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
- Reader-0:
> > reading from ratings-3 starting at offset
13271
> > 2019-01-29 09:21:58,142 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
- Reader-0:
> > reading from ratings-4 starting at offset
12813
> > 2019-01-29 09:21:58,142 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
- Reader-0:
> > reading from ratings-5 starting at offset
13211
> > 2019-01-29 09:21:58,144 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
- Reader-0:
> > reading from ratings-6 starting at offset
13394
> > 2019-01-29 09:21:58,145 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
- Reader-0:
> > reading from ratings-7 starting at offset
13194
> > 2019-01-29 09:21:58,145 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
- Reader-0:
> > reading from ratings-8 starting at offset
13478
> > 2019-01-29 09:21:58,145 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
- Reader-0:
> > reading from ratings-9 starting at offset
12966
> >
> >
> > On Mon, Jan 28, 2019 at 3:36 PM Kaymak, Tobias
> <[email protected]
<mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>
> > <mailto:[email protected]
<mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>>>
> wrote:
> >
> > Hi Maximilian,
> >
> > yes, I've set the --runner to
FlinkRunner when launching the pipeline
> > and it does work for a GCS sink, but
it seems to be ignored for a
> > BigQuery sink somehow. Even though it
looks like the system magically
> > handles it itself.
> >
> > This is the full command line to
launch the Beam 2.9.0 pipeline
> on Flink
> > 1.5.5:
> >
> > bin/flink run -d -c
di.beam.KafkaToBigQuery -j lib/beam_pipelines.jar
> > --runner=FlinkRunner --appName=ratings
> --checkpointingMode=EXACTLY_ONCE
> > --checkpointingInterval=300000
--parallelism=1
> > --tempLocation=gs://somebucket
> >
> > Here are the logs from the
taskmanager, I can share the full code
> of the
> > pipeline if you want:
> >
> > 2019-01-28 14:33:31,287 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:31,911 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:31,976 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:32,217 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:32,227 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:32,228 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:32,276 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:32,282 INFO
> >
org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> <http://streaming.io>.UnboundedSourceWrapper
> > - No restore state for
UnbounedSourceWrapper.
> > 2019-01-28 14:33:32,288 INFO
> >
org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> <http://streaming.io>.UnboundedSourceWrapper
> > - Unbounded Flink Source 0/1 is
reading from sources:
> >
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@5fecdf95]
> > 2019-01-28 14:33:32,296 INFO
> >
org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> <http://streaming.io>.UnboundedSourceWrapper
> > - No restore state for
UnbounedSourceWrapper.
> > 2019-01-28 14:33:32,318 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:32,321 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:32,324 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:32,329 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:32,357 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:32,482 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:32,483 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:32,493 WARN
org.apache.flink.metrics.MetricGroup
> > �� - The operator name
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:32,697 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:32,782 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:32,789 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:33,093 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:33,122 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:33,162 INFO
> >
org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> <http://streaming.io>.UnboundedSourceWrapper
> > - No restore state for
UnbounedSourceWrapper.
> > 2019-01-28 14:33:33,179 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:33,187 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:33,192 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:33,218 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:33,220 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:33,298 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:33,304 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:33,323 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:33,326 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:33,357 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:33,377 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:33,395 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:33,477 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:33,487 WARN
org.apache.flink.metrics.MetricGroup
> > - The operator name
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out
> > exceeded the 80 characters length
limit and was truncated.
> > 2019-01-28 14:33:33,748 INFO
> >
org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> <http://streaming.io>.UnboundedSourceWrapper
> > - No restore state for
UnbounedSourceWrapper.
> > 2019-01-28 14:33:34,577 INFO
> >
org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> <http://streaming.io>.UnboundedSourceWrapper
> > - Unbounded Flink Source 0/1 is
reading from sources:
> >
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@1892a35d]
> > 2019-01-28 14:33:34,610 INFO
> >
org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> <http://streaming.io>.UnboundedSourceWrapper
> > - Unbounded Flink Source 0/1 is
reading from sources:
> >
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@7adcb05b]
> > 2019-01-28 14:33:34,747 INFO
> >
org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> <http://streaming.io>.UnboundedSourceWrapper
> > - Unbounded Flink Source 0/1 is
reading from sources:
> > [org.apache.beam.sdk.io
<http://org.apache.beam.sdk.io>.kafka.KafkaUnboundedSource@71389814]
> > 2019-01-28 14:33:34,896 WARN
> >
org.apache.kafka.clients.consumer.ConsumerConfig
- The
> > configuration
'metis.input.messages.config' was supplied but isn't a
> > known config.
> > 2019-01-28 14:33:35,462 INFO
> >
org.apache.beam.sdk.io.gcp.bigquery.BatchLoads
-
> Writing
> > BigQuery temporary files to
> >
>
gs://tempspace_eu_regional/dataflow_dev/BigQueryWriteTemp/beam_load_ratingsflink01281433342cd8f9b4_7ce229196b1d41e3ad63d0ef6234e0f6/
> > before loading them.
> > 2019-01-28 14:33:35,544 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
-
> > Reader-0: reading from ratings-0
starting at offset 2945
> > 2019-01-28 14:33:35,544 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
-
> > Reader-0: reading from ratings-1
starting at offset 3101
> > 2019-01-28 14:33:35,544 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
-
> > Reader-0: reading from ratings-2
starting at offset 3031
> > 2019-01-28 14:33:35,545 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
-
> > Reader-0: reading from ratings-3
starting at offset 3009
> > 2019-01-28 14:33:35,545 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
-
> > Reader-0: reading from ratings-4
starting at offset 2903
> > 2019-01-28 14:33:35,545 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
-
> > Reader-0: reading from ratings-5
starting at offset 3068
> > 2019-01-28 14:33:35,545 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
-
> > Reader-0: reading from ratings-6
starting at offset 3160
> > 2019-01-28 14:33:35,545 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
-
> > Reader-0: reading from ratings-7
starting at offset 3014
> > 2019-01-28 14:33:35,546 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
-
> > Reader-0: reading from ratings-8
starting at offset 3096
> > 2019-01-28 14:33:35,546 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
-
> > Reader-0: reading from ratings-9
starting at offset 2885
> > 2019-01-28 14:33:35,577 WARN
> >
org.apache.kafka.clients.consumer.ConsumerConfig
- The
> > configuration
'metis.input.messages.config' was supplied but isn't a
> > known config.
> > 2019-01-28 14:33:35,801 INFO
> >
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
-
> Waiting
> > for jobs to complete.
> > 2019-01-28 14:33:35,803 INFO
> >
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
-
> Waiting
> > for jobs to complete.
> > 2019-01-28 14:33:35,801 INFO
> >
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
-
> Waiting
> > for jobs to complete.
> > 2019-01-28 14:33:36,217 INFO
> >
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
-
> > Reader-0: first record offset 3014
> >
> >
> > Best,
> > Tobi
> >
> >
> > On Mon, Jan 28, 2019 at 11:52 AM
Maximilian Michels
> <[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> > <mailto:[email protected]
<mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>>> wrote:
> >
> > Hi Tobias,
> >
> > Checkpointing should be enabled
when you set it in the Flink
> config
> > or via the
> > Beam option
`checkpointingInterval`. Did you set `runner` to
> > `FlinkRunner`?
> >
> > If possible, could you share parts
of the Flink logs?
> >
> > Thanks,
> > Max
> >
> > On 25.01.19 15:14, Kaymak, Tobias
wrote:
> > > Hi,
> > >
> > > I am trying to migrate my
existing KafkaToGCS pipeline to a
> > KafkaToBigQuery
> > > pipeline to skip the loading
step from GCS which is currently
> > handled externally
> > > from Beam.
> > >
> > > I noticed that the pipeline,
written in Beam 2.9.0 (Java) does
> > not trigger any
> > > checkpoint on Flink (1.5.5),
even though its configured to
> do so
> > when I launch
> > > it. Is this normal? How does
Beam then guarantee exactly once
> > when there are no
> > > checkpoints in Flink? (It seems
to start from scratch when it
> > crashes, during my
> > > tests, but I am not 100% sure)
> > >
> > >
> > > This is my pipeline:
> > >
> > > pipeline
> > > .apply(
> > > KafkaIO.<String,
String>read()
> > >
.withBootstrapServers(bootstrap)
> > �� >
.withTopics(topics)
> > >
> .withKeyDeserializer(StringDeserializer.class)
> > >
> >
.withValueDeserializer(ConfigurableDeserializer.class)
> > >
.updateConsumerProperties(
> > >
> >
ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
> > > inputMessagesConfig))
> > >
> >
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
> > > "earliest"))
> > >
> >
.updateConsumerProperties(ImmutableMap.of("group.id
<http://group.id>
> <http://group.id> <http://group.id>
> > > <http://group.id>", groupId))
> > >
> >
.updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
> > > "true"))
> > >
.withReadCommitted()
> > >
.withTimestampPolicyFactory(withEventTs)
> > >
.commitOffsetsInFinalize())
> > > .apply(ParDo.of(new
ToEventFn()))
> > > .apply(
> > > Window.into(new
ZurichTimePartitioningWindowFn())
> > >
> > > .triggering(
> > >
Repeatedly.forever(
> > >
AfterFirst.of(
> > >
> >
AfterPane.elementCountAtLeast(bundleSize),
> > >
> >
AfterProcessingTime.pastFirstElementInPane()
> > >
> .plusDelayOf(refreshFrequency))))
> > >
> .withAllowedLateness(Duration.standardDays(14))
> > >
.discardingFiredPanes())
> > > .apply(
> > >
BigQueryIO.<Event>write()
> > >
> .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> > >
.withTriggeringFrequency(refreshFrequency)
> > >
.withNumFileShards(1)
> > >
.to(partitionedTableDynamicDestinations)
> > >
.withFormatFunction(
> > >
(SerializableFunction<Event, TableRow>)
> > >
> KafkaToBigQuery::convertUserEventToTableRow)
> > >
> > >
> >
>
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
> > >
> > >
> >
>
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
> > >
> > >
pipeline.run().waitUntilFinish();
> > > It's launched like the other
(GCS) one via:
> > >
> > > ...--checkpointingMode=EXACTLY_ONCE
> --checkpointingInterval=300000
> > > --parallelism=1
--tempLocation=gs://foo..
> > >
> > > Any idea why checkpointing does
not work here?
> > >
> > > Best,
> > > Tobias
> >
> >
> >
> > --
> > Tobias Kaymak
> > Data Engineer
> >
> > [email protected]
<mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected]
<mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>>
> > www.ricardo.ch <http://www.ricardo.ch>
<http://www.ricardo.ch> <http://www.ricardo.ch/>
> >
> >
> >
> > --
> > Tobias Kaymak
> > Data Engineer
> >
> > [email protected]
<mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected]
<mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>>
> > www.ricardo.ch <http://www.ricardo.ch>
<http://www.ricardo.ch> <http://www.ricardo.ch/>
> >
> >
> >
> > --
> > Tobias Kaymak
> > Data Engineer
> >
> > [email protected]
<mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected]
<mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>>
> > www.ricardo.ch <http://www.ricardo.ch>
<http://www.ricardo.ch> <http://www.ricardo.ch/>
> >
>