[jira] [Commented] (BEAM-2534) KafkaIO should allow gaps in message offsets

2017-07-06 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16076928#comment-16076928
 ] 

Jingsong Lee commented on BEAM-2534:


Could be cherry-picked to 2.1.0 branch?

> KafkaIO should allow gaps in message offsets
> 
>
> Key: BEAM-2534
> URL: https://issues.apache.org/jira/browse/BEAM-2534
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 2.0.0
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
>Priority: Minor
> Fix For: 2.1.0
>
>
> KafkaIO reader logs a warning when it notices gaps in offsets for messages. 
> While such gaps are not expected for normal Kafka topics, there could be gaps 
> when log compaction is enabled (which deletes older messages for a key). 
> This warning log is not very useful. Also we should take such gaps while 
> estimating backlog.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1531) Support dynamic work rebalancing for HBaseIO

2017-06-29 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16067961#comment-16067961
 ] 

Jingsong Lee commented on BEAM-1531:


(y) It's Great!

> Support dynamic work rebalancing for HBaseIO
> 
>
> Key: BEAM-1531
> URL: https://issues.apache.org/jira/browse/BEAM-1531
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Ismaël Mejía
>Assignee: Ismaël Mejía
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1531) Support dynamic work rebalancing for HBaseIO

2017-06-28 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16067629#comment-16067629
 ] 

Jingsong Lee commented on BEAM-1531:


Of course, the embedded HBase server version is better, it is a complete mini 
hbase cluster. So I only changed the testReadingSplitAtFraction (only involving 
Scanner.iterator) test, other tests remain unchanged.
I think there is a tradeoff here, tradeoff of test accuracy and test speed. For 
testReadingSplitAtFraction test, if we can effectively improve the speed, but 
also there is a good mock(query by startRow and stopRow), we can achieve the 
purpose of our test. (test HBaseIO.splitAtFraction)

I carried out some tests, understand the realization of HBaseTestingUtility, 
which has a complete miniHBaseCluster and miniZKCluster, JVM has 8000+ classes 
and 300+ threads when run. Then it is very slow. I do not have a detailed 
understanding, probably need to do a cluster of things, but let a JVM to do, 
resulting in a very slow running.

> Support dynamic work rebalancing for HBaseIO
> 
>
> Key: BEAM-1531
> URL: https://issues.apache.org/jira/browse/BEAM-1531
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Ismaël Mejía
>Assignee: Ismaël Mejía
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1531) Support dynamic work rebalancing for HBaseIO

2017-06-28 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16066883#comment-16066883
 ] 

Jingsong Lee commented on BEAM-1531:


Hi [~iemejia]l, at BEAM-2393 we talked about accelerating HBaseIOTest.
Since some tests (including {{testReadingSplitAtFraction}}) only use the 
Scanner.iterator interface, so I abstracted the iterator interface, so use 
memory queries to speed up the test.
I did not abstract the estimateSizeBytes and split, that would be a bit more 
complicated.
Commit: 
https://github.com/JingsongLi/beam/commit/d1080c2a1669d13654f64b77122f7cfcb6e1edeb
What do you think?

> Support dynamic work rebalancing for HBaseIO
> 
>
> Key: BEAM-1531
> URL: https://issues.apache.org/jira/browse/BEAM-1531
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Ismaël Mejía
>Assignee: Ismaël Mejía
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode

2017-06-28 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16066882#comment-16066882
 ] 

Jingsong Lee commented on BEAM-2393:


Let's talk about it in BEAM-1531.

> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -
>
> Key: BEAM-2393
> URL: https://issues.apache.org/jira/browse/BEAM-2393
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when 
> the failure to restart, it will send duplicate data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode

2017-06-28 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16066145#comment-16066145
 ] 

Jingsong Lee commented on BEAM-2393:


(y) Yes, that's what I want. I can help you to optimize tests.

> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -
>
> Key: BEAM-2393
> URL: https://issues.apache.org/jira/browse/BEAM-2393
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when 
> the failure to restart, it will send duplicate data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode

2017-06-28 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16066001#comment-16066001
 ] 

Jingsong Lee commented on BEAM-2393:


Now there is no relationship with Dynamic Work Rebalancing.
The main question is how to checkpoint to {{BoundedSource}}.
{{BoundedToUnboundedSourceAdapter}} gives a way: calling splitAtFraction or 
snapshot all the rest of the elements.
[~iemejia] Can there be a simpler way for HBaseIOs to implement the 
splitAtFraction method?

> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -
>
> Key: BEAM-2393
> URL: https://issues.apache.org/jira/browse/BEAM-2393
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when 
> the failure to restart, it will send duplicate data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode

2017-06-27 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-2393:
--

Assignee: Jingsong Lee

> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -
>
> Key: BEAM-2393
> URL: https://issues.apache.org/jira/browse/BEAM-2393
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when 
> the failure to restart, it will send duplicate data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode

2017-06-27 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065750#comment-16065750
 ] 

Jingsong Lee commented on BEAM-2393:


Now the {{UnboundedSourceWrapper}} has already supported the exit when the 
watermark exceeds TIMESTAMP_MAX_VALUE. 
So can we use {{BoundedToUnboundedSourceAdapter}}?

bq. Checkpoints are created by calling {{BoundedReader#splitAtFraction}} on 
inner {{BoundedSource}}.
bq. Sources that cannot be split are read entirely into memory, so this 
transform does not work well with large, unsplittable sources.

But at least we can provide an accurate semantics.

> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -
>
> Key: BEAM-2393
> URL: https://issues.apache.org/jira/browse/BEAM-2393
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Jingsong Lee
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when 
> the failure to restart, it will send duplicate data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner

2017-06-26 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16064090#comment-16064090
 ] 

Jingsong Lee commented on BEAM-2140:


On 1: If the decision whether the window expired is output watermark hold,(Uh. 
I always thought it was input watermark) it does not end, need to continue 
processing ProcessTimer. 

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> ---
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2478) Distinct Aggregates

2017-06-25 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16062468#comment-16062468
 ] 

Jingsong Lee commented on BEAM-2478:


(y) You are right.
Calcite native offers a lot of useful Rules, which is really exciting.

> Distinct Aggregates
> ---
>
> Key: BEAM-2478
> URL: https://issues.apache.org/jira/browse/BEAM-2478
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Jingsong Lee
>Assignee: Tarush Grover
>
> eg: COUNT(DISTINCT empno)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (BEAM-2477) BeamAggregationRel should use Combine.perKey instead of GroupByKey

2017-06-24 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed BEAM-2477.
--

> BeamAggregationRel should use Combine.perKey instead of GroupByKey
> --
>
> Key: BEAM-2477
> URL: https://issues.apache.org/jira/browse/BEAM-2477
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>  Labels: dsl_sql_merge
> Fix For: Not applicable
>
>
> Their semantics are the same, but the efficiency of implementation is quite 
> different, and at the runner level there is a lot of optimization for 
> `Combine.perKey`.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (BEAM-2477) BeamAggregationRel should use Combine.perKey instead of GroupByKey

2017-06-24 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee resolved BEAM-2477.

   Resolution: Fixed
Fix Version/s: Not applicable

> BeamAggregationRel should use Combine.perKey instead of GroupByKey
> --
>
> Key: BEAM-2477
> URL: https://issues.apache.org/jira/browse/BEAM-2477
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>  Labels: dsl_sql_merge
> Fix For: Not applicable
>
>
> Their semantics are the same, but the efficiency of implementation is quite 
> different, and at the runner level there is a lot of optimization for 
> `Combine.perKey`.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2508) Fix javaDoc of Stateful DoFn

2017-06-23 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated BEAM-2508:
---
Description: StateSpec > 
StateSpec  (was: StateSpec > 
StateSpec Fix javaDoc of Stateful DoFn
> 
>
> Key: BEAM-2508
> URL: https://issues.apache.org/jira/browse/BEAM-2508
> Project: Beam
>  Issue Type: Bug
>  Components: beam-model
>Reporter: Jingsong Lee
>Assignee: Kenneth Knowles
> Fix For: 2.1.0
>
>
> StateSpec > StateSpec



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2508) Fix javaDoc of Stateful DoFn

2017-06-23 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated BEAM-2508:
---
Fix Version/s: 2.1.0

> Fix javaDoc of Stateful DoFn
> 
>
> Key: BEAM-2508
> URL: https://issues.apache.org/jira/browse/BEAM-2508
> Project: Beam
>  Issue Type: Bug
>  Components: beam-model
>Reporter: Jingsong Lee
>Assignee: Kenneth Knowles
> Fix For: 2.1.0
>
>
> StateSpec > StateSpec



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-2508) Fix javaDoc of Stateful DoFn

