Tobias, do I understand correctly that checkpointing works fine with 2.10.0? The issues you reported are related to Metrics only?

Juan, it is correct that the bug[1] is caused by a Problem in Flink[2]. The bug occurred when a task had no work assigned, e.g. an empty partition list when parallelism > #partitions.

The issue is resolved for 2.10.0 and will be fixed in 2.7.1 (LTS).

[1] https://jira.apache.org/jira/browse/BEAM-5386
[2] https://issues.apache.org/jira/browse/FLINK-2491

On 01.02.19 20:21, Juan Carlos Garcia wrote:
Sorry i was on the phone, the Flink version is 1.5.4.

Am Fr., 1. Feb. 2019, 20:19 hat Juan Carlos Garcia <[email protected] <mailto:[email protected]>> geschrieben:

    Hi Tobias

    I would like to ask the following and see if this apply to you.

    How many kafka partitions you have?
    How many Taskmanagers are you using? (parallelism)

    There is bug in Flink, which is triggered as soon as you start
    playing around with a parallelism greater than the amount of
    partitions in your kafka topic.

    If you were using Flink api directly you can control de parallelism
    on each operation (sources and sinks), however when using beam the
    parallelism is apply to all of the operator in the DAG..

    I am using beam 2.9 with Flink 1.5.2 and just today we deployed a
    pipeline (OnPremise) reading from our kafka and publishing to
    bigquery, using hdfs as backing store for checkpoint and is working
    flawless.

    Here is a link for the Flink bug

    https://issues.apache.org/jira/browse/FLINK-2491

    Hope it helps.

    JC

    Am Do., 31. Jan. 2019, 15:51 hat Kaymak, Tobias
    <[email protected] <mailto:[email protected]>>
    geschrieben:

        I should have drank the coffee before writing this ;)
        The end-to-end-duration of snapshots is fine, the snapshots were
        created at 10:00 in the morning and I thought they took
        increasing more time because of the clock ;)
        The rest of the findings are still valid.

        On Thu, Jan 31, 2019 at 12:27 PM Kaymak, Tobias
        <[email protected] <mailto:[email protected]>> wrote:

            Follow up to summarize my findings so far:

            Beam 2.9.0 with Flink 1.5.5: No checkpoints are created
            Beam 2.10RC1 with Flink 1.6.2: Metrics shown in the
            webinterface of Flink's jobmanager, Checkpoints are created
            - but checkpoints take more than 10 minutes even if
            end-to-end duration is 10 seconds.
            Beam 2.10RC1 with Flink 1.7.1: No Metrics shown in the
            webinterface of Flink's jobmanager, Checkpoints are created
            - but checkpoints take more than 10 minutes even if
            end-to-end duration is 10 seconds

            Attached is a screenshot from the 1.7.1 webinterface. So far
            using 1.6.2 with Beam 2.10RC1 seems to be the best option. I
            am continuing to investigate why the checkpoints take so long.

            image.png


            On Wed, Jan 30, 2019 at 1:49 PM Maximilian Michels
            <[email protected] <mailto:[email protected]>> wrote:

                Thank you for verifying this. This is manifested in
                https://jira.apache.org/jira/browse/BEAM-5386 and has
                indeed been fixed already
                for 2.10.0.

                This likely warrants a 2.9.1 release. I'll check on the
                dev mailing list.

                Thanks,
                Max

                On 30.01.19 10:27, Kaymak, Tobias wrote:
                 > Hi Maximilian,
                 >
                 > I can confirm that checkpoints work with Beam
                2.10-SNAPSHOT and do not work with
                 > version 2.9. I am very sure it is related to this issue:
                 > https://issues.apache.org/jira/browse/FLINK-2491 -
                which has been fixed in 2.10,
                 > since parts of the pipeline are FINISHED after a
                couple of minutes and this then
                 > triggers the shutdown of the checkpoints. However,
                executing the pipeline on a
                 > Flink 1.5.5 cluster yields no metrics about the
                elements processed in the
                 > webinterface anymore:
                 >
