Hi Tobias,

It is normal to see "No restore state for UnbounedSourceWrapper" when not restoring from a checkpoint/savepoint.

Just checking. You mentioned you set the checkpoint interval via: --checkpointingInterval=300000

That means you have to wait 5 minutes until the first checkpoint will be taken. You should be seeing an INFO message like this: "INFO: Triggering checkpoint 1 @ 1548775459114 for job 3b5bdb811f1923bf49db24403e9c1ae9."

Thanks,
Max

On 29.01.19 16:13, Kaymak, Tobias wrote:
Even after altering the pipeline and making it way more simple it still does not checkpoint. (I used a single KafkaTopic as a source and altered the IO step the following way:

      .apply(
             BigQueryIO.<Event>write()
                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                 .withTriggeringFrequency(refreshFrequency)
                 .withNumFileShards(1)
                 .to(projectId + ":" + dataset + "." + tableName)
                .withTimePartitioning(new TimePartitioning().setField("event_date"))
                 .withSchema(tableSchema)
                 .withFormatFunction(
                     (SerializableFunction<Event, TableRow>)
                         KafkaToBigQuery::convertUserEventToTableRow)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

The graph that Flink 1.5.5 generated looked exactly the same and checkpointing did not work still:
image.png

On Tue, Jan 29, 2019 at 11:05 AM Kaymak, Tobias <[email protected] <mailto:[email protected]>> wrote:

    If I have a pipeline running and I restart the taskmanager on which it's
    executing the log shows - I find the "No restore state for
    UnbounedSourceWrapper." interesting, as it seems to indicate that the
    pipeline never stored a state in the first place?

    Starting taskexecutor as a console application on host
    flink-taskmanager-5d85dd6854-pm5bl.
2019-01-29 09:20:48,706 WARN  org.apache.hadoop.util.NativeCodeLoader                - Unable to load native-hadoop library for your platform...
    using builtin-java classes where applicable
2019-01-29 09:20:51,253 WARN org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL
    configuration failed: javax.security.auth.login.LoginException: No JAAS
    configuration section named 'Client' was found in specified JAAS
    configuration file: '/tmp/jaas-7768141350028767113.conf'. Will continue
    connection to Zookeeper server without SASL authentication, if Zookeeper
    server allows it.
    2019-01-29 09:20:51,281 ERROR
    org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  -
    Authentication failed
2019-01-29 09:21:53,814 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:53,828 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:53,834 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:53,917 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper - No restore state for UnbounedSourceWrapper. 2019-01-29 09:21:53,929 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:53,937 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:53,978 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper - Unbounded Flink Source 0/1 is reading from sources:
    
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@69908217]
2019-01-29 09:21:54,002 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,008 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,011 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,020 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,080 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,091 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,099 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,107 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,109 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,119 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,118 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,115 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,114 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,111 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,111 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,110 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,110 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,109 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,144 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,172 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,176 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,179 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,189 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,191 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,203 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,210 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,217 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,238 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,242 WARN  org.apache.flink.metrics.MetricGroup                 - The operator name
    
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
    exceeded the 80 characters length limit and was truncated.
2019-01-29 09:21:54,339 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper - No restore state for UnbounedSourceWrapper. 2019-01-29 09:21:54,371 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper - No restore state for UnbounedSourceWrapper. 2019-01-29 09:21:54,479 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper - No restore state for UnbounedSourceWrapper. 2019-01-29 09:21:55,509 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper - Unbounded Flink Source 0/1 is reading from sources:
    
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@2a501a64]
2019-01-29 09:21:55,535 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper - Unbounded Flink Source 0/1 is reading from sources:
    
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@60345813]
2019-01-29 09:21:55,770 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper - Unbounded Flink Source 0/1 is reading from sources:
    [org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@75aab48]
2019-01-29 09:21:56,280 WARN org.apache.kafka.clients.consumer.ConsumerConfig              - The
    configuration 'metis.input.messages.config' was supplied but isn't a known
    config.
2019-01-29 09:21:57,387 INFO org.apache.beam.sdk.io.gcp.bigquery.BatchLoads                - Writing
    BigQuery temporary files to
    
gs://bucket/BigQueryWriteTemp/beam_load_ratingsflink01290921554a632112_85202e449b2d45729d384e3ac5f8cc2b/
    before loading them.
2019-01-29 09:21:58,118 INFO org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           - Waiting for
    jobs to complete.
2019-01-29 09:21:58,118 INFO org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           - Waiting for
    jobs to complete.
2019-01-29 09:21:58,118 INFO org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           - Waiting for
    jobs to complete.