2017-06-23 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-2508:
--

 Summary: Fix javaDoc of Stateful DoFn
 Key: BEAM-2508
 URL: https://issues.apache.org/jira/browse/BEAM-2508
 Project: Beam
  Issue Type: Bug
  Components: beam-model
Reporter: Jingsong Lee
Assignee: Kenneth Knowles


StateSpec > StateSpec

[jira] [Commented] (BEAM-2478) Distinct Aggregates

2017-06-22 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060353#comment-16060353
 ] 

Jingsong Lee commented on BEAM-2478:


Count(Distinct) is a very interesting function.
It needs operator to count with the details of distinct field. This state is 
very huge sometimes.
There are three solutions as far as I know:
1.Count with all details of distinct field: I think we can use StatefulParDo 
with ValueState(Count) and SetState(For Distinct).
2.Approximation algorithm: cardinality(HyperLogLog) or bloomFilter or Bitmap. 
This can greatly reduce the amount of State data, but will lead to inaccurate. 
Apache Kylin use this.
3.Hierarchical calculation: 
select a, count(distinct b) from t group by a; -> select a, count(1) from 
(select a, count(1) group by a,b) t2 group by a;
First operator distinct by b(also can do some local aggregate by a, will reduce 
the shuffle data) and second operator count by a. This can effectively reduce 
the state data, ease data skew. Apache Impala use this.

> Distinct Aggregates
> ---
>
> Key: BEAM-2478
> URL: https://issues.apache.org/jira/browse/BEAM-2478
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Jingsong Lee
>Assignee: Tarush Grover
>
> eg: COUNT(DISTINCT empno)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-1612) Support real Bundle in Flink runner

2017-06-21 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated BEAM-1612:
---
Fix Version/s: 2.1.0

> Support real Bundle in Flink runner
> ---
>
> Key: BEAM-1612
> URL: https://issues.apache.org/jira/browse/BEAM-1612
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
> Fix For: 2.1.0
>
>
> The Bundle is very important in the beam model. Users can use the bundle to 
> flush buffer, can reuse many heavyweight resources in a bundle. Most IO 
> plugins use the bundle to flush. 
> Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, 
> such as first placed in JavaHeap, flush into RocksDbState when invoke 
> finishBundle , this can reduce the number of serialization.
> But now FlinkRunner calls the finishBundle every processElement. We need 
> support real Bundle.
> I think we can have the following implementations:
> 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But 
> sometimes this "Bundle" maybe too big. This depends on the user's checkpoint 
> configuration.
> 2.Manually control the size of the bundle. The half-bundle will be flushed to 
> a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not 
> need to wait, just call the startBundle and finishBundle at the right time.
> [Proposal 
> document|https://docs.google.com/document/d/1UzELM4nFu8SIeu-QJkbs0sv7Uzd1Ux4aXXM3cw4s7po/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-2487) Give a option to ignore the timer that is larger than END_OF_GLOBAL_WINDOW

2017-06-20 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-2487:
--

 Summary: Give a option to ignore the timer that is larger than 
END_OF_GLOBAL_WINDOW
 Key: BEAM-2487
 URL: https://issues.apache.org/jira/browse/BEAM-2487
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Jingsong Lee


Event time can not reach END_OF_GLOBAL_WINDOW in unbounded world. (Except for 
testing)
But Flink runner will set some timers when user set a StatefulPardo/GBK with 
GlobalWindow. Flink maintains timers in PriorityQueue on the Java Heap. There 
is a bad performance when the number of keys is very much.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2486) Should throws some useful messages when statefulParDo use non-KV input

2017-06-20 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056877#comment-16056877
 ] 

Jingsong Lee commented on BEAM-2486:


[~kenn] Can we do some validates before runner?

> Should throws some useful messages when statefulParDo use non-KV input
> --
>
> Key: BEAM-2486
> URL: https://issues.apache.org/jira/browse/BEAM-2486
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core, runner-flink
>Reporter: Jingsong Lee
>
> Now Flink runner will throws a ClassCastException without detail messages 
> when a statefulParDo use non-KV input. It is not easy for users to find 
> errors and causes. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-2486) Should throws some useful messages when statefulParDo use non-KV input

2017-06-20 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-2486:
--

 Summary: Should throws some useful messages when statefulParDo use 
non-KV input
 Key: BEAM-2486
 URL: https://issues.apache.org/jira/browse/BEAM-2486
 Project: Beam
  Issue Type: Improvement
  Components: runner-core, runner-flink
Reporter: Jingsong Lee


Now Flink runner will throws a ClassCastException without detail messages when 
a statefulParDo use non-KV input. It is not easy for users to find errors and 
causes. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2477) BeamAggregationRel should use Combine.perKey instead of GroupByKey

2017-06-20 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055416#comment-16055416
 ] 

Jingsong Lee commented on BEAM-2477:


*Local combine*: Cloud Dataflow/Flink Batch optimizes Combine operations (such 
as Count and Sum) by performing partial combining locally before sending the 
data to the main grouping operation. Graph optimizations in 
https://cloud.google.com/blog/big-data/2017/05/after-lambda-exactly-once-processing-in-cloud-dataflow-part-2-ensuring-low-latency
*Incremental aggregation*: Similar to Flink's concept, 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#windowfunction-with-incremental-aggregation

While the GroupByKey will keep the details of elements until the window closes. 
(AFAIK in Flink Runner)

> BeamAggregationRel should use Combine.perKey instead of GroupByKey
> --
>
> Key: BEAM-2477
> URL: https://issues.apache.org/jira/browse/BEAM-2477
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>  Labels: dsl_sql_merge
>
> Their semantics are the same, but the efficiency of implementation is quite 
> different, and at the runner level there is a lot of optimization for 
> `Combine.perKey`.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-2478) Distinct Aggregates

2017-06-19 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-2478:
--

 Summary: Distinct Aggregates
 Key: BEAM-2478
 URL: https://issues.apache.org/jira/browse/BEAM-2478
 Project: Beam
  Issue Type: New Feature
  Components: dsl-sql
Reporter: Jingsong Lee


eg: COUNT(DISTINCT empno)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-2477) BeamAggregationRel should use Combine.perKey instead of GroupByKey

2017-06-19 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-2477:
--

 Summary: BeamAggregationRel should use Combine.perKey instead of 
GroupByKey
 Key: BEAM-2477
 URL: https://issues.apache.org/jira/browse/BEAM-2477
 Project: Beam
  Issue Type: Improvement
  Components: dsl-sql
Reporter: Jingsong Lee
Assignee: Jingsong Lee


Their semantics are the same, but the efficiency of implementation is quite 
different, and at the runner level there is a lot of optimization for 
`Combine.perKey`.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-1942) Add Watermark Metrics in Flink Runner

2017-06-08 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated BEAM-1942:
---
Summary: Add Watermark Metrics in Flink Runner  (was: Add Source Watermark 
Metrics in Flink Runner)

> Add Watermark Metrics in Flink Runner
> -
>
> Key: BEAM-1942
> URL: https://issues.apache.org/jira/browse/BEAM-1942
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1945) Add Watermark Metrics in Apex runner

2017-06-08 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated BEAM-1945:
---
Summary: Add Watermark Metrics in Apex runner  (was: Add Source Watermark 
Metrics in Apex runner)

> Add Watermark Metrics in Apex runner
> 
>
> Key: BEAM-1945
> URL: https://issues.apache.org/jira/browse/BEAM-1945
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-apex
>Reporter: Jingsong Lee
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1943) Add Watermark Metrics in Dataflow runner

2017-06-08 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated BEAM-1943:
---
Summary: Add Watermark Metrics in Dataflow runner  (was: Add Source 
Watermark Metrics in Dataflow runner)

> Add Watermark Metrics in Dataflow runner
> 
>
> Key: BEAM-1943
> URL: https://issues.apache.org/jira/browse/BEAM-1943
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-dataflow
>Reporter: Jingsong Lee
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1944) Add Watermark Metrics in Spark runner

2017-06-08 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated BEAM-1944:
---
Summary: Add Watermark Metrics in Spark runner  (was: Add Source Watermark 
Metrics in Spark runner)

> Add Watermark Metrics in Spark runner
> -
>
> Key: BEAM-1944
> URL: https://issues.apache.org/jira/browse/BEAM-1944
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-spark
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1941) Add Watermark Metrics in Runners

2017-06-08 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated BEAM-1941:
---
Summary: Add Watermark Metrics in Runners  (was: Add Source Watermark 
Metrics in Runners)