> 2019-01-30 09:14:53,934 WARN org.apache.beam.sdk.metrics.MetricsEnvironment -
                 > Reporting metrics are not supported in the current
                execution environment.
                 >
                 > Is this a known issue? I want to change my Flink
                version to 1.6 to see if this
                 > is fixed, but I am unsure at this point how to
                achieve this. Is it something I
                 > can pass in my pom.xml?
                 >
                 >
                 >
                 >
                 > image.png
                 >
                 > Best,
                 > Tobi
                 >
                 >
                 >
                 > On Tue, Jan 29, 2019 at 4:27 PM Maximilian Michels
                <[email protected] <mailto:[email protected]>
                 > <mailto:[email protected] <mailto:[email protected]>>> wrote:
                 >
                 >     Hi Tobias,
                 >
                 >     It is normal to see "No restore state for
                UnbounedSourceWrapper" when not
                 >     restoring from a checkpoint/savepoint.
                 >
                 >     Just checking. You mentioned you set the
                checkpoint interval via:
                 >     --checkpointingInterval=300000
                 >
                 >     That means you have to wait 5 minutes until the
                first checkpoint will be taken.
                 >     You should be seeing an INFO message like this:
                "INFO: Triggering checkpoint
                 >     1 @
                 >     1548775459114 for job
                3b5bdb811f1923bf49db24403e9c1ae9."
                 >
                 >     Thanks,
                 >     Max
                 >
                 >     On 29.01.19 16:13, Kaymak, Tobias wrote:
                 >      > Even after altering the pipeline and making it
                way more simple it still
                 >     does not
                 >      > checkpoint. (I used a single KafkaTopic as a
                source and altered the IO
                 >     step the
                 >      > following way:
                 >      >
                 >      >       .apply(
                 >      >              BigQueryIO.<Event>write()
>      > .withMethod(BigQueryIO.Write.Method.FILE_LOADS) >      > .withTriggeringFrequency(refreshFrequency)
                 >      >                  .withNumFileShards(1)
                 >      >                  .to(projectId + ":" + dataset
                + "." + tableName)
                 >      >                  .withTimePartitioning(new
                 >      > TimePartitioning().setField("event_date"))
                 >      >                  .withSchema(tableSchema)
                 >      >                  .withFormatFunction(
>      > (SerializableFunction<Event, TableRow>) >      > KafkaToBigQuery::convertUserEventToTableRow)
                 >      >
                 >      >
                
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                 >      >
                 >      >
                
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
                 >      >
                 >      > The graph that Flink 1.5.5 generated looked
                exactly the same and
                 >     checkpointing
                 >      > did not work still:
                 >      > image.png
                 >      >
                 >      > On Tue, Jan 29, 2019 at 11:05 AM Kaymak,
                Tobias <[email protected]
                <mailto:[email protected]>
                 >     <mailto:[email protected]
                <mailto:[email protected]>>
                 >      > <mailto:[email protected]
                <mailto:[email protected]>
                <mailto:[email protected]
                <mailto:[email protected]>>>> wrote:
                 >      >
                 >      >     If I have a pipeline running and I restart
                the taskmanager on which it's
                 >      >     executing the log shows - I find the "No
                restore state for
                 >      >     UnbounedSourceWrapper." interesting, as it
                seems to indicate that the
                 >      >     pipeline never stored a state in the first
                place?
                 >      >
                 >      >     Starting taskexecutor as a console
                application on host
                 >      >     flink-taskmanager-5d85dd6854-pm5bl.
>      >     2019-01-29 09:20:48,706 WARN org.apache.hadoop.util.NativeCodeLoader
                 >      >                     - Unable to load
                native-hadoop library for your
                 >     platform...
                 >      >     using builtin-java classes where applicable
                 >      >     2019-01-29 09:20:51,253 WARN
>      >  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL
                 >      >     configuration failed:
                javax.security.auth.login.LoginException: No JAAS
                 >      >     configuration section named 'Client' was
                found in specified JAAS
                 >      >     configuration file:
                '/tmp/jaas-7768141350028767113.conf'. Will continue
                 >      >     connection to Zookeeper server without
                SASL authentication, if Zookeeper
                 >      >     server allows it.
                 >      >     2019-01-29 09:20:51,281 ERROR
>      >  org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  -
                 >      >     Authentication failed
>      >     2019-01-29 09:21:53,814 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:53,828 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:53,834 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out
                 >      >     exceeded the 80 characters length limit
                and was truncated.
                 >      >     2019-01-29 09:21:53,917 INFO
                 >      >
                org.apache.beam.runners.flink.translation.wrappers.streaming.io
                
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
                 >     <http://streaming.io>.UnboundedSourceWrapper
                 >      >     - No restore state for UnbounedSourceWrapper.
>      >     2019-01-29 09:21:53,929 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:53,937 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
                 >      >     2019-01-29 09:21:53,978 INFO
                 >      >
                org.apache.beam.runners.flink.translation.wrappers.streaming.io
                
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
                 >     <http://streaming.io>.UnboundedSourceWrapper
                 >      >     - Unbounded Flink Source 0/1 is reading
                from sources:
                 >      >
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@69908217] >      >     2019-01-29 09:21:54,002 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,008 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,011 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,020 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,080 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,091 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,099 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,107 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,109 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,119 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,118 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,115 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,114 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,111 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,111 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,110 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,110 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,109 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,144 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,172 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,176 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,179 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,189 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,191 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,203 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,210 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,217 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,238 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
>      >     2019-01-29 09:21:54,242 WARN org.apache.flink.metrics.MetricGroup
                 >      >                      - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
                 >      >     exceeded the 80 characters length limit
                and was truncated.
                 >      >     2019-01-29 09:21:54,339 INFO
                 >      >
                org.apache.beam.runners.flink.translation.wrappers.streaming.io
                
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
                 >     <http://streaming.io>.UnboundedSourceWrapper
                 >      >     - No restore state for UnbounedSourceWrapper.
                 >      >     2019-01-29 09:21:54,371 INFO
                 >      >
                org.apache.beam.runners.flink.translation.wrappers.streaming.io
                
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
                 >     <http://streaming.io>.UnboundedSourceWrapper
                 >      >     - No restore state for UnbounedSourceWrapper.
                 >      >     2019-01-29 09:21:54,479 INFO
                 >      >
                org.apache.beam.runners.flink.translation.wrappers.streaming.io
                
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
                 >     <http://streaming.io>.UnboundedSourceWrapper
                 >      >     - No restore state for UnbounedSourceWrapper.
                 >      >     2019-01-29 09:21:55,509 INFO
                 >      >
                org.apache.beam.runners.flink.translation.wrappers.streaming.io
                
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
                 >     <http://streaming.io>.UnboundedSourceWrapper
                 >      >     - Unbounded Flink Source 0/1 is reading
                from sources:
                 >      >
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@2a501a64]
                 >      >     2019-01-29 09:21:55,535 INFO
                 >      >
                org.apache.beam.runners.flink.translation.wrappers.streaming.io
                
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
                 >     <http://streaming.io>.UnboundedSourceWrapper
                 >      >     - Unbounded Flink Source 0/1 is reading
                from sources:
                 >      >
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@60345813]
                 >      >     2019-01-29 09:21:55,770 INFO
                 >      >
                org.apache.beam.runners.flink.translation.wrappers.streaming.io
                
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
                 >     <http://streaming.io>.UnboundedSourceWrapper
                 >      >     - Unbounded Flink Source 0/1 is reading
                from sources:
                 >      >     [org.apache.beam.sdk.io
                
<http://org.apache.beam.sdk.io>.kafka.KafkaUnboundedSource@75aab48]
                 >      >     2019-01-29 09:21:56,280 WARN
>      >  org.apache.kafka.clients.consumer.ConsumerConfig         - The
                 >      >     configuration
                'metis.input.messages.config' was supplied but isn't a
                 >     known
                 >      >     config.
                 >      >     2019-01-29 09:21:57,387 INFO
>      >  org.apache.beam.sdk.io.gcp.bigquery.BatchLoads         - Writing
                 >      >     BigQuery temporary files to
                 >      >
>  gs://bucket/BigQueryWriteTemp/beam_load_ratingsflink01290921554a632112_85202e449b2d45729d384e3ac5f8cc2b/
                 >      >     before loading them.
                 >      >     2019-01-29 09:21:58,118 INFO
>      >  org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers        -
                 >     Waiting for
                 >      >     jobs to complete.
                 >      >     2019-01-29 09:21:58,118 INFO
>      >  org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers        -
                 >     Waiting for
                 >      >     jobs to complete.
                 >      >     2019-01-29 09:21:58,118 INFO
>      >  org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers        -
                 >     Waiting for
                 >      >     jobs to complete.
                 >      >     2019-01-29 09:21:58,140 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        - Reader-0:
                 >      >     reading from ratings-0 starting at offset
                13112
                 >      >     2019-01-29 09:21:58,141 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        - Reader-0:
                 >      >     reading from ratings-1 starting at offset
                13407
                 >      >     2019-01-29 09:21:58,142 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        - Reader-0:
                 >      >     reading from ratings-2 starting at offset
                13034
                 >      >     2019-01-29 09:21:58,142 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        - Reader-0:
                 >      >     reading from ratings-3 starting at offset
                13271
                 >      >     2019-01-29 09:21:58,142 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        - Reader-0:
                 >      >     reading from ratings-4 starting at offset
                12813
                 >      >     2019-01-29 09:21:58,142 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        - Reader-0:
                 >      >     reading from ratings-5 starting at offset
                13211
                 >      >     2019-01-29 09:21:58,144 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        - Reader-0:
                 >      >     reading from ratings-6 starting at offset
                13394
                 >      >     2019-01-29 09:21:58,145 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        - Reader-0:
                 >      >     reading from ratings-7 starting at offset
                13194
                 >      >     2019-01-29 09:21:58,145 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        - Reader-0:
                 >      >     reading from ratings-8 starting at offset
                13478
                 >      >     2019-01-29 09:21:58,145 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        - Reader-0:
                 >      >     reading from ratings-9 starting at offset
                12966
                 >      >
                 >      >
                 >      >     On Mon, Jan 28, 2019 at 3:36 PM Kaymak, Tobias
                 >     <[email protected]
                <mailto:[email protected]>
                <mailto:[email protected]
                <mailto:[email protected]>>
                 >      >     <mailto:[email protected]
                <mailto:[email protected]>
                <mailto:[email protected]
                <mailto:[email protected]>>>>
                 >     wrote:
                 >      >
                 >      >         Hi Maximilian,
                 >      >
                 >      >         yes, I've set the --runner to
                FlinkRunner when launching the pipeline
                 >      >         and it does work for a GCS sink, but
                it seems to be ignored for a
                 >      >         BigQuery sink somehow. Even though it
                looks like the system magically
                 >      >         handles it itself.
                 >      >
                 >      >         This is the full command line to
                launch the Beam 2.9.0 pipeline
                 >     on Flink
                 >      >         1.5.5:
                 >      >
                 >      >         bin/flink run -d -c
                di.beam.KafkaToBigQuery -j lib/beam_pipelines.jar
                 >      >         --runner=FlinkRunner --appName=ratings
                 >     --checkpointingMode=EXACTLY_ONCE
                 >      >         --checkpointingInterval=300000
                --parallelism=1
                 >      >         --tempLocation=gs://somebucket
                 >      >
                 >      >         Here are the logs from the
                taskmanager, I can share the full code
                 >     of the
                 >      >         pipeline if you want:
                 >      >
>      >         2019-01-28 14:33:31,287 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:31,911 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:31,976 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:32,217 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:32,227 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:32,228 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:32,276 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
                 >      >         2019-01-28 14:33:32,282 INFO
                 >      >
                org.apache.beam.runners.flink.translation.wrappers.streaming.io
                
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
                 >     <http://streaming.io>.UnboundedSourceWrapper
                 >      >         - No restore state for
                UnbounedSourceWrapper.
                 >      >         2019-01-28 14:33:32,288 INFO
                 >      >
                org.apache.beam.runners.flink.translation.wrappers.streaming.io
                
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
                 >     <http://streaming.io>.UnboundedSourceWrapper
                 >      >         - Unbounded Flink Source 0/1 is
                reading from sources:
                 >      >
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@5fecdf95]
                 >      >         2019-01-28 14:33:32,296 INFO
                 >      >
                org.apache.beam.runners.flink.translation.wrappers.streaming.io
                
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
                 >     <http://streaming.io>.UnboundedSourceWrapper
                 >      >         - No restore state for
                UnbounedSourceWrapper.