2019-01-29 09:21:58,140 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - Reader-0:
    reading from ratings-0 starting at offset 13112
2019-01-29 09:21:58,141 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - Reader-0:
    reading from ratings-1 starting at offset 13407
2019-01-29 09:21:58,142 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - Reader-0:
    reading from ratings-2 starting at offset 13034
2019-01-29 09:21:58,142 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - Reader-0:
    reading from ratings-3 starting at offset 13271
2019-01-29 09:21:58,142 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - Reader-0:
    reading from ratings-4 starting at offset 12813
2019-01-29 09:21:58,142 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - Reader-0:
    reading from ratings-5 starting at offset 13211
2019-01-29 09:21:58,144 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - Reader-0:
    reading from ratings-6 starting at offset 13394
2019-01-29 09:21:58,145 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - Reader-0:
    reading from ratings-7 starting at offset 13194
2019-01-29 09:21:58,145 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - Reader-0:
    reading from ratings-8 starting at offset 13478
2019-01-29 09:21:58,145 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - Reader-0:
    reading from ratings-9 starting at offset 12966


    On Mon, Jan 28, 2019 at 3:36 PM Kaymak, Tobias <[email protected]
    <mailto:[email protected]>> wrote:

        Hi Maximilian,

        yes, I've set the --runner to FlinkRunner when launching the pipeline
        and it does work for a GCS sink, but it seems to be ignored for a
        BigQuery sink somehow. Even though it looks like the system magically
        handles it itself.

        This is the full command line to launch the Beam 2.9.0 pipeline on Flink
        1.5.5:

        bin/flink run -d -c di.beam.KafkaToBigQuery -j lib/beam_pipelines.jar
        --runner=FlinkRunner --appName=ratings --checkpointingMode=EXACTLY_ONCE
        --checkpointingInterval=300000 --parallelism=1
        --tempLocation=gs://somebucket

        Here are the logs from the taskmanager, I can share the full code of the
        pipeline if you want:

2019-01-28 14:33:31,287 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:31,911 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:31,976 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,217 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,227 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,228 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,276 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,282 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper - No restore state for UnbounedSourceWrapper. 2019-01-28 14:33:32,288 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper - Unbounded Flink Source 0/1 is reading from sources:
        
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@5fecdf95]
2019-01-28 14:33:32,296 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper - No restore state for UnbounedSourceWrapper. 2019-01-28 14:33:32,318 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,321 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,324 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,329 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,357 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,482 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,483 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,493 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,697 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,782 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,789 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,093 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,122 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,162 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper - No restore state for UnbounedSourceWrapper. 2019-01-28 14:33:33,179 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,187 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,192 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,218 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,220 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,298 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,304 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,323 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,326 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,357 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,377 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,395 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,477 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,487 WARN  org.apache.flink.metrics.MetricGroup                     - The operator name
        
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out
        exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,748 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper - No restore state for UnbounedSourceWrapper. 2019-01-28 14:33:34,577 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper - Unbounded Flink Source 0/1 is reading from sources:
        
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@1892a35d]
2019-01-28 14:33:34,610 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper - Unbounded Flink Source 0/1 is reading from sources:
        
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@7adcb05b]
2019-01-28 14:33:34,747 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper - Unbounded Flink Source 0/1 is reading from sources:
        [org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@71389814]
2019-01-28 14:33:34,896 WARN org.apache.kafka.clients.consumer.ConsumerConfig              - The
        configuration 'metis.input.messages.config' was supplied but isn't a
        known config.
2019-01-28 14:33:35,462 INFO org.apache.beam.sdk.io.gcp.bigquery.BatchLoads                - Writing
        BigQuery temporary files to
        
gs://tempspace_eu_regional/dataflow_dev/BigQueryWriteTemp/beam_load_ratingsflink01281433342cd8f9b4_7ce229196b1d41e3ad63d0ef6234e0f6/
        before loading them.
2019-01-28 14:33:35,544 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
        Reader-0: reading from ratings-0 starting at offset 2945
2019-01-28 14:33:35,544 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
        Reader-0: reading from ratings-1 starting at offset 3101
2019-01-28 14:33:35,544 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
        Reader-0: reading from ratings-2 starting at offset 3031
2019-01-28 14:33:35,545 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
        Reader-0: reading from ratings-3 starting at offset 3009
2019-01-28 14:33:35,545 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
        Reader-0: reading from ratings-4 starting at offset 2903
2019-01-28 14:33:35,545 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
        Reader-0: reading from ratings-5 starting at offset 3068
