[jira] [Commented] (FLINK-9174) The type of state created in ProccessWindowFunction.proccess() is inconsistency

2018-04-24 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451673#comment-16451673
 ] 

Sihua Zhou commented on FLINK-9174:
---

Hi [~aljoscha], could you please have a look at this? Do you think this should 
be an issue?

> The type of state created in ProccessWindowFunction.proccess() is 
> inconsistency
> ---
>
> Key: FLINK-9174
> URL: https://issues.apache.org/jira/browse/FLINK-9174
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
> Fix For: 1.5.0
>
>
> The type of state created from windowState and globalState in 
> {{ProcessWindowFunction.process()}} is inconsistency. For detail,
> {code}
> context.windowState().getListState(); // return type is HeapListState or 
> RocksDBListState
> context.globalState().getListState(); // return type is UserFacingListState
> {code}
> This cause the problem in the following code,
> {code}
> Iterable iterableState = listState.get();
>  if (terableState.iterator().hasNext()) {
>for (T value : iterableState) {
>  value.setRetracting(true);
>  collector.collect(value);
>}
>state.clear();
> }
> {code}
> If the {{listState}} is created from {{context.globalState()}} is fine, but 
> when it created from {{context.windowState()}} this will cause NPE. I met 
> this in 1.3.2 but I found it also affect 1.5.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8497) KafkaConsumer throws NPE if topic doesn't exist

2018-04-24 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451495#comment-16451495
 ] 

vinoyang commented on FLINK-8497:
-

Hi [~alexey.lesnov] , are you on processing about this issue?

> KafkaConsumer throws NPE if topic doesn't exist
> ---
>
> Key: FLINK-8497
> URL: https://issues.apache.org/jira/browse/FLINK-8497
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: chris snow
>Priority: Minor
>
> If I accidentally set the kafka consumer with a topic that doesn't exist:
> {code:java}
> FlinkKafkaConsumer011 kafkaConsumer = new FlinkKafkaConsumer011(
>"does_not_exist",
> new JSONKeyValueDeserializationSchema(false),
> properties
> );
> DataStream input = env.addSource(kafkaConsumer);{code}
> Flink throws NPE
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:75)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:128)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:415)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748){code}
> Maybe Flink could through an IllegalStateException("Topic not found")?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5904: [FLINK-9249][build] Add convenience profile for sk...

2018-04-24 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5904#discussion_r183922637
  
--- Diff: pom.xml ---
@@ -636,6 +636,63 @@ under the License.
 

 
+   
+   fast
+   
+   
+   fast
+   
+   
+   
+   
+   
+   
+   
org.apache.rat
+   
apache-rat-plugin
+   
+   
true
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-checkstyle-plugin
+   
+   
true
+   
--- End diff --

That sounds nice.


---


[jira] [Commented] (FLINK-9249) Add convenience profile for skipping non-essential plugins

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451488#comment-16451488
 ] 

ASF GitHub Bot commented on FLINK-9249:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5904#discussion_r183922637
  
--- Diff: pom.xml ---
@@ -636,6 +636,63 @@ under the License.
 

 
+   
+   fast
+   
+   
+   fast
+   
+   
+   
+   
+   
+   
+   
org.apache.rat
+   
apache-rat-plugin
+   
+   
true
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-checkstyle-plugin
+   
+   
true
+   
--- End diff --

That sounds nice.


> Add convenience profile for skipping non-essential plugins
> --
>
> Key: FLINK-9249
> URL: https://issues.apache.org/jira/browse/FLINK-9249
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.0
>
>
> When compiling Flink devs can already set a variety of command line options 
> to speed up the process, for example skipping checkstyle. We also do the same 
> thing on travis.
> However, not only is it difficult to keep track of all possible options, it 
> is also tedious to write and obfuscates the actual command.
> I propose adding a {{fast}} profile that skips non-essential plugins, 
> including:
> * rat
> * checkstyle
> * scalastyle
> * enforcer
> * japicmp
> * javadoc



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-7588) Document RocksDB tuning for spinning disks

2018-04-24 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258309#comment-16258309
 ] 

Ted Yu edited comment on FLINK-7588 at 4/25/18 12:54 AM:
-

bq. Be careful about whether you have enough memory to keep all bloom filters

Other than the above being tricky, the other guidelines are actionable.


was (Author: yuzhih...@gmail.com):
bq. Be careful about whether you have enough memory to keep all bloom filters

Other than the above being tricky, the other guidelines are actionable .

> Document RocksDB tuning for spinning disks
> --
>
> Key: FLINK-7588
> URL: https://issues.apache.org/jira/browse/FLINK-7588
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Ted Yu
>Priority: Major
>  Labels: performance
>
> In docs/ops/state/large_state_tuning.md , it was mentioned that:
> bq. the default configuration is tailored towards SSDs and performs 
> suboptimal on spinning disks
> We should add recommendation targeting spinning disks:
> https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#difference-of-spinning-disk



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451228#comment-16451228
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183880939
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,15 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+val isDistinctAggs = distinctAggs.map(_.nonEmpty)
--- End diff --

Yes. You are right. Sorry I missed that. Just updated :-)


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-24 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183880939
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,15 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+val isDistinctAggs = distinctAggs.map(_.nonEmpty)
--- End diff --

Yes. You are right. Sorry I missed that. Just updated :-)


---


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450465#comment-16450465
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183867019
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  COLLECT(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,{1=1}",
+  "2,2,2,{2=1}",
+  "2,3,1,{1=1, 2=1}",
+  "3,2,2,{2=1}",
+  "3,2,2,{2=1}",
+  "3,5,2,{2=1, 3=1}",
+  "4,2,2,{2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "5,1,1,{1=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,6,1,{1=1, 2=1, 3=1}",
+  "5,5,2,{2=1, 3=1}")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  COUNT(e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,1",
+  "2,1,2,2",
+  "2,2,3,1",
+  "3,1,2,2",
+  "3,2,2,2",
+  "3,3,5,2",
+  "4,1,2,2",
+  "4,2,3,1",
+  "4,3,3,1",
+  "4,4,3,1",
+  "5,1,1,1",
+  "5,2,4,1",
+  "5,3,4,1",
+  "5,4,6,1",
+  "5,5,6,1")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
--- End diff --

Great, thanks!


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183867019
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  COLLECT(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,{1=1}",
+  "2,2,2,{2=1}",
+  "2,3,1,{1=1, 2=1}",
+  "3,2,2,{2=1}",
+  "3,2,2,{2=1}",
+  "3,5,2,{2=1, 3=1}",
+  "4,2,2,{2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "5,1,1,{1=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,6,1,{1=1, 2=1, 3=1}",
+  "5,5,2,{2=1, 3=1}")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  COUNT(e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,1",
+  "2,1,2,2",
+  "2,2,3,1",
+  "3,1,2,2",
+  "3,2,2,2",
+  "3,3,5,2",
+  "4,1,2,2",
+  "4,2,3,1",
+  "4,3,3,1",
+  "4,4,3,1",
+  "5,1,1,1",
+  "5,2,4,1",
+  "5,3,4,1",
+  "5,4,6,1",
+  "5,5,6,1")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
--- End diff --

Great, thanks!


---


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450464#comment-16450464
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183866776
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,15 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+val isDistinctAggs = distinctAggs.map(_.nonEmpty)
--- End diff --

I think we don't need the type info of the accumulator to create the 
`MapViewTypeInfo`, but the type info of the input type of the aggregation 
function. The input types can be resolved from `physicalInputTypes` and 
`aggFields`. 


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183866776
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,15 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+val isDistinctAggs = distinctAggs.map(_.nonEmpty)
--- End diff --

I think we don't need the type info of the accumulator to create the 
`MapViewTypeInfo`, but the type info of the input type of the aggregation 
function. The input types can be resolved from `physicalInputTypes` and 
`aggFields`. 


---


[jira] [Comment Edited] (FLINK-8037) Missing cast in integer arithmetic in TransactionalIdsGenerator#generateIdsToAbort

2018-04-24 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16427664#comment-16427664
 ] 

Ted Yu edited comment on FLINK-8037 at 4/24/18 7:06 PM:


Please rebase PR .


was (Author: yuzhih...@gmail.com):
Please rebase PR.

> Missing cast in integer arithmetic in 
> TransactionalIdsGenerator#generateIdsToAbort
> --
>
> Key: FLINK-8037
> URL: https://issues.apache.org/jira/browse/FLINK-8037
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: kafka-connect
>
> {code}
>   public Set generateIdsToAbort() {
> Set idsToAbort = new HashSet<>();
> for (int i = 0; i < safeScaleDownFactor; i++) {
>   idsToAbort.addAll(generateIdsToUse(i * poolSize * 
> totalNumberOfSubtasks));
> {code}
> The operands are integers where generateIdsToUse() expects long parameter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9048) LocalFlinkMiniClusterITCase#testLocalFlinkMiniClusterWithMultipleTaskManagers sometimes fails

2018-04-24 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-9048:
--
Description: 
As of commit e0bc37bef69f5376d03214578e9b95816add661b, I got the following :
{code}
testLocalFlinkMiniClusterWithMultipleTaskManagers(org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase)
  Time elapsed: 41.681 sec  <<< FAILURE!
java.lang.AssertionError: Thread 
Thread[ForkJoinPool.commonPool-worker-25,5,main] was started by the mini 
cluster, but not shut down
  at org.junit.Assert.fail(Assert.java:88)
  at 
org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase.testLocalFlinkMiniClusterWithMultipleTaskManagers(LocalFlinkMiniClusterITCase.java:174)
{code}

  was:
As of commit e0bc37bef69f5376d03214578e9b95816add661b, I got the following :

{code}
testLocalFlinkMiniClusterWithMultipleTaskManagers(org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase)
  Time elapsed: 41.681 sec  <<< FAILURE!
java.lang.AssertionError: Thread 
Thread[ForkJoinPool.commonPool-worker-25,5,main] was started by the mini 
cluster, but not shut down
  at org.junit.Assert.fail(Assert.java:88)
  at 
org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase.testLocalFlinkMiniClusterWithMultipleTaskManagers(LocalFlinkMiniClusterITCase.java:174)
{code}


> LocalFlinkMiniClusterITCase#testLocalFlinkMiniClusterWithMultipleTaskManagers 
> sometimes fails
> -
>
> Key: FLINK-9048
> URL: https://issues.apache.org/jira/browse/FLINK-9048
> Project: Flink
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Minor
>
> As of commit e0bc37bef69f5376d03214578e9b95816add661b, I got the following :
> {code}
> testLocalFlinkMiniClusterWithMultipleTaskManagers(org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase)
>   Time elapsed: 41.681 sec  <<< FAILURE!
> java.lang.AssertionError: Thread 
> Thread[ForkJoinPool.commonPool-worker-25,5,main] was started by the mini 
> cluster, but not shut down
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase.testLocalFlinkMiniClusterWithMultipleTaskManagers(LocalFlinkMiniClusterITCase.java:174)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9249) Add convenience profile for skipping non-essential plugins

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450378#comment-16450378
 ] 

ASF GitHub Bot commented on FLINK-9249:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5904#discussion_r183841885
  
--- Diff: pom.xml ---
@@ -636,6 +636,63 @@ under the License.
 

 
+   
+   fast
+   
+   
+   fast
+   
+   
+   
+   
+   
+   
+   
org.apache.rat
+   
apache-rat-plugin
+   
+   
true
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-checkstyle-plugin
+   
+   
true
+   
--- End diff --

got nothing to do `.travis.yml`. The `travis_mvn_watchdog.sh` script uses 
differet settings for each build, one of which builds all of Flink with all 
plugins enabled, (so called "misc.")


> Add convenience profile for skipping non-essential plugins
> --
>
> Key: FLINK-9249
> URL: https://issues.apache.org/jira/browse/FLINK-9249
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.0
>
>
> When compiling Flink devs can already set a variety of command line options 
> to speed up the process, for example skipping checkstyle. We also do the same 
> thing on travis.
> However, not only is it difficult to keep track of all possible options, it 
> is also tedious to write and obfuscates the actual command.
> I propose adding a {{fast}} profile that skips non-essential plugins, 
> including:
> * rat
> * checkstyle
> * scalastyle
> * enforcer
> * japicmp
> * javadoc



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5904: [FLINK-9249][build] Add convenience profile for sk...

2018-04-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5904#discussion_r183841885
  
--- Diff: pom.xml ---
@@ -636,6 +636,63 @@ under the License.
 

 
+   
+   fast
+   
+   
+   fast
+   
+   
+   
+   
+   
+   
+   
org.apache.rat
+   
apache-rat-plugin
+   
+   
true
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-checkstyle-plugin
+   
+   
true
+   
--- End diff --

got nothing to do `.travis.yml`. The `travis_mvn_watchdog.sh` script uses 
differet settings for each build, one of which builds all of Flink with all 
plugins enabled, (so called "misc.")


---


[jira] [Comment Edited] (FLINK-9219) Add support for OpenGIS features in Table & SQL API

2018-04-24 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450266#comment-16450266
 ] 

Xingcan Cui edited comment on FLINK-9219 at 4/24/18 5:43 PM:
-

[~twalthr], it's an ESRI type wrapped by Calcite. This is the first time I 
touch the {{TypeInformation}} part, but I guess it may not be easy to implement 
the type/serializer given such a complicated data structure, right?


was (Author: xccui):
[~twalthr], it's an ESRI type wrapped by Calcite.

> Add support for OpenGIS features in Table & SQL API
> ---
>
> Key: FLINK-9219
> URL: https://issues.apache.org/jira/browse/FLINK-9219
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> CALCITE-1968 added core functionality for handling 
> spatial/geographical/geometry data. It should not be too hard to expose these 
> features also in Flink's Table & SQL API. We would need a new {{GEOMETRY}} 
> data type and connect the function APIs.
> Right now the following functions are supported by Calcite:
> {code}
> ST_AsText, ST_AsWKT, ST_Boundary, ST_Buffer, ST_Contains, 
> ST_ContainsProperly, ST_Crosses, ST_Disjoint, ST_Distance, ST_DWithin, 
> ST_Envelope, ST_EnvelopesIntersect, ST_Equals, ST_GeometryType, 
> ST_GeometryTypeCode, ST_GeomFromText, ST_Intersects, ST_Is3D, 
> ST_LineFromText, ST_MakeLine, ST_MakePoint, ST_MLineFromText, 
> ST_MPointFromText, ST_MPolyFromText, ST_Overlaps, ST_Point, ST_PointFromText, 
> ST_PolyFromText, ST_SetSRID, ST_Touches, ST_Transform, ST_Union, ST_Within, 
> ST_Z
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9219) Add support for OpenGIS features in Table & SQL API