> Add Watermark Metrics in Runners
> 
>
> Key: BEAM-1941
> URL: https://issues.apache.org/jira/browse/BEAM-1941
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-ideas
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>
> The source watermark metrics show the consumer latency of Source. 
> It allows the user to know the health of the job, or it can be used to 
> monitor and alarm.
> Since each runner is likely already tracking a watermark, another option here 
> is to just have the runner report it appropriately, rather than having the 
> source report it using metrics. This also addresses the fact that even if the 
> source has advanced to 8:00, the runner may still know about buffered 
> elements at 7:00, and so not advance the watermark all the way to 8:00. 
> [~bchambers]
> Includes:
> 1.Source watermark (`min` amongst all splits):
>type = Gauge, namespace = io, name = source_watermark
> 2.Source watermark per split:
>type = Gauge, namespace = io.splits, name = .source_watermark



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-2423) Abstract StateInternalsTest for the different state internals/Runners

2017-06-07 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-2423:
--

 Summary: Abstract StateInternalsTest for the different state 
internals/Runners
 Key: BEAM-2423
 URL: https://issues.apache.org/jira/browse/BEAM-2423
 Project: Beam
  Issue Type: Improvement
  Components: runner-core, runner-flink
Reporter: Jingsong Lee
Assignee: Jingsong Lee


For the test of InMemoryStateInternals, ApexStateInternals, 
FlinkStateInternals, SparkStateInternals, etc..
Have a common base class for the state internals test that has an abstract 
method createStateInternals() and all the test methods. Then an actual 
implementation would just derive from that and only implement the method for 
creating the state internals. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1476) Support MapState in Flink runner

2017-06-06 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16040044#comment-16040044
 ] 

Jingsong Lee commented on BEAM-1476:


solved in https://github.com/apache/beam/pull/3289

> Support MapState in Flink runner
> 
>
> Key: BEAM-1476
> URL: https://issues.apache.org/jira/browse/BEAM-1476
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Pei He
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1476) Support MapState in Flink runner

2017-06-06 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1476:
--

Assignee: Pei He  (was: Jingsong Lee)

> Support MapState in Flink runner
> 
>
> Key: BEAM-1476
> URL: https://issues.apache.org/jira/browse/BEAM-1476
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Pei He
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1476) Support MapState in Flink runner

2017-06-01 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1476:
--

Assignee: Jingsong Lee

> Support MapState in Flink runner
> 
>
> Key: BEAM-1476
> URL: https://issues.apache.org/jira/browse/BEAM-1476
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Jingsong Lee
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1483) Support SetState in Flink runner

2017-06-01 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1483:
--

Assignee: Jingsong Lee

> Support SetState in Flink runner
> 
>
> Key: BEAM-1483
> URL: https://issues.apache.org/jira/browse/BEAM-1483
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Jingsong Lee
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1498) Use Flink-native side outputs

2017-06-01 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1498:
--

Assignee: Jingsong Lee

> Use Flink-native side outputs
> -
>
> Key: BEAM-1498
> URL: https://issues.apache.org/jira/browse/BEAM-1498
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Jingsong Lee
>
> Once Flink has support for side outputs we should use them instead of 
> manually dealing with the {{RawUnionValues}}.
> Side outputs for Flink is being tracked in 
> https://issues.apache.org/jira/browse/FLINK-4460.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-2401) Update Flink Runner to Flink 1.3.0

2017-06-01 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-2401:
--

 Summary: Update Flink Runner to Flink 1.3.0
 Key: BEAM-2401
 URL: https://issues.apache.org/jira/browse/BEAM-2401
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Jingsong Lee
Assignee: Jingsong Lee


http://flink.apache.org/news/2017/06/01/release-1.3.0.html
There are a lot of exciting improvements.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode

2017-05-31 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-2393:
--

 Summary: BoundedSource is not fault-tolerant in FlinkRunner 
Streaming mode
 Key: BEAM-2393
 URL: https://issues.apache.org/jira/browse/BEAM-2393
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Reporter: Jingsong Lee


{{BoundedSourceWrapper}} does not implement snapshot() and restore(), when the 
failure to restart, it will send duplicate data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (BEAM-2248) KafkaIO support to use start read time to set start offset

2017-05-30 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed BEAM-2248.
--
   Resolution: Fixed
Fix Version/s: 2.1.0

> KafkaIO support to use start read time to set start offset
> --
>
> Key: BEAM-2248
> URL: https://issues.apache.org/jira/browse/BEAM-2248
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-extensions
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
> Fix For: 2.1.0
>
>
> This Kafka 0.10.x adds support for a searchable index for each topic based 
> off of message timestamps. It enables consumer support for offset lookup by 
> timestamp.
> So we can add a start read time to set start offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-2248) KafkaIO support to use start read time to set start offset

2017-05-10 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-2248:
--

 Summary: KafkaIO support to use start read time to set start offset
 Key: BEAM-2248
 URL: https://issues.apache.org/jira/browse/BEAM-2248
 Project: Beam
  Issue Type: New Feature
  Components: sdk-java-extensions
Reporter: Jingsong Lee
Assignee: Jingsong Lee


This Kafka 0.10.x adds support for a searchable index for each topic based off 
of message timestamps. It enables consumer support for offset lookup by 
timestamp.
So we can add a start read time to set start offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner

2017-05-09 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003002#comment-16003002
 ] 

Jingsong Lee commented on BEAM-2140:


1. {{SplittableParDo}} will clear its State, in 
{{SplittableParDo.ProcessFn.processElement()}}
{code}
if (result.getResidualRestriction() == null) {
// All work for this element/restriction is completed. Clear state and 
release hold.
elementState.clear();
restrictionState.clear();
holdState.clear();
return;
  }
{code}

I got values up to 12344 in intellij debug mode while got values up to 34567 in 
intellij run mode.(No matter what outputWatermark of {{BoundedSourceWrapper}}) 
So I think the processing of SDF has nothing to do with inputWatermark, and 
ProcessTimeService only related. 

This question should be caused by the second and third points.

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> ---
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner

2017-05-08 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16002039#comment-16002039
 ] 

Jingsong Lee commented on BEAM-2140:


First, {{SplittableParDo}} should not wrap {{StatefulDoFnRunner}}.

Second, {{SplittableParDo}} use {{PROCESSING_TIME}} to continue processing. And 
it also sets watermark holds which will affect the sending of the output 
watermark. (see {{DoFnOperator.processWatermark1()}}).
When {{BoundedSourceWrapper}} is over, it will emit a Long.MAX_VALUE watermark, 
but the {{SplittableParDo}} may be not over yet. (depends on system time) So no 
one can send watermark to the downstream.

Last, {{StreamTask}} will shutdown when there are no inputs and invoke 
{{timerService.quiesceAndAwaitPending}}. (see {{StreamTask.invoke()}} in Flink)
It will shutdown TimeService and invoke all task in TimeService and reject the 
new registration. So it will break the continue processing of 
{{SplittableParDo}}.

[~aljoscha] Is that right? Please correct me if I wrong.

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> ---
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1612) Support real Bundle in Flink runner

2017-05-04 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996603#comment-15996603
 ] 

Jingsong Lee commented on BEAM-1612:


Agree with [~aljoscha].

About the course grained, A little idea is that we can use {{processWatermark}} 
to end a bundle. The bundle is finished with firing of event timer.

About Flink operator cannot output data in snapshot. Can Flink provide a 
{{beforeSnapshot()}} callback before invoke {{broadcastCheckpointBarrier()}} in 
{{StreamTask.performCheckpoint()}} ? I think users who use Flink API also have 
such a need.

> Support real Bundle in Flink runner
> ---
>
> Key: BEAM-1612
> URL: https://issues.apache.org/jira/browse/BEAM-1612
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Jingsong Lee
>
> The Bundle is very important in the beam model. Users can use the bundle to 
> flush buffer, can reuse many heavyweight resources in a bundle. Most IO 
> plugins use the bundle to flush. 
> Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, 
> such as first placed in JavaHeap, flush into RocksDbState when invoke 
> finishBundle , this can reduce the number of serialization.
> But now FlinkRunner calls the finishBundle every processElement. We need 
> support real Bundle.
> I think we can have the following implementations:
> 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But 
> sometimes this "Bundle" maybe too big. This depends on the user's checkpoint 
> configuration.
> 2.Manually control the size of the bundle. The half-bundle will be flushed to 
> a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not 
> need to wait, just call the startBundle and finishBundle at the right time.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1588) Reuse StateNamespace.stringKey in Flink States

2017-05-02 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1588:
--

Assignee: (was: Jingsong Lee)

> Reuse StateNamespace.stringKey in Flink States
> --
>
> Key: BEAM-1588
> URL: https://issues.apache.org/jira/browse/BEAM-1588
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Jingsong Lee
>
> See BEAM-1587
> StateNamespace.stringKey did two things: the base64 encoding of window , and 
> then String.format. These two things consumption is not small. We can cache 
> it in State and reuse.
> Further more, we can cache the state.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1587) Use StringBuilder to stringKey of StateNamespace instead of String.format