>      >         2019-01-28 14:33:32,318 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:32,321 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:32,324 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:32,329 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:32,357 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:32,482 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:32,483 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:32,493 WARN org.apache.flink.metrics.MetricGroup
                 >      >                          ��   - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:32,697 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:32,782 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:32,789 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:33,093 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:33,122 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
                 >      >         2019-01-28 14:33:33,162 INFO
                 >      >
                org.apache.beam.runners.flink.translation.wrappers.streaming.io
                
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
                 >     <http://streaming.io>.UnboundedSourceWrapper
                 >      >         - No restore state for
                UnbounedSourceWrapper.
>      >         2019-01-28 14:33:33,179 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:33,187 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:33,192 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:33,218 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:33,220 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:33,298 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:33,304 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:33,323 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:33,326 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:33,357 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:33,377 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:33,395 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:33,477 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
                 >      >         exceeded the 80 characters length
                limit and was truncated.
>      >         2019-01-28 14:33:33,487 WARN org.apache.flink.metrics.MetricGroup
                 >      >                              - The operator name
                 >      >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out
                 >      >         exceeded the 80 characters length
                limit and was truncated.
                 >      >         2019-01-28 14:33:33,748 INFO
                 >      >
                org.apache.beam.runners.flink.translation.wrappers.streaming.io
                
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
                 >     <http://streaming.io>.UnboundedSourceWrapper
                 >      >         - No restore state for
                UnbounedSourceWrapper.
                 >      >         2019-01-28 14:33:34,577 INFO
                 >      >
                org.apache.beam.runners.flink.translation.wrappers.streaming.io
                
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
                 >     <http://streaming.io>.UnboundedSourceWrapper
                 >      >         - Unbounded Flink Source 0/1 is
                reading from sources:
                 >      >
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@1892a35d]
                 >      >         2019-01-28 14:33:34,610 INFO
                 >      >
                org.apache.beam.runners.flink.translation.wrappers.streaming.io
                
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
                 >     <http://streaming.io>.UnboundedSourceWrapper
                 >      >         - Unbounded Flink Source 0/1 is
                reading from sources:
                 >      >
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@7adcb05b]
                 >      >         2019-01-28 14:33:34,747 INFO
                 >      >
                org.apache.beam.runners.flink.translation.wrappers.streaming.io
                
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
                 >     <http://streaming.io>.UnboundedSourceWrapper
                 >      >         - Unbounded Flink Source 0/1 is
                reading from sources:
                 >      >         [org.apache.beam.sdk.io
                
<http://org.apache.beam.sdk.io>.kafka.KafkaUnboundedSource@71389814]
                 >      >         2019-01-28 14:33:34,896 WARN
>      >  org.apache.kafka.clients.consumer.ConsumerConfig         - The
                 >      >         configuration
                'metis.input.messages.config' was supplied but isn't a
                 >      >         known config.
                 >      >         2019-01-28 14:33:35,462 INFO
>      >  org.apache.beam.sdk.io.gcp.bigquery.BatchLoads         -
                 >     Writing
                 >      >         BigQuery temporary files to
                 >      >
>  gs://tempspace_eu_regional/dataflow_dev/BigQueryWriteTemp/beam_load_ratingsflink01281433342cd8f9b4_7ce229196b1d41e3ad63d0ef6234e0f6/
                 >      >         before loading them.
                 >      >         2019-01-28 14:33:35,544 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        -
                 >      >         Reader-0: reading from ratings-0
                starting at offset 2945
                 >      >         2019-01-28 14:33:35,544 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        -
                 >      >         Reader-0: reading from ratings-1
                starting at offset 3101
                 >      >         2019-01-28 14:33:35,544 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        -
                 >      >         Reader-0: reading from ratings-2
                starting at offset 3031
                 >      >         2019-01-28 14:33:35,545 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        -
                 >      >         Reader-0: reading from ratings-3
                starting at offset 3009
                 >      >         2019-01-28 14:33:35,545 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        -
                 >      >         Reader-0: reading from ratings-4
                starting at offset 2903
                 >      >         2019-01-28 14:33:35,545 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        -
                 >      >         Reader-0: reading from ratings-5
                starting at offset 3068
                 >      >         2019-01-28 14:33:35,545 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        -
                 >      >         Reader-0: reading from ratings-6
                starting at offset 3160
                 >      >         2019-01-28 14:33:35,545 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        -
                 >      >         Reader-0: reading from ratings-7
                starting at offset 3014
                 >      >         2019-01-28 14:33:35,546 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        -
                 >      >         Reader-0: reading from ratings-8
                starting at offset 3096
                 >      >         2019-01-28 14:33:35,546 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        -
                 >      >         Reader-0: reading from ratings-9
                starting at offset 2885
                 >      >         2019-01-28 14:33:35,577 WARN
>      >  org.apache.kafka.clients.consumer.ConsumerConfig         - The
                 >      >         configuration
                'metis.input.messages.config' was supplied but isn't a
                 >      >         known config.
                 >      >         2019-01-28 14:33:35,801 INFO
>      >  org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers        -
                 >     Waiting
                 >      >         for jobs to complete.
                 >      >         2019-01-28 14:33:35,803 INFO
>      >  org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers        -
                 >     Waiting
                 >      >         for jobs to complete.
                 >      >         2019-01-28 14:33:35,801 INFO
>      >  org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers        -
                 >     Waiting
                 >      >         for jobs to complete.
                 >      >         2019-01-28 14:33:36,217 INFO
>      >  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource        -
                 >      >         Reader-0: first record offset 3014
                 >      >
                 >      >
                 >      >         Best,
                 >      >         Tobi
                 >      >
                 >      >
                 >      >         On Mon, Jan 28, 2019 at 11:52 AM
                Maximilian Michels
                 >     <[email protected] <mailto:[email protected]>
                <mailto:[email protected] <mailto:[email protected]>>
                 >      >         <mailto:[email protected]
                <mailto:[email protected]> <mailto:[email protected]
                <mailto:[email protected]>>>> wrote:
                 >      >
                 >      >             Hi Tobias,
                 >      >
                 >      >             Checkpointing should be enabled
                when you set it in the Flink
                 >     config
                 >      >             or via the
                 >      >             Beam option
                `checkpointingInterval`. Did you set `runner` to
                 >      >             `FlinkRunner`?
                 >      >
                 >      >             If possible, could you share parts
                of the Flink logs?
                 >      >
                 >      >             Thanks,
                 >      >             Max
                 >      >
                 >      >             On 25.01.19 15:14, Kaymak, Tobias
                wrote:
                 >      >              > Hi,
                 >      >              >
                 >      >              > I am trying to migrate my
                existing KafkaToGCS pipeline to a
                 >      >             KafkaToBigQuery
                 >      >              > pipeline to skip the loading
                step from GCS which is currently
                 >      >             handled externally
                 >      >              > from Beam.
                 >      >              >
                 >      >              > I noticed that the pipeline,
                written in Beam 2.9.0 (Java) does
                 >      >             not trigger any
                 >      >              > checkpoint on Flink (1.5.5),
                even though its configured to
                 >     do so
                 >      >             when I launch
                 >      >              > it. Is this normal? How does
                Beam then guarantee exactly once
                 >      >             when there are no
                 >      >              > checkpoints in Flink? (It seems
                to start from scratch when it
                 >      >             crashes, during my
                 >      >              > tests, but I am not 100% sure)
                 >      >              >
                 >      >              >
                 >      >              > This is my pipeline:
                 >      >              >
                 >      >              >   pipeline
                 >      >              >          .apply(
                 >      >              >              KafkaIO.<String,
                String>read()
>      >              > .withBootstrapServers(bootstrap) >      >      ��       > .withTopics(topics)
                 >      >              >
                 >     .withKeyDeserializer(StringDeserializer.class)
                 >      >              >
>      >  .withValueDeserializer(ConfigurableDeserializer.class) >      >              > .updateConsumerProperties(
                 >      >              >
>      >  ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
                 >      >              > inputMessagesConfig))
                 >      >              >
>      >  .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
                 >      >              > "earliest"))
                 >      >              >