2018-04-24 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450266#comment-16450266
 ] 

Xingcan Cui commented on FLINK-9219:


[~twalthr], it's an ESRI type wrapped by Calcite.

> Add support for OpenGIS features in Table & SQL API
> ---
>
> Key: FLINK-9219
> URL: https://issues.apache.org/jira/browse/FLINK-9219
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> CALCITE-1968 added core functionality for handling 
> spatial/geographical/geometry data. It should not be too hard to expose these 
> features also in Flink's Table & SQL API. We would need a new {{GEOMETRY}} 
> data type and connect the function APIs.
> Right now the following functions are supported by Calcite:
> {code}
> ST_AsText, ST_AsWKT, ST_Boundary, ST_Buffer, ST_Contains, 
> ST_ContainsProperly, ST_Crosses, ST_Disjoint, ST_Distance, ST_DWithin, 
> ST_Envelope, ST_EnvelopesIntersect, ST_Equals, ST_GeometryType, 
> ST_GeometryTypeCode, ST_GeomFromText, ST_Intersects, ST_Is3D, 
> ST_LineFromText, ST_MakeLine, ST_MakePoint, ST_MLineFromText, 
> ST_MPointFromText, ST_MPolyFromText, ST_Overlaps, ST_Point, ST_PointFromText, 
> ST_PolyFromText, ST_SetSRID, ST_Touches, ST_Transform, ST_Union, ST_Within, 
> ST_Z
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9115) Support addition of part suffix in BucketingSink

2018-04-24 Thread Lakshmi Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lakshmi Rao resolved FLINK-9115.

Resolution: Duplicate

> Support addition of part suffix in BucketingSink
> 
>
> Key: FLINK-9115
> URL: https://issues.apache.org/jira/browse/FLINK-9115
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: Lakshmi Rao
>Priority: Minor
>  Labels: usability
>
> Currently the BucketingSink allows addition of part prefix, pending 
> prefix/suffix and in-progress prefix/suffix via setter methods. Can we also 
> support setting part suffixes?
> An instance where this maybe useful: I am currently writing GZIP compressed 
> output to S3 using the BucketingSink and I would want the uploaded files to 
> have a ".gz" or ".zip" extensions . An easy way to do this would be by 
> setting  a part file suffix with the required file extension. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9251) Move MemoryStateBackend to flink-state-backends

2018-04-24 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9251:
--
Fix Version/s: 1.6.0

> Move MemoryStateBackend to flink-state-backends
> ---
>
> Key: FLINK-9251
> URL: https://issues.apache.org/jira/browse/FLINK-9251
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Since RocksDBBackend has been moved to flink-state-backends, we should also 
> move MemoryStateBackends to it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9251) Move MemoryStateBackend to flink-state-backends

2018-04-24 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9251:
--
Affects Version/s: 1.6.0
   1.5.0

> Move MemoryStateBackend to flink-state-backends
> ---
>
> Key: FLINK-9251
> URL: https://issues.apache.org/jira/browse/FLINK-9251
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Since RocksDBBackend has been moved to flink-state-backends, we should also 
> move MemoryStateBackends to it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9251) Move MemoryStateBackend to flink-state-backends

2018-04-24 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9251:
-

Assignee: Sihua Zhou

> Move MemoryStateBackend to flink-state-backends
> ---
>
> Key: FLINK-9251
> URL: https://issues.apache.org/jira/browse/FLINK-9251
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Since RocksDBBackend has been moved to flink-state-backends, we should also 
> move MemoryStateBackends to it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9251) Move MemoryStateBackend to flink-state-backends

2018-04-24 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9251:
-

 Summary: Move MemoryStateBackend to flink-state-backends
 Key: FLINK-9251
 URL: https://issues.apache.org/jira/browse/FLINK-9251
 Project: Flink
  Issue Type: Improvement
Reporter: Sihua Zhou


Since RocksDBBackend has been moved to flink-state-backends, we should also 
move MemoryStateBackends to it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9249) Add convenience profile for skipping non-essential plugins

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450135#comment-16450135
 ] 

ASF GitHub Bot commented on FLINK-9249:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5904#discussion_r183790505
  
--- Diff: pom.xml ---
@@ -636,6 +636,63 @@ under the License.
 

 
+   
+   fast
+   
+   
+   fast
+   
+   
+   
+   
+   
+   
+   
org.apache.rat
+   
apache-rat-plugin
+   
+   
true
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-checkstyle-plugin
+   
+   
true
+   
--- End diff --

That's good. I think this works because of ```.travis.yml``` enable it at 
least. Correct ?


> Add convenience profile for skipping non-essential plugins
> --
>
> Key: FLINK-9249
> URL: https://issues.apache.org/jira/browse/FLINK-9249
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.0
>
>
> When compiling Flink devs can already set a variety of command line options 
> to speed up the process, for example skipping checkstyle. We also do the same 
> thing on travis.
> However, not only is it difficult to keep track of all possible options, it 
> is also tedious to write and obfuscates the actual command.
> I propose adding a {{fast}} profile that skips non-essential plugins, 
> including:
> * rat
> * checkstyle
> * scalastyle
> * enforcer
> * japicmp
> * javadoc



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5904: [FLINK-9249][build] Add convenience profile for sk...

2018-04-24 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5904#discussion_r183790505
  
--- Diff: pom.xml ---
@@ -636,6 +636,63 @@ under the License.
 

 
+   
+   fast
+   
+   
+   fast
+   
+   
+   
+   
+   
+   
+   
org.apache.rat
+   
apache-rat-plugin
+   
+   
true
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-checkstyle-plugin
+   
+   
true
+   
--- End diff --

That's good. I think this works because of ```.travis.yml``` enable it at 
least. Correct ?


---


[jira] [Commented] (FLINK-9249) Add convenience profile for skipping non-essential plugins

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450124#comment-16450124
 ] 

ASF GitHub Bot commented on FLINK-9249:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5904#discussion_r183787514
  
--- Diff: pom.xml ---
@@ -636,6 +636,63 @@ under the License.
 

 
+   
+   fast
+   
+   
+   fast
+   
+   
+   
+   
+   
+   
+   
org.apache.rat
+   
apache-rat-plugin
+   
+   
true
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-checkstyle-plugin
+   
+   
true
+   
--- End diff --

we still verify everything at least once on travis.


> Add convenience profile for skipping non-essential plugins
> --
>
> Key: FLINK-9249
> URL: https://issues.apache.org/jira/browse/FLINK-9249
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.0
>
>
> When compiling Flink devs can already set a variety of command line options 
> to speed up the process, for example skipping checkstyle. We also do the same 
> thing on travis.
> However, not only is it difficult to keep track of all possible options, it 
> is also tedious to write and obfuscates the actual command.
> I propose adding a {{fast}} profile that skips non-essential plugins, 
> including:
> * rat
> * checkstyle
> * scalastyle
> * enforcer
> * japicmp
> * javadoc



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5904: [FLINK-9249][build] Add convenience profile for sk...

2018-04-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5904#discussion_r183787514
  
--- Diff: pom.xml ---
@@ -636,6 +636,63 @@ under the License.
 

 
+   
+   fast
+   
+   
+   fast
+   
+   
+   
+   
+   
+   
+   
org.apache.rat
+   
apache-rat-plugin
+   
+   
true
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-checkstyle-plugin
+   
+   
true
+   
--- End diff --

we still verify everything at least once on travis.


---


[jira] [Updated] (FLINK-9231) Enable SO_REUSEADDR on listen sockets

2018-04-24 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-9231:

Description: This allows sockets to be bound even if there are sockets from 
a previous application that are still pending closure.  (was: This allows 
sockets to be bound even if there are sockets
from a previous application that are still pending closure.)

> Enable SO_REUSEADDR on listen sockets
> -
>
> Key: FLINK-9231
> URL: https://issues.apache.org/jira/browse/FLINK-9231
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Triones Deng
>Priority: Major
>
> This allows sockets to be bound even if there are sockets from a previous 
> application that are still pending closure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9231) Enable SO_REUSEADDR on listen sockets

2018-04-24 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450111#comment-16450111
 ] 

mingleizhang edited comment on FLINK-9231 at 4/24/18 3:51 PM:
--

Hi, [~triones] I think you can push a PR first. Would not suggest to push a 
bunch of code here.  PR can show your idea in a easier way, if the PR not 
suitable for the issue, then people would give you some suggestions. then, 
After a few iterations with the PR. Patch then available to merge. Good luck ~


was (Author: mingleizhang):
Hi, [~triones] I think you can push a PR first. Would not suggest to push a 
bunch of code here. 

> Enable SO_REUSEADDR on listen sockets
> -
>
> Key: FLINK-9231
> URL: https://issues.apache.org/jira/browse/FLINK-9231
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Triones Deng
>Priority: Major
>
> This allows sockets to be bound even if there are sockets
> from a previous application that are still pending closure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9231) Enable SO_REUSEADDR on listen sockets

2018-04-24 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450111#comment-16450111
 ] 

mingleizhang commented on FLINK-9231:
-

Hi, [~triones] I think you can push a PR first. Would not suggest to push a 
bunch of code here. 

> Enable SO_REUSEADDR on listen sockets
> -
>
> Key: FLINK-9231
> URL: https://issues.apache.org/jira/browse/FLINK-9231
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Triones Deng
>Priority: Major
>
> This allows sockets to be bound even if there are sockets
> from a previous application that are still pending closure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9219) Add support for OpenGIS features in Table & SQL API

2018-04-24 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450109#comment-16450109
 ] 

Timo Walther commented on FLINK-9219:
-

[~xccui] is this type a Calcite or ESRI type?

> Add support for OpenGIS features in Table & SQL API
> ---
>
> Key: FLINK-9219
> URL: https://issues.apache.org/jira/browse/FLINK-9219
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> CALCITE-1968 added core functionality for handling 
> spatial/geographical/geometry data. It should not be too hard to expose these 
> features also in Flink's Table & SQL API. We would need a new {{GEOMETRY}} 
> data type and connect the function APIs.
> Right now the following functions are supported by Calcite:
> {code}
> ST_AsText, ST_AsWKT, ST_Boundary, ST_Buffer, ST_Contains, 
> ST_ContainsProperly, ST_Crosses, ST_Disjoint, ST_Distance, ST_DWithin, 
> ST_Envelope, ST_EnvelopesIntersect, ST_Equals, ST_GeometryType, 
> ST_GeometryTypeCode, ST_GeomFromText, ST_Intersects, ST_Is3D, 
> ST_LineFromText, ST_MakeLine, ST_MakePoint, ST_MLineFromText, 
> ST_MPointFromText, ST_MPolyFromText, ST_Overlaps, ST_Point, ST_PointFromText, 
> ST_PolyFromText, ST_SetSRID, ST_Touches, ST_Transform, ST_Union, ST_Within, 
> ST_Z
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-04-24 Thread Narayanan Arunachalam (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450075#comment-16450075
 ] 

Narayanan Arunachalam commented on FLINK-9138:
--

I will try the changes. Upgrading is going to take a while for us, so I will 
have to try using custom sink or see if we can backport it.

> Enhance BucketingSink to also flush data by time interval
> -
>
> Key: FLINK-9138
> URL: https://issues.apache.org/jira/browse/FLINK-9138
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> BucketingSink now supports flushing data to the file system by size limit and 
> by period of inactivity. It will be useful to also flush data by a specified 
> time period. This way, the data will be written out when write throughput is 
> low but there is no significant time period gaps between the writes. This 
> reduces ETA for the data in the file system and should help move the 
> checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9115) Support addition of part suffix in BucketingSink

2018-04-24 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450069#comment-16450069
 ] 

Fabian Hueske commented on FLINK-9115:
--

Thanks for helping to keep the JIRA clean [~glaksh100]! :-)
+1 to close as duplicate

> Support addition of part suffix in BucketingSink
> 
>
> Key: FLINK-9115
> URL: https://issues.apache.org/jira/browse/FLINK-9115
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: Lakshmi Rao
>Priority: Minor
>  Labels: usability
>
> Currently the BucketingSink allows addition of part prefix, pending 
> prefix/suffix and in-progress prefix/suffix via setter methods. Can we also 
> support setting part suffixes?
> An instance where this maybe useful: I am currently writing GZIP compressed 
> output to S3 using the BucketingSink and I would want the uploaded files to 
> have a ".gz" or ".zip" extensions . An easy way to do this would be by 
> setting  a part file suffix with the required file extension. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9115) Support addition of part suffix in BucketingSink

2018-04-24 Thread Andrei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450056#comment-16450056
 ] 

Andrei commented on FLINK-9115:
---

OK, no problems

> Support addition of part suffix in BucketingSink
> 
>
> Key: FLINK-9115
> URL: https://issues.apache.org/jira/browse/FLINK-9115
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: Lakshmi Rao
>Priority: Minor
>  Labels: usability
>
> Currently the BucketingSink allows addition of part prefix, pending 
> prefix/suffix and in-progress prefix/suffix via setter methods. Can we also 
> support setting part suffixes?
> An instance where this maybe useful: I am currently writing GZIP compressed 
> output to S3 using the BucketingSink and I would want the uploaded files to 
> have a ".gz" or ".zip" extensions . An easy way to do this would be by 
> setting  a part file suffix with the required file extension. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9231) Enable SO_REUSEADDR on listen sockets

2018-04-24 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448142#comment-16448142
 ] 

Ted Yu edited comment on FLINK-9231 at 4/24/18 3:07 PM:


Plan for WebFrontendBootstrap sounds good to me.