2017-05-02 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1587:
--

Assignee: (was: Jingsong Lee)

> Use StringBuilder to stringKey of StateNamespace instead of String.format
> -
>
> Key: BEAM-1587
> URL: https://issues.apache.org/jira/browse/BEAM-1587
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core
>Reporter: Jingsong Lee
>
> In Flink Runner, each State visit will call the namespace stringKey once. 
> Since stringKey uses String.format to deal with, the impact on performance is 
> relatively large.
> Some extreme cases, stringKey performance consumption of up to 2%.
> Here is a test on StringBuilder and String.format:
> {code}
>   public static void main(String[] args) throws Exception {
> String[] strs = new String[1000_000];
> for (int i = 0; i < strs.length; i++) {
>   strs[i] = getRandomString(10);
> }
> {
>   long start = System.nanoTime();
>   for (int i = 0; i < strs.length; i++) {
> strs[i] = testFormat(strs[i]);
>   }
>   System.out.println("testStringFormat: " + ((System.nanoTime() - 
> start)/1000_000) + "ms");
> }
> {
>   long start = System.nanoTime();
>   for (int i = 0; i < strs.length; i++) {
> strs[i] = testStringBuild(strs[i]);
>   }
>   System.out.println("testStringBuilder: " + ((System.nanoTime() - 
> start)/1000_000) + "ms");
> }
>   }
> {code}
> testStringFormat: 2312ms
> testStringBuilder: 266ms



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1483) Support SetState in Flink runner

2017-05-02 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1483:
--

Assignee: (was: Jingsong Lee)

> Support SetState in Flink runner
> 
>
> Key: BEAM-1483
> URL: https://issues.apache.org/jira/browse/BEAM-1483
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1476) Support MapState in Flink runner

2017-05-02 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1476:
--

Assignee: (was: Jingsong Lee)

> Support MapState in Flink runner
> 
>
> Key: BEAM-1476
> URL: https://issues.apache.org/jira/browse/BEAM-1476
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1498) Use Flink-native side outputs

2017-05-02 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1498:
--

Assignee: (was: Jingsong Lee)

> Use Flink-native side outputs
> -
>
> Key: BEAM-1498
> URL: https://issues.apache.org/jira/browse/BEAM-1498
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>
> Once Flink has support for side outputs we should use them instead of 
> manually dealing with the {{RawUnionValues}}.
> Side outputs for Flink is being tracked in 
> https://issues.apache.org/jira/browse/FLINK-4460.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1641) Support synchronized processing time in Flink runner

2017-05-01 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated BEAM-1641:
---
Fix Version/s: (was: First stable release)

> Support synchronized processing time in Flink runner
> 
>
> Key: BEAM-1641
> URL: https://issues.apache.org/jira/browse/BEAM-1641
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Aljoscha Krettek
>Priority: Blocker
>
> The "continuation trigger" for a processing time trigger is a synchronized 
> processing time trigger. Today, this throws an exception in the FlinkRunner.
> The supports the following:
>  - GBK1
>  - GBK2
> When GBK1 fires due to processing time past the first element in the pane and 
> that element arrives at GBK2, it will wait until all the other upstream keys 
> have also processed and emitted corresponding data.
> Sorry for the terseness of explanation - writing quickly so I don't forget.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1641) Support synchronized processing time in Flink runner

2017-05-01 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15992110#comment-15992110
 ] 

Jingsong Lee commented on BEAM-1641:


There are some differences between the processing of event time and 
synchronised processing time in {{DirectRunner}}. The Source just emit the 
{{BoundedWindow.TIMESTAMP_MAX_VALUE}} as the synchronizedProcessingTime, and 
the downStream use {{min(clock.now(), 
synchronizedProcessingInputWatermark.get())}} to generate 
synchronizedProcessingTime.
But I think from the fundamental point of view, ingestion time and synchronized 
processing time have produced almost the same effect. So I think we can use 
ingestion time and let Flink track ingestion and event time at the same time.

> Support synchronized processing time in Flink runner
> 
>
> Key: BEAM-1641
> URL: https://issues.apache.org/jira/browse/BEAM-1641
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: First stable release
>
>
> The "continuation trigger" for a processing time trigger is a synchronized 
> processing time trigger. Today, this throws an exception in the FlinkRunner.
> The supports the following:
>  - GBK1
>  - GBK2
> When GBK1 fires due to processing time past the first element in the pane and 
> that element arrives at GBK2, it will wait until all the other upstream keys 
> have also processed and emitted corresponding data.
> Sorry for the terseness of explanation - writing quickly so I don't forget.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1641) Support synchronized processing time in Flink runner

2017-04-28 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988363#comment-15988363
 ] 

Jingsong Lee commented on BEAM-1641:


I think it is difficult to achieve, it requires Flink-runtime 
{{StatusWatermarkValve}} also manage synchronized processing time.(maybe in 
Flink 1.4 or latter) It's a tough one indeed. I have no idea about using some 
special punctuation(s) to trigger too. I think it can be deferred. What do you 
think? [~aljoscha]

> Support synchronized processing time in Flink runner
> 
>
> Key: BEAM-1641
> URL: https://issues.apache.org/jira/browse/BEAM-1641
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: First stable release
>
>
> The "continuation trigger" for a processing time trigger is a synchronized 
> processing time trigger. Today, this throws an exception in the FlinkRunner.
> The supports the following:
>  - GBK1
>  - GBK2
> When GBK1 fires due to processing time past the first element in the pane and 
> that element arrives at GBK2, it will wait until all the other upstream keys 
> have also processed and emitted corresponding data.
> Sorry for the terseness of explanation - writing quickly so I don't forget.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1944) Add Source Watermark Metrics in Spark runner

2017-04-23 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1944:
--

Assignee: Jingsong Lee

> Add Source Watermark Metrics in Spark runner
> 
>
> Key: BEAM-1944
> URL: https://issues.apache.org/jira/browse/BEAM-1944
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-spark
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1942) Add Source Watermark Metrics in Flink Runner

2017-04-23 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1942:
--

Assignee: Jingsong Lee

> Add Source Watermark Metrics in Flink Runner
> 
>
> Key: BEAM-1942
> URL: https://issues.apache.org/jira/browse/BEAM-1942
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1941) Add Source Watermark Metrics in Runners

2017-04-23 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1941:
--

Assignee: Jingsong Lee

> Add Source Watermark Metrics in Runners
> ---
>
> Key: BEAM-1941
> URL: https://issues.apache.org/jira/browse/BEAM-1941
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-ideas
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>
> The source watermark metrics show the consumer latency of Source. 
> It allows the user to know the health of the job, or it can be used to 
> monitor and alarm.
> Since each runner is likely already tracking a watermark, another option here 
> is to just have the runner report it appropriately, rather than having the 
> source report it using metrics. This also addresses the fact that even if the 
> source has advanced to 8:00, the runner may still know about buffered 
> elements at 7:00, and so not advance the watermark all the way to 8:00. 
> [~bchambers]
> Includes:
> 1.Source watermark (`min` amongst all splits):
>type = Gauge, namespace = io, name = source_watermark
> 2.Source watermark per split:
>type = Gauge, namespace = io.splits, name = .source_watermark



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1886) Remove TextIO override in Flink runner

2017-04-13 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1886:
--

Assignee: Jingsong Lee

> Remove TextIO override in Flink runner
> --
>
> Key: BEAM-1886
> URL: https://issues.apache.org/jira/browse/BEAM-1886
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Jingsong Lee
> Fix For: First stable release
>
>
> Today, the Flink runner replaces TextIO with a customized version. I believe 
> this is related to adequate support for files HDFS.
> However, the capabilities are less, in particular the recent support for 
> window-and-pane sharded writes of unbounded collections.
> Concretely, we have had to remove WindowedWordCountIT from the precommit 
> Jenkins run.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1944) Add Source Watermark Metrics in Spark runner

2017-04-11 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-1944:
--

 Summary: Add Source Watermark Metrics in Spark runner
 Key: BEAM-1944
 URL: https://issues.apache.org/jira/browse/BEAM-1944
 Project: Beam
  Issue Type: Sub-task
  Components: runner-spark
Reporter: Jingsong Lee






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1945) Add Source Watermark Metrics in Apex runner

2017-04-11 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-1945:
--

 Summary: Add Source Watermark Metrics in Apex runner
 Key: BEAM-1945
 URL: https://issues.apache.org/jira/browse/BEAM-1945
 Project: Beam
  Issue Type: Sub-task
  Components: runner-apex
Reporter: Jingsong Lee






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1943) Add Source Watermark Metrics in Dataflow runner

