[jira] [Commented] (BEAM-2534) KafkaIO should allow gaps in message offsets
[ https://issues.apache.org/jira/browse/BEAM-2534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16076928#comment-16076928 ] Jingsong Lee commented on BEAM-2534: Could be cherry-picked to 2.1.0 branch? > KafkaIO should allow gaps in message offsets > > > Key: BEAM-2534 > URL: https://issues.apache.org/jira/browse/BEAM-2534 > Project: Beam > Issue Type: Bug > Components: sdk-java-extensions >Affects Versions: 2.0.0 >Reporter: Raghu Angadi >Assignee: Raghu Angadi >Priority: Minor > Fix For: 2.1.0 > > > KafkaIO reader logs a warning when it notices gaps in offsets for messages. > While such gaps are not expected for normal Kafka topics, there could be gaps > when log compaction is enabled (which deletes older messages for a key). > This warning log is not very useful. Also we should take such gaps while > estimating backlog. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1531) Support dynamic work rebalancing for HBaseIO
[ https://issues.apache.org/jira/browse/BEAM-1531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16067961#comment-16067961 ] Jingsong Lee commented on BEAM-1531: (y) It's Great! > Support dynamic work rebalancing for HBaseIO > > > Key: BEAM-1531 > URL: https://issues.apache.org/jira/browse/BEAM-1531 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Ismaël Mejía >Assignee: Ismaël Mejía >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1531) Support dynamic work rebalancing for HBaseIO
[ https://issues.apache.org/jira/browse/BEAM-1531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16067629#comment-16067629 ] Jingsong Lee commented on BEAM-1531: Of course, the embedded HBase server version is better, it is a complete mini hbase cluster. So I only changed the testReadingSplitAtFraction (only involving Scanner.iterator) test, other tests remain unchanged. I think there is a tradeoff here, tradeoff of test accuracy and test speed. For testReadingSplitAtFraction test, if we can effectively improve the speed, but also there is a good mock(query by startRow and stopRow), we can achieve the purpose of our test. (test HBaseIO.splitAtFraction) I carried out some tests, understand the realization of HBaseTestingUtility, which has a complete miniHBaseCluster and miniZKCluster, JVM has 8000+ classes and 300+ threads when run. Then it is very slow. I do not have a detailed understanding, probably need to do a cluster of things, but let a JVM to do, resulting in a very slow running. > Support dynamic work rebalancing for HBaseIO > > > Key: BEAM-1531 > URL: https://issues.apache.org/jira/browse/BEAM-1531 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Ismaël Mejía >Assignee: Ismaël Mejía >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1531) Support dynamic work rebalancing for HBaseIO
[ https://issues.apache.org/jira/browse/BEAM-1531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16066883#comment-16066883 ] Jingsong Lee commented on BEAM-1531: Hi [~iemejia]l, at BEAM-2393 we talked about accelerating HBaseIOTest. Since some tests (including {{testReadingSplitAtFraction}}) only use the Scanner.iterator interface, so I abstracted the iterator interface, so use memory queries to speed up the test. I did not abstract the estimateSizeBytes and split, that would be a bit more complicated. Commit: https://github.com/JingsongLi/beam/commit/d1080c2a1669d13654f64b77122f7cfcb6e1edeb What do you think? > Support dynamic work rebalancing for HBaseIO > > > Key: BEAM-1531 > URL: https://issues.apache.org/jira/browse/BEAM-1531 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Ismaël Mejía >Assignee: Ismaël Mejía >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
[ https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16066882#comment-16066882 ] Jingsong Lee commented on BEAM-2393: Let's talk about it in BEAM-1531. > BoundedSource is not fault-tolerant in FlinkRunner Streaming mode > - > > Key: BEAM-2393 > URL: https://issues.apache.org/jira/browse/BEAM-2393 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Jingsong Lee >Assignee: Jingsong Lee > > {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when > the failure to restart, it will send duplicate data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
[ https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16066145#comment-16066145 ] Jingsong Lee commented on BEAM-2393: (y) Yes, that's what I want. I can help you to optimize tests. > BoundedSource is not fault-tolerant in FlinkRunner Streaming mode > - > > Key: BEAM-2393 > URL: https://issues.apache.org/jira/browse/BEAM-2393 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Jingsong Lee >Assignee: Jingsong Lee > > {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when > the failure to restart, it will send duplicate data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
[ https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16066001#comment-16066001 ] Jingsong Lee commented on BEAM-2393: Now there is no relationship with Dynamic Work Rebalancing. The main question is how to checkpoint to {{BoundedSource}}. {{BoundedToUnboundedSourceAdapter}} gives a way: calling splitAtFraction or snapshot all the rest of the elements. [~iemejia] Can there be a simpler way for HBaseIOs to implement the splitAtFraction method? > BoundedSource is not fault-tolerant in FlinkRunner Streaming mode > - > > Key: BEAM-2393 > URL: https://issues.apache.org/jira/browse/BEAM-2393 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Jingsong Lee >Assignee: Jingsong Lee > > {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when > the failure to restart, it will send duplicate data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
[ https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-2393: -- Assignee: Jingsong Lee > BoundedSource is not fault-tolerant in FlinkRunner Streaming mode > - > > Key: BEAM-2393 > URL: https://issues.apache.org/jira/browse/BEAM-2393 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Jingsong Lee >Assignee: Jingsong Lee > > {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when > the failure to restart, it will send duplicate data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
[ https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065750#comment-16065750 ] Jingsong Lee commented on BEAM-2393: Now the {{UnboundedSourceWrapper}} has already supported the exit when the watermark exceeds TIMESTAMP_MAX_VALUE. So can we use {{BoundedToUnboundedSourceAdapter}}? bq. Checkpoints are created by calling {{BoundedReader#splitAtFraction}} on inner {{BoundedSource}}. bq. Sources that cannot be split are read entirely into memory, so this transform does not work well with large, unsplittable sources. But at least we can provide an accurate semantics. > BoundedSource is not fault-tolerant in FlinkRunner Streaming mode > - > > Key: BEAM-2393 > URL: https://issues.apache.org/jira/browse/BEAM-2393 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Jingsong Lee > > {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when > the failure to restart, it will send duplicate data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
[ https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16064090#comment-16064090 ] Jingsong Lee commented on BEAM-2140: On 1: If the decision whether the window expired is output watermark hold,(Uh. I always thought it was input watermark) it does not end, need to continue processing ProcessTimer. > Fix SplittableDoFn ValidatesRunner tests in FlinkRunner > --- > > Key: BEAM-2140 > URL: https://issues.apache.org/jira/browse/BEAM-2140 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > As discovered as part of BEAM-1763, there is a failing SDF test. We disabled > the tests to unblock the open PR for BEAM-1763. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2478) Distinct Aggregates
[ https://issues.apache.org/jira/browse/BEAM-2478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16062468#comment-16062468 ] Jingsong Lee commented on BEAM-2478: (y) You are right. Calcite native offers a lot of useful Rules, which is really exciting. > Distinct Aggregates > --- > > Key: BEAM-2478 > URL: https://issues.apache.org/jira/browse/BEAM-2478 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Jingsong Lee >Assignee: Tarush Grover > > eg: COUNT(DISTINCT empno) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (BEAM-2477) BeamAggregationRel should use Combine.perKey instead of GroupByKey
[ https://issues.apache.org/jira/browse/BEAM-2477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed BEAM-2477. -- > BeamAggregationRel should use Combine.perKey instead of GroupByKey > -- > > Key: BEAM-2477 > URL: https://issues.apache.org/jira/browse/BEAM-2477 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Jingsong Lee >Assignee: Jingsong Lee > Labels: dsl_sql_merge > Fix For: Not applicable > > > Their semantics are the same, but the efficiency of implementation is quite > different, and at the runner level there is a lot of optimization for > `Combine.perKey`. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (BEAM-2477) BeamAggregationRel should use Combine.perKey instead of GroupByKey
[ https://issues.apache.org/jira/browse/BEAM-2477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee resolved BEAM-2477. Resolution: Fixed Fix Version/s: Not applicable > BeamAggregationRel should use Combine.perKey instead of GroupByKey > -- > > Key: BEAM-2477 > URL: https://issues.apache.org/jira/browse/BEAM-2477 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Jingsong Lee >Assignee: Jingsong Lee > Labels: dsl_sql_merge > Fix For: Not applicable > > > Their semantics are the same, but the efficiency of implementation is quite > different, and at the runner level there is a lot of optimization for > `Combine.perKey`. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-2508) Fix javaDoc of Stateful DoFn
[ https://issues.apache.org/jira/browse/BEAM-2508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated BEAM-2508: --- Description: StateSpec> StateSpec (was: StateSpec > StateSpec Fix javaDoc of Stateful DoFn > > > Key: BEAM-2508 > URL: https://issues.apache.org/jira/browse/BEAM-2508 > Project: Beam > Issue Type: Bug > Components: beam-model >Reporter: Jingsong Lee >Assignee: Kenneth Knowles > Fix For: 2.1.0 > > > StateSpec > StateSpec -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-2508) Fix javaDoc of Stateful DoFn
[ https://issues.apache.org/jira/browse/BEAM-2508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated BEAM-2508: --- Fix Version/s: 2.1.0 > Fix javaDoc of Stateful DoFn > > > Key: BEAM-2508 > URL: https://issues.apache.org/jira/browse/BEAM-2508 > Project: Beam > Issue Type: Bug > Components: beam-model >Reporter: Jingsong Lee >Assignee: Kenneth Knowles > Fix For: 2.1.0 > > > StateSpec> StateSpec -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-2508) Fix javaDoc of Stateful DoFn
Jingsong Lee created BEAM-2508: -- Summary: Fix javaDoc of Stateful DoFn Key: BEAM-2508 URL: https://issues.apache.org/jira/browse/BEAM-2508 Project: Beam Issue Type: Bug Components: beam-model Reporter: Jingsong Lee Assignee: Kenneth Knowles StateSpec> StateSpec
[jira] [Commented] (BEAM-2478) Distinct Aggregates
[ https://issues.apache.org/jira/browse/BEAM-2478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060353#comment-16060353 ] Jingsong Lee commented on BEAM-2478: Count(Distinct) is a very interesting function. It needs operator to count with the details of distinct field. This state is very huge sometimes. There are three solutions as far as I know: 1.Count with all details of distinct field: I think we can use StatefulParDo with ValueState(Count) and SetState(For Distinct). 2.Approximation algorithm: cardinality(HyperLogLog) or bloomFilter or Bitmap. This can greatly reduce the amount of State data, but will lead to inaccurate. Apache Kylin use this. 3.Hierarchical calculation: select a, count(distinct b) from t group by a; -> select a, count(1) from (select a, count(1) group by a,b) t2 group by a; First operator distinct by b(also can do some local aggregate by a, will reduce the shuffle data) and second operator count by a. This can effectively reduce the state data, ease data skew. Apache Impala use this. > Distinct Aggregates > --- > > Key: BEAM-2478 > URL: https://issues.apache.org/jira/browse/BEAM-2478 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Jingsong Lee >Assignee: Tarush Grover > > eg: COUNT(DISTINCT empno) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-1612) Support real Bundle in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated BEAM-1612: --- Fix Version/s: 2.1.0 > Support real Bundle in Flink runner > --- > > Key: BEAM-1612 > URL: https://issues.apache.org/jira/browse/BEAM-1612 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Jingsong Lee >Assignee: Jingsong Lee > Fix For: 2.1.0 > > > The Bundle is very important in the beam model. Users can use the bundle to > flush buffer, can reuse many heavyweight resources in a bundle. Most IO > plugins use the bundle to flush. > Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, > such as first placed in JavaHeap, flush into RocksDbState when invoke > finishBundle , this can reduce the number of serialization. > But now FlinkRunner calls the finishBundle every processElement. We need > support real Bundle. > I think we can have the following implementations: > 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But > sometimes this "Bundle" maybe too big. This depends on the user's checkpoint > configuration. > 2.Manually control the size of the bundle. The half-bundle will be flushed to > a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not > need to wait, just call the startBundle and finishBundle at the right time. > [Proposal > document|https://docs.google.com/document/d/1UzELM4nFu8SIeu-QJkbs0sv7Uzd1Ux4aXXM3cw4s7po/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-2487) Give a option to ignore the timer that is larger than END_OF_GLOBAL_WINDOW
Jingsong Lee created BEAM-2487: -- Summary: Give a option to ignore the timer that is larger than END_OF_GLOBAL_WINDOW Key: BEAM-2487 URL: https://issues.apache.org/jira/browse/BEAM-2487 Project: Beam Issue Type: Improvement Components: runner-flink Reporter: Jingsong Lee Event time can not reach END_OF_GLOBAL_WINDOW in unbounded world. (Except for testing) But Flink runner will set some timers when user set a StatefulPardo/GBK with GlobalWindow. Flink maintains timers in PriorityQueue on the Java Heap. There is a bad performance when the number of keys is very much. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2486) Should throws some useful messages when statefulParDo use non-KV input
[ https://issues.apache.org/jira/browse/BEAM-2486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056877#comment-16056877 ] Jingsong Lee commented on BEAM-2486: [~kenn] Can we do some validates before runner? > Should throws some useful messages when statefulParDo use non-KV input > -- > > Key: BEAM-2486 > URL: https://issues.apache.org/jira/browse/BEAM-2486 > Project: Beam > Issue Type: Improvement > Components: runner-core, runner-flink >Reporter: Jingsong Lee > > Now Flink runner will throws a ClassCastException without detail messages > when a statefulParDo use non-KV input. It is not easy for users to find > errors and causes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-2486) Should throws some useful messages when statefulParDo use non-KV input
Jingsong Lee created BEAM-2486: -- Summary: Should throws some useful messages when statefulParDo use non-KV input Key: BEAM-2486 URL: https://issues.apache.org/jira/browse/BEAM-2486 Project: Beam Issue Type: Improvement Components: runner-core, runner-flink Reporter: Jingsong Lee Now Flink runner will throws a ClassCastException without detail messages when a statefulParDo use non-KV input. It is not easy for users to find errors and causes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2477) BeamAggregationRel should use Combine.perKey instead of GroupByKey
[ https://issues.apache.org/jira/browse/BEAM-2477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055416#comment-16055416 ] Jingsong Lee commented on BEAM-2477: *Local combine*: Cloud Dataflow/Flink Batch optimizes Combine operations (such as Count and Sum) by performing partial combining locally before sending the data to the main grouping operation. Graph optimizations in https://cloud.google.com/blog/big-data/2017/05/after-lambda-exactly-once-processing-in-cloud-dataflow-part-2-ensuring-low-latency *Incremental aggregation*: Similar to Flink's concept, https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#windowfunction-with-incremental-aggregation While the GroupByKey will keep the details of elements until the window closes. (AFAIK in Flink Runner) > BeamAggregationRel should use Combine.perKey instead of GroupByKey > -- > > Key: BEAM-2477 > URL: https://issues.apache.org/jira/browse/BEAM-2477 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Jingsong Lee >Assignee: Jingsong Lee > Labels: dsl_sql_merge > > Their semantics are the same, but the efficiency of implementation is quite > different, and at the runner level there is a lot of optimization for > `Combine.perKey`. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-2478) Distinct Aggregates
Jingsong Lee created BEAM-2478: -- Summary: Distinct Aggregates Key: BEAM-2478 URL: https://issues.apache.org/jira/browse/BEAM-2478 Project: Beam Issue Type: New Feature Components: dsl-sql Reporter: Jingsong Lee eg: COUNT(DISTINCT empno) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-2477) BeamAggregationRel should use Combine.perKey instead of GroupByKey
Jingsong Lee created BEAM-2477: -- Summary: BeamAggregationRel should use Combine.perKey instead of GroupByKey Key: BEAM-2477 URL: https://issues.apache.org/jira/browse/BEAM-2477 Project: Beam Issue Type: Improvement Components: dsl-sql Reporter: Jingsong Lee Assignee: Jingsong Lee Their semantics are the same, but the efficiency of implementation is quite different, and at the runner level there is a lot of optimization for `Combine.perKey`. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-1942) Add Watermark Metrics in Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-1942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated BEAM-1942: --- Summary: Add Watermark Metrics in Flink Runner (was: Add Source Watermark Metrics in Flink Runner) > Add Watermark Metrics in Flink Runner > - > > Key: BEAM-1942 > URL: https://issues.apache.org/jira/browse/BEAM-1942 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Jingsong Lee >Assignee: Jingsong Lee > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (BEAM-1945) Add Watermark Metrics in Apex runner
[ https://issues.apache.org/jira/browse/BEAM-1945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated BEAM-1945: --- Summary: Add Watermark Metrics in Apex runner (was: Add Source Watermark Metrics in Apex runner) > Add Watermark Metrics in Apex runner > > > Key: BEAM-1945 > URL: https://issues.apache.org/jira/browse/BEAM-1945 > Project: Beam > Issue Type: Sub-task > Components: runner-apex >Reporter: Jingsong Lee > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (BEAM-1943) Add Watermark Metrics in Dataflow runner
[ https://issues.apache.org/jira/browse/BEAM-1943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated BEAM-1943: --- Summary: Add Watermark Metrics in Dataflow runner (was: Add Source Watermark Metrics in Dataflow runner) > Add Watermark Metrics in Dataflow runner > > > Key: BEAM-1943 > URL: https://issues.apache.org/jira/browse/BEAM-1943 > Project: Beam > Issue Type: Sub-task > Components: runner-dataflow >Reporter: Jingsong Lee > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (BEAM-1944) Add Watermark Metrics in Spark runner
[ https://issues.apache.org/jira/browse/BEAM-1944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated BEAM-1944: --- Summary: Add Watermark Metrics in Spark runner (was: Add Source Watermark Metrics in Spark runner) > Add Watermark Metrics in Spark runner > - > > Key: BEAM-1944 > URL: https://issues.apache.org/jira/browse/BEAM-1944 > Project: Beam > Issue Type: Sub-task > Components: runner-spark >Reporter: Jingsong Lee >Assignee: Jingsong Lee > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (BEAM-1941) Add Watermark Metrics in Runners
[ https://issues.apache.org/jira/browse/BEAM-1941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated BEAM-1941: --- Summary: Add Watermark Metrics in Runners (was: Add Source Watermark Metrics in Runners) > Add Watermark Metrics in Runners > > > Key: BEAM-1941 > URL: https://issues.apache.org/jira/browse/BEAM-1941 > Project: Beam > Issue Type: New Feature > Components: runner-ideas >Reporter: Jingsong Lee >Assignee: Jingsong Lee > > The source watermark metrics show the consumer latency of Source. > It allows the user to know the health of the job, or it can be used to > monitor and alarm. > Since each runner is likely already tracking a watermark, another option here > is to just have the runner report it appropriately, rather than having the > source report it using metrics. This also addresses the fact that even if the > source has advanced to 8:00, the runner may still know about buffered > elements at 7:00, and so not advance the watermark all the way to 8:00. > [~bchambers] > Includes: > 1.Source watermark (`min` amongst all splits): >type = Gauge, namespace = io, name = source_watermark > 2.Source watermark per split: >type = Gauge, namespace = io.splits, name = .source_watermark -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-2423) Abstract StateInternalsTest for the different state internals/Runners
Jingsong Lee created BEAM-2423: -- Summary: Abstract StateInternalsTest for the different state internals/Runners Key: BEAM-2423 URL: https://issues.apache.org/jira/browse/BEAM-2423 Project: Beam Issue Type: Improvement Components: runner-core, runner-flink Reporter: Jingsong Lee Assignee: Jingsong Lee For the test of InMemoryStateInternals, ApexStateInternals, FlinkStateInternals, SparkStateInternals, etc.. Have a common base class for the state internals test that has an abstract method createStateInternals() and all the test methods. Then an actual implementation would just derive from that and only implement the method for creating the state internals. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1476) Support MapState in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16040044#comment-16040044 ] Jingsong Lee commented on BEAM-1476: solved in https://github.com/apache/beam/pull/3289 > Support MapState in Flink runner > > > Key: BEAM-1476 > URL: https://issues.apache.org/jira/browse/BEAM-1476 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Pei He > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1476) Support MapState in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1476: -- Assignee: Pei He (was: Jingsong Lee) > Support MapState in Flink runner > > > Key: BEAM-1476 > URL: https://issues.apache.org/jira/browse/BEAM-1476 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Pei He > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1476) Support MapState in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1476: -- Assignee: Jingsong Lee > Support MapState in Flink runner > > > Key: BEAM-1476 > URL: https://issues.apache.org/jira/browse/BEAM-1476 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Jingsong Lee > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1483) Support SetState in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1483: -- Assignee: Jingsong Lee > Support SetState in Flink runner > > > Key: BEAM-1483 > URL: https://issues.apache.org/jira/browse/BEAM-1483 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Jingsong Lee > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1498) Use Flink-native side outputs
[ https://issues.apache.org/jira/browse/BEAM-1498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1498: -- Assignee: Jingsong Lee > Use Flink-native side outputs > - > > Key: BEAM-1498 > URL: https://issues.apache.org/jira/browse/BEAM-1498 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Aljoscha Krettek >Assignee: Jingsong Lee > > Once Flink has support for side outputs we should use them instead of > manually dealing with the {{RawUnionValues}}. > Side outputs for Flink is being tracked in > https://issues.apache.org/jira/browse/FLINK-4460. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-2401) Update Flink Runner to Flink 1.3.0
Jingsong Lee created BEAM-2401: -- Summary: Update Flink Runner to Flink 1.3.0 Key: BEAM-2401 URL: https://issues.apache.org/jira/browse/BEAM-2401 Project: Beam Issue Type: Improvement Components: runner-flink Reporter: Jingsong Lee Assignee: Jingsong Lee http://flink.apache.org/news/2017/06/01/release-1.3.0.html There are a lot of exciting improvements. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
Jingsong Lee created BEAM-2393: -- Summary: BoundedSource is not fault-tolerant in FlinkRunner Streaming mode Key: BEAM-2393 URL: https://issues.apache.org/jira/browse/BEAM-2393 Project: Beam Issue Type: Bug Components: runner-flink Reporter: Jingsong Lee {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when the failure to restart, it will send duplicate data. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (BEAM-2248) KafkaIO support to use start read time to set start offset
[ https://issues.apache.org/jira/browse/BEAM-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed BEAM-2248. -- Resolution: Fixed Fix Version/s: 2.1.0 > KafkaIO support to use start read time to set start offset > -- > > Key: BEAM-2248 > URL: https://issues.apache.org/jira/browse/BEAM-2248 > Project: Beam > Issue Type: New Feature > Components: sdk-java-extensions >Reporter: Jingsong Lee >Assignee: Jingsong Lee > Fix For: 2.1.0 > > > This Kafka 0.10.x adds support for a searchable index for each topic based > off of message timestamps. It enables consumer support for offset lookup by > timestamp. > So we can add a start read time to set start offset. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-2248) KafkaIO support to use start read time to set start offset
Jingsong Lee created BEAM-2248: -- Summary: KafkaIO support to use start read time to set start offset Key: BEAM-2248 URL: https://issues.apache.org/jira/browse/BEAM-2248 Project: Beam Issue Type: New Feature Components: sdk-java-extensions Reporter: Jingsong Lee Assignee: Jingsong Lee This Kafka 0.10.x adds support for a searchable index for each topic based off of message timestamps. It enables consumer support for offset lookup by timestamp. So we can add a start read time to set start offset. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
[ https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003002#comment-16003002 ] Jingsong Lee commented on BEAM-2140: 1. {{SplittableParDo}} will clear its State, in {{SplittableParDo.ProcessFn.processElement()}} {code} if (result.getResidualRestriction() == null) { // All work for this element/restriction is completed. Clear state and release hold. elementState.clear(); restrictionState.clear(); holdState.clear(); return; } {code} I got values up to 12344 in intellij debug mode while got values up to 34567 in intellij run mode.(No matter what outputWatermark of {{BoundedSourceWrapper}}) So I think the processing of SDF has nothing to do with inputWatermark, and ProcessTimeService only related. This question should be caused by the second and third points. > Fix SplittableDoFn ValidatesRunner tests in FlinkRunner > --- > > Key: BEAM-2140 > URL: https://issues.apache.org/jira/browse/BEAM-2140 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > As discovered as part of BEAM-1763, there is a failing SDF test. We disabled > the tests to unblock the open PR for BEAM-1763. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
[ https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16002039#comment-16002039 ] Jingsong Lee commented on BEAM-2140: First, {{SplittableParDo}} should not wrap {{StatefulDoFnRunner}}. Second, {{SplittableParDo}} use {{PROCESSING_TIME}} to continue processing. And it also sets watermark holds which will affect the sending of the output watermark. (see {{DoFnOperator.processWatermark1()}}). When {{BoundedSourceWrapper}} is over, it will emit a Long.MAX_VALUE watermark, but the {{SplittableParDo}} may be not over yet. (depends on system time) So no one can send watermark to the downstream. Last, {{StreamTask}} will shutdown when there are no inputs and invoke {{timerService.quiesceAndAwaitPending}}. (see {{StreamTask.invoke()}} in Flink) It will shutdown TimeService and invoke all task in TimeService and reject the new registration. So it will break the continue processing of {{SplittableParDo}}. [~aljoscha] Is that right? Please correct me if I wrong. > Fix SplittableDoFn ValidatesRunner tests in FlinkRunner > --- > > Key: BEAM-2140 > URL: https://issues.apache.org/jira/browse/BEAM-2140 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > As discovered as part of BEAM-1763, there is a failing SDF test. We disabled > the tests to unblock the open PR for BEAM-1763. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1612) Support real Bundle in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996603#comment-15996603 ] Jingsong Lee commented on BEAM-1612: Agree with [~aljoscha]. About the course grained, A little idea is that we can use {{processWatermark}} to end a bundle. The bundle is finished with firing of event timer. About Flink operator cannot output data in snapshot. Can Flink provide a {{beforeSnapshot()}} callback before invoke {{broadcastCheckpointBarrier()}} in {{StreamTask.performCheckpoint()}} ? I think users who use Flink API also have such a need. > Support real Bundle in Flink runner > --- > > Key: BEAM-1612 > URL: https://issues.apache.org/jira/browse/BEAM-1612 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Jingsong Lee > > The Bundle is very important in the beam model. Users can use the bundle to > flush buffer, can reuse many heavyweight resources in a bundle. Most IO > plugins use the bundle to flush. > Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, > such as first placed in JavaHeap, flush into RocksDbState when invoke > finishBundle , this can reduce the number of serialization. > But now FlinkRunner calls the finishBundle every processElement. We need > support real Bundle. > I think we can have the following implementations: > 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But > sometimes this "Bundle" maybe too big. This depends on the user's checkpoint > configuration. > 2.Manually control the size of the bundle. The half-bundle will be flushed to > a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not > need to wait, just call the startBundle and finishBundle at the right time. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1588) Reuse StateNamespace.stringKey in Flink States
[ https://issues.apache.org/jira/browse/BEAM-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1588: -- Assignee: (was: Jingsong Lee) > Reuse StateNamespace.stringKey in Flink States > -- > > Key: BEAM-1588 > URL: https://issues.apache.org/jira/browse/BEAM-1588 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Jingsong Lee > > See BEAM-1587 > StateNamespace.stringKey did two things: the base64 encoding of window , and > then String.format. These two things consumption is not small. We can cache > it in State and reuse. > Further more, we can cache the state. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1587) Use StringBuilder to stringKey of StateNamespace instead of String.format
[ https://issues.apache.org/jira/browse/BEAM-1587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1587: -- Assignee: (was: Jingsong Lee) > Use StringBuilder to stringKey of StateNamespace instead of String.format > - > > Key: BEAM-1587 > URL: https://issues.apache.org/jira/browse/BEAM-1587 > Project: Beam > Issue Type: Improvement > Components: runner-core >Reporter: Jingsong Lee > > In Flink Runner, each State visit will call the namespace stringKey once. > Since stringKey uses String.format to deal with, the impact on performance is > relatively large. > Some extreme cases, stringKey performance consumption of up to 2%. > Here is a test on StringBuilder and String.format: > {code} > public static void main(String[] args) throws Exception { > String[] strs = new String[1000_000]; > for (int i = 0; i < strs.length; i++) { > strs[i] = getRandomString(10); > } > { > long start = System.nanoTime(); > for (int i = 0; i < strs.length; i++) { > strs[i] = testFormat(strs[i]); > } > System.out.println("testStringFormat: " + ((System.nanoTime() - > start)/1000_000) + "ms"); > } > { > long start = System.nanoTime(); > for (int i = 0; i < strs.length; i++) { > strs[i] = testStringBuild(strs[i]); > } > System.out.println("testStringBuilder: " + ((System.nanoTime() - > start)/1000_000) + "ms"); > } > } > {code} > testStringFormat: 2312ms > testStringBuilder: 266ms -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1483) Support SetState in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1483: -- Assignee: (was: Jingsong Lee) > Support SetState in Flink runner > > > Key: BEAM-1483 > URL: https://issues.apache.org/jira/browse/BEAM-1483 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1476) Support MapState in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1476: -- Assignee: (was: Jingsong Lee) > Support MapState in Flink runner > > > Key: BEAM-1476 > URL: https://issues.apache.org/jira/browse/BEAM-1476 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1498) Use Flink-native side outputs
[ https://issues.apache.org/jira/browse/BEAM-1498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1498: -- Assignee: (was: Jingsong Lee) > Use Flink-native side outputs > - > > Key: BEAM-1498 > URL: https://issues.apache.org/jira/browse/BEAM-1498 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Aljoscha Krettek > > Once Flink has support for side outputs we should use them instead of > manually dealing with the {{RawUnionValues}}. > Side outputs for Flink is being tracked in > https://issues.apache.org/jira/browse/FLINK-4460. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (BEAM-1641) Support synchronized processing time in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated BEAM-1641: --- Fix Version/s: (was: First stable release) > Support synchronized processing time in Flink runner > > > Key: BEAM-1641 > URL: https://issues.apache.org/jira/browse/BEAM-1641 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Aljoscha Krettek >Priority: Blocker > > The "continuation trigger" for a processing time trigger is a synchronized > processing time trigger. Today, this throws an exception in the FlinkRunner. > The supports the following: > - GBK1 > - GBK2 > When GBK1 fires due to processing time past the first element in the pane and > that element arrives at GBK2, it will wait until all the other upstream keys > have also processed and emitted corresponding data. > Sorry for the terseness of explanation - writing quickly so I don't forget. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1641) Support synchronized processing time in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15992110#comment-15992110 ] Jingsong Lee commented on BEAM-1641: There are some differences between the processing of event time and synchronised processing time in {{DirectRunner}}. The Source just emit the {{BoundedWindow.TIMESTAMP_MAX_VALUE}} as the synchronizedProcessingTime, and the downStream use {{min(clock.now(), synchronizedProcessingInputWatermark.get())}} to generate synchronizedProcessingTime. But I think from the fundamental point of view, ingestion time and synchronized processing time have produced almost the same effect. So I think we can use ingestion time and let Flink track ingestion and event time at the same time. > Support synchronized processing time in Flink runner > > > Key: BEAM-1641 > URL: https://issues.apache.org/jira/browse/BEAM-1641 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: First stable release > > > The "continuation trigger" for a processing time trigger is a synchronized > processing time trigger. Today, this throws an exception in the FlinkRunner. > The supports the following: > - GBK1 > - GBK2 > When GBK1 fires due to processing time past the first element in the pane and > that element arrives at GBK2, it will wait until all the other upstream keys > have also processed and emitted corresponding data. > Sorry for the terseness of explanation - writing quickly so I don't forget. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1641) Support synchronized processing time in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988363#comment-15988363 ] Jingsong Lee commented on BEAM-1641: I think it is difficult to achieve, it requires Flink-runtime {{StatusWatermarkValve}} also manage synchronized processing time.(maybe in Flink 1.4 or latter) It's a tough one indeed. I have no idea about using some special punctuation(s) to trigger too. I think it can be deferred. What do you think? [~aljoscha] > Support synchronized processing time in Flink runner > > > Key: BEAM-1641 > URL: https://issues.apache.org/jira/browse/BEAM-1641 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: First stable release > > > The "continuation trigger" for a processing time trigger is a synchronized > processing time trigger. Today, this throws an exception in the FlinkRunner. > The supports the following: > - GBK1 > - GBK2 > When GBK1 fires due to processing time past the first element in the pane and > that element arrives at GBK2, it will wait until all the other upstream keys > have also processed and emitted corresponding data. > Sorry for the terseness of explanation - writing quickly so I don't forget. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1944) Add Source Watermark Metrics in Spark runner
[ https://issues.apache.org/jira/browse/BEAM-1944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1944: -- Assignee: Jingsong Lee > Add Source Watermark Metrics in Spark runner > > > Key: BEAM-1944 > URL: https://issues.apache.org/jira/browse/BEAM-1944 > Project: Beam > Issue Type: Sub-task > Components: runner-spark >Reporter: Jingsong Lee >Assignee: Jingsong Lee > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1942) Add Source Watermark Metrics in Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-1942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1942: -- Assignee: Jingsong Lee > Add Source Watermark Metrics in Flink Runner > > > Key: BEAM-1942 > URL: https://issues.apache.org/jira/browse/BEAM-1942 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Jingsong Lee >Assignee: Jingsong Lee > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1941) Add Source Watermark Metrics in Runners
[ https://issues.apache.org/jira/browse/BEAM-1941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1941: -- Assignee: Jingsong Lee > Add Source Watermark Metrics in Runners > --- > > Key: BEAM-1941 > URL: https://issues.apache.org/jira/browse/BEAM-1941 > Project: Beam > Issue Type: New Feature > Components: runner-ideas >Reporter: Jingsong Lee >Assignee: Jingsong Lee > > The source watermark metrics show the consumer latency of Source. > It allows the user to know the health of the job, or it can be used to > monitor and alarm. > Since each runner is likely already tracking a watermark, another option here > is to just have the runner report it appropriately, rather than having the > source report it using metrics. This also addresses the fact that even if the > source has advanced to 8:00, the runner may still know about buffered > elements at 7:00, and so not advance the watermark all the way to 8:00. > [~bchambers] > Includes: > 1.Source watermark (`min` amongst all splits): >type = Gauge, namespace = io, name = source_watermark > 2.Source watermark per split: >type = Gauge, namespace = io.splits, name = .source_watermark -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1886) Remove TextIO override in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1886: -- Assignee: Jingsong Lee > Remove TextIO override in Flink runner > -- > > Key: BEAM-1886 > URL: https://issues.apache.org/jira/browse/BEAM-1886 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Jingsong Lee > Fix For: First stable release > > > Today, the Flink runner replaces TextIO with a customized version. I believe > this is related to adequate support for files HDFS. > However, the capabilities are less, in particular the recent support for > window-and-pane sharded writes of unbounded collections. > Concretely, we have had to remove WindowedWordCountIT from the precommit > Jenkins run. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1944) Add Source Watermark Metrics in Spark runner
Jingsong Lee created BEAM-1944: -- Summary: Add Source Watermark Metrics in Spark runner Key: BEAM-1944 URL: https://issues.apache.org/jira/browse/BEAM-1944 Project: Beam Issue Type: Sub-task Components: runner-spark Reporter: Jingsong Lee -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1945) Add Source Watermark Metrics in Apex runner
Jingsong Lee created BEAM-1945: -- Summary: Add Source Watermark Metrics in Apex runner Key: BEAM-1945 URL: https://issues.apache.org/jira/browse/BEAM-1945 Project: Beam Issue Type: Sub-task Components: runner-apex Reporter: Jingsong Lee -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1943) Add Source Watermark Metrics in Dataflow runner
Jingsong Lee created BEAM-1943: -- Summary: Add Source Watermark Metrics in Dataflow runner Key: BEAM-1943 URL: https://issues.apache.org/jira/browse/BEAM-1943 Project: Beam Issue Type: Sub-task Components: runner-dataflow Reporter: Jingsong Lee -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1942) Add Source Watermark Metrics in Flink Runner
Jingsong Lee created BEAM-1942: -- Summary: Add Source Watermark Metrics in Flink Runner Key: BEAM-1942 URL: https://issues.apache.org/jira/browse/BEAM-1942 Project: Beam Issue Type: Sub-task Components: runner-flink Reporter: Jingsong Lee -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1941) Add Source Watermark Metrics in Runners
Jingsong Lee created BEAM-1941: -- Summary: Add Source Watermark Metrics in Runners Key: BEAM-1941 URL: https://issues.apache.org/jira/browse/BEAM-1941 Project: Beam Issue Type: New Feature Components: runner-ideas Reporter: Jingsong Lee The source watermark metrics show the consumer latency of Source. It allows the user to know the health of the job, or it can be used to monitor and alarm. Since each runner is likely already tracking a watermark, another option here is to just have the runner report it appropriately, rather than having the source report it using metrics. This also addresses the fact that even if the source has advanced to 8:00, the runner may still know about buffered elements at 7:00, and so not advance the watermark all the way to 8:00. [~bchambers] Includes: 1.Source watermark (`min` amongst all splits): type = Gauge, namespace = io, name = source_watermark 2.Source watermark per split: type = Gauge, namespace = io.splits, name = .source_watermark -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1723) FlinkRunner should deduplicate when an UnboundedSource requires Deduping
[ https://issues.apache.org/jira/browse/BEAM-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962125#comment-15962125 ] Jingsong Lee commented on BEAM-1723: I think it is necessary to be configurable because the deduplication window is related to the checkpoint interval. > FlinkRunner should deduplicate when an UnboundedSource requires Deduping > > > Key: BEAM-1723 > URL: https://issues.apache.org/jira/browse/BEAM-1723 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Thomas Groh >Assignee: Jingsong Lee > > UnboundedSource implementations can require deduping, and the FlinkRunner > currently logs a warning that this is not supported. > https://github.com/apache/beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L139 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1723) FlinkRunner should deduplicate when an UnboundedSource requires Deduping
[ https://issues.apache.org/jira/browse/BEAM-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962077#comment-15962077 ] Jingsong Lee commented on BEAM-1723: I understand. The reason for the duplication is that {{PubSubIO}} use Pull-Ack model, {{acknowledge()}} in {{finalizeCheckpoint()}} may be fail, while Kafka use offset to restore. > FlinkRunner should deduplicate when an UnboundedSource requires Deduping > > > Key: BEAM-1723 > URL: https://issues.apache.org/jira/browse/BEAM-1723 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Thomas Groh >Assignee: Jingsong Lee > > UnboundedSource implementations can require deduping, and the FlinkRunner > currently logs a warning that this is not supported. > https://github.com/apache/beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L139 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1723) FlinkRunner should deduplicate when an UnboundedSource requires Deduping
[ https://issues.apache.org/jira/browse/BEAM-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1723: -- Assignee: Jingsong Lee > FlinkRunner should deduplicate when an UnboundedSource requires Deduping > > > Key: BEAM-1723 > URL: https://issues.apache.org/jira/browse/BEAM-1723 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Thomas Groh >Assignee: Jingsong Lee > > UnboundedSource implementations can require deduping, and the FlinkRunner > currently logs a warning that this is not supported. > https://github.com/apache/beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L139 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1723) FlinkRunner should deduplicate when an UnboundedSource requires Deduping
[ https://issues.apache.org/jira/browse/BEAM-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959226#comment-15959226 ] Jingsong Lee commented on BEAM-1723: I see {{CachedIdDeduplicator}} in direct runner. It use {{LoadingCache}} to dedup. The expireAfterAccess is 10 minutes and the maximumSize is 100_000. Do these two values need to be parameterized? Do these caches need be snapshotted in flink runner? (Fault tolerance) > FlinkRunner should deduplicate when an UnboundedSource requires Deduping > > > Key: BEAM-1723 > URL: https://issues.apache.org/jira/browse/BEAM-1723 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Thomas Groh > > UnboundedSource implementations can require deduping, and the FlinkRunner > currently logs a warning that this is not supported. > https://github.com/apache/beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L139 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1612) Support real Bundle in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955360#comment-15955360 ] Jingsong Lee commented on BEAM-1612: I think this is a good idea. No matter how small it is better than one bundle by one record. Flink default buffer size is 32768 bytes and default BufferTimeout is 100ms. ([~aljoscha] Please point out if there is something wrong) According to my production experience, I think 2M is a more reasonable bundleSize. I have not changed the BufferSize in Flink. [~aljoscha] Can Flink work well with 2M BufferSize? > Support real Bundle in Flink runner > --- > > Key: BEAM-1612 > URL: https://issues.apache.org/jira/browse/BEAM-1612 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Jingsong Lee >Assignee: Jingsong Lee > > The Bundle is very important in the beam model. Users can use the bundle to > flush buffer, can reuse many heavyweight resources in a bundle. Most IO > plugins use the bundle to flush. > Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, > such as first placed in JavaHeap, flush into RocksDbState when invoke > finishBundle , this can reduce the number of serialization. > But now FlinkRunner calls the finishBundle every processElement. We need > support real Bundle. > I think we can have the following implementations: > 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But > sometimes this "Bundle" maybe too big. This depends on the user's checkpoint > configuration. > 2.Manually control the size of the bundle. The half-bundle will be flushed to > a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not > need to wait, just call the startBundle and finishBundle at the right time. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1862) SplittableDoFnOperator should close the ScheduledExecutorService
Jingsong Lee created BEAM-1862: -- Summary: SplittableDoFnOperator should close the ScheduledExecutorService Key: BEAM-1862 URL: https://issues.apache.org/jira/browse/BEAM-1862 Project: Beam Issue Type: Bug Components: runner-flink Reporter: Jingsong Lee Assignee: Jingsong Lee {{SplittableDoFnOperator}} new a {{ScheduledExecutorService}} to {{OutputAndTimeBoundedSplittableProcessElementInvoker}}, but not shutdown it. We should shutdown it in {{close()}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1772) Support merging WindowFn other than IntervalWindow on Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-1772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942408#comment-15942408 ] Jingsong Lee commented on BEAM-1772: Consider the performance, I retained the combining with sorted data, and extended a combining based on HashMap State for Non-IntervalWindow merging. > Support merging WindowFn other than IntervalWindow on Flink Runner > -- > > Key: BEAM-1772 > URL: https://issues.apache.org/jira/browse/BEAM-1772 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ismaël Mejía >Assignee: Jingsong Lee > > Flink currently supports merging IntervalWindows, however if you have a > WindowFn who extends IntervalWindow the execution breaks. > I found this while executing a Pipeline in Flink's batch mode. > This will involve probably changing the window merging logic in > `FlinkMergingReduceFunction.mergeWindows()` and other similar parts to really > use the merging logic of the `WindowFn`. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-773) Implement Metrics support for Flink runner
[ https://issues.apache.org/jira/browse/BEAM-773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941542#comment-15941542 ] Jingsong Lee commented on BEAM-773: --- Best wishes for your vacation! Do you think Metrics is necessary to be fault-tolerant? > Implement Metrics support for Flink runner > -- > > Key: BEAM-773 > URL: https://issues.apache.org/jira/browse/BEAM-773 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ben Chambers >Assignee: Jingsong Lee > Fix For: First stable release > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1772) Support merging WindowFn other than IntervalWindow on Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-1772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938119#comment-15938119 ] Jingsong Lee commented on BEAM-1772: The problem is that {{FlinkMergingPartialReduceFunction}}/{{FlinkMergingReduceFunction}}/{{FlinkMergingNonShuffleReduceFunction}} merge window with SortedList to produce new WindowedValue of merged window. This behavior is different from {{WindowFn.mergeWindows (MergeContext)}}, Now we cast BoundedWindow to IntervalWindow. What the {{ReduceFnRunner}} would do is implemented here but without any regard for triggers. I think we can use {{GroupAlsoByWindowViaOutputBufferDoFn}} instead. [~aljoscha] What do you think? > Support merging WindowFn other than IntervalWindow on Flink Runner > -- > > Key: BEAM-1772 > URL: https://issues.apache.org/jira/browse/BEAM-1772 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ismaël Mejía >Assignee: Aljoscha Krettek > > Flink currently supports merging IntervalWindows, however if you have a > WindowFn who extends IntervalWindow the execution breaks. > I found this while executing a Pipeline in Flink's batch mode. > This will involve probably changing the window merging logic in > `FlinkMergingReduceFunction.mergeWindows()` and other similar parts to really > use the merging logic of the `WindowFn`. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (BEAM-1727) Add setForNowAlign(period, offset) to Timer
[ https://issues.apache.org/jira/browse/BEAM-1727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated BEAM-1727: --- Fix Version/s: First stable release > Add setForNowAlign(period, offset) to Timer > --- > > Key: BEAM-1727 > URL: https://issues.apache.org/jira/browse/BEAM-1727 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Jingsong Lee >Assignee: Jingsong Lee > Fix For: First stable release > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1727) Add setForNowAlign(period, offset) to Timer
[ https://issues.apache.org/jira/browse/BEAM-1727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1727: -- Assignee: Jingsong Lee > Add setForNowAlign(period, offset) to Timer > --- > > Key: BEAM-1727 > URL: https://issues.apache.org/jira/browse/BEAM-1727 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Jingsong Lee >Assignee: Jingsong Lee > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1612) Support real Bundle in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931210#comment-15931210 ] Jingsong Lee commented on BEAM-1612: [~aljoscha] Yes, this will greatly improve performance in some cases. > Support real Bundle in Flink runner > --- > > Key: BEAM-1612 > URL: https://issues.apache.org/jira/browse/BEAM-1612 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Jingsong Lee >Assignee: Jingsong Lee > > The Bundle is very important in the beam model. Users can use the bundle to > flush buffer, can reuse many heavyweight resources in a bundle. Most IO > plugins use the bundle to flush. > Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, > such as first placed in JavaHeap, flush into RocksDbState when invoke > finishBundle , this can reduce the number of serialization. > But now FlinkRunner calls the finishBundle every processElement. We need > support real Bundle. > I think we can have the following implementations: > 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But > sometimes this "Bundle" maybe too big. This depends on the user's checkpoint > configuration. > 2.Manually control the size of the bundle. The half-bundle will be flushed to > a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not > need to wait, just call the startBundle and finishBundle at the right time. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1727) Add setForNowAlign(period, offset) to Timer
[ https://issues.apache.org/jira/browse/BEAM-1727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929606#comment-15929606 ] Jingsong Lee commented on BEAM-1727: Can we return true or false in verifyTargetTime to let the user know if the setting is successful instead of throwing an exception? > Add setForNowAlign(period, offset) to Timer > --- > > Key: BEAM-1727 > URL: https://issues.apache.org/jira/browse/BEAM-1727 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Jingsong Lee > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1727) Add setForNowAlign(period, offset) to Timer
[ https://issues.apache.org/jira/browse/BEAM-1727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925942#comment-15925942 ] Jingsong Lee commented on BEAM-1727: What do you think? [~kenn] > Add setForNowAlign(period, offset) to Timer > --- > > Key: BEAM-1727 > URL: https://issues.apache.org/jira/browse/BEAM-1727 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Jingsong Lee > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1727) Add setForNowAlign(period, offset) to Timer
Jingsong Lee created BEAM-1727: -- Summary: Add setForNowAlign(period, offset) to Timer Key: BEAM-1727 URL: https://issues.apache.org/jira/browse/BEAM-1727 Project: Beam Issue Type: Improvement Components: beam-model Reporter: Jingsong Lee -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1612) Support real Bundle in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15894699#comment-15894699 ] Jingsong Lee commented on BEAM-1612: https://issues.apache.org/jira/browse/FLINK-2846 Does this issue let us cannot emit data while snapshotting? We will lose some emitted data when job restarted. But we must invoke the finishBundle when snapshotting, otherwise we will lose some buffer data which not be flushed. I think we can make a fake collector in OutputManager when snapshotting. And then save the data to {{FlinkSplitStateInternals}} or {{FlinkKeyGroupStateInternals}}, the next processElement then send them out, so that will not lose the data. This may be a bit complicated, but it may work. Just like Flink's AsyncFuntion, it stores the input data while we store the output data. > Support real Bundle in Flink runner > --- > > Key: BEAM-1612 > URL: https://issues.apache.org/jira/browse/BEAM-1612 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Jingsong Lee >Assignee: Jingsong Lee > > The Bundle is very important in the beam model. Users can use the bundle to > flush buffer, can reuse many heavyweight resources in a bundle. Most IO > plugins use the bundle to flush. > Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, > such as first placed in JavaHeap, flush into RocksDbState when invoke > finishBundle , this can reduce the number of serialization. > But now FlinkRunner calls the finishBundle every processElement. We need > support real Bundle. > I think we can have the following implementations: > 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But > sometimes this "Bundle" maybe too big. This depends on the user's checkpoint > configuration. > 2.Manually control the size of the bundle. The half-bundle will be flushed to > a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not > need to wait, just call the startBundle and finishBundle at the right time. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1612) Support real Bundle in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15893827#comment-15893827 ] Jingsong Lee commented on BEAM-1612: In a bundle, we can reuse ReduceFnRunner simply too. We do not have to worry about the OOM problem caused by caching. > Support real Bundle in Flink runner > --- > > Key: BEAM-1612 > URL: https://issues.apache.org/jira/browse/BEAM-1612 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Jingsong Lee >Assignee: Jingsong Lee > > The Bundle is very important in the beam model. Users can use the bundle to > flush buffer, can reuse many heavyweight resources in a bundle. Most IO > plugins use the bundle to flush. > Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, > such as first placed in JavaHeap, flush into RocksDbState when invoke > finishBundle , this can reduce the number of serialization. > But now FlinkRunner calls the finishBundle every processElement. We need > support real Bundle. > I think we can have the following implementations: > 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But > sometimes this "Bundle" maybe too big. This depends on the user's checkpoint > configuration. > 2.Manually control the size of the bundle. The half-bundle will be flushed to > a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not > need to wait, just call the startBundle and finishBundle at the right time. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1612) Support real Bundle in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15893814#comment-15893814 ] Jingsong Lee commented on BEAM-1612: What do you think [~aljoscha] ? > Support real Bundle in Flink runner > --- > > Key: BEAM-1612 > URL: https://issues.apache.org/jira/browse/BEAM-1612 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Jingsong Lee >Assignee: Jingsong Lee > > The Bundle is very important in the beam model. Users can use the bundle to > flush buffer, can reuse many heavyweight resources in a bundle. Most IO > plugins use the bundle to flush. > Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, > such as first placed in JavaHeap, flush into RocksDbState when invoke > finishBundle , this can reduce the number of serialization. > But now FlinkRunner calls the finishBundle every processElement. We need > support real Bundle. > I think we can have the following implementations: > 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But > sometimes this "Bundle" maybe too big. This depends on the user's checkpoint > configuration. > 2.Manually control the size of the bundle. The half-bundle will be flushed to > a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not > need to wait, just call the startBundle and finishBundle at the right time. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1612) Support real Bundle in Flink runner
Jingsong Lee created BEAM-1612: -- Summary: Support real Bundle in Flink runner Key: BEAM-1612 URL: https://issues.apache.org/jira/browse/BEAM-1612 Project: Beam Issue Type: Improvement Components: runner-flink Reporter: Jingsong Lee Assignee: Jingsong Lee The Bundle is very important in the beam model. Users can use the bundle to flush buffer, can reuse many heavyweight resources in a bundle. Most IO plugins use the bundle to flush. Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, such as first placed in JavaHeap, flush into RocksDbState when invoke finishBundle , this can reduce the number of serialization. But now FlinkRunner calls the finishBundle every processElement. We need support real Bundle. I think we can have the following implementations: 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But sometimes this "Bundle" maybe too big. This depends on the user's checkpoint configuration. 2.Manually control the size of the bundle. The half-bundle will be flushed to a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not need to wait, just call the startBundle and finishBundle at the right time. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1476) Support MapState in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1476: -- Assignee: Jingsong Lee > Support MapState in Flink runner > > > Key: BEAM-1476 > URL: https://issues.apache.org/jira/browse/BEAM-1476 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Jingsong Lee > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1483) Support SetState in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1483: -- Assignee: Jingsong Lee > Support SetState in Flink runner > > > Key: BEAM-1483 > URL: https://issues.apache.org/jira/browse/BEAM-1483 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Jingsong Lee > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1588) Reuse StateNamespace.stringKey in Flink States
Jingsong Lee created BEAM-1588: -- Summary: Reuse StateNamespace.stringKey in Flink States Key: BEAM-1588 URL: https://issues.apache.org/jira/browse/BEAM-1588 Project: Beam Issue Type: Improvement Components: runner-flink Reporter: Jingsong Lee Assignee: Jingsong Lee See BEAM-1587 StateNamespace.stringKey did two things: the base64 encoding of window , and then String.format. These two things consumption is not small. We can cache it in State and reuse. Further more, we can cache the state. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1587) Use StringBuilder to stringKey of StateNamespace instead of String.format
[ https://issues.apache.org/jira/browse/BEAM-1587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1587: -- Assignee: Jingsong Lee (was: Kenneth Knowles) > Use StringBuilder to stringKey of StateNamespace instead of String.format > - > > Key: BEAM-1587 > URL: https://issues.apache.org/jira/browse/BEAM-1587 > Project: Beam > Issue Type: Improvement > Components: runner-core >Reporter: Jingsong Lee >Assignee: Jingsong Lee > > In Flink Runner, each State visit will call the namespace stringKey once. > Since stringKey uses String.format to deal with, the impact on performance is > relatively large. > Some extreme cases, stringKey performance consumption of up to 2%. > Here is a test on StringBuilder and String.format: > {code} > public static void main(String[] args) throws Exception { > String[] strs = new String[1000_000]; > for (int i = 0; i < strs.length; i++) { > strs[i] = getRandomString(10); > } > { > long start = System.nanoTime(); > for (int i = 0; i < strs.length; i++) { > strs[i] = testFormat(strs[i]); > } > System.out.println("testStringFormat: " + ((System.nanoTime() - > start)/1000_000) + "ms"); > } > { > long start = System.nanoTime(); > for (int i = 0; i < strs.length; i++) { > strs[i] = testStringBuild(strs[i]); > } > System.out.println("testStringBuilder: " + ((System.nanoTime() - > start)/1000_000) + "ms"); > } > } > {code} > testStringFormat: 2312ms > testStringBuilder: 266ms -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (BEAM-1587) Use StringBuilder to stringKey of StateNamespace instead of String.format
[ https://issues.apache.org/jira/browse/BEAM-1587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated BEAM-1587: --- Description: In Flink Runner, each State visit will call the namespace stringKey once. Since stringKey uses String.format to deal with, the impact on performance is relatively large. Some extreme cases, stringKey performance consumption of up to 2%. Here is a test on StringBuilder and String.format: {code} public static void main(String[] args) throws Exception { String[] strs = new String[1000_000]; for (int i = 0; i < strs.length; i++) { strs[i] = getRandomString(10); } { long start = System.nanoTime(); for (int i = 0; i < strs.length; i++) { strs[i] = testFormat(strs[i]); } System.out.println("testStringFormat: " + ((System.nanoTime() - start)/1000_000) + "ms"); } { long start = System.nanoTime(); for (int i = 0; i < strs.length; i++) { strs[i] = testStringBuild(strs[i]); } System.out.println("testStringBuilder: " + ((System.nanoTime() - start)/1000_000) + "ms"); } } {code} testStringFormat: 2312ms testStringBuilder: 266ms > Use StringBuilder to stringKey of StateNamespace instead of String.format > - > > Key: BEAM-1587 > URL: https://issues.apache.org/jira/browse/BEAM-1587 > Project: Beam > Issue Type: Improvement > Components: runner-core >Reporter: Jingsong Lee >Assignee: Kenneth Knowles > > In Flink Runner, each State visit will call the namespace stringKey once. > Since stringKey uses String.format to deal with, the impact on performance is > relatively large. > Some extreme cases, stringKey performance consumption of up to 2%. > Here is a test on StringBuilder and String.format: > {code} > public static void main(String[] args) throws Exception { > String[] strs = new String[1000_000]; > for (int i = 0; i < strs.length; i++) { > strs[i] = getRandomString(10); > } > { > long start = System.nanoTime(); > for (int i = 0; i < strs.length; i++) { > strs[i] = testFormat(strs[i]); > } > System.out.println("testStringFormat: " + ((System.nanoTime() - > start)/1000_000) + "ms"); > } > { > long start = System.nanoTime(); > for (int i = 0; i < strs.length; i++) { > strs[i] = testStringBuild(strs[i]); > } > System.out.println("testStringBuilder: " + ((System.nanoTime() - > start)/1000_000) + "ms"); > } > } > {code} > testStringFormat: 2312ms > testStringBuilder: 266ms -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1587) Use StringBuilder to stringKey of StateNamespace instead of String.format
Jingsong Lee created BEAM-1587: -- Summary: Use StringBuilder to stringKey of StateNamespace instead of String.format Key: BEAM-1587 URL: https://issues.apache.org/jira/browse/BEAM-1587 Project: Beam Issue Type: Improvement Components: runner-core Reporter: Jingsong Lee Assignee: Kenneth Knowles -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1116) Support for new Timer API in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887771#comment-15887771 ] Jingsong Lee commented on BEAM-1116: OK, I will finish it quickly. > Support for new Timer API in Flink runner > - > > Key: BEAM-1116 > URL: https://issues.apache.org/jira/browse/BEAM-1116 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Jingsong Lee > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1517) Garbage collect user state in Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-1517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887550#comment-15887550 ] Jingsong Lee commented on BEAM-1517: [~kenn] Is it like {{LateDataDroppingDoFnRunner}} droping late data in {{processElement()}}? But how to deal with late timer?(not EVENT_TIME) Just throw it in {{onTimer()}} like element? > Garbage collect user state in Flink Runner > -- > > Key: BEAM-1517 > URL: https://issues.apache.org/jira/browse/BEAM-1517 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > User facing state/timers in Beam are bound to the key/window of the data. > Right now, the Flink Runner does not clean up user state when the watermark > passes the GC horizon for the state associated with a given window. > Neither {{StateInternals}} nor the Flink state API support discarding state > for a whole namespace (which is the window in this case) so we might have to > manually set a GC timer for each window/key combination, as is done in the > {{ReduceFnRunner}}. For this we have to know all states a user can possibly > use, which we can get from the {{DoFn}} signature. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1036) Support for new State API in FlinkRunner
[ https://issues.apache.org/jira/browse/BEAM-1036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1036: -- Assignee: Jingsong Lee (was: Aljoscha Krettek) > Support for new State API in FlinkRunner > > > Key: BEAM-1036 > URL: https://issues.apache.org/jira/browse/BEAM-1036 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Jingsong Lee > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1560) Use SimpleDoFnRunner to invoke and use new CombineFnRunner in batch mode of Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885493#comment-15885493 ] Jingsong Lee commented on BEAM-1560: [~aljoscha] Because it is independent of streaming mode, so I did not reopen BEAM-843 > Use SimpleDoFnRunner to invoke and use new CombineFnRunner in batch mode of > Flink runner > > > Key: BEAM-1560 > URL: https://issues.apache.org/jira/browse/BEAM-1560 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Jingsong Lee >Assignee: Jingsong Lee > Fix For: 0.6.0 > > > To support new StateApi and TimerApi in Flink runner (batch) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1483) Support SetState in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882056#comment-15882056 ] Jingsong Lee commented on BEAM-1483: We can implement it by Flink MapState in Flink 1.3, like HashSet implemented by HashMap. > Support SetState in Flink runner > > > Key: BEAM-1483 > URL: https://issues.apache.org/jira/browse/BEAM-1483 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1476) Support MapState in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882055#comment-15882055 ] Jingsong Lee commented on BEAM-1476: We can implement it by Flink MapState in Flink 1.3. https://issues.apache.org/jira/browse/FLINK-4856 > Support MapState in Flink runner > > > Key: BEAM-1476 > URL: https://issues.apache.org/jira/browse/BEAM-1476 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1498) Use Flink-native side outputs
[ https://issues.apache.org/jira/browse/BEAM-1498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15876026#comment-15876026 ] Jingsong Lee commented on BEAM-1498: Great~ That will simplify the implementation of {{ParDoBoundMultiStreamingTranslator}}. > Use Flink-native side outputs > - > > Key: BEAM-1498 > URL: https://issues.apache.org/jira/browse/BEAM-1498 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Aljoscha Krettek >Assignee: Jingsong Lee > > Once Flink has support for side outputs we should use them instead of > manually dealing with the {{RawUnionValues}}. > Side outputs for Flink is being tracked in > https://issues.apache.org/jira/browse/FLINK-4460. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1498) Use Flink-native side outputs
[ https://issues.apache.org/jira/browse/BEAM-1498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned BEAM-1498: -- Assignee: Jingsong Lee > Use Flink-native side outputs > - > > Key: BEAM-1498 > URL: https://issues.apache.org/jira/browse/BEAM-1498 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Aljoscha Krettek >Assignee: Jingsong Lee > > Once Flink has support for side outputs we should use them instead of > manually dealing with the {{RawUnionValues}}. > Side outputs for Flink is being tracked in > https://issues.apache.org/jira/browse/FLINK-4460. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1517) Garbage collect user state in Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-1517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15876005#comment-15876005 ] Jingsong Lee commented on BEAM-1517: Is it appropriate for the user to do the work of GC? Just like this: {code} @ProcessElement public void process( ProcessContext c, BoundedWindow window, @StateId(stateId) ValueState state, @TimerId("GcTimer") Timer timer) { Instant maxTimestamp = window.maxTimestamp(); long allowedLateness = 10 * 1000; Instant gcTime = maxTimestamp.plus(allowedLateness); //Can Timer have a getCurrentTime interface? Instant currentTime = new Instant(); if (gcTime.isBefore(currentTime)) { c.sideOutput(lateDataTag, c.element()); } else { timer.set(gcTime); // user logical // } } @OnTimer("GcTimer") public void gc( OnTimerContext context, @StateId(stateId) ValueState state) { state.clear(); } {code} > Garbage collect user state in Flink Runner > -- > > Key: BEAM-1517 > URL: https://issues.apache.org/jira/browse/BEAM-1517 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > User facing state/timers in Beam are bound to the key/window of the data. > Right now, the Flink Runner does not clean up user state when the watermark > passes the GC horizon for the state associated with a given window. > Neither {{StateInternals}} nor the Flink state API support discarding state > for a whole namespace (which is the window in this case) so we might have to > manually set a GC timer for each window/key combination, as is done in the > {{ReduceFnRunner}}. For this we have to know all states a user can possibly > use, which we can get from the {{DoFn}} signature. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1116) Support for new Timer API in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15871001#comment-15871001 ] Jingsong Lee commented on BEAM-1116: [~kenn] I found that {{onTimer()}} requires the {{BoundedWindow}} parameter. How to get {{BoundedWindow}} from the {{StateNamespace}} of {{TimerData}}? {code} private BoundedWindow getWindowFromNamespace(StateNamespace namespace) { if (namespace instanceof WindowNamespace) { return ((WindowNamespace) namespace).getWindow(); } else if (namespace instanceof GlobalNamespace) { return GlobalWindow.INSTANCE; } else if (namespace instanceof WindowAndTriggerNamespace) { return ((WindowAndTriggerNamespace) namespace).getWindow(); } else { throw new RuntimeException("Unknown StateNamespace type: " + namespace.getClass()); } } {code} Is that right? Why does StateNamespace not provide {{getWindow()}} method? > Support for new Timer API in Flink runner > - > > Key: BEAM-1116 > URL: https://issues.apache.org/jira/browse/BEAM-1116 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Jingsong Lee > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (BEAM-1393) Update Flink Runner to Flink 1.2.0
[ https://issues.apache.org/jira/browse/BEAM-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15862325#comment-15862325 ] Jingsong Lee edited comment on BEAM-1393 at 2/11/17 9:08 AM: - BTW, I think SPLIT_DISTRIBUTE state maybe need a new Repartitioner, not only round-robin. Let each element in ListState have the opportunity to select a KeyGroupIndex. {{CheckpointCoordinator}} use the KeyGroupIndex to redistribute state. I understood it. {{CheckpointCoordinator}} is run in JobManager. That is too heavy to read every elements. Maybe Flink can abstract KeyGroup state, provide split(snapshot to several KeyGroups) and merge(restore by several KeyGroups) methods to manage state with KeyGroups. was (Author: lzljs3620320): BTW, I think SPLIT_DISTRIBUTE state maybe need a new Repartitioner, not only round-robin. Let each element in ListState have the opportunity to select a KeyGroupIndex. {{CheckpointCoordinator}} use the KeyGroupIndex to redistribute state. > Update Flink Runner to Flink 1.2.0 > -- > > Key: BEAM-1393 > URL: https://issues.apache.org/jira/browse/BEAM-1393 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Aljoscha Krettek >Assignee: Jingsong Lee > > When we update to 1.2.0 we can use the new internal Timer API that is > available to Flink operators: {{InternalTimerService}} and also use broadcast > state to store side-input data. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1393) Update Flink Runner to Flink 1.2.0
[ https://issues.apache.org/jira/browse/BEAM-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15862325#comment-15862325 ] Jingsong Lee commented on BEAM-1393: BTW, I think SPLIT_DISTRIBUTE state maybe need a new Repartitioner, not only round-robin. Let each element in ListState have the opportunity to select a KeyGroupIndex. {{CheckpointCoordinator}} use the KeyGroupIndex to redistribute state. > Update Flink Runner to Flink 1.2.0 > -- > > Key: BEAM-1393 > URL: https://issues.apache.org/jira/browse/BEAM-1393 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Aljoscha Krettek >Assignee: Jingsong Lee > > When we update to 1.2.0 we can use the new internal Timer API that is > available to Flink operators: {{InternalTimerService}} and also use broadcast > state to store side-input data. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1393) Update Flink Runner to Flink 1.2.0
[ https://issues.apache.org/jira/browse/BEAM-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15862290#comment-15862290 ] Jingsong Lee commented on BEAM-1393: Totally agree! {{AbstractStreamOperator}} will check the type of {{this}} and invoke {{checkpointKeyGroup}} in {{snapshotState()}}. ({{initializeState}} is similar) Looking forward to contributing back to Flink. I think we do not need store {{pushedBackWatermark}} in state anymore. We can maintain it in memory and restore it by traversing pushed-back events. > Update Flink Runner to Flink 1.2.0 > -- > > Key: BEAM-1393 > URL: https://issues.apache.org/jira/browse/BEAM-1393 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Aljoscha Krettek >Assignee: Jingsong Lee > > When we update to 1.2.0 we can use the new internal Timer API that is > available to Flink operators: {{InternalTimerService}} and also use broadcast > state to store side-input data. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1393) Update Flink Runner to Flink 1.2.0
[ https://issues.apache.org/jira/browse/BEAM-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15860992#comment-15860992 ] Jingsong Lee commented on BEAM-1393: Good point! The processing of pushed-back events is indeed a trouble. For non-keyed operators, we store the elements in SPLIT_DISTRIBUTE state, this is no problem. But for keyed operators, we can't find the prepared events when a new side-input element come if we use {{KeyedStateBackend}}. We need to find all the pushed-back events that have the side-input window. Just like the processing of timer. Maybe we need override {{AbstractStreamOperator.snapshotState}} to store pushed-back events by KeyGroups way with snapshot TimerService. I see that only one {{startNewKeyGroup}} can be called, so we have to override the TimerService snapshot instead of calling super. > Update Flink Runner to Flink 1.2.0 > -- > > Key: BEAM-1393 > URL: https://issues.apache.org/jira/browse/BEAM-1393 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Aljoscha Krettek >Assignee: Jingsong Lee > > When we update to 1.2.0 we can use the new internal Timer API that is > available to Flink operators: {{InternalTimerService}} and also use broadcast > state to store side-input data. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1456) Make UnboundedSourceWrapper snapshot to rescalable operator state in Flink Runner
Jingsong Lee created BEAM-1456: -- Summary: Make UnboundedSourceWrapper snapshot to rescalable operator state in Flink Runner Key: BEAM-1456 URL: https://issues.apache.org/jira/browse/BEAM-1456 Project: Beam Issue Type: Improvement Components: runner-flink Reporter: Jingsong Lee Assignee: Jingsong Lee By using the SPLIT_DISTRIBUTE OperatorState in flink to snapshot source checkpoints we make UnboundedSourceWrapper operators rescalable. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1445) Use Flink broadcast state to store side-input data
Jingsong Lee created BEAM-1445: -- Summary: Use Flink broadcast state to store side-input data Key: BEAM-1445 URL: https://issues.apache.org/jira/browse/BEAM-1445 Project: Beam Issue Type: Improvement Components: runner-flink Reporter: Jingsong Lee Assignee: Jingsong Lee By using the broadcast state to store side-input data we make operators rescalable. What BROADCAST does is collect all checkpointed states into one "list" and then send out that list to all parallel subtasks when restoring.The way we would use it is to only checkpoint anything from the operator with subtask index 0 because we assume that the state is the same on all parallel instances of the operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)