was (Author: yuzhih...@gmail.com):
Sounds good to me.

> Enable SO_REUSEADDR on listen sockets
> -
>
> Key: FLINK-9231
> URL: https://issues.apache.org/jira/browse/FLINK-9231
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Triones Deng
>Priority: Major
>
> This allows sockets to be bound even if there are sockets
> from a previous application that are still pending closure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9115) Support addition of part suffix in BucketingSink

2018-04-24 Thread Lakshmi Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450027#comment-16450027
 ] 

Lakshmi Rao commented on FLINK-9115:


Hi [~Poluliakh], thanks for offering to work on this. As I was looking into 
another change in the BucketingSink, I realized this feature was introduced 
recently in this PR - [https://github.com/apache/flink/pull/5603] . I will mark 
this ticket as a dupe and close it out. Does that sound good?

> Support addition of part suffix in BucketingSink
> 
>
> Key: FLINK-9115
> URL: https://issues.apache.org/jira/browse/FLINK-9115
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: Lakshmi Rao
>Priority: Minor
>  Labels: usability
>
> Currently the BucketingSink allows addition of part prefix, pending 
> prefix/suffix and in-progress prefix/suffix via setter methods. Can we also 
> support setting part suffixes?
> An instance where this maybe useful: I am currently writing GZIP compressed 
> output to S3 using the BucketingSink and I would want the uploaded files to 
> have a ".gz" or ".zip" extensions . An easy way to do this would be by 
> setting  a part file suffix with the required file extension. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9179) Deduplicate WebOptions.PORT and RestOptions.REST_PORT

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450021#comment-16450021
 ] 

ASF GitHub Bot commented on FLINK-9179:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5856
  
Thanks for @zentol having an awesome review! Will have a update soon.


> Deduplicate WebOptions.PORT and RestOptions.REST_PORT
> -
>
> Key: FLINK-9179
> URL: https://issues.apache.org/jira/browse/FLINK-9179
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration, REST, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>Priority: Blocker
> Fix For: 1.5.0
>
>
> In the past {{WebOptions.PORT}} was used to configure the port on which the 
> WebUI listens on. With the rework of the REST API we added a new 
> configuration key {{RestOptions.REST_PORT}} to specify on which port the REST 
> API listens on.
> Effectively these 2 options control the same thing, with the rest option 
> being broader and also applicable to components with a REST API but no WebUI.
> I suggest to deprecate WebOptions.PORT, and add a deprecated key to 
> {{RestOptions.REST_PORT}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5856: [FLINK-9179] [conf] Fix deduplicate WebOptions.PORT and R...

2018-04-24 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5856
  
Thanks for @zentol having an awesome review! Will have a update soon.


---


[jira] [Commented] (FLINK-9219) Add support for OpenGIS features in Table & SQL API

2018-04-24 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450019#comment-16450019
 ] 

Xingcan Cui commented on FLINK-9219:


That's fine. Thanks for your comment, [~fhueske].

> Add support for OpenGIS features in Table & SQL API
> ---
>
> Key: FLINK-9219
> URL: https://issues.apache.org/jira/browse/FLINK-9219
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> CALCITE-1968 added core functionality for handling 
> spatial/geographical/geometry data. It should not be too hard to expose these 
> features also in Flink's Table & SQL API. We would need a new {{GEOMETRY}} 
> data type and connect the function APIs.
> Right now the following functions are supported by Calcite:
> {code}
> ST_AsText, ST_AsWKT, ST_Boundary, ST_Buffer, ST_Contains, 
> ST_ContainsProperly, ST_Crosses, ST_Disjoint, ST_Distance, ST_DWithin, 
> ST_Envelope, ST_EnvelopesIntersect, ST_Equals, ST_GeometryType, 
> ST_GeometryTypeCode, ST_GeomFromText, ST_Intersects, ST_Is3D, 
> ST_LineFromText, ST_MakeLine, ST_MakePoint, ST_MLineFromText, 
> ST_MPointFromText, ST_MPolyFromText, ST_Overlaps, ST_Point, ST_PointFromText, 
> ST_PolyFromText, ST_SetSRID, ST_Touches, ST_Transform, ST_Union, ST_Within, 
> ST_Z
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9219) Add support for OpenGIS features in Table & SQL API

2018-04-24 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450012#comment-16450012
 ] 

Fabian Hueske commented on FLINK-9219:
--

I think we should implement a new built-in type including type and serializer. 
These would be added to flink-table, because we don't want to add the 
dependency to the geo library to flink-core or flink-java.

> Add support for OpenGIS features in Table & SQL API
> ---
>
> Key: FLINK-9219
> URL: https://issues.apache.org/jira/browse/FLINK-9219
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> CALCITE-1968 added core functionality for handling 
> spatial/geographical/geometry data. It should not be too hard to expose these 
> features also in Flink's Table & SQL API. We would need a new {{GEOMETRY}} 
> data type and connect the function APIs.
> Right now the following functions are supported by Calcite:
> {code}
> ST_AsText, ST_AsWKT, ST_Boundary, ST_Buffer, ST_Contains, 
> ST_ContainsProperly, ST_Crosses, ST_Disjoint, ST_Distance, ST_DWithin, 
> ST_Envelope, ST_EnvelopesIntersect, ST_Equals, ST_GeometryType, 
> ST_GeometryTypeCode, ST_GeomFromText, ST_Intersects, ST_Is3D, 
> ST_LineFromText, ST_MakeLine, ST_MakePoint, ST_MLineFromText, 
> ST_MPointFromText, ST_MPolyFromText, ST_Overlaps, ST_Point, ST_PointFromText, 
> ST_PolyFromText, ST_SetSRID, ST_Touches, ST_Transform, ST_Union, ST_Within, 
> ST_Z
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-04-24 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450003#comment-16450003
 ] 

Fabian Hueske commented on FLINK-9138:
--

[~narayaruna], we'd probably add this feature for 1.6.0 as we are not adding 
new features in bugfix releases and 1.5.0 is in feature freeze mode already.
However, might be use the 1.6 version as a custom source in your 1.4 setup.

I had a look at the PR and it looks good. 
[~narayaruna] Could you check if the change also addresses your requirements?

Thanks, Fabian

> Enhance BucketingSink to also flush data by time interval
> -
>
> Key: FLINK-9138
> URL: https://issues.apache.org/jira/browse/FLINK-9138
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> BucketingSink now supports flushing data to the file system by size limit and 
> by period of inactivity. It will be useful to also flush data by a specified 
> time period. This way, the data will be written out when write throughput is 
> low but there is no significant time period gaps between the writes. This 
> reduces ETA for the data in the file system and should help move the 
> checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449984#comment-16449984
 ] 

ASF GitHub Bot commented on FLINK-9138:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r183753866
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -87,9 +87,11 @@
  * and a rolling counter. For example the file {@code "part-1-17"} 
contains the data from
  * {@code subtask 1} of the sink and is the {@code 17th} bucket created by 
that subtask. Per default
  * the part prefix is {@code "part"} but this can be configured using 
{@link #setPartPrefix(String)}.
- * When a part file becomes bigger than the user-specified batch size the 
current part file is closed,
- * the part counter is increased and a new part file is created. The batch 
size defaults to {@code 384MB},
- * this can be configured using {@link #setBatchSize(long)}.
+ * When a part file becomes bigger than the user-specified batch size or 
when the part file becomes older
+ * than the user-specified roll over interval the current part file is 
closed,the part counter is increased
--- End diff --

Add space `closed,the` -> `closed, the`


> Enhance BucketingSink to also flush data by time interval
> -
>
> Key: FLINK-9138
> URL: https://issues.apache.org/jira/browse/FLINK-9138
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> BucketingSink now supports flushing data to the file system by size limit and 
> by period of inactivity. It will be useful to also flush data by a specified 
> time period. This way, the data will be written out when write throughput is 
> low but there is no significant time period gaps between the writes. This 
> reduces ETA for the data in the file system and should help move the 
> checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449986#comment-16449986
 ] 

ASF GitHub Bot commented on FLINK-9138:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r183754129
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -87,9 +87,11 @@
  * and a rolling counter. For example the file {@code "part-1-17"} 
contains the data from
  * {@code subtask 1} of the sink and is the {@code 17th} bucket created by 
that subtask. Per default
  * the part prefix is {@code "part"} but this can be configured using 
{@link #setPartPrefix(String)}.
- * When a part file becomes bigger than the user-specified batch size the 
current part file is closed,
- * the part counter is increased and a new part file is created. The batch 
size defaults to {@code 384MB},
- * this can be configured using {@link #setBatchSize(long)}.
+ * When a part file becomes bigger than the user-specified batch size or 
when the part file becomes older
+ * than the user-specified roll over interval the current part file is 
closed,the part counter is increased
+ * and a new part file is created. The batch size defaults to {@code 
384MB},this can be configured
--- End diff --

Add space `{@code 384MB},this` -> `{@code 384MB}, this`


> Enhance BucketingSink to also flush data by time interval
> -
>
> Key: FLINK-9138
> URL: https://issues.apache.org/jira/browse/FLINK-9138
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> BucketingSink now supports flushing data to the file system by size limit and 
> by period of inactivity. It will be useful to also flush data by a specified 
> time period. This way, the data will be written out when write throughput is 
> low but there is no significant time period gaps between the writes. This 
> reduces ETA for the data in the file system and should help move the 
> checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449985#comment-16449985
 ] 

ASF GitHub Bot commented on FLINK-9138:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r183754480
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -472,6 +480,15 @@ private boolean shouldRoll(BucketState bucketState) 
throws IOException {
subtaskIndex,
writePosition,
batchSize);
+   } else {
--- End diff --

update method comment.


> Enhance BucketingSink to also flush data by time interval
> -
>
> Key: FLINK-9138
> URL: https://issues.apache.org/jira/browse/FLINK-9138
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> BucketingSink now supports flushing data to the file system by size limit and 
> by period of inactivity. It will be useful to also flush data by a specified 
> time period. This way, the data will be written out when write throughput is 
> low but there is no significant time period gaps between the writes. This 
> reduces ETA for the data in the file system and should help move the 
> checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449987#comment-16449987
 ] 

ASF GitHub Bot commented on FLINK-9138:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r183758338
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 ---
@@ -436,6 +463,40 @@ public void testScalingUp() throws Exception {
checkFs(outDir, 0, 3, 5, 5);
}
 
+   @Test
+   public void testRolloverInterval() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSinkWithRollover(outDir, 1, 0, 1000L, 100L);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L));
+   checkFs(outDir, 1, 0,  0, 0);
--- End diff --

rm double blank


> Enhance BucketingSink to also flush data by time interval
> -
>
> Key: FLINK-9138
> URL: https://issues.apache.org/jira/browse/FLINK-9138
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> BucketingSink now supports flushing data to the file system by size limit and 
> by period of inactivity. It will be useful to also flush data by a specified 
> time period. This way, the data will be written out when write throughput is 
> low but there is no significant time period gaps between the writes. This 
> reduces ETA for the data in the file system and should help move the 
> checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449990#comment-16449990
 ] 

ASF GitHub Bot commented on FLINK-9138:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r183757920
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -536,6 +553,9 @@ private void openNewPartFile(Path bucketPath, 
BucketState bucketState) throws
partPath = new Path(bucketPath, partPrefix + "-" + 
subtaskIndex + "-" + bucketState.partCounter);
}
 
+   // Record the creation time of the bucket
+   bucketState.firstWrittenToTime = 
processingTimeService.getCurrentProcessingTime();
--- End diff --

I think the current behavior is also more consistent than starting the 
timeout when the first record is written. 


> Enhance BucketingSink to also flush data by time interval
> -
>
> Key: FLINK-9138
> URL: https://issues.apache.org/jira/browse/FLINK-9138
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> BucketingSink now supports flushing data to the file system by size limit and 
> by period of inactivity. It will be useful to also flush data by a specified 
> time period. This way, the data will be written out when write throughput is 
> low but there is no significant time period gaps between the writes. This 
> reduces ETA for the data in the file system and should help move the 
> checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449989#comment-16449989
 ] 

ASF GitHub Bot commented on FLINK-9138:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r183758663
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 ---
@@ -436,6 +463,40 @@ public void testScalingUp() throws Exception {
checkFs(outDir, 0, 3, 5, 5);
}
 
+   @Test
+   public void testRolloverInterval() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSinkWithRollover(outDir, 1, 0, 1000L, 100L);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L));
+   checkFs(outDir, 1, 0,  0, 0);
--- End diff --

check other `checkFs()` calls as well


> Enhance BucketingSink to also flush data by time interval
> -
>
> Key: FLINK-9138
> URL: https://issues.apache.org/jira/browse/FLINK-9138
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> BucketingSink now supports flushing data to the file system by size limit and 
> by period of inactivity. It will be useful to also flush data by a specified 
> time period. This way, the data will be written out when write throughput is 
> low but there is no significant time period gaps between the writes. This 
> reduces ETA for the data in the file system and should help move the 
> checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449988#comment-16449988
 ] 

ASF GitHub Bot commented on FLINK-9138:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r183756583
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -536,6 +553,9 @@ private void openNewPartFile(Path bucketPath, 
BucketState bucketState) throws
partPath = new Path(bucketPath, partPrefix + "-" + 
subtaskIndex + "-" + bucketState.partCounter);
}
 
+   // Record the creation time of the bucket
+   bucketState.firstWrittenToTime = 
processingTimeService.getCurrentProcessingTime();
--- End diff --

rename parameter to `bucket.creationTime`? At this point, nothing has been 
written to the file. Actually, the file has not even been created yet.


> Enhance BucketingSink to also flush data by time interval
> -
>
> Key: FLINK-9138
> URL: https://issues.apache.org/jira/browse/FLINK-9138
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> BucketingSink now supports flushing data to the file system by size limit and 
> by period of inactivity. It will be useful to also flush data by a specified 
> time period. This way, the data will be written out when write throughput is 
> low but there is no significant time period gaps between the writes. This 
> reduces ETA for the data in the file system and should help move the 
> checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

2018-04-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r183754129
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -87,9 +87,11 @@
  * and a rolling counter. For example the file {@code "part-1-17"} 