2017-04-11 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-1943:
--

 Summary: Add Source Watermark Metrics in Dataflow runner
 Key: BEAM-1943
 URL: https://issues.apache.org/jira/browse/BEAM-1943
 Project: Beam
  Issue Type: Sub-task
  Components: runner-dataflow
Reporter: Jingsong Lee






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1942) Add Source Watermark Metrics in Flink Runner

2017-04-11 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-1942:
--

 Summary: Add Source Watermark Metrics in Flink Runner
 Key: BEAM-1942
 URL: https://issues.apache.org/jira/browse/BEAM-1942
 Project: Beam
  Issue Type: Sub-task
  Components: runner-flink
Reporter: Jingsong Lee






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1941) Add Source Watermark Metrics in Runners

2017-04-11 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-1941:
--

 Summary: Add Source Watermark Metrics in Runners
 Key: BEAM-1941
 URL: https://issues.apache.org/jira/browse/BEAM-1941
 Project: Beam
  Issue Type: New Feature
  Components: runner-ideas
Reporter: Jingsong Lee


The source watermark metrics show the consumer latency of Source. 
It allows the user to know the health of the job, or it can be used to monitor 
and alarm.
Since each runner is likely already tracking a watermark, another option here 
is to just have the runner report it appropriately, rather than having the 
source report it using metrics. This also addresses the fact that even if the 
source has advanced to 8:00, the runner may still know about buffered elements 
at 7:00, and so not advance the watermark all the way to 8:00. [~bchambers]
Includes:
1.Source watermark (`min` amongst all splits):
   type = Gauge, namespace = io, name = source_watermark
2.Source watermark per split:
   type = Gauge, namespace = io.splits, name = .source_watermark



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1723) FlinkRunner should deduplicate when an UnboundedSource requires Deduping

2017-04-09 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962125#comment-15962125
 ] 

Jingsong Lee commented on BEAM-1723:


I think it is necessary to be configurable because the deduplication window is 
related to the checkpoint interval.

> FlinkRunner should deduplicate when an UnboundedSource requires Deduping
> 
>
> Key: BEAM-1723
> URL: https://issues.apache.org/jira/browse/BEAM-1723
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Thomas Groh
>Assignee: Jingsong Lee
>
> UnboundedSource implementations can require deduping, and the FlinkRunner 
> currently logs a warning that this is not supported.
> https://github.com/apache/beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L139



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1723) FlinkRunner should deduplicate when an UnboundedSource requires Deduping

2017-04-09 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962077#comment-15962077
 ] 

Jingsong Lee commented on BEAM-1723:


I understand. The reason for the duplication is that {{PubSubIO}} use Pull-Ack 
model, {{acknowledge()}} in {{finalizeCheckpoint()}} may be fail, while Kafka 
use offset to restore.

> FlinkRunner should deduplicate when an UnboundedSource requires Deduping
> 
>
> Key: BEAM-1723
> URL: https://issues.apache.org/jira/browse/BEAM-1723
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Thomas Groh
>Assignee: Jingsong Lee
>
> UnboundedSource implementations can require deduping, and the FlinkRunner 
> currently logs a warning that this is not supported.
> https://github.com/apache/beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L139



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1723) FlinkRunner should deduplicate when an UnboundedSource requires Deduping

2017-04-06 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1723:
--

Assignee: Jingsong Lee

> FlinkRunner should deduplicate when an UnboundedSource requires Deduping
> 
>
> Key: BEAM-1723
> URL: https://issues.apache.org/jira/browse/BEAM-1723
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Thomas Groh
>Assignee: Jingsong Lee
>
> UnboundedSource implementations can require deduping, and the FlinkRunner 
> currently logs a warning that this is not supported.
> https://github.com/apache/beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L139



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1723) FlinkRunner should deduplicate when an UnboundedSource requires Deduping

2017-04-06 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959226#comment-15959226
 ] 

Jingsong Lee commented on BEAM-1723:


I see {{CachedIdDeduplicator}} in direct runner. It use {{LoadingCache}} to 
dedup. The expireAfterAccess is 10 minutes and the maximumSize is 100_000. Do 
these two values need to be parameterized?

Do these caches need be snapshotted in flink runner?  (Fault tolerance)

> FlinkRunner should deduplicate when an UnboundedSource requires Deduping
> 
>
> Key: BEAM-1723
> URL: https://issues.apache.org/jira/browse/BEAM-1723
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Thomas Groh
>
> UnboundedSource implementations can require deduping, and the FlinkRunner 
> currently logs a warning that this is not supported.
> https://github.com/apache/beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L139



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1612) Support real Bundle in Flink runner

2017-04-04 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955360#comment-15955360
 ] 

Jingsong Lee commented on BEAM-1612:


I think this is a good idea. No matter how small it is better than one bundle 
by one record.
Flink default buffer size is 32768 bytes and default BufferTimeout is 100ms. 
([~aljoscha] Please point out if there is something wrong)
According to my production experience, I think 2M is a more reasonable 
bundleSize. I have not changed the BufferSize in Flink. [~aljoscha] Can Flink 
work well with 2M BufferSize?

> Support real Bundle in Flink runner
> ---
>
> Key: BEAM-1612
> URL: https://issues.apache.org/jira/browse/BEAM-1612
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>
> The Bundle is very important in the beam model. Users can use the bundle to 
> flush buffer, can reuse many heavyweight resources in a bundle. Most IO 
> plugins use the bundle to flush. 
> Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, 
> such as first placed in JavaHeap, flush into RocksDbState when invoke 
> finishBundle , this can reduce the number of serialization.
> But now FlinkRunner calls the finishBundle every processElement. We need 
> support real Bundle.
> I think we can have the following implementations:
> 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But 
> sometimes this "Bundle" maybe too big. This depends on the user's checkpoint 
> configuration.
> 2.Manually control the size of the bundle. The half-bundle will be flushed to 
> a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not 
> need to wait, just call the startBundle and finishBundle at the right time.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1862) SplittableDoFnOperator should close the ScheduledExecutorService

2017-04-03 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-1862:
--

 Summary: SplittableDoFnOperator should close the 
ScheduledExecutorService
 Key: BEAM-1862
 URL: https://issues.apache.org/jira/browse/BEAM-1862
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Reporter: Jingsong Lee
Assignee: Jingsong Lee


{{SplittableDoFnOperator}} new a {{ScheduledExecutorService}} to 
{{OutputAndTimeBoundedSplittableProcessElementInvoker}}, but not shutdown it.
We should shutdown it in {{close()}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1772) Support merging WindowFn other than IntervalWindow on Flink Runner

2017-03-26 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942408#comment-15942408
 ] 

Jingsong Lee commented on BEAM-1772:


Consider the performance, I retained the combining with sorted data, and 
extended a combining based on HashMap State for Non-IntervalWindow merging.

> Support merging WindowFn other than IntervalWindow on Flink Runner
> --
>
> Key: BEAM-1772
> URL: https://issues.apache.org/jira/browse/BEAM-1772
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ismaël Mejía
>Assignee: Jingsong Lee
>
> Flink currently supports merging IntervalWindows, however if you have a 
> WindowFn who extends IntervalWindow the execution breaks.
> I found this while executing a Pipeline in Flink's batch mode.
> This will involve probably changing the window merging logic in 
> `FlinkMergingReduceFunction.mergeWindows()` and other similar parts to really 
> use the merging logic of the `WindowFn`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-773) Implement Metrics support for Flink runner

2017-03-24 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941542#comment-15941542
 ] 

Jingsong Lee commented on BEAM-773:
---

Best wishes for your vacation!  Do you think Metrics is necessary to be 
fault-tolerant?

> Implement Metrics support for Flink runner
> --
>
> Key: BEAM-773
> URL: https://issues.apache.org/jira/browse/BEAM-773
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Ben Chambers
>Assignee: Jingsong Lee
> Fix For: First stable release
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1772) Support merging WindowFn other than IntervalWindow on Flink Runner

2017-03-23 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938119#comment-15938119
 ] 

Jingsong Lee commented on BEAM-1772:


The problem is that 
{{FlinkMergingPartialReduceFunction}}/{{FlinkMergingReduceFunction}}/{{FlinkMergingNonShuffleReduceFunction}}
 merge window with SortedList to produce new WindowedValue of 
merged window. This behavior is different from {{WindowFn.mergeWindows 
(MergeContext)}}, Now we cast BoundedWindow to IntervalWindow. What the 
{{ReduceFnRunner}} would do is implemented here but without any regard for 
triggers.
I think we can use {{GroupAlsoByWindowViaOutputBufferDoFn}} instead. 
[~aljoscha] What do you think?