>      >  .updateConsumerProperties(ImmutableMap.of("group.id
                <http://group.id>
                 >     <http://group.id> <http://group.id>
                 >      >              > <http://group.id>", groupId))
                 >      >              >
>      >  .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
                 >      >              > "true"))
>      >              > .withReadCommitted() >      >              > .withTimestampPolicyFactory(withEventTs) >      >              > .commitOffsetsInFinalize())
                 >      >              >          .apply(ParDo.of(new
                ToEventFn()))
                 >      >              >          .apply(
                 >      >              >              Window.into(new
                ZurichTimePartitioningWindowFn())
                 >      >              >
                 >      >              >                  .triggering(
>      >              > Repeatedly.forever( >      >              > AfterFirst.of(
                 >      >              >
>      >  AfterPane.elementCountAtLeast(bundleSize),
                 >      >              >
>      >  AfterProcessingTime.pastFirstElementInPane()
                 >      >              >
                 >     .plusDelayOf(refreshFrequency))))
                 >      >              >
                 >     .withAllowedLateness(Duration.standardDays(14))
>      >              > .discardingFiredPanes())
                 >      >              >          .apply(
>      >              > BigQueryIO.<Event>write()
                 >      >              >
                 >     .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>      >              > .withTriggeringFrequency(refreshFrequency) >      >              > .withNumFileShards(1) >      >              > .to(partitionedTableDynamicDestinations) >      >              > .withFormatFunction( >      >              > (SerializableFunction<Event, TableRow>)
                 >      >              >
                 >     KafkaToBigQuery::convertUserEventToTableRow)
                 >      >              >
                 >      >              >
                 >      >
>  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                 >      >              >
                 >      >              >
                 >      >
>  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
                 >      >              >
>      >              > pipeline.run().waitUntilFinish();
                 >      >              > It's launched like the other
                (GCS) one via:
                 >      >              >
                 >      >              > ...--checkpointingMode=EXACTLY_ONCE
                 >     --checkpointingInterval=300000
                 >      >              > --parallelism=1
                --tempLocation=gs://foo..
                 >      >              >
                 >      >              > Any idea why checkpointing does
                not work here?
                 >      >              >
                 >      >              > Best,
                 >      >              > Tobias
                 >      >
                 >      >
                 >      >
                 >      >         --
                 >      >         Tobias Kaymak
                 >      >         Data Engineer
                 >      >
                 >      > [email protected]
                <mailto:[email protected]>
                <mailto:[email protected]
                <mailto:[email protected]>>
                 >     <mailto:[email protected]
                <mailto:[email protected]>
                <mailto:[email protected]
                <mailto:[email protected]>>>
                 >      > www.ricardo.ch <http://www.ricardo.ch>
                <http://www.ricardo.ch> <http://www.ricardo.ch/>
                 >      >
                 >      >
                 >      >
                 >      >     --
                 >      >     Tobias Kaymak
                 >      >     Data Engineer
                 >      >
                 >      > [email protected]
                <mailto:[email protected]>
                <mailto:[email protected]
                <mailto:[email protected]>>
                 >     <mailto:[email protected]
                <mailto:[email protected]>
                <mailto:[email protected]
                <mailto:[email protected]>>>
                 >      > www.ricardo.ch <http://www.ricardo.ch>
                <http://www.ricardo.ch> <http://www.ricardo.ch/>
                 >      >
                 >      >
                 >      >
                 >      > --
                 >      > Tobias Kaymak
                 >      > Data Engineer
                 >      >
                 >      > [email protected]
                <mailto:[email protected]>
                <mailto:[email protected]
                <mailto:[email protected]>>
                 >     <mailto:[email protected]
                <mailto:[email protected]>
                <mailto:[email protected]
                <mailto:[email protected]>>>
                 >      > www.ricardo.ch <http://www.ricardo.ch>
                <http://www.ricardo.ch> <http://www.ricardo.ch/>
                 >      >
                 >

Reply via email to