contains the data from
  * {@code subtask 1} of the sink and is the {@code 17th} bucket created by 
that subtask. Per default
  * the part prefix is {@code "part"} but this can be configured using 
{@link #setPartPrefix(String)}.
- * When a part file becomes bigger than the user-specified batch size the 
current part file is closed,
- * the part counter is increased and a new part file is created. The batch 
size defaults to {@code 384MB},
- * this can be configured using {@link #setBatchSize(long)}.
+ * When a part file becomes bigger than the user-specified batch size or 
when the part file becomes older
+ * than the user-specified roll over interval the current part file is 
closed,the part counter is increased
+ * and a new part file is created. The batch size defaults to {@code 
384MB},this can be configured
--- End diff --

Add space `{@code 384MB},this` -> `{@code 384MB}, this`


---


[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

2018-04-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r183756583
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -536,6 +553,9 @@ private void openNewPartFile(Path bucketPath, 
BucketState bucketState) throws
partPath = new Path(bucketPath, partPrefix + "-" + 
subtaskIndex + "-" + bucketState.partCounter);
}
 
+   // Record the creation time of the bucket
+   bucketState.firstWrittenToTime = 
processingTimeService.getCurrentProcessingTime();
--- End diff --

rename parameter to `bucket.creationTime`? At this point, nothing has been 
written to the file. Actually, the file has not even been created yet.


---


[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

2018-04-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r183757920
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -536,6 +553,9 @@ private void openNewPartFile(Path bucketPath, 
BucketState bucketState) throws
partPath = new Path(bucketPath, partPrefix + "-" + 
subtaskIndex + "-" + bucketState.partCounter);
}
 
+   // Record the creation time of the bucket
+   bucketState.firstWrittenToTime = 
processingTimeService.getCurrentProcessingTime();
--- End diff --

I think the current behavior is also more consistent than starting the 
timeout when the first record is written. 


---


[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

2018-04-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r183754480
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -472,6 +480,15 @@ private boolean shouldRoll(BucketState bucketState) 
throws IOException {
subtaskIndex,
writePosition,
batchSize);
+   } else {
--- End diff --

update method comment.


---


[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

2018-04-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r183753866
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -87,9 +87,11 @@
  * and a rolling counter. For example the file {@code "part-1-17"} 
contains the data from
  * {@code subtask 1} of the sink and is the {@code 17th} bucket created by 
that subtask. Per default
  * the part prefix is {@code "part"} but this can be configured using 
{@link #setPartPrefix(String)}.
- * When a part file becomes bigger than the user-specified batch size the 
current part file is closed,
- * the part counter is increased and a new part file is created. The batch 
size defaults to {@code 384MB},
- * this can be configured using {@link #setBatchSize(long)}.
+ * When a part file becomes bigger than the user-specified batch size or 
when the part file becomes older
+ * than the user-specified roll over interval the current part file is 
closed,the part counter is increased
--- End diff --

Add space `closed,the` -> `closed, the`


---


[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

2018-04-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r183758663
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 ---
@@ -436,6 +463,40 @@ public void testScalingUp() throws Exception {
checkFs(outDir, 0, 3, 5, 5);
}
 
+   @Test
+   public void testRolloverInterval() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSinkWithRollover(outDir, 1, 0, 1000L, 100L);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L));
+   checkFs(outDir, 1, 0,  0, 0);
--- End diff --

check other `checkFs()` calls as well


---


[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

2018-04-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r183758338
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 ---
@@ -436,6 +463,40 @@ public void testScalingUp() throws Exception {
checkFs(outDir, 0, 3, 5, 5);
}
 
+   @Test
+   public void testRolloverInterval() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSinkWithRollover(outDir, 1, 0, 1000L, 100L);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L));
+   checkFs(outDir, 1, 0,  0, 0);
--- End diff --

rm double blank


---


[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449972#comment-16449972
 ] 

ASF GitHub Bot commented on FLINK-9008:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r183756061
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test-class/ElasticsearchStreamingJob.java 
---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.quickstart;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch example for Flink Streaming Job.
+ *
+ * In this streaming job, we generate a bunch of data from numbers, 
apply two operators map
+ * and filter to that data. Then we choose elasticsearch as its sink to 
storage these data.
+ *
+ * Run test_quickstarts.sh to verify this program. Package this class 
to a jar, verify the jar,
+ * then deploy it on a flink cluster.
+ */
+public class ElasticsearchStreamingJob {
+
+   public static void main(String[] args) throws Exception {
+   // set up the streaming execution environment
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream source = env.generateSequence(0, 20)
+   // map the data from 1,2,3... to the form of message 
#1, message #2, message #3...
+   .map(new MapFunction() {
--- End diff --

Will change 


> End-to-end test: Quickstarts
> 
>
> Key: FLINK-9008
> URL: https://issues.apache.org/jira/browse/FLINK-9008
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Critical
> Fix For: 1.5.0
>
>
> We could add an end-to-end test which verifies Flink's quickstarts. It should 
> do the following:
> # create a new Flink project using the quickstarts archetype 
> # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library) 
> # run {{mvn clean package -Pbuild-jar}}
> # verify that no core dependencies are contained in the jar file
> # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-04-24 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r183756061
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test-class/ElasticsearchStreamingJob.java 
---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.quickstart;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch example for Flink Streaming Job.
+ *
+ * In this streaming job, we generate a bunch of data from numbers, 
apply two operators map
+ * and filter to that data. Then we choose elasticsearch as its sink to 
storage these data.
+ *
+ * Run test_quickstarts.sh to verify this program. Package this class 
to a jar, verify the jar,
+ * then deploy it on a flink cluster.
+ */
+public class ElasticsearchStreamingJob {
+
+   public static void main(String[] args) throws Exception {
+   // set up the streaming execution environment
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream source = env.generateSequence(0, 20)
+   // map the data from 1,2,3... to the form of message 
#1, message #2, message #3...
+   .map(new MapFunction() {
--- End diff --

Will change 


---


[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-04-24 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r183753921
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test-class/ElasticsearchStreamingJob.java 
---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.quickstart;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch example for Flink Streaming Job.
+ *
+ * In this streaming job, we generate a bunch of data from numbers, 
apply two operators map
+ * and filter to that data. Then we choose elasticsearch as its sink to 
storage these data.
+ *
+ * Run test_quickstarts.sh to verify this program. Package this class 
to a jar, verify the jar,
+ * then deploy it on a flink cluster.
+ */
+public class ElasticsearchStreamingJob {
+
+   public static void main(String[] args) throws Exception {
+   // set up the streaming execution environment
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream source = env.generateSequence(0, 20)
+   // map the data from 1,2,3... to the form of message 
#1, message #2, message #3...
+   .map(new MapFunction() {
+   @Override
+   public String map(Long value) throws Exception {
+   return "message #" + value;
+   }})
+   // filter out the data that contains message #11 and 
message #17
+   .filter(new FilterFunction() {
+   @Override
+   public boolean filter(String value) throws 
Exception {
+   return !value.equals("message #11") && 
!value.equals("message #17");
+   }
+   });
+
+   Map userConfig = new HashMap<>();
+   userConfig.put("cluster.name", "elasticsearch");
+   // This instructs the sink to emit after every element, 
otherwise they would be buffered
--- End diff --

The code just force write the data to Elasticsearch once there is one data 
was generated. The default number of actions to flush a bulk is 1000. If we do 
not set it to 1 for flush, then we can not see the data to sink.



---


[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449966#comment-16449966
 ] 

ASF GitHub Bot commented on FLINK-9008:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r183753921
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test-class/ElasticsearchStreamingJob.java 
---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.quickstart;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch example for Flink Streaming Job.
+ *
+ * In this streaming job, we generate a bunch of data from numbers, 
apply two operators map
+ * and filter to that data. Then we choose elasticsearch as its sink to 
storage these data.
+ *
+ * Run test_quickstarts.sh to verify this program. Package this class 
to a jar, verify the jar,
+ * then deploy it on a flink cluster.
+ */
+public class ElasticsearchStreamingJob {
+
+   public static void main(String[] args) throws Exception {
+   // set up the streaming execution environment
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream source = env.generateSequence(0, 20)
+   // map the data from 1,2,3... to the form of message 
#1, message #2, message #3...
+   .map(new MapFunction() {
+   @Override
+   public String map(Long value) throws Exception {
+   return "message #" + value;
+   }})
+   // filter out the data that contains message #11 and 
message #17
+   .filter(new FilterFunction() {
+   @Override
+   public boolean filter(String value) throws 
Exception {
+   return !value.equals("message #11") && 
!value.equals("message #17");
+   }
+   });
+
+   Map userConfig = new HashMap<>();
+   userConfig.put("cluster.name", "elasticsearch");
+   // This instructs the sink to emit after every element, 
otherwise they would be buffered
--- End diff --

The code just force write the data to Elasticsearch once there is one data 
was generated. The default number of actions to flush a bulk is 1000. If we do 
not set it to 1 for flush, then we can not see the data to sink.



> End-to-end test: Quickstarts
> 
>
> Key: FLINK-9008
> URL: https://issues.apache.org/jira/browse/FLINK-9008
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Critical
> Fix For: 1.5.0
>
>
> We could add an end-to-end test which verifies Flink's quickstarts. It should 
> do the following:
> # create a new Flink project using the quickstarts archetype 
> # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 

[jira] [Commented] (FLINK-8686) Improve basic embedded SQL client

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449957#comment-16449957
 ] 

ASF GitHub Bot commented on FLINK-8686:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5867#discussion_r183721315
  
--- Diff: flink-libraries/flink-sql-client/bin/sql-client.sh ---
@@ -44,20 +44,32 @@ bin=`dirname "$target"`
 . "$bin"/config.sh
 
 if [ "$FLINK_IDENT_STRING" = "" ]; then
-FLINK_IDENT_STRING="$USER"
+FLINK_IDENT_STRING="$USER"
 fi
 
 CC_CLASSPATH=`constructFlinkClassPath`
 
+export FLINK_ROOT_DIR
+export FLINK_CONF_DIR
+
 

-# SQL client specific logic
+# SQL Client CLI specific logic
 

 
-log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-sql-client-$HOSTNAME.log
+log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-sql-client-cli-$HOSTNAME.log
 log_setting=(-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties 
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
 
-export FLINK_ROOT_DIR
-export FLINK_CONF_DIR
+if [[ ! ${FLINK_SCC_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_SCC_HEAP}" -lt 
"0" ]]; then
+echo "[ERROR] Configured SQL Client CLI JVM heap size is not a number. 
Please set '${KEY_SCC_MEM_SIZE}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+
+if [ "${FLINK_SCC_HEAP}" -gt "0" ]; then
+export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_SCC_HEAP"m 
-Xmx"$FLINK_SCC_HEAP"m"
--- End diff --

Yes, I think that's a valid concern. 
Reusing the variable here, might render it unusable to define common 
options for TMs and JMs. 


> Improve basic embedded SQL client 
> --
>
> Key: FLINK-8686
> URL: https://issues.apache.org/jira/browse/FLINK-8686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue describes follow-up issues that should be fixes in order to make 
> the SQL client more stable:
>  - Add more tests for executor
>  - Configure JVM heap size
>  - Limit changelog and table buffers
>  - "The input is invalid please check it again." => add allowed range
>  - Load dependencies recursively
>  - Clean up results in result store
>  - Improve error message for unsupported batch queries
>  - Add more logging instead swallowing exceptions
>  - List properties in error message about missing TS factory sorted by name
>  - Add command to show loaded TS factories and their required propeties
>  - Add command to reload configuration from files (no need to restart client)
>  - Improve error message in case of invalid json-schema (right now: 
> {{java.lang.IllegalArgumentException: No type could be found in node:}}
>  - Add switch to show full stacktraces of exceptions
>  - Give error message when setting unknown parameters 
> {{result-mode=changelog}} does not give an error but should be 
> {{execution.result-mode=changelog}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8686) Improve basic embedded SQL client

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449961#comment-16449961
 ] 

ASF GitHub Bot commented on FLINK-8686:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5867#discussion_r183748841
  
--- Diff: flink-dist/src/main/resources/flink-conf.yaml ---
@@ -229,17 +229,26 @@ web.port: 8081
 
 # Directory to upload completed jobs to. Add this directory to the list of
 # monitored directories of the HistoryServer as well (see below).
-#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
+# jobmanager.archive.fs.dir: hdfs:///completed-jobs/
 
 # The address under which the web-based HistoryServer listens.
-#historyserver.web.address: 0.0.0.0
+# historyserver.web.address: 0.0.0.0
 
 # The port under which the web-based HistoryServer listens.
-#historyserver.web.port: 8082
+# historyserver.web.port: 8082
 
 # Comma separated list of directories to monitor for completed jobs.
-#historyserver.archive.fs.dir: hdfs:///completed-jobs/
+# historyserver.archive.fs.dir: hdfs:///completed-jobs/
 
 # Interval in milliseconds for refreshing the monitored directories.
-#historyserver.archive.fs.refresh-interval: 1
+# historyserver.archive.fs.refresh-interval: 1
+

+#==
+# SQL Client

+#==
+
+# The SQL Client CLI can be started via bin/sql-client.sh embedded
+
+# The heap size for the SQL Client CLI JVM
+# sqlclient.cli.heap.mb: 1024
--- End diff --

Regarding the question whether to add this property of the 
`flink-conf.yaml` or a CLI client specific file, are there other (existing) 
properties in the `flink-conf.yaml` file that are used for the CLI client, 
besides the connection info to the cluster?


> Improve basic embedded SQL client 
> --
>
> Key: FLINK-8686
> URL: https://issues.apache.org/jira/browse/FLINK-8686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue describes follow-up issues that should be fixes in order to make 
> the SQL client more stable:
>  - Add more tests for executor
>  - Configure JVM heap size
>  - Limit changelog and table buffers
>  - "The input is invalid please check it again." => add allowed range
>  - Load dependencies recursively
>  - Clean up results in result store
>  - Improve error message for unsupported batch queries
>  - Add more logging instead swallowing exceptions
>  - List properties in error message about missing TS factory sorted by name
>  - Add command to show loaded TS factories and their required propeties
>  - Add command to reload configuration from files (no need to restart client)
>  - Improve error message in case of invalid json-schema (right now: 
> {{java.lang.IllegalArgumentException: No type could be found in node:}}
>  - Add switch to show full stacktraces of exceptions
>  - Give error message when setting unknown parameters 
> {{result-mode=changelog}} does not give an error but should be 
> {{execution.result-mode=changelog}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8686) Improve basic embedded SQL client

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449959#comment-16449959
 ] 

ASF GitHub Bot commented on FLINK-8686:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5867#discussion_r183743259
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -95,6 +95,8 @@ DEFAULT_ENV_LOG_MAX=5   # 
Maximum number of old log
 DEFAULT_ENV_JAVA_OPTS=""# Optional JVM args
 DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args 
(JobManager)
 DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args 
(TaskManager)
+DEFAULT_ENV_JAVA_OPTS_SCC=""# Optional JVM args 
(SQL Client CLI)
+DEFAULT_ENV_JAVA_OPTS_SCG=""# Optional JVM args 
(SQL Client Gateway)
--- End diff --

The Gateway parameter is not used yet. I think we can added when we need it


> Improve basic embedded SQL client 
> --
>
> Key: FLINK-8686
> URL: https://issues.apache.org/jira/browse/FLINK-8686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue describes follow-up issues that should be fixes in order to make 
> the SQL client more stable:
>  - Add more tests for executor
>  - Configure JVM heap size
>  - Limit changelog and table buffers
>  - "The input is invalid please check it again." => add allowed range
>  - Load dependencies recursively
>  - Clean up results in result store
>  - Improve error message for unsupported batch queries
>  - Add more logging instead swallowing exceptions
>  - List properties in error message about missing TS factory sorted by name
>  - Add command to show loaded TS factories and their required propeties
>  - Add command to reload configuration from files (no need to restart client)
>  - Improve error message in case of invalid json-schema (right now: 
> {{java.lang.IllegalArgumentException: No type could be found in node:}}
>  - Add switch to show full stacktraces of exceptions
>  - Give error message when setting unknown parameters 
> {{result-mode=changelog}} does not give an error but should be 
> {{execution.result-mode=changelog}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8686) Improve basic embedded SQL client

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449960#comment-16449960
 ] 

ASF GitHub Bot commented on FLINK-8686:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5867#discussion_r183746951
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -91,6 +92,22 @@ public Deployment getDeployment() {
return deployment;
}
 
+   public String explain() {
+   final StringBuilder sb = new StringBuilder();
+   sb.append("= Tables 
=\n");
+   tables.forEach((name, table) -> {
+   sb.append("- name: ").append(name).append("\n");
+   final DescriptorProperties props = new 
DescriptorProperties(true);
+   table.addProperties(props);
--- End diff --

Should we make a copy of `table` before modifying it?


> Improve basic embedded SQL client 
> --
>
> Key: FLINK-8686
> URL: https://issues.apache.org/jira/browse/FLINK-8686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue describes follow-up issues that should be fixes in order to make 
> the SQL client more stable:
>  - Add more tests for executor
>  - Configure JVM heap size
>  - Limit changelog and table buffers
>  - "The input is invalid please check it again." => add allowed range
>  - Load dependencies recursively
>  - Clean up results in result store
>  - Improve error message for unsupported batch queries
>  - Add more logging instead swallowing exceptions
>  - List properties in error message about missing TS factory sorted by name
>  - Add command to show loaded TS factories and their required propeties
>  - Add command to reload configuration from files (no need to restart client)
>  - Improve error message in case of invalid json-schema (right now: 
> {{java.lang.IllegalArgumentException: No type could be found in node:}}
>  - Add switch to show full stacktraces of exceptions
>  - Give error message when setting unknown parameters 
> {{result-mode=changelog}} does not give an error but should be 
> {{execution.result-mode=changelog}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8686) Improve basic embedded SQL client

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449958#comment-16449958
 ] 

ASF GitHub Bot commented on FLINK-8686:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5867#discussion_r183729075
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -91,6 +92,22 @@ public Deployment getDeployment() {
return deployment;
}
 
+   public String explain() {
--- End diff --

I would not call this method `explain()` but maybe 
`showEnvironmentConfig()` or similar.
In the context of DBMS and SQL, EXPLAIN is used to show the execution plan 
of a query.


> Improve basic embedded SQL client 
> --
>
> Key: FLINK-8686
> URL: https://issues.apache.org/jira/browse/FLINK-8686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue describes follow-up issues that should be fixes in order to make 
> the SQL client more stable:
>  - Add more tests for executor
>  - Configure JVM heap size
>  - Limit changelog and table buffers
>  - "The input is invalid please check it again." => add allowed range
>  - Load dependencies recursively
>  - Clean up results in result store
>  - Improve error message for unsupported batch queries
>  - Add more logging instead swallowing exceptions
>  - List properties in error message about missing TS factory sorted by name
>  - Add command to show loaded TS factories and their required propeties
>  - Add command to reload configuration from files (no need to restart client)
>  - Improve error message in case of invalid json-schema (right now: 
> {{java.lang.IllegalArgumentException: No type could be found in node:}}
>  - Add switch to show full stacktraces of exceptions
>  - Give error message when setting unknown parameters 
> {{result-mode=changelog}} does not give an error but should be 
> {{execution.result-mode=changelog}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5867: [FLINK-8686] [sql-client] Improve basic embedded S...

2018-04-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5867#discussion_r183748841
  
--- Diff: flink-dist/src/main/resources/flink-conf.yaml ---
@@ -229,17 +229,26 @@ web.port: 8081
 
 # Directory to upload completed jobs to. Add this directory to the list of
 # monitored directories of the HistoryServer as well (see below).
-#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
+# jobmanager.archive.fs.dir: hdfs:///completed-jobs/
 
 # The address under which the web-based HistoryServer listens.
-#historyserver.web.address: 0.0.0.0
+# historyserver.web.address: 0.0.0.0
 
 # The port under which the web-based HistoryServer listens.
-#historyserver.web.port: 8082
+# historyserver.web.port: 8082
 
 # Comma separated list of directories to monitor for completed jobs.
-#historyserver.archive.fs.dir: hdfs:///completed-jobs/
+# historyserver.archive.fs.dir: hdfs:///completed-jobs/
 
 # Interval in milliseconds for refreshing the monitored directories.
-#historyserver.archive.fs.refresh-interval: 1
+# historyserver.archive.fs.refresh-interval: 1
+

+#==
+# SQL Client

+#==
+
+# The SQL Client CLI can be started via bin/sql-client.sh embedded
+
+# The heap size for the SQL Client CLI JVM
+# sqlclient.cli.heap.mb: 1024
--- End diff --

Regarding the question whether to add this property of the 
`flink-conf.yaml` or a CLI client specific file, are there other (existing) 
properties in the `flink-conf.yaml` file that are used for the CLI client, 
besides the connection info to the cluster?


---


[GitHub] flink pull request #5867: [FLINK-8686] [sql-client] Improve basic embedded S...

2018-04-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5867#discussion_r183746951
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -91,6 +92,22 @@ public Deployment getDeployment() {
return deployment;
}
 
+   public String explain() {
+   final StringBuilder sb = new StringBuilder();
+   sb.append("= Tables 
=\n");
+   tables.forEach((name, table) -> {
+   sb.append("- name: ").append(name).append("\n");
+   final DescriptorProperties props = new 
DescriptorProperties(true);
+   table.addProperties(props);
--- End diff --

Should we make a copy of `table` before modifying it?


---


[GitHub] flink pull request #5867: [FLINK-8686] [sql-client] Improve basic embedded S...

2018-04-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5867#discussion_r183743259
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -95,6 +95,8 @@ DEFAULT_ENV_LOG_MAX=5   # 
Maximum number of old log
 DEFAULT_ENV_JAVA_OPTS=""# Optional JVM args
 DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args 
(JobManager)
 DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args 
(TaskManager)
+DEFAULT_ENV_JAVA_OPTS_SCC=""# Optional JVM args 
(SQL Client CLI)
+DEFAULT_ENV_JAVA_OPTS_SCG=""# Optional JVM args 
(SQL Client Gateway)
--- End diff --

The Gateway parameter is not used yet. I think we can added when we need it


---


[GitHub] flink pull request #5867: [FLINK-8686] [sql-client] Improve basic embedded S...

2018-04-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5867#discussion_r183721315
  
--- Diff: flink-libraries/flink-sql-client/bin/sql-client.sh ---
@@ -44,20 +44,32 @@ bin=`dirname "$target"`
 . "$bin"/config.sh
 
 if [ "$FLINK_IDENT_STRING" = "" ]; then
-FLINK_IDENT_STRING="$USER"
+FLINK_IDENT_STRING="$USER"
 fi
 
 CC_CLASSPATH=`constructFlinkClassPath`
 
+export FLINK_ROOT_DIR
+export FLINK_CONF_DIR
+
 

-# SQL client specific logic
+# SQL Client CLI specific logic
 

 
-log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-sql-client-$HOSTNAME.log
+log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-sql-client-cli-$HOSTNAME.log
 log_setting=(-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties 
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
 
-export FLINK_ROOT_DIR
-export FLINK_CONF_DIR
+if [[ ! ${FLINK_SCC_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_SCC_HEAP}" -lt 
"0" ]]; then
+echo "[ERROR] Configured SQL Client CLI JVM heap size is not a number. 
Please set '${KEY_SCC_MEM_SIZE}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+
+if [ "${FLINK_SCC_HEAP}" -gt "0" ]; then
+export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_SCC_HEAP"m 
-Xmx"$FLINK_SCC_HEAP"m"
--- End diff --

Yes, I think that's a valid concern. 
Reusing the variable here, might render it unusable to define common 
options for TMs and JMs. 


---


[GitHub] flink pull request #5867: [FLINK-8686] [sql-client] Improve basic embedded S...

2018-04-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5867#discussion_r183729075
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -91,6 +92,22 @@ public Deployment getDeployment() {
return deployment;
}
 
+   public String explain() {
--- End diff --

I would not call this method `explain()` but maybe 
`showEnvironmentConfig()` or similar.
In the context of DBMS and SQL, EXPLAIN is used to show the execution plan 
of a query.


---


[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449930#comment-16449930
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r183745456
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
+   }
+
+   checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
+   checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
+   return new TimeBounded<>(
+   input1,
+   input2,
+   lowerBound.toMilliseconds(),
+   upperBound.toMilliseconds(),
+   true,
+   true,
+   keySelector1,
+   keySelector2
+   );
+   }
+   }
+   }
+
+   /**
+* Joined streams that have keys for both sides as well as the time 
boundaries over which
+* elements should be joined defined.
+*
+* @param  Input type of elements from the first stream
+* @param  Input type of elements from the second stream
+* @param  The type of the key
+*/
+   public static class TimeBounded {
+
+   private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
+
+   private final DataStream left;
+   private final DataStream right;
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final KeySelector keySelector1;
+   private final KeySelector keySelector2;
+
+   private boolean lowerBoundInclusive;
+   private boolean upperBoundInclusive;
+
+   public TimeBounded(
+   DataStream left,
+   DataStream right,
+   long lowerBound,
+   long upperBound,
+   boolean lowerBoundInclusive,
+   boolean upperBoundInclusive,
+   KeySelector keySelector1,
+   KeySelector keySelector2) {
+
+   this.left = Preconditions.checkNotNull(left);
+   this.right = Preconditions.checkNotNull(right);
+
+   this.lowerBound = lowerBound;
+   this.upperBound = upperBound;
+
+   this.lowerBoundInclusive = lowerBoundInclusive;
+   this.upperBoundInclusive = upperBoundInclusive;
+
+   this.keySelector1 = 
Preconditions.checkNotNull(keySelector1);
+   this.keySelector2 = 

[GitHub] flink pull request #5482: [FLINK-8480][DataStream] Add Java API for timeboun...

2018-04-24 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r183745456
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
+   }
+
+   checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
+   checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
+   return new TimeBounded<>(
+   input1,
+   input2,
+   lowerBound.toMilliseconds(),
+   upperBound.toMilliseconds(),
+   true,
+   true,
+   keySelector1,
+   keySelector2
+   );
+   }
+   }
+   }
+
+   /**
+* Joined streams that have keys for both sides as well as the time 
boundaries over which
+* elements should be joined defined.
+*
+* @param  Input type of elements from the first stream
+* @param  Input type of elements from the second stream
+* @param  The type of the key
+*/
+   public static class TimeBounded {
+
+   private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
+
+   private final DataStream left;
+   private final DataStream right;
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final KeySelector keySelector1;
+   private final KeySelector keySelector2;
+
+   private boolean lowerBoundInclusive;
+   private boolean upperBoundInclusive;
+
+   public TimeBounded(
+   DataStream left,
+   DataStream right,
+   long lowerBound,
+   long upperBound,
+   boolean lowerBoundInclusive,
+   boolean upperBoundInclusive,
+   KeySelector keySelector1,
+   KeySelector keySelector2) {
+
+   this.left = Preconditions.checkNotNull(left);
+   this.right = Preconditions.checkNotNull(right);
+
+   this.lowerBound = lowerBound;
+   this.upperBound = upperBound;
+
+   this.lowerBoundInclusive = lowerBoundInclusive;
+   this.upperBoundInclusive = upperBoundInclusive;
+
+   this.keySelector1 = 
Preconditions.checkNotNull(keySelector1);
+   this.keySelector2 = 
Preconditions.checkNotNull(keySelector2);
+   }
+
+   /**
+* Configure whether the upper bound should be considered 
exclusive or inclusive.
+*/
+   public TimeBounded 

[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449927#comment-16449927
 ] 

ASF GitHub Bot commented on FLINK-9008:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r183745076
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test-class/ElasticsearchStreamingJob.java 
---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.quickstart;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch example for Flink Streaming Job.
+ *
+ * In this streaming job, we generate a bunch of data from numbers, 
apply two operators map
+ * and filter to that data. Then we choose elasticsearch as its sink to 
storage these data.
+ *
+ * Run test_quickstarts.sh to verify this program. Package this class 
to a jar, verify the jar,
+ * then deploy it on a flink cluster.
+ */
+public class ElasticsearchStreamingJob {
+
+   public static void main(String[] args) throws Exception {
+   // set up the streaming execution environment
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream source = env.generateSequence(0, 20)
+   // map the data from 1,2,3... to the form of message 
#1, message #2, message #3...
+   .map(new MapFunction() {
+   @Override
+   public String map(Long value) throws Exception {
+   return "message #" + value;
+   }})
+   // filter out the data that contains message #11 and 
message #17
+   .filter(new FilterFunction() {
+   @Override
+   public boolean filter(String value) throws 
Exception {
+   return !value.equals("message #11") && 
!value.equals("message #17");
+   }
+   });
+
+   Map userConfig = new HashMap<>();
+   userConfig.put("cluster.name", "elasticsearch");
+   // This instructs the sink to emit after every element, 
otherwise they would be buffered
+   
userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+   List transports = new ArrayList<>();
+   transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+   source.addSink(new ElasticsearchSink<>(userConfig, transports, 
new 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction(){
--- End diff --

Okay, Sir ~


> End-to-end test: Quickstarts
> 
>
> Key: FLINK-9008
> URL: https://issues.apache.org/jira/browse/FLINK-9008
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Critical
> Fix For: 1.5.0
>
>
> We could add an end-to-end 

[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-04-24 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r183745076
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test-class/ElasticsearchStreamingJob.java 
---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.quickstart;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch example for Flink Streaming Job.
+ *
+ * In this streaming job, we generate a bunch of data from numbers, 
apply two operators map
+ * and filter to that data. Then we choose elasticsearch as its sink to 
storage these data.
+ *
+ * Run test_quickstarts.sh to verify this program. Package this class 
to a jar, verify the jar,
+ * then deploy it on a flink cluster.
+ */
+public class ElasticsearchStreamingJob {
+
+   public static void main(String[] args) throws Exception {
+   // set up the streaming execution environment
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream source = env.generateSequence(0, 20)
+   // map the data from 1,2,3... to the form of message 
#1, message #2, message #3...
+   .map(new MapFunction() {
+   @Override
+   public String map(Long value) throws Exception {
+   return "message #" + value;
+   }})
+   // filter out the data that contains message #11 and 
message #17
+   .filter(new FilterFunction() {
+   @Override
+   public boolean filter(String value) throws 
Exception {
+   return !value.equals("message #11") && 
!value.equals("message #17");
+   }
+   });
+
+   Map userConfig = new HashMap<>();
+   userConfig.put("cluster.name", "elasticsearch");
+   // This instructs the sink to emit after every element, 
otherwise they would be buffered
+   
userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+   List transports = new ArrayList<>();
+   transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+   source.addSink(new ElasticsearchSink<>(userConfig, transports, 
new 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction(){
--- End diff --

Okay, Sir ~


---


[jira] [Comment Edited] (FLINK-9231) Enable SO_REUSEADDR on listen sockets

2018-04-24 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448079#comment-16448079
 ] 

Triones Deng edited comment on FLINK-9231 at 4/24/18 2:05 PM:
--

[~yuzhih...@gmail.com] I notice that there are four kind of server socket,
 # JobManager and TaskManager create socket server by 
NetUtils.createSocketFromPorts. here just looking for a available port to 
create ActorSystem. the config in akka-remote.jar the config item 
"tcp-reuse-addr = off-for-windows". and will close the ServerSocket at once 
when find a available port. code as below. here  i think we can direct call new 
ServerSocker(port) without backlog will be ok, what do you think?
{code:java}
val result = AkkaUtils.retryOnBindException({
// Try all ports in the range until successful
val socket = NetUtils.createSocketFromPorts(
actorSystemPortRange,
new NetUtils.SocketFactory {
override def createSocket(port: Int): ServerSocket = new ServerSocket(
// Use the correct listening address, bound ports will only be
// detected later by Akka.
port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress))
})

val port =
if (socket == null) {
throw new BindException(s"Unable to allocate port for TaskManager.")
} else {
try {
socket.getLocalPort()
} finally {
socket.close()
}
}
..
}, { !actorSystemPortRange.hasNext }, 5000)
{code}

 # BlobServer make use of ServerSocket or SSLContext to create ServerSocket
 # make use of Netty for io,like NettyServer.
 # WebFrontendBootstrap make use of netty to create ServerBootstrap.

I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful 
if your server has been shut down, and then restarted right away while sockets 
are still active on its port. You should be aware that if any unexpected data 
comes in, it may confuse your server, but while this is possible, it is not 
likely" (see 
:[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)]
 , Here may be we can allow  sockets to set SO_REUSEADDR when start  
WebFrontendBootstrap. what's your idea? anything wrong please feel free to 
correct me.

sample code for WebFrontendBootstrap.java like:
{code:java}
this.bootstrap = new ServerBootstrap();
this.bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer).childOption(ChannelOption.SO_REUSEADDR,true);{code}


was (Author: triones):
[~yuzhih...@gmail.com] I notice that there are four kind of server socket,
 # JobManager and TaskManager create socket server by 
NetUtils.createSocketFromPorts. here just looking for a available port to 
create ActorSystem. will close the ServerSocket at once when find a available 
port. code as below. here  i think we can direct call new ServerSocker(port) 
without backlog will be ok, what do you think?
{code:java}
val result = AkkaUtils.retryOnBindException({
// Try all ports in the range until successful
val socket = NetUtils.createSocketFromPorts(
actorSystemPortRange,
new NetUtils.SocketFactory {
override def createSocket(port: Int): ServerSocket = new ServerSocket(
// Use the correct listening address, bound ports will only be
// detected later by Akka.
port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress))
})

val port =
if (socket == null) {
throw new BindException(s"Unable to allocate port for TaskManager.")
} else {
try {
socket.getLocalPort()
} finally {
socket.close()
}
}
..
}, { !actorSystemPortRange.hasNext }, 5000)
{code}

 # BlobServer make use of ServerSocket or SSLContext to create ServerSocket
 # make use of Netty for io,like NettyServer.
 # WebFrontendBootstrap make use of netty to create ServerBootstrap.

I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful 
if your server has been shut down, and then restarted right away while sockets 
are still active on its port. You should be aware that if any unexpected data 
comes in, it may confuse your server, but while this is possible, it is not 
likely" (see 
:[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)]
 , Here may be we can allow  sockets to set SO_REUSEADDR when start  
WebFrontendBootstrap. what's your idea? anything wrong please feel free to 
correct me.

sample code for WebFrontendBootstrap.java like:
{code:java}
this.bootstrap = new ServerBootstrap();
this.bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer).childOption(ChannelOption.SO_REUSEADDR,true);{code}

> Enable SO_REUSEADDR on listen sockets
> -
>
> Key: FLINK-9231
> URL: https://issues.apache.org/jira/browse/FLINK-9231
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Triones Deng
>Priority: Major
>
> This allows sockets to be bound even if there are 

[jira] [Commented] (FLINK-9231) Enable SO_REUSEADDR on listen sockets

2018-04-24 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449911#comment-16449911
 ] 

mingleizhang commented on FLINK-9231:
-

Come on [~triones]! :D

> Enable SO_REUSEADDR on listen sockets
> -
>
> Key: FLINK-9231
> URL: https://issues.apache.org/jira/browse/FLINK-9231
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Triones Deng
>Priority: Major
>
> This allows sockets to be bound even if there are sockets
> from a previous application that are still pending closure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449909#comment-16449909
 ] 

ASF GitHub Bot commented on FLINK-9008:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r183742072
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.4.2   \
--- End diff --

will update.


> End-to-end test: Quickstarts
> 
>
> Key: FLINK-9008
> URL: https://issues.apache.org/jira/browse/FLINK-9008
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Critical
> Fix For: 1.5.0
>
>
> We could add an end-to-end test which verifies Flink's quickstarts. It should 
> do the following:
> # create a new Flink project using the quickstarts archetype 
> # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library) 
> # run {{mvn clean package -Pbuild-jar}}
> # verify that no core dependencies are contained in the jar file
> # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-04-24 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r183742072
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.4.2   \
--- End diff --

will update.


---


[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449903#comment-16449903
 ] 

ASF GitHub Bot commented on FLINK-9008:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r183741490
  
--- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh ---
@@ -53,6 +53,14 @@ if [ $EXIT_CODE == 0 ]; then
 EXIT_CODE=$?
 fi
 
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==\n"
+  printf "Running Quickstarts nightly end-to-end test\n"
--- End diff --

Yes. I also feel strange put this in this file, but I watched the same like 
```test_streaming_kafka010.sh``` in this file. And this confused me. I do not 
think ```test_streaming_kafka010.sh``` should put here since 
```kafka-common.sh``` will execute a download also.


> End-to-end test: Quickstarts
> 
>
> Key: FLINK-9008
> URL: https://issues.apache.org/jira/browse/FLINK-9008
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Critical
> Fix For: 1.5.0
>
>
> We could add an end-to-end test which verifies Flink's quickstarts. It should 
> do the following:
> # create a new Flink project using the quickstarts archetype 
> # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library) 
> # run {{mvn clean package -Pbuild-jar}}
> # verify that no core dependencies are contained in the jar file
> # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-04-24 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r183741490
  
--- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh ---
@@ -53,6 +53,14 @@ if [ $EXIT_CODE == 0 ]; then
 EXIT_CODE=$?
 fi
 
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==\n"
+  printf "Running Quickstarts nightly end-to-end test\n"
--- End diff --

Yes. I also feel strange put this in this file, but I watched the same like 
```test_streaming_kafka010.sh``` in this file. And this confused me. I do not 
think ```test_streaming_kafka010.sh``` should put here since 
```kafka-common.sh``` will execute a download also.


---


[jira] [Commented] (FLINK-9249) Add convenience profile for skipping non-essential plugins

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449902#comment-16449902
 ] 

ASF GitHub Bot commented on FLINK-9249:
---

Github user W4anD0eR96 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5904#discussion_r183741162
  
--- Diff: pom.xml ---
@@ -636,6 +636,63 @@ under the License.
 

 
+   
+   fast
+   
+   
+   fast
+   
+   
+   
+   
+   
+   
+   
org.apache.rat
+   
apache-rat-plugin
+   
+   
true
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-checkstyle-plugin
+   
+   
true
+   
--- End diff --

+1 not skip plugins on tarvis. LGTM make it fast local building.


> Add convenience profile for skipping non-essential plugins
> --
>
> Key: FLINK-9249
> URL: https://issues.apache.org/jira/browse/FLINK-9249
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.0
>
>
> When compiling Flink devs can already set a variety of command line options 
> to speed up the process, for example skipping checkstyle. We also do the same 
> thing on travis.
> However, not only is it difficult to keep track of all possible options, it 
> is also tedious to write and obfuscates the actual command.
> I propose adding a {{fast}} profile that skips non-essential plugins, 
> including:
> * rat
> * checkstyle
> * scalastyle
> * enforcer
> * japicmp
> * javadoc



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5904: [FLINK-9249][build] Add convenience profile for sk...

2018-04-24 Thread W4anD0eR96
Github user W4anD0eR96 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5904#discussion_r183741162
  
--- Diff: pom.xml ---
@@ -636,6 +636,63 @@ under the License.
 

 
+   
+   fast
+   
+   
+   fast
+   
+   
+   
+   
+   
+   
+   
org.apache.rat
+   
apache-rat-plugin
+   
+   
true
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-checkstyle-plugin
+   
+   
true
+   
--- End diff --

+1 not skip plugins on tarvis. LGTM make it fast local building.


---


[jira] [Comment Edited] (FLINK-9231) Enable SO_REUSEADDR on listen sockets

2018-04-24 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448079#comment-16448079
 ] 

Triones Deng edited comment on FLINK-9231 at 4/24/18 1:56 PM:
--

[~yuzhih...@gmail.com] I notice that there are four kind of server socket,
 # JobManager and TaskManager create socket server by 
NetUtils.createSocketFromPorts. here just looking for a available port to 
create ActorSystem. will close the ServerSocket at once when find a available 
port. code as below. here  i think we can direct call new ServerSocker(port) 
without backlog will be ok, what do you think?
{code:java}
val result = AkkaUtils.retryOnBindException({
// Try all ports in the range until successful
val socket = NetUtils.createSocketFromPorts(
actorSystemPortRange,
new NetUtils.SocketFactory {
override def createSocket(port: Int): ServerSocket = new ServerSocket(
// Use the correct listening address, bound ports will only be
// detected later by Akka.
port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress))
})

val port =
if (socket == null) {
throw new BindException(s"Unable to allocate port for TaskManager.")
} else {
try {
socket.getLocalPort()
} finally {
socket.close()
}
}
..
}, { !actorSystemPortRange.hasNext }, 5000)
{code}

 # BlobServer make use of ServerSocket or SSLContext to create ServerSocket
 # make use of Netty for io,like NettyServer.
 # WebFrontendBootstrap make use of netty to create ServerBootstrap.

I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful 
if your server has been shut down, and then restarted right away while sockets 
are still active on its port. You should be aware that if any unexpected data 
comes in, it may confuse your server, but while this is possible, it is not 
likely" (see 
:[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)]
 , Here may be we can allow  sockets to set SO_REUSEADDR when start  
WebFrontendBootstrap. what's your idea? anything wrong please feel free to 
correct me.

sample code for WebFrontendBootstrap.java like:
{code:java}
this.bootstrap = new ServerBootstrap();
this.bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer).childOption(ChannelOption.SO_REUSEADDR,true);{code}


was (Author: triones):
[~yuzhih...@gmail.com] I notice that there are three kind of server socket,
 # JobManager and TaskManager create socket server by 
NetUtils.createSocketFromPorts. here just looking for a available port to 
create ActorSystem. will close the ServerSocket at once when find a available 
port. code as below. here  i think we can direct call new ServerSocker(port) 
without backlog will be ok, what do you think?
{code:java}
val result = AkkaUtils.retryOnBindException({
// Try all ports in the range until successful
val socket = NetUtils.createSocketFromPorts(
actorSystemPortRange,
new NetUtils.SocketFactory {
override def createSocket(port: Int): ServerSocket = new ServerSocket(
// Use the correct listening address, bound ports will only be
// detected later by Akka.
port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress))
})

val port =
if (socket == null) {
throw new BindException(s"Unable to allocate port for TaskManager.")
} else {
try {
socket.getLocalPort()
} finally {
socket.close()
}
}
..
}, { !actorSystemPortRange.hasNext }, 5000)
{code}

 # make use of Netty for io,like NettyServer.
 # BlobServer make use of ServerSocket or SSLContext to create ServerSocket
 # WebFrontendBootstrap make use of netty to create ServerBootstrap.

I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful 
if your server has been shut down, and then restarted right away while sockets 
are still active on its port. You should be aware that if any unexpected data 
comes in, it may confuse your server, but while this is possible, it is not 
likely" (see 
:[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)]
 , Here may be we can allow  sockets to set SO_REUSEADDR when start  
WebFrontendBootstrap. what's your idea? anything wrong please feel free to 
correct me.

sample code for WebFrontendBootstrap.java like:
{code:java}
this.bootstrap = new ServerBootstrap();
this.bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer).childOption(ChannelOption.SO_REUSEADDR,true);{code}

> Enable SO_REUSEADDR on listen sockets
> -
>
> Key: FLINK-9231
> URL: https://issues.apache.org/jira/browse/FLINK-9231
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Triones Deng
>Priority: Major
>
> This allows sockets to be bound even if there are sockets
> from a previous application that are still pending closure.



--
This 

[jira] [Comment Edited] (FLINK-9231) Enable SO_REUSEADDR on listen sockets

2018-04-24 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448079#comment-16448079
 ] 

Triones Deng edited comment on FLINK-9231 at 4/24/18 1:54 PM:
--

[~yuzhih...@gmail.com] I notice that there are three kind of server socket,
 # JobManager and TaskManager create socket server by 
NetUtils.createSocketFromPorts. here just looking for a available port to 
create ActorSystem. will close the ServerSocket at once when find a available 
port. code as below. here  i think we can direct call new ServerSocker(port) 
without backlog will be ok, what do you think?
{code:java}
val result = AkkaUtils.retryOnBindException({
// Try all ports in the range until successful
val socket = NetUtils.createSocketFromPorts(
actorSystemPortRange,
new NetUtils.SocketFactory {
override def createSocket(port: Int): ServerSocket = new ServerSocket(
// Use the correct listening address, bound ports will only be
// detected later by Akka.
port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress))
})

val port =
if (socket == null) {
throw new BindException(s"Unable to allocate port for TaskManager.")
} else {
try {
socket.getLocalPort()
} finally {
socket.close()
}
}
..
}, { !actorSystemPortRange.hasNext }, 5000)
{code}

 # make use of Netty for io,like NettyServer.
 # BlobServer make use of ServerSocket or SSLContext to create ServerSocket
 # WebFrontendBootstrap make use of netty to create ServerBootstrap.

I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful 
if your server has been shut down, and then restarted right away while sockets 
are still active on its port. You should be aware that if any unexpected data 
comes in, it may confuse your server, but while this is possible, it is not 
likely" (see 
:[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)]
 , Here may be we can allow  sockets to set SO_REUSEADDR when start  
WebFrontendBootstrap. what's your idea? anything wrong please feel free to 
correct me.

sample code for WebFrontendBootstrap.java like:
{code:java}
this.bootstrap = new ServerBootstrap();
this.bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer).childOption(ChannelOption.SO_REUSEADDR,true);{code}


was (Author: triones):
[~yuzhih...@gmail.com] I notice that there are two kind of server socket,
 # make use of  SockerServer, like JobManager,TaskManager, TaskManagerRunner 
and BlobServer, all of them create socket server by 
NetUtils.createSocketFromPorts
 # make use of Netty for io,like NettyServer.

I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful 
if your server has been shut down, and then restarted right away while sockets 
are still active on its port. You should be aware that if any unexpected data 
comes in, it may confuse your server, but while this is possible, it is not 
likely" (see 
:[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)]
 , Here may be we can allow  sockets to set SO_REUSEADDR when start  
JobManager,TaskManager, TaskManagerRunner. anything wrong please correct me.

sample code for JobManager.scala like:
{code:java}
val socket = NetUtils.createSocketFromPorts(
  listeningPortRange,
  new NetUtils.SocketFactory {
override def createSocket(port: Int): ServerSocket = {
  // Use the correct listening address, bound ports will only be
  // detected later by Akka.
  val serverSocket = new ServerSocket()
  serverSocket.setReuseAddress(true)
  serverSocket.bind(new 
InetSocketAddress(InetAddress.getByName(NetUtils.getWildcardIPAddress), port), 
0)
  serverSocket
}
})
{code}

> Enable SO_REUSEADDR on listen sockets
> -
>
> Key: FLINK-9231
> URL: https://issues.apache.org/jira/browse/FLINK-9231
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Triones Deng
>Priority: Major
>
> This allows sockets to be bound even if there are sockets
> from a previous application that are still pending closure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9219) Add support for OpenGIS features in Table & SQL API

2018-04-24 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449889#comment-16449889
 ] 

Xingcan Cui commented on FLINK-9219:


Hi [~twalthr], I was working on this issue. Since the {{Geom}} type used by 
these functions is actually an interface (which has two implementation classes 
now), do you think we should add a new built-in type/serializer for it or just 
take it as a generic type?

> Add support for OpenGIS features in Table & SQL API
> ---
>
> Key: FLINK-9219
> URL: https://issues.apache.org/jira/browse/FLINK-9219
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> CALCITE-1968 added core functionality for handling 
> spatial/geographical/geometry data. It should not be too hard to expose these 
> features also in Flink's Table & SQL API. We would need a new {{GEOMETRY}} 
> data type and connect the function APIs.
> Right now the following functions are supported by Calcite:
> {code}
> ST_AsText, ST_AsWKT, ST_Boundary, ST_Buffer, ST_Contains, 
> ST_ContainsProperly, ST_Crosses, ST_Disjoint, ST_Distance, ST_DWithin, 
> ST_Envelope, ST_EnvelopesIntersect, ST_Equals, ST_GeometryType, 
> ST_GeometryTypeCode, ST_GeomFromText, ST_Intersects, ST_Is3D, 
> ST_LineFromText, ST_MakeLine, ST_MakePoint, ST_MLineFromText, 
> ST_MPointFromText, ST_MPolyFromText, ST_Overlaps, ST_Point, ST_PointFromText, 
> ST_PolyFromText, ST_SetSRID, ST_Touches, ST_Transform, ST_Union, ST_Within, 
> ST_Z
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9249) Add convenience profile for skipping non-essential plugins

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449878#comment-16449878
 ] 

ASF GitHub Bot commented on FLINK-9249:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5904#discussion_r183735405
  
--- Diff: pom.xml ---
@@ -636,6 +636,63 @@ under the License.
 

 
+   
+   fast
+   
+   
+   fast
+   
+   
+   
+   
+   
+   
+   
org.apache.rat
+   
apache-rat-plugin
+   
+   
true
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-checkstyle-plugin
+   
+   
true
+   
--- End diff --

If we skip this plugin, Will it affect the format or style of the code of 
people's PR will merge to flink? 


> Add convenience profile for skipping non-essential plugins
> --
>
> Key: FLINK-9249
> URL: https://issues.apache.org/jira/browse/FLINK-9249
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.0
>
>
> When compiling Flink devs can already set a variety of command line options 
> to speed up the process, for example skipping checkstyle. We also do the same 
> thing on travis.
> However, not only is it difficult to keep track of all possible options, it 
> is also tedious to write and obfuscates the actual command.
> I propose adding a {{fast}} profile that skips non-essential plugins, 
> including:
> * rat
> * checkstyle
> * scalastyle
> * enforcer
> * japicmp
> * javadoc



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5904: [FLINK-9249][build] Add convenience profile for sk...

2018-04-24 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5904#discussion_r183735405
  
--- Diff: pom.xml ---
@@ -636,6 +636,63 @@ under the License.
 

 
+   
+   fast
+   
+   
+   fast
+   
+   
+   
+   
+   
+   
+   
org.apache.rat
+   
apache-rat-plugin
+   
+   
true
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-checkstyle-plugin
+   
+   
true
+   
--- End diff --

If we skip this plugin, Will it affect the format or style of the code of 
people's PR will merge to flink? 


---


[jira] [Commented] (FLINK-9115) Support addition of part suffix in BucketingSink

2018-04-24 Thread Andrei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449846#comment-16449846
 ] 

Andrei commented on FLINK-9115:
---

Hi [~glaksh100]. I'm ready to work on this

> Support addition of part suffix in BucketingSink
> 
>
> Key: FLINK-9115
> URL: https://issues.apache.org/jira/browse/FLINK-9115
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: Lakshmi Rao
>Priority: Minor
>  Labels: usability
>
> Currently the BucketingSink allows addition of part prefix, pending 
> prefix/suffix and in-progress prefix/suffix via setter methods. Can we also 
> support setting part suffixes?
> An instance where this maybe useful: I am currently writing GZIP compressed 
> output to S3 using the BucketingSink and I would want the uploaded files to 
> have a ".gz" or ".zip" extensions . An easy way to do this would be by 
> setting  a part file suffix with the required file extension. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449836#comment-16449836
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user florianschmidt1994 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r183727245
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
+   }
+
+   checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
+   checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
+   return new TimeBounded<>(
+   input1,
+   input2,
+   lowerBound.toMilliseconds(),
+   upperBound.toMilliseconds(),
+   true,
+   true,
+   keySelector1,
+   keySelector2
+   );
+   }
+   }
+   }
+
+   /**
+* Joined streams that have keys for both sides as well as the time 
boundaries over which
+* elements should be joined defined.
+*
+* @param  Input type of elements from the first stream
+* @param  Input type of elements from the second stream
+* @param  The type of the key
+*/
+   public static class TimeBounded {
+
+   private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
+
+   private final DataStream left;
+   private final DataStream right;
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final KeySelector keySelector1;
+   private final KeySelector keySelector2;
+
+   private boolean lowerBoundInclusive;
+   private boolean upperBoundInclusive;
+
+   public TimeBounded(
+   DataStream left,
+   DataStream right,
+   long lowerBound,
+   long upperBound,
+   boolean lowerBoundInclusive,
+   boolean upperBoundInclusive,
+   KeySelector keySelector1,
+   KeySelector keySelector2) {
+
+   this.left = Preconditions.checkNotNull(left);
+   this.right = Preconditions.checkNotNull(right);
+
+   this.lowerBound = lowerBound;
+   this.upperBound = upperBound;
+
+   this.lowerBoundInclusive = lowerBoundInclusive;
+   this.upperBoundInclusive = upperBoundInclusive;
+
+   this.keySelector1 = 
Preconditions.checkNotNull(keySelector1);
+   this.keySelector2 = 

[GitHub] flink pull request #5482: [FLINK-8480][DataStream] Add Java API for timeboun...

2018-04-24 Thread florianschmidt1994
Github user florianschmidt1994 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r183727245
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
+   }
+
+   checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
+   checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
+   return new TimeBounded<>(
+   input1,
+   input2,
+   lowerBound.toMilliseconds(),
+   upperBound.toMilliseconds(),
+   true,
+   true,
+   keySelector1,
+   keySelector2
+   );
+   }
+   }
+   }
+
+   /**
+* Joined streams that have keys for both sides as well as the time 
boundaries over which
+* elements should be joined defined.
+*
+* @param  Input type of elements from the first stream
+* @param  Input type of elements from the second stream
+* @param  The type of the key
+*/
+   public static class TimeBounded {
+
+   private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
+
+   private final DataStream left;
+   private final DataStream right;
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final KeySelector keySelector1;
+   private final KeySelector keySelector2;
+
+   private boolean lowerBoundInclusive;
+   private boolean upperBoundInclusive;
+
+   public TimeBounded(
+   DataStream left,
+   DataStream right,
+   long lowerBound,
+   long upperBound,
+   boolean lowerBoundInclusive,
+   boolean upperBoundInclusive,
+   KeySelector keySelector1,
+   KeySelector keySelector2) {
+
+   this.left = Preconditions.checkNotNull(left);
+   this.right = Preconditions.checkNotNull(right);
+
+   this.lowerBound = lowerBound;
+   this.upperBound = upperBound;
+
+   this.lowerBoundInclusive = lowerBoundInclusive;
+   this.upperBoundInclusive = upperBoundInclusive;
+
+   this.keySelector1 = 
Preconditions.checkNotNull(keySelector1);
+   this.keySelector2 = 
Preconditions.checkNotNull(keySelector2);
+   }
+
+   /**
+* Configure whether the upper bound should be considered 
exclusive or inclusive.
+*/
+   public TimeBounded

[jira] [Commented] (FLINK-8912) Web UI does not render error messages correctly in FLIP-6 mode

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449835#comment-16449835
 ] 

ASF GitHub Bot commented on FLINK-8912:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5907

[FLINK-8912][WebUI] Update error handling for flip6

## What is the purpose of the change

This PR updates the error handling in the webUI to support Flip6. Error 
reporting was standardized with the REST rework, but is slightly different than 
in the past.

The existing error handling code was checking whether the response 
contained an `error` field (or just returned the entire payload if the status 
code indicates an error), whereas with the rework every error has contains an 
`errors` array.

Various checks were added to account or both cases.

## Verifying this change

- manually verified


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8912

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5907.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5907


commit c4081d43b3b0a6ffc60320bbb3076246ca75c240
Author: zentol 
Date:   2018-04-24T13:17:44Z

[FLINK-8912][WebUI] Update error handling for flip6




> Web UI does not render error messages correctly in FLIP-6 mode
> --
>
> Key: FLINK-8912
> URL: https://issues.apache.org/jira/browse/FLINK-8912
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.5.0
> Environment: commit: c531486288caf5241cdf7f0f00f087f3ce82239f
>Reporter: Gary Yao
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip6
> Fix For: 1.5.0
>
>
> *Description*
> The Web UI renders error messages returned by the REST API incorrectly, e.g., 
> on the job submission page. The JSON returned by the REST API is rendered as 
> a whole. However, the UI should only render the contents of the {{errors}} 
> field.
> *Steps to reproduce*
> Submit {{examples/streaming/SocketWindowWordCount.jar}} without specifying 
> program arguments. Error message will be rendered as follows:
> {noformat}
> {"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
> program plan could not be fetched - the program aborted 
> pre-maturely.\n\nSystem.err: (none)\n\nSystem.out: No port specified. Please 
> run 'SocketWindowWordCount --hostname  --port ', where 
> hostname (localhost by default) and port is the address of the text 
> server\nTo start a simple text server, run 'netcat -l ' and type the 
> input text into the command line\n"]}
> {noformat}
> Note that flip6 mode must be enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5907: [FLINK-8912][WebUI] Update error handling for flip...

2018-04-24 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5907

[FLINK-8912][WebUI] Update error handling for flip6

## What is the purpose of the change

This PR updates the error handling in the webUI to support Flip6. Error 
reporting was standardized with the REST rework, but is slightly different than 
in the past.

The existing error handling code was checking whether the response 
contained an `error` field (or just returned the entire payload if the status 
code indicates an error), whereas with the rework every error has contains an 
`errors` array.

Various checks were added to account or both cases.

## Verifying this change

- manually verified


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8912

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5907.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5907


commit c4081d43b3b0a6ffc60320bbb3076246ca75c240
Author: zentol 
Date:   2018-04-24T13:17:44Z

[FLINK-8912][WebUI] Update error handling for flip6




---


[jira] [Commented] (FLINK-8497) KafkaConsumer throws NPE if topic doesn't exist

2018-04-24 Thread Aleksei Lesnov (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449816#comment-16449816
 ] 

Aleksei Lesnov commented on FLINK-8497:
---

Hi [~snowch] , I'd like to take this ticket for development.

> KafkaConsumer throws NPE if topic doesn't exist
> ---
>
> Key: FLINK-8497
> URL: https://issues.apache.org/jira/browse/FLINK-8497
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: chris snow
>Priority: Minor
>
> If I accidentally set the kafka consumer with a topic that doesn't exist:
> {code:java}
> FlinkKafkaConsumer011 kafkaConsumer = new FlinkKafkaConsumer011(
>"does_not_exist",
> new JSONKeyValueDeserializationSchema(false),
> properties
> );
> DataStream input = env.addSource(kafkaConsumer);{code}
> Flink throws NPE
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:75)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:128)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:415)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748){code}
> Maybe Flink could through an IllegalStateException("Topic not found")?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9194) Finished jobs are not archived to HistoryServer

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449809#comment-16449809
 ] 