> Support merging WindowFn other than IntervalWindow on Flink Runner
> --
>
> Key: BEAM-1772
> URL: https://issues.apache.org/jira/browse/BEAM-1772
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ismaël Mejía
>Assignee: Aljoscha Krettek
>
> Flink currently supports merging IntervalWindows, however if you have a 
> WindowFn who extends IntervalWindow the execution breaks.
> I found this while executing a Pipeline in Flink's batch mode.
> This will involve probably changing the window merging logic in 
> `FlinkMergingReduceFunction.mergeWindows()` and other similar parts to really 
> use the merging logic of the `WindowFn`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1727) Add setForNowAlign(period, offset) to Timer

2017-03-20 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated BEAM-1727:
---
Fix Version/s: First stable release

> Add setForNowAlign(period, offset) to Timer
> ---
>
> Key: BEAM-1727
> URL: https://issues.apache.org/jira/browse/BEAM-1727
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
> Fix For: First stable release
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1727) Add setForNowAlign(period, offset) to Timer

2017-03-18 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1727:
--

Assignee: Jingsong Lee

> Add setForNowAlign(period, offset) to Timer
> ---
>
> Key: BEAM-1727
> URL: https://issues.apache.org/jira/browse/BEAM-1727
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1612) Support real Bundle in Flink runner

2017-03-18 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931210#comment-15931210
 ] 

Jingsong Lee commented on BEAM-1612:


[~aljoscha] Yes, this will greatly improve performance in some cases.

> Support real Bundle in Flink runner
> ---
>
> Key: BEAM-1612
> URL: https://issues.apache.org/jira/browse/BEAM-1612
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>
> The Bundle is very important in the beam model. Users can use the bundle to 
> flush buffer, can reuse many heavyweight resources in a bundle. Most IO 
> plugins use the bundle to flush. 
> Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, 
> such as first placed in JavaHeap, flush into RocksDbState when invoke 
> finishBundle , this can reduce the number of serialization.
> But now FlinkRunner calls the finishBundle every processElement. We need 
> support real Bundle.
> I think we can have the following implementations:
> 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But 
> sometimes this "Bundle" maybe too big. This depends on the user's checkpoint 
> configuration.
> 2.Manually control the size of the bundle. The half-bundle will be flushed to 
> a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not 
> need to wait, just call the startBundle and finishBundle at the right time.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1727) Add setForNowAlign(period, offset) to Timer

2017-03-17 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929606#comment-15929606
 ] 

Jingsong Lee commented on BEAM-1727:


Can we return true or false in verifyTargetTime to let the user know if the 
setting is successful instead of throwing an exception? 

> Add setForNowAlign(period, offset) to Timer
> ---
>
> Key: BEAM-1727
> URL: https://issues.apache.org/jira/browse/BEAM-1727
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Jingsong Lee
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1727) Add setForNowAlign(period, offset) to Timer

2017-03-15 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925942#comment-15925942
 ] 

Jingsong Lee commented on BEAM-1727:


What do you think? [~kenn]

> Add setForNowAlign(period, offset) to Timer
> ---
>
> Key: BEAM-1727
> URL: https://issues.apache.org/jira/browse/BEAM-1727
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Jingsong Lee
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1727) Add setForNowAlign(period, offset) to Timer

2017-03-15 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-1727:
--

 Summary: Add setForNowAlign(period, offset) to Timer
 Key: BEAM-1727
 URL: https://issues.apache.org/jira/browse/BEAM-1727
 Project: Beam
  Issue Type: Improvement
  Components: beam-model
Reporter: Jingsong Lee






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1612) Support real Bundle in Flink runner

2017-03-03 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15894699#comment-15894699
 ] 

Jingsong Lee commented on BEAM-1612:


https://issues.apache.org/jira/browse/FLINK-2846
Does this issue let us cannot emit data while snapshotting? We will lose some 
emitted data when job restarted. 

But we must invoke the finishBundle when snapshotting, otherwise we will lose 
some buffer data which not be flushed.

I think we can make a fake collector in OutputManager when snapshotting. And 
then save the data to {{FlinkSplitStateInternals}} or 
{{FlinkKeyGroupStateInternals}}, the next processElement then send them out, so 
that will not lose the data.
This may be a bit complicated, but it may work.
Just like Flink's AsyncFuntion, it stores the input data while we store the 
output data.





> Support real Bundle in Flink runner
> ---
>
> Key: BEAM-1612
> URL: https://issues.apache.org/jira/browse/BEAM-1612
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>
> The Bundle is very important in the beam model. Users can use the bundle to 
> flush buffer, can reuse many heavyweight resources in a bundle. Most IO 
> plugins use the bundle to flush. 
> Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, 
> such as first placed in JavaHeap, flush into RocksDbState when invoke 
> finishBundle , this can reduce the number of serialization.
> But now FlinkRunner calls the finishBundle every processElement. We need 
> support real Bundle.
> I think we can have the following implementations:
> 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But 
> sometimes this "Bundle" maybe too big. This depends on the user's checkpoint 
> configuration.
> 2.Manually control the size of the bundle. The half-bundle will be flushed to 
> a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not 
> need to wait, just call the startBundle and finishBundle at the right time.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1612) Support real Bundle in Flink runner

2017-03-02 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15893827#comment-15893827
 ] 

Jingsong Lee commented on BEAM-1612:


In a bundle, we can reuse ReduceFnRunner simply too. We do not have to worry 
about the OOM problem caused by caching.

> Support real Bundle in Flink runner
> ---
>
> Key: BEAM-1612
> URL: https://issues.apache.org/jira/browse/BEAM-1612
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>
> The Bundle is very important in the beam model. Users can use the bundle to 
> flush buffer, can reuse many heavyweight resources in a bundle. Most IO 
> plugins use the bundle to flush. 
> Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, 
> such as first placed in JavaHeap, flush into RocksDbState when invoke 
> finishBundle , this can reduce the number of serialization.
> But now FlinkRunner calls the finishBundle every processElement. We need 
> support real Bundle.
> I think we can have the following implementations:
> 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But 
> sometimes this "Bundle" maybe too big. This depends on the user's checkpoint 
> configuration.
> 2.Manually control the size of the bundle. The half-bundle will be flushed to 
> a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not 
> need to wait, just call the startBundle and finishBundle at the right time.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1612) Support real Bundle in Flink runner

2017-03-02 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15893814#comment-15893814
 ] 

Jingsong Lee commented on BEAM-1612:


What do you think [~aljoscha] ?

> Support real Bundle in Flink runner
> ---
>
> Key: BEAM-1612
> URL: https://issues.apache.org/jira/browse/BEAM-1612
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>
> The Bundle is very important in the beam model. Users can use the bundle to 
> flush buffer, can reuse many heavyweight resources in a bundle. Most IO 
> plugins use the bundle to flush. 
> Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, 
> such as first placed in JavaHeap, flush into RocksDbState when invoke 
> finishBundle , this can reduce the number of serialization.
> But now FlinkRunner calls the finishBundle every processElement. We need 
> support real Bundle.
> I think we can have the following implementations:
> 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But 
> sometimes this "Bundle" maybe too big. This depends on the user's checkpoint 
> configuration.
> 2.Manually control the size of the bundle. The half-bundle will be flushed to 
> a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not 
> need to wait, just call the startBundle and finishBundle at the right time.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1612) Support real Bundle in Flink runner

2017-03-02 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-1612:
--

 Summary: Support real Bundle in Flink runner
 Key: BEAM-1612
 URL: https://issues.apache.org/jira/browse/BEAM-1612
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Jingsong Lee
Assignee: Jingsong Lee


The Bundle is very important in the beam model. Users can use the bundle to 
flush buffer, can reuse many heavyweight resources in a bundle. Most IO plugins 
use the bundle to flush. 

Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, 
such as first placed in JavaHeap, flush into RocksDbState when invoke 
finishBundle , this can reduce the number of serialization.

But now FlinkRunner calls the finishBundle every processElement. We need 
support real Bundle.

I think we can have the following implementations:

1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But 
sometimes this "Bundle" maybe too big. This depends on the user's checkpoint 
configuration.

2.Manually control the size of the bundle. The half-bundle will be flushed to a 
full-bundle by count or eventTime or processTime or {{snapshot}}. We do not 
need to wait, just call the startBundle and finishBundle at the right time.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1476) Support MapState in Flink runner

2017-03-01 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1476:
--

Assignee: Jingsong Lee

> Support MapState in Flink runner
> 
>
> Key: BEAM-1476
> URL: https://issues.apache.org/jira/browse/BEAM-1476
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Jingsong Lee
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1483) Support SetState in Flink runner

2017-03-01 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1483:
--

Assignee: Jingsong Lee

> Support SetState in Flink runner
> 
>
> Key: BEAM-1483
> URL: https://issues.apache.org/jira/browse/BEAM-1483
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Jingsong Lee
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1588) Reuse StateNamespace.stringKey in Flink States

