Hi Maximilian, I can confirm that checkpoints work with Beam 2.10-SNAPSHOT and do not work with version 2.9. I am very sure it is related to this issue: https://issues.apache.org/jira/browse/FLINK-2491 - which has been fixed in 2.10, since parts of the pipeline are FINISHED after a couple of minutes and this then triggers the shutdown of the checkpoints. However, executing the pipeline on a Flink 1.5.5 cluster yields no metrics about the elements processed in the webinterface anymore:
2019-01-30 09:14:53,934 WARN org.apache.beam.sdk.metrics.MetricsEnvironment - Reporting metrics are not supported in the current execution environment. Is this a known issue? I want to change my Flink version to 1.6 to see if this is fixed, but I am unsure at this point how to achieve this. Is it something I can pass in my pom.xml? [image: image.png] Best, Tobi On Tue, Jan 29, 2019 at 4:27 PM Maximilian Michels <[email protected]> wrote: > Hi Tobias, > > It is normal to see "No restore state for UnbounedSourceWrapper" when not > restoring from a checkpoint/savepoint. > > Just checking. You mentioned you set the checkpoint interval via: > --checkpointingInterval=300000 > > That means you have to wait 5 minutes until the first checkpoint will be > taken. > You should be seeing an INFO message like this: "INFO: Triggering > checkpoint 1 @ > 1548775459114 for job 3b5bdb811f1923bf49db24403e9c1ae9." > > Thanks, > Max > > On 29.01.19 16:13, Kaymak, Tobias wrote: > > Even after altering the pipeline and making it way more simple it still > does not > > checkpoint. (I used a single KafkaTopic as a source and altered the IO > step the > > following way: > > > > .apply( > > BigQueryIO.<Event>write() > > .withMethod(BigQueryIO.Write.Method.FILE_LOADS) > > .withTriggeringFrequency(refreshFrequency) > > .withNumFileShards(1) > > .to(projectId + ":" + dataset + "." + tableName) > > .withTimePartitioning(new > > TimePartitioning().setField("event_date")) > > .withSchema(tableSchema) > > .withFormatFunction( > > (SerializableFunction<Event, TableRow>) > > KafkaToBigQuery::convertUserEventToTableRow) > > > > > .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) > > > > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); > > > > The graph that Flink 1.5.5 generated looked exactly the same and > checkpointing > > did not work still: > > image.png > > > > On Tue, Jan 29, 2019 at 11:05 AM Kaymak, Tobias < > [email protected] > > <mailto:[email protected]>> wrote: > > > > If I have a pipeline running and I restart the taskmanager on which > it's > > executing the log shows - I find the "No restore state for > > UnbounedSourceWrapper." interesting, as it seems to indicate that the > > pipeline never stored a state in the first place? > > > > Starting taskexecutor as a console application on host > > flink-taskmanager-5d85dd6854-pm5bl. > > 2019-01-29 09:20:48,706 WARN > org.apache.hadoop.util.NativeCodeLoader > > - Unable to load native-hadoop library for your > platform... > > using builtin-java classes where applicable > > 2019-01-29 09:20:51,253 WARN > > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - > SASL > > configuration failed: javax.security.auth.login.LoginException: No > JAAS > > configuration section named 'Client' was found in specified JAAS > > configuration file: '/tmp/jaas-7768141350028767113.conf'. Will > continue > > connection to Zookeeper server without SASL authentication, if > Zookeeper > > server allows it. > > 2019-01-29 09:20:51,281 ERROR > > org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - > > Authentication failed > > 2019-01-29 09:21:53,814 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:53,828 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:53,834 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:53,917 INFO > > > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper > > > - No restore state for UnbounedSourceWrapper. > > 2019-01-29 09:21:53,929 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:53,937 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:53,978 INFO > > > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper > > > - Unbounded Flink Source 0/1 is reading from sources: > > > > [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@69908217 > ] > > 2019-01-29 09:21:54,002 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,008 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,011 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,020 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,080 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,091 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,099 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,107 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,109 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,119 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,118 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,115 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,114 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,111 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,111 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,110 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,110 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,109 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,144 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,172 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,176 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,179 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,189 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,191 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,203 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,210 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,217 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,238 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,242 WARN org.apache.flink.metrics.MetricGroup > > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-29 09:21:54,339 INFO > > > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper > > > - No restore state for UnbounedSourceWrapper. > > 2019-01-29 09:21:54,371 INFO > > > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper > > > - No restore state for UnbounedSourceWrapper. > > 2019-01-29 09:21:54,479 INFO > > > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper > > > - No restore state for UnbounedSourceWrapper. > > 2019-01-29 09:21:55,509 INFO > > > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper > > > - Unbounded Flink Source 0/1 is reading from sources: > > > > [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@2a501a64 > ] > > 2019-01-29 09:21:55,535 INFO > > > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper > > > - Unbounded Flink Source 0/1 is reading from sources: > > > > [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@60345813 > ] > > 2019-01-29 09:21:55,770 INFO > > > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper > > > - Unbounded Flink Source 0/1 is reading from sources: > > [org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@75aab48] > > 2019-01-29 09:21:56,280 WARN > > org.apache.kafka.clients.consumer.ConsumerConfig - The > > configuration 'metis.input.messages.config' was supplied but isn't a > known > > config. > > 2019-01-29 09:21:57,387 INFO > > org.apache.beam.sdk.io.gcp.bigquery.BatchLoads - > Writing > > BigQuery temporary files to > > > > gs://bucket/BigQueryWriteTemp/beam_load_ratingsflink01290921554a632112_85202e449b2d45729d384e3ac5f8cc2b/ > > before loading them. > > 2019-01-29 09:21:58,118 INFO > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers - > Waiting for > > jobs to complete. > > 2019-01-29 09:21:58,118 INFO > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers - > Waiting for > > jobs to complete. > > 2019-01-29 09:21:58,118 INFO > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers - > Waiting for > > jobs to complete. > > 2019-01-29 09:21:58,140 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > Reader-0: > > reading from ratings-0 starting at offset 13112 > > 2019-01-29 09:21:58,141 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > Reader-0: > > reading from ratings-1 starting at offset 13407 > > 2019-01-29 09:21:58,142 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > Reader-0: > > reading from ratings-2 starting at offset 13034 > > 2019-01-29 09:21:58,142 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > Reader-0: > > reading from ratings-3 starting at offset 13271 > > 2019-01-29 09:21:58,142 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > Reader-0: > > reading from ratings-4 starting at offset 12813 > > 2019-01-29 09:21:58,142 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > Reader-0: > > reading from ratings-5 starting at offset 13211 > > 2019-01-29 09:21:58,144 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > Reader-0: > > reading from ratings-6 starting at offset 13394 > > 2019-01-29 09:21:58,145 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > Reader-0: > > reading from ratings-7 starting at offset 13194 > > 2019-01-29 09:21:58,145 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > Reader-0: > > reading from ratings-8 starting at offset 13478 > > 2019-01-29 09:21:58,145 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > Reader-0: > > reading from ratings-9 starting at offset 12966 > > > > > > On Mon, Jan 28, 2019 at 3:36 PM Kaymak, Tobias < > [email protected] > > <mailto:[email protected]>> wrote: > > > > Hi Maximilian, > > > > yes, I've set the --runner to FlinkRunner when launching the > pipeline > > and it does work for a GCS sink, but it seems to be ignored for a > > BigQuery sink somehow. Even though it looks like the system > magically > > handles it itself. > > > > This is the full command line to launch the Beam 2.9.0 pipeline > on Flink > > 1.5.5: > > > > bin/flink run -d -c di.beam.KafkaToBigQuery -j > lib/beam_pipelines.jar > > --runner=FlinkRunner --appName=ratings > --checkpointingMode=EXACTLY_ONCE > > --checkpointingInterval=300000 --parallelism=1 > > --tempLocation=gs://somebucket > > > > Here are the logs from the taskmanager, I can share the full > code of the > > pipeline if you want: > > > > 2019-01-28 14:33:31,287 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:31,911 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:31,976 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:32,217 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:32,227 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:32,228 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:32,276 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:32,282 INFO > > > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper > > > - No restore state for UnbounedSourceWrapper. > > 2019-01-28 14:33:32,288 INFO > > > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper > > > - Unbounded Flink Source 0/1 is reading from sources: > > > > [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@5fecdf95 > ] > > 2019-01-28 14:33:32,296 INFO > > > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper > > > - No restore state for UnbounedSourceWrapper. > > 2019-01-28 14:33:32,318 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:32,321 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:32,324 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:32,329 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:32,357 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:32,482 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:32,483 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:32,493 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:32,697 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:32,782 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:32,789 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:33,093 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:33,122 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:33,162 INFO > > > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper > > > - No restore state for UnbounedSourceWrapper. > > 2019-01-28 14:33:33,179 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:33,187 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:33,192 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:33,218 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:33,220 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:33,298 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:33,304 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:33,323 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:33,326 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:33,357 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:33,377 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:33,395 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:33,477 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous) > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:33,487 WARN > org.apache.flink.metrics.MetricGroup > > - The operator name > > > > BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out > > exceeded the 80 characters length limit and was truncated. > > 2019-01-28 14:33:33,748 INFO > > > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper > > > - No restore state for UnbounedSourceWrapper. > > 2019-01-28 14:33:34,577 INFO > > > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper > > > - Unbounded Flink Source 0/1 is reading from sources: > > > > [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@1892a35d > ] > > 2019-01-28 14:33:34,610 INFO > > > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper > > > - Unbounded Flink Source 0/1 is reading from sources: > > > > [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@7adcb05b > ] > > 2019-01-28 14:33:34,747 INFO > > > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper > > > - Unbounded Flink Source 0/1 is reading from sources: > > [org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@71389814] > > 2019-01-28 14:33:34,896 WARN > > org.apache.kafka.clients.consumer.ConsumerConfig - > The > > configuration 'metis.input.messages.config' was supplied but > isn't a > > known config. > > 2019-01-28 14:33:35,462 INFO > > org.apache.beam.sdk.io.gcp.bigquery.BatchLoads - > Writing > > BigQuery temporary files to > > > > gs://tempspace_eu_regional/dataflow_dev/BigQueryWriteTemp/beam_load_ratingsflink01281433342cd8f9b4_7ce229196b1d41e3ad63d0ef6234e0f6/ > > before loading them. > > 2019-01-28 14:33:35,544 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > > Reader-0: reading from ratings-0 starting at offset 2945 > > 2019-01-28 14:33:35,544 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > > Reader-0: reading from ratings-1 starting at offset 3101 > > 2019-01-28 14:33:35,544 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > > Reader-0: reading from ratings-2 starting at offset 3031 > > 2019-01-28 14:33:35,545 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > > Reader-0: reading from ratings-3 starting at offset 3009 > > 2019-01-28 14:33:35,545 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > > Reader-0: reading from ratings-4 starting at offset 2903 > > 2019-01-28 14:33:35,545 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > > Reader-0: reading from ratings-5 starting at offset 3068 > > 2019-01-28 14:33:35,545 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > > Reader-0: reading from ratings-6 starting at offset 3160 > > 2019-01-28 14:33:35,545 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > > Reader-0: reading from ratings-7 starting at offset 3014 > > 2019-01-28 14:33:35,546 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > > Reader-0: reading from ratings-8 starting at offset 3096 > > 2019-01-28 14:33:35,546 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > > Reader-0: reading from ratings-9 starting at offset 2885 > > 2019-01-28 14:33:35,577 WARN > > org.apache.kafka.clients.consumer.ConsumerConfig - > The > > configuration 'metis.input.messages.config' was supplied but > isn't a > > known config. > > 2019-01-28 14:33:35,801 INFO > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers - > Waiting > > for jobs to complete. > > 2019-01-28 14:33:35,803 INFO > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers - > Waiting > > for jobs to complete. > > 2019-01-28 14:33:35,801 INFO > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers - > Waiting > > for jobs to complete. > > 2019-01-28 14:33:36,217 INFO > > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - > > Reader-0: first record offset 3014 > > > > > > Best, > > Tobi > > > > > > On Mon, Jan 28, 2019 at 11:52 AM Maximilian Michels < > [email protected] > > <mailto:[email protected]>> wrote: > > > > Hi Tobias, > > > > Checkpointing should be enabled when you set it in the Flink > config > > or via the > > Beam option `checkpointingInterval`. Did you set `runner` to > > `FlinkRunner`? > > > > If possible, could you share parts of the Flink logs? > > > > Thanks, > > Max > > > > On 25.01.19 15:14, Kaymak, Tobias wrote: > > > Hi, > > > > > > I am trying to migrate my existing KafkaToGCS pipeline to > a > > KafkaToBigQuery > > > pipeline to skip the loading step from GCS which is > currently > > handled externally > > > from Beam. > > > > > > I noticed that the pipeline, written in Beam 2.9.0 (Java) > does > > not trigger any > > > checkpoint on Flink (1.5.5), even though its configured > to do so > > when I launch > > > it. Is this normal? How does Beam then guarantee exactly > once > > when there are no > > > checkpoints in Flink? (It seems to start from scratch > when it > > crashes, during my > > > tests, but I am not 100% sure) > > > > > > > > > This is my pipeline: > > > > > > pipeline > > > .apply( > > > KafkaIO.<String, String>read() > > > .withBootstrapServers(bootstrap) > > > .withTopics(topics) > > > > .withKeyDeserializer(StringDeserializer.class) > > > > > .withValueDeserializer(ConfigurableDeserializer.class) > > > .updateConsumerProperties( > > > > > ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, > > > inputMessagesConfig)) > > > > > > .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", > > > "earliest")) > > > > > .updateConsumerProperties(ImmutableMap.of("group.id < > http://group.id> > > > <http://group.id>", groupId)) > > > > > > .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", > > > "true")) > > > .withReadCommitted() > > > .withTimestampPolicyFactory(withEventTs) > > > .commitOffsetsInFinalize()) > > > .apply(ParDo.of(new ToEventFn())) > > > .apply( > > > Window.into(new > ZurichTimePartitioningWindowFn()) > > > > > > .triggering( > > > Repeatedly.forever( > > > AfterFirst.of( > > > > > AfterPane.elementCountAtLeast(bundleSize), > > > > > AfterProcessingTime.pastFirstElementInPane() > > > > .plusDelayOf(refreshFrequency)))) > > > > .withAllowedLateness(Duration.standardDays(14)) > > > .discardingFiredPanes()) > > > .apply( > > > BigQueryIO.<Event>write() > > > > .withMethod(BigQueryIO.Write.Method.FILE_LOADS) > > > > .withTriggeringFrequency(refreshFrequency) > > > .withNumFileShards(1) > > > .to(partitionedTableDynamicDestinations) > > > .withFormatFunction( > > > (SerializableFunction<Event, > TableRow>) > > > > KafkaToBigQuery::convertUserEventToTableRow) > > > > > > > > > .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) > > > > > > > > > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); > > > > > > pipeline.run().waitUntilFinish(); > > > It's launched like the other (GCS) one via: > > > > > > ...--checkpointingMode=EXACTLY_ONCE > --checkpointingInterval=300000 > > > --parallelism=1 --tempLocation=gs://foo.. > > > > > > Any idea why checkpointing does not work here? > > > > > > Best, > > > Tobias > > > > > > > > -- > > Tobias Kaymak > > Data Engineer > > > > [email protected] <mailto:[email protected]> > > www.ricardo.ch <http://www.ricardo.ch/> > > > > > > > > -- > > Tobias Kaymak > > Data Engineer > > > > [email protected] <mailto:[email protected]> > > www.ricardo.ch <http://www.ricardo.ch/> > > > > > > > > -- > > Tobias Kaymak > > Data Engineer > > > > [email protected] <mailto:[email protected]> > > www.ricardo.ch <http://www.ricardo.ch/> > > >