ASF GitHub Bot commented on FLINK-9194:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5902#discussion_r183720759
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -211,7 +215,7 @@ public void run() {
}
}
if (updateOverview) {
-   updateJobOverview(webDir);
+   
updateJobOverview(webOverviewDir, webDir);
}
--- End diff --

yes.


> Finished jobs are not archived to HistoryServer
> ---
>
> Key: FLINK-9194
> URL: https://issues.apache.org/jira/browse/FLINK-9194
> Project: Flink
>  Issue Type: Bug
>  Components: History Server, JobManager
>Affects Versions: 1.5.0
> Environment: Flink 2af481a
>Reporter: Gary Yao
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.6.0
>
>
> In flip6 mode, jobs are not archived to the HistoryServer. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5902: [FLINK-9194][history] Add HistoryServer support to...

2018-04-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5902#discussion_r183720759
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -211,7 +215,7 @@ public void run() {
}
}
if (updateOverview) {
-   updateJobOverview(webDir);
+   
updateJobOverview(webOverviewDir, webDir);
}
--- End diff --

yes.


---


[jira] [Assigned] (FLINK-8912) Web UI does not render error messages correctly in FLIP-6 mode

2018-04-24 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-8912:
---

Assignee: Chesnay Schepler

> Web UI does not render error messages correctly in FLIP-6 mode
> --
>
> Key: FLINK-8912
> URL: https://issues.apache.org/jira/browse/FLINK-8912
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.5.0
> Environment: commit: c531486288caf5241cdf7f0f00f087f3ce82239f
>Reporter: Gary Yao
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip6
> Fix For: 1.5.0
>
>
> *Description*
> The Web UI renders error messages returned by the REST API incorrectly, e.g., 
> on the job submission page. The JSON returned by the REST API is rendered as 
> a whole. However, the UI should only render the contents of the {{errors}} 
> field.
> *Steps to reproduce*
> Submit {{examples/streaming/SocketWindowWordCount.jar}} without specifying 
> program arguments. Error message will be rendered as follows:
> {noformat}
> {"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
> program plan could not be fetched - the program aborted 
> pre-maturely.\n\nSystem.err: (none)\n\nSystem.out: No port specified. Please 
> run 'SocketWindowWordCount --hostname  --port ', where 
> hostname (localhost by default) and port is the address of the text 
> server\nTo start a simple text server, run 'netcat -l ' and type the 
> input text into the command line\n"]}
> {noformat}
> Note that flip6 mode must be enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9213) Revert breaking change in checkpoint detail URL

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449806#comment-16449806
 ] 