2017-03-01 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-1588:
--

 Summary: Reuse StateNamespace.stringKey in Flink States
 Key: BEAM-1588
 URL: https://issues.apache.org/jira/browse/BEAM-1588
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Jingsong Lee
Assignee: Jingsong Lee


See BEAM-1587
StateNamespace.stringKey did two things: the base64 encoding of window , and 
then String.format. These two things consumption is not small. We can cache it 
in State and reuse.
Further more, we can cache the state.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1587) Use StringBuilder to stringKey of StateNamespace instead of String.format

2017-03-01 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1587:
--

Assignee: Jingsong Lee  (was: Kenneth Knowles)

> Use StringBuilder to stringKey of StateNamespace instead of String.format
> -
>
> Key: BEAM-1587
> URL: https://issues.apache.org/jira/browse/BEAM-1587
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>
> In Flink Runner, each State visit will call the namespace stringKey once. 
> Since stringKey uses String.format to deal with, the impact on performance is 
> relatively large.
> Some extreme cases, stringKey performance consumption of up to 2%.
> Here is a test on StringBuilder and String.format:
> {code}
>   public static void main(String[] args) throws Exception {
> String[] strs = new String[1000_000];
> for (int i = 0; i < strs.length; i++) {
>   strs[i] = getRandomString(10);
> }
> {
>   long start = System.nanoTime();
>   for (int i = 0; i < strs.length; i++) {
> strs[i] = testFormat(strs[i]);
>   }
>   System.out.println("testStringFormat: " + ((System.nanoTime() - 
> start)/1000_000) + "ms");
> }
> {
>   long start = System.nanoTime();
>   for (int i = 0; i < strs.length; i++) {
> strs[i] = testStringBuild(strs[i]);
>   }
>   System.out.println("testStringBuilder: " + ((System.nanoTime() - 
> start)/1000_000) + "ms");
> }
>   }
> {code}
> testStringFormat: 2312ms
> testStringBuilder: 266ms



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1587) Use StringBuilder to stringKey of StateNamespace instead of String.format

2017-03-01 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated BEAM-1587:
---
Description: 
In Flink Runner, each State visit will call the namespace stringKey once. Since 
stringKey uses String.format to deal with, the impact on performance is 
relatively large.
Some extreme cases, stringKey performance consumption of up to 2%.
Here is a test on StringBuilder and String.format:
{code}
  public static void main(String[] args) throws Exception {
String[] strs = new String[1000_000];
for (int i = 0; i < strs.length; i++) {
  strs[i] = getRandomString(10);
}
{
  long start = System.nanoTime();
  for (int i = 0; i < strs.length; i++) {
strs[i] = testFormat(strs[i]);
  }
  System.out.println("testStringFormat: " + ((System.nanoTime() - 
start)/1000_000) + "ms");
}
{
  long start = System.nanoTime();
  for (int i = 0; i < strs.length; i++) {
strs[i] = testStringBuild(strs[i]);
  }
  System.out.println("testStringBuilder: " + ((System.nanoTime() - 
start)/1000_000) + "ms");
}
  }
{code}
testStringFormat: 2312ms
testStringBuilder: 266ms

> Use StringBuilder to stringKey of StateNamespace instead of String.format
> -
>
> Key: BEAM-1587
> URL: https://issues.apache.org/jira/browse/BEAM-1587
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core
>Reporter: Jingsong Lee
>Assignee: Kenneth Knowles
>
> In Flink Runner, each State visit will call the namespace stringKey once. 
> Since stringKey uses String.format to deal with, the impact on performance is 
> relatively large.
> Some extreme cases, stringKey performance consumption of up to 2%.
> Here is a test on StringBuilder and String.format:
> {code}
>   public static void main(String[] args) throws Exception {
> String[] strs = new String[1000_000];
> for (int i = 0; i < strs.length; i++) {
>   strs[i] = getRandomString(10);
> }
> {
>   long start = System.nanoTime();
>   for (int i = 0; i < strs.length; i++) {
> strs[i] = testFormat(strs[i]);
>   }
>   System.out.println("testStringFormat: " + ((System.nanoTime() - 
> start)/1000_000) + "ms");
> }
> {
>   long start = System.nanoTime();
>   for (int i = 0; i < strs.length; i++) {
> strs[i] = testStringBuild(strs[i]);
>   }
>   System.out.println("testStringBuilder: " + ((System.nanoTime() - 
> start)/1000_000) + "ms");
> }
>   }
> {code}
> testStringFormat: 2312ms
> testStringBuilder: 266ms



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1587) Use StringBuilder to stringKey of StateNamespace instead of String.format

2017-03-01 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-1587:
--

 Summary: Use StringBuilder to stringKey of StateNamespace instead 
of String.format
 Key: BEAM-1587
 URL: https://issues.apache.org/jira/browse/BEAM-1587
 Project: Beam
  Issue Type: Improvement
  Components: runner-core
Reporter: Jingsong Lee
Assignee: Kenneth Knowles






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1116) Support for new Timer API in Flink runner

2017-02-28 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887771#comment-15887771
 ] 

Jingsong Lee commented on BEAM-1116:


OK, I will finish it quickly.

> Support for new Timer API in Flink runner
> -
>
> Key: BEAM-1116
> URL: https://issues.apache.org/jira/browse/BEAM-1116
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Jingsong Lee
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1517) Garbage collect user state in Flink Runner

2017-02-28 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887550#comment-15887550
 ] 

Jingsong Lee commented on BEAM-1517:


[~kenn] Is it like {{LateDataDroppingDoFnRunner}} droping late data in 
{{processElement()}}? But how to deal with late timer?(not EVENT_TIME) Just 
throw it in {{onTimer()}} like element?


> Garbage collect user state in Flink Runner
> --
>
> Key: BEAM-1517
> URL: https://issues.apache.org/jira/browse/BEAM-1517
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> User facing state/timers in Beam are bound to the key/window of the data. 
> Right now, the Flink Runner does not clean up user state when the watermark 
> passes the GC horizon for the state associated with a given window.
> Neither {{StateInternals}} nor the Flink state API support discarding state 
> for a whole namespace (which is the window in this case) so we might have to 
> manually set a GC timer for each window/key combination, as is done in the 
> {{ReduceFnRunner}}. For this we have to know all states a user can possibly 
> use, which we can get from the {{DoFn}} signature.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1036) Support for new State API in FlinkRunner

2017-02-27 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1036:
--

Assignee: Jingsong Lee  (was: Aljoscha Krettek)

> Support for new State API in FlinkRunner
> 
>
> Key: BEAM-1036
> URL: https://issues.apache.org/jira/browse/BEAM-1036
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Jingsong Lee
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1560) Use SimpleDoFnRunner to invoke and use new CombineFnRunner in batch mode of Flink runner

2017-02-27 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885493#comment-15885493
 ] 

Jingsong Lee commented on BEAM-1560:


[~aljoscha] Because it is independent of streaming mode, so I did not reopen 
BEAM-843

> Use SimpleDoFnRunner to invoke and use new CombineFnRunner in batch mode of 
> Flink runner
> 
>
> Key: BEAM-1560
> URL: https://issues.apache.org/jira/browse/BEAM-1560
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
> Fix For: 0.6.0
>
>
> To support new StateApi and TimerApi in Flink runner (batch)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1483) Support SetState in Flink runner

2017-02-23 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882056#comment-15882056
 ] 

Jingsong Lee commented on BEAM-1483:


We can implement it by Flink MapState in Flink 1.3, like HashSet implemented by 
HashMap.

> Support SetState in Flink runner
> 
>
> Key: BEAM-1483
> URL: https://issues.apache.org/jira/browse/BEAM-1483
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1476) Support MapState in Flink runner

2017-02-23 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882055#comment-15882055
 ] 

Jingsong Lee commented on BEAM-1476:


We can implement it by Flink MapState in Flink 1.3. 
https://issues.apache.org/jira/browse/FLINK-4856

> Support MapState in Flink runner
> 
>
> Key: BEAM-1476
> URL: https://issues.apache.org/jira/browse/BEAM-1476
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1498) Use Flink-native side outputs

2017-02-21 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15876026#comment-15876026
 ] 

Jingsong Lee commented on BEAM-1498:


Great~ That will simplify the implementation of 
{{ParDoBoundMultiStreamingTranslator}}.

> Use Flink-native side outputs
> -
>
> Key: BEAM-1498
> URL: https://issues.apache.org/jira/browse/BEAM-1498
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Jingsong Lee
>
> Once Flink has support for side outputs we should use them instead of 
> manually dealing with the {{RawUnionValues}}.
> Side outputs for Flink is being tracked in 
> https://issues.apache.org/jira/browse/FLINK-4460.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1498) Use Flink-native side outputs