2019-01-28 14:33:35,545 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
        Reader-0: reading from ratings-6 starting at offset 3160
2019-01-28 14:33:35,545 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
        Reader-0: reading from ratings-7 starting at offset 3014
2019-01-28 14:33:35,546 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
        Reader-0: reading from ratings-8 starting at offset 3096
2019-01-28 14:33:35,546 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
        Reader-0: reading from ratings-9 starting at offset 2885
2019-01-28 14:33:35,577 WARN org.apache.kafka.clients.consumer.ConsumerConfig              - The
        configuration 'metis.input.messages.config' was supplied but isn't a
        known config.
2019-01-28 14:33:35,801 INFO org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           - Waiting
        for jobs to complete.
2019-01-28 14:33:35,803 INFO org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           - Waiting
        for jobs to complete.
2019-01-28 14:33:35,801 INFO org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           - Waiting
        for jobs to complete.
2019-01-28 14:33:36,217 INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
        Reader-0: first record offset 3014


        Best,
        Tobi


        On Mon, Jan 28, 2019 at 11:52 AM Maximilian Michels <[email protected]
        <mailto:[email protected]>> wrote:

            Hi Tobias,

            Checkpointing should be enabled when you set it in the Flink config
            or via the
            Beam option `checkpointingInterval`. Did you set `runner` to
            `FlinkRunner`?

            If possible, could you share parts of the Flink logs?

            Thanks,
            Max

            On 25.01.19 15:14, Kaymak, Tobias wrote:
             > Hi,
             >
             > I am trying to migrate my existing KafkaToGCS pipeline to a
            KafkaToBigQuery
             > pipeline to skip the loading step from GCS which is currently
            handled externally
             > from Beam.
             >
             > I noticed that the pipeline, written in Beam 2.9.0 (Java) does
            not trigger any
             > checkpoint on Flink (1.5.5), even though its configured to do so
            when I launch
             > it. Is this normal? How does Beam then guarantee exactly once
            when there are no
             > checkpoints in Flink? (It seems to start from scratch when it
            crashes, during my
             > tests, but I am not 100% sure)
             >
             >
             > This is my pipeline:
             >
             >   pipeline
             >          .apply(
             >              KafkaIO.<String, String>read()
             >                  .withBootstrapServers(bootstrap)
             >                  .withTopics(topics)
             >                  .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(ConfigurableDeserializer.class)
             >                  .updateConsumerProperties(
> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
             > inputMessagesConfig))
> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
             > "earliest"))
> .updateConsumerProperties(ImmutableMap.of("group.id <http://group.id>
             > <http://group.id>", groupId))
> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
             > "true"))
             >                  .withReadCommitted()
             >                  .withTimestampPolicyFactory(withEventTs)
             >                  .commitOffsetsInFinalize())
             >          .apply(ParDo.of(new ToEventFn()))
             >          .apply(
             >              Window.into(new ZurichTimePartitioningWindowFn())
             >
             >                  .triggering(
             >                      Repeatedly.forever(
             >                          AfterFirst.of(
> AfterPane.elementCountAtLeast(bundleSize), > AfterProcessingTime.pastFirstElementInPane()
             >                                  
.plusDelayOf(refreshFrequency))))
             >                  .withAllowedLateness(Duration.standardDays(14))
             >                  .discardingFiredPanes())
             >          .apply(
             >              BigQueryIO.<Event>write()
             >                  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
             >                  .withTriggeringFrequency(refreshFrequency)
             >                  .withNumFileShards(1)
             >                  .to(partitionedTableDynamicDestinations)
             >                  .withFormatFunction(
             >                      (SerializableFunction<Event, TableRow>)
             >                          
KafkaToBigQuery::convertUserEventToTableRow)
             >
             >
            
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
             >
             >
            
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
             >
             >      pipeline.run().waitUntilFinish();
             > It's launched like the other (GCS) one via:
             >
             > ...--checkpointingMode=EXACTLY_ONCE 
--checkpointingInterval=300000
             > --parallelism=1 --tempLocation=gs://foo..
             >
             > Any idea why checkpointing does not work here?
             >
             > Best,
             > Tobias



-- Tobias Kaymak
        Data Engineer

        [email protected] <mailto:[email protected]>
        www.ricardo.ch <http://www.ricardo.ch/>



-- Tobias Kaymak
    Data Engineer

    [email protected] <mailto:[email protected]>
    www.ricardo.ch <http://www.ricardo.ch/>



--
Tobias Kaymak
Data Engineer

[email protected] <mailto:[email protected]>
www.ricardo.ch <http://www.ricardo.ch/>

Reply via email to