ASF GitHub Bot commented on FLINK-9213:
---

GitHub user azagrebin opened a pull request:

https://github.com/apache/flink/pull/5906

[FLINK-9213] [REST] [docs] Revert checkpoint details URL from 
'/jobs/:jobid/checkpoints/:checkpointid' to 
'/jobs/:jobid/checkpoints/details/:checkpointid' as it was in 1.5

## What is the purpose of the change

This PR aligns the newer 1.5 REST implementation of getting checkpoint 
details with previous legacy one by reverting URL address
from `/jobs/:jobid/checkpoints/:checkpointid`
to `/jobs/:jobid/checkpoints/details/:checkpointid`

## Brief change log

  - revert `URL` static field in `CheckpointStatisticDetailsHeaders`
  - regenerate docs for `docs/_includes/generated/rest_dispatcher.html`


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/azagrebin/flink FLINK-9213

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5906.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5906


commit 4ebe10873bcc784f626a35bf6055d93f39e6b9af
Author: Andrey Zagrebin 
Date:   2018-04-24T11:26:40Z

[FLINK-9213] Revert checkpoint details URL from 
'/jobs/:jobid/checkpoints/:checkpointid' to 
'/jobs/:jobid/checkpoints/details/:checkpointid' as it was in 1.5




> Revert breaking change in checkpoint detail URL
> ---
>
> Key: FLINK-9213
> URL: https://issues.apache.org/jira/browse/FLINK-9213
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> In 1.4, the URL for retrieving detailed checkpoint information is 
> {{/jobs/:jobid/checkpoints/details/:checkpointid}}, whereas in 1.5 it is 
> {{/jobs/:jobid/checkpoints/:checkpointid}}.
> This is a breaking change that also affects the WebUI and should thus be 
> reverted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8912) Web UI does not render error messages correctly in FLIP-6 mode