2017-02-21 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1498:
--

Assignee: Jingsong Lee

> Use Flink-native side outputs
> -
>
> Key: BEAM-1498
> URL: https://issues.apache.org/jira/browse/BEAM-1498
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Jingsong Lee
>
> Once Flink has support for side outputs we should use them instead of 
> manually dealing with the {{RawUnionValues}}.
> Side outputs for Flink is being tracked in 
> https://issues.apache.org/jira/browse/FLINK-4460.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1517) Garbage collect user state in Flink Runner

2017-02-21 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15876005#comment-15876005
 ] 

Jingsong Lee commented on BEAM-1517:


Is it appropriate for the user to do the work of GC?
Just like this:
{code}
  @ProcessElement
  public void process(
  ProcessContext c,
  BoundedWindow window,
  @StateId(stateId) ValueState state,
  @TimerId("GcTimer") Timer timer) {
Instant maxTimestamp = window.maxTimestamp();
long allowedLateness = 10 * 1000;
Instant gcTime = maxTimestamp.plus(allowedLateness);
//Can Timer have a getCurrentTime interface?
Instant currentTime = new Instant();
if (gcTime.isBefore(currentTime)) {
  c.sideOutput(lateDataTag, c.element());
} else {
  timer.set(gcTime);
  // user logical
  // 
}
  }
  @OnTimer("GcTimer")
  public void gc(
  OnTimerContext context,
  @StateId(stateId) ValueState state) {
state.clear();
  }
{code}


> Garbage collect user state in Flink Runner
> --
>
> Key: BEAM-1517
> URL: https://issues.apache.org/jira/browse/BEAM-1517
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> User facing state/timers in Beam are bound to the key/window of the data. 
> Right now, the Flink Runner does not clean up user state when the watermark 
> passes the GC horizon for the state associated with a given window.
> Neither {{StateInternals}} nor the Flink state API support discarding state 
> for a whole namespace (which is the window in this case) so we might have to 
> manually set a GC timer for each window/key combination, as is done in the 
> {{ReduceFnRunner}}. For this we have to know all states a user can possibly 
> use, which we can get from the {{DoFn}} signature.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1116) Support for new Timer API in Flink runner

2017-02-16 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15871001#comment-15871001
 ] 

Jingsong Lee commented on BEAM-1116:


[~kenn] I found that {{onTimer()}} requires the {{BoundedWindow}} parameter. 
How to get {{BoundedWindow}} from the {{StateNamespace}} of {{TimerData}}?
{code}
private BoundedWindow getWindowFromNamespace(StateNamespace namespace) {
if (namespace instanceof WindowNamespace) {
  return ((WindowNamespace) namespace).getWindow();
} else if (namespace instanceof GlobalNamespace) {
  return GlobalWindow.INSTANCE;
} else if (namespace instanceof WindowAndTriggerNamespace) {
  return ((WindowAndTriggerNamespace) namespace).getWindow();
} else {
  throw new RuntimeException("Unknown StateNamespace type: "
  + namespace.getClass());
}
  }
{code}
Is that right? Why does StateNamespace not provide {{getWindow()}} method?

> Support for new Timer API in Flink runner
> -
>
> Key: BEAM-1116
> URL: https://issues.apache.org/jira/browse/BEAM-1116
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Jingsong Lee
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (BEAM-1393) Update Flink Runner to Flink 1.2.0

2017-02-11 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15862325#comment-15862325
 ] 

Jingsong Lee edited comment on BEAM-1393 at 2/11/17 9:08 AM:
-

BTW, I think SPLIT_DISTRIBUTE state maybe need a new Repartitioner, not only 
round-robin. Let each element in ListState have the opportunity to select a 
KeyGroupIndex. {{CheckpointCoordinator}} use the KeyGroupIndex to redistribute 
state. 
I understood it. {{CheckpointCoordinator}} is run in JobManager. That is too 
heavy to read every elements.
Maybe Flink can abstract KeyGroup state, provide split(snapshot to several 
KeyGroups) and merge(restore by several KeyGroups) methods to manage state with 
KeyGroups.


was (Author: lzljs3620320):
BTW, I think SPLIT_DISTRIBUTE state maybe need a new Repartitioner, not only 
round-robin. Let each element in ListState have the opportunity to select a 
KeyGroupIndex. {{CheckpointCoordinator}} use the KeyGroupIndex to redistribute 
state. 

> Update Flink Runner to Flink 1.2.0
> --
>
> Key: BEAM-1393
> URL: https://issues.apache.org/jira/browse/BEAM-1393
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Jingsong Lee
>
> When we update to 1.2.0 we can use the new internal Timer API that is 
> available to Flink operators: {{InternalTimerService}} and also use broadcast 
> state to store side-input data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1393) Update Flink Runner to Flink 1.2.0

2017-02-11 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15862325#comment-15862325
 ] 

Jingsong Lee commented on BEAM-1393:


BTW, I think SPLIT_DISTRIBUTE state maybe need a new Repartitioner, not only 
round-robin. Let each element in ListState have the opportunity to select a 
KeyGroupIndex. {{CheckpointCoordinator}} use the KeyGroupIndex to redistribute 
state. 

> Update Flink Runner to Flink 1.2.0
> --
>
> Key: BEAM-1393
> URL: https://issues.apache.org/jira/browse/BEAM-1393
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Jingsong Lee
>
> When we update to 1.2.0 we can use the new internal Timer API that is 
> available to Flink operators: {{InternalTimerService}} and also use broadcast 
> state to store side-input data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1393) Update Flink Runner to Flink 1.2.0

2017-02-10 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15862290#comment-15862290
 ] 

Jingsong Lee commented on BEAM-1393:


Totally agree!
{{AbstractStreamOperator}} will check the type of {{this}} and invoke 
{{checkpointKeyGroup}} in {{snapshotState()}}. ({{initializeState}} is similar) 
Looking forward to contributing back to Flink.
I think we do not need store {{pushedBackWatermark}} in state anymore. We can 
maintain it in memory and restore it by traversing pushed-back events.


> Update Flink Runner to Flink 1.2.0
> --
>
> Key: BEAM-1393
> URL: https://issues.apache.org/jira/browse/BEAM-1393
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Jingsong Lee
>
> When we update to 1.2.0 we can use the new internal Timer API that is 
> available to Flink operators: {{InternalTimerService}} and also use broadcast 
> state to store side-input data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1393) Update Flink Runner to Flink 1.2.0

2017-02-10 Thread Jingsong Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15860992#comment-15860992
 ] 

Jingsong Lee commented on BEAM-1393:


Good point! The processing of pushed-back events is indeed a trouble. For 
non-keyed operators, we store the elements in SPLIT_DISTRIBUTE state, this is 
no problem. But for keyed operators, we can't find the prepared events when a 
new side-input element come if we use {{KeyedStateBackend}}. We need to find 
all the pushed-back events that have the side-input window. Just like the 
processing of timer.
Maybe we need override {{AbstractStreamOperator.snapshotState}} to store 
pushed-back events by KeyGroups way with snapshot TimerService. I see that only 
one {{startNewKeyGroup}} can be called, so we have to override the TimerService 
snapshot instead of calling super.

> Update Flink Runner to Flink 1.2.0
> --
>
> Key: BEAM-1393
> URL: https://issues.apache.org/jira/browse/BEAM-1393
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Jingsong Lee
>
> When we update to 1.2.0 we can use the new internal Timer API that is 
> available to Flink operators: {{InternalTimerService}} and also use broadcast 
> state to store side-input data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1456) Make UnboundedSourceWrapper snapshot to rescalable operator state in Flink Runner

2017-02-09 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-1456:
--

 Summary: Make UnboundedSourceWrapper snapshot to rescalable 
operator state in Flink Runner
 Key: BEAM-1456
 URL: https://issues.apache.org/jira/browse/BEAM-1456
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Jingsong Lee
Assignee: Jingsong Lee


By using the SPLIT_DISTRIBUTE OperatorState in flink to snapshot source 
checkpoints we make UnboundedSourceWrapper operators rescalable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1445) Use Flink broadcast state to store side-input data

2017-02-09 Thread Jingsong Lee (JIRA)
Jingsong Lee created BEAM-1445:
--

 Summary: Use Flink broadcast state to store side-input data
 Key: BEAM-1445
 URL: https://issues.apache.org/jira/browse/BEAM-1445
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Jingsong Lee
Assignee: Jingsong Lee


By using the broadcast state to store side-input data we make operators 
rescalable. What BROADCAST does is collect all checkpointed states into one 
"list" and then send out that list to all parallel subtasks when restoring.The 
way we would use it is to only checkpoint anything from the operator with 
subtask index 0 because we assume that the state is the same on all parallel 
instances of the operator.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


  1   2   >