2018-04-24 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8912:

Affects Version/s: (was: 1.6.0)

> Web UI does not render error messages correctly in FLIP-6 mode
> --
>
> Key: FLINK-8912
> URL: https://issues.apache.org/jira/browse/FLINK-8912
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.5.0
> Environment: commit: c531486288caf5241cdf7f0f00f087f3ce82239f
>Reporter: Gary Yao
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip6
> Fix For: 1.5.0
>
>
> *Description*
> The Web UI renders error messages returned by the REST API incorrectly, e.g., 
> on the job submission page. The JSON returned by the REST API is rendered as 
> a whole. However, the UI should only render the contents of the {{errors}} 
> field.
> *Steps to reproduce*
> Submit {{examples/streaming/SocketWindowWordCount.jar}} without specifying 
> program arguments. Error message will be rendered as follows:
> {noformat}
> {"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
> program plan could not be fetched - the program aborted 
> pre-maturely.\n\nSystem.err: (none)\n\nSystem.out: No port specified. Please 
> run 'SocketWindowWordCount --hostname  --port ', where 
> hostname (localhost by default) and port is the address of the text 
> server\nTo start a simple text server, run 'netcat -l ' and type the 
> input text into the command line\n"]}
> {noformat}
> Note that flip6 mode must be enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5906: [FLINK-9213] [REST] [docs] Revert checkpoint detai...

2018-04-24 Thread azagrebin
GitHub user azagrebin opened a pull request:

https://github.com/apache/flink/pull/5906

[FLINK-9213] [REST] [docs] Revert checkpoint details URL from 
'/jobs/:jobid/checkpoints/:checkpointid' to 
'/jobs/:jobid/checkpoints/details/:checkpointid' as it was in 1.5

## What is the purpose of the change

This PR aligns the newer 1.5 REST implementation of getting checkpoint 
details with previous legacy one by reverting URL address
from `/jobs/:jobid/checkpoints/:checkpointid`
to `/jobs/:jobid/checkpoints/details/:checkpointid`

## Brief change log

  - revert `URL` static field in `CheckpointStatisticDetailsHeaders`
  - regenerate docs for `docs/_includes/generated/rest_dispatcher.html`


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/azagrebin/flink FLINK-9213

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5906.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5906


commit 4ebe10873bcc784f626a35bf6055d93f39e6b9af
Author: Andrey Zagrebin 
Date:   2018-04-24T11:26:40Z

[FLINK-9213] Revert checkpoint details URL from 
'/jobs/:jobid/checkpoints/:checkpointid' to 
'/jobs/:jobid/checkpoints/details/:checkpointid' as it was in 1.5




---


[jira] [Commented] (FLINK-9229) Fix literal handling in code generation

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449788#comment-16449788
 ] 

ASF GitHub Bot commented on FLINK-9229:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5898#discussion_r183716031
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1274,30 +1276,46 @@ abstract class CodeGenerator(
   literalType: TypeInformation[_],
   literalCode: String)
 : GeneratedExpression = {
+
+// mark this expression as a constant literal
+generateTerm(literalType, literalCode).copy(literal = true)
+  }
+
+  private[flink] def generateSymbol(enum: Enum[_]): GeneratedExpression = {
+GeneratedExpression(
+  qualifyEnum(enum),
+  "false",
+  "",
+  new GenericTypeInfo(enum.getDeclaringClass))
+  }
+
+  /**
+* Generates access to a term (e.g. a field) that does not require 
unboxing logic.
+*
+* @param fieldType type of field
+* @param fieldTerm expression term of field (already unboxed)
+* @return internal unboxed field representation
+*/
+  private[flink] def generateTerm(
+  fieldType: TypeInformation[_],
+  fieldTerm: String)
+: GeneratedExpression = {
 val resultTerm = newName("result")
 val nullTerm = newName("isNull")
--- End diff --

If the `nullTerm` is constant, couldn't we say `nullTerm = "false";` and 
simplify the remaining code?


> Fix literal handling in code generation
> ---
>
> Key: FLINK-9229
> URL: https://issues.apache.org/jira/browse/FLINK-9229
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.4.2
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> Information about expressions that are constant help during code generation. 
> Especially when moving often reused parts of code in the member area of a 
> generated function. Right now this behavior is not consistent because even 
> methods in {{generateFieldAccess}} generate literals but they are not 
> constant. This could lead to unintended behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9229) Fix literal handling in code generation

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449789#comment-16449789
 ] 

ASF GitHub Bot commented on FLINK-9229:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5898#discussion_r183716512
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1274,30 +1276,46 @@ abstract class CodeGenerator(
   literalType: TypeInformation[_],
   literalCode: String)
 : GeneratedExpression = {
+
+// mark this expression as a constant literal
+generateTerm(literalType, literalCode).copy(literal = true)
+  }
+
+  private[flink] def generateSymbol(enum: Enum[_]): GeneratedExpression = {
+GeneratedExpression(
+  qualifyEnum(enum),
+  "false",
+  "",
+  new GenericTypeInfo(enum.getDeclaringClass))
+  }
+
+  /**
+* Generates access to a term (e.g. a field) that does not require 
unboxing logic.
+*
+* @param fieldType type of field
+* @param fieldTerm expression term of field (already unboxed)
+* @return internal unboxed field representation
+*/
+  private[flink] def generateTerm(
+  fieldType: TypeInformation[_],
+  fieldTerm: String)
+: GeneratedExpression = {
 val resultTerm = newName("result")
 val nullTerm = newName("isNull")
--- End diff --

Same is true for `generateNullLiteral()`


> Fix literal handling in code generation
> ---
>
> Key: FLINK-9229
> URL: https://issues.apache.org/jira/browse/FLINK-9229
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.4.2
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> Information about expressions that are constant help during code generation. 
> Especially when moving often reused parts of code in the member area of a 
> generated function. Right now this behavior is not consistent because even 
> methods in {{generateFieldAccess}} generate literals but they are not 
> constant. This could lead to unintended behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5898: [FLINK-9229] [table] Fix literal handling in code ...

2018-04-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5898#discussion_r183716512
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1274,30 +1276,46 @@ abstract class CodeGenerator(
   literalType: TypeInformation[_],
   literalCode: String)
 : GeneratedExpression = {
+
+// mark this expression as a constant literal
+generateTerm(literalType, literalCode).copy(literal = true)
+  }
+
+  private[flink] def generateSymbol(enum: Enum[_]): GeneratedExpression = {
+GeneratedExpression(
+  qualifyEnum(enum),
+  "false",
+  "",
+  new GenericTypeInfo(enum.getDeclaringClass))
+  }
+
+  /**
+* Generates access to a term (e.g. a field) that does not require 
unboxing logic.
+*
+* @param fieldType type of field
+* @param fieldTerm expression term of field (already unboxed)
+* @return internal unboxed field representation
+*/
+  private[flink] def generateTerm(
+  fieldType: TypeInformation[_],
+  fieldTerm: String)
+: GeneratedExpression = {
 val resultTerm = newName("result")
 val nullTerm = newName("isNull")
--- End diff --

Same is true for `generateNullLiteral()`


---


  1   2   3   >