[jira] [Updated] (FLINK-7488) TaskManagerHeapSizeCalculationJavaBashTest sometimes fails

2017-11-18 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7488:
--
Description: 
{code}
compareNetworkBufShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest)
  Time elapsed: 0.239 sec  <<< FAILURE!
org.junit.ComparisonFailure: Different network buffer memory sizes with 
configuration: {taskmanager.network.memory.fraction=0.1, 
taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, 
taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, 
taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} 
expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf 
because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to 
augment the Hadoop classpath: 
/usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600>
  at org.junit.Assert.assertEquals(Assert.java:115)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:235)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:81)

compareHeapSizeShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest)
  Time elapsed: 0.16 sec  <<< FAILURE!
org.junit.ComparisonFailure: Different heap sizes with configuration: 
{taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, 
taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, 
taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, 
taskmanager.network.memory.min=67108864} expected:<[]1000> but was:<[Setting 
HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the 
result of 'hadoop classpath' to augment the Hadoop classpath: 
/usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]1000>
  at org.junit.Assert.assertEquals(Assert.java:115)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:275)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:110)
{code}
$HADOOP_CONF_DIR was not set prior to running the test.

  was:
{code}
compareNetworkBufShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest)
  Time elapsed: 0.239 sec  <<< FAILURE!
org.junit.ComparisonFailure: Different network buffer memory sizes with 
configuration: {taskmanager.network.memory.fraction=0.1, 
taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, 
taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, 
taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} 
expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf 
because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to 
augment the Hadoop classpath: 
/usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600>
  at org.junit.Assert.assertEquals(Assert.java:115)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:235)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:81)

compareHeapSizeShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest)
  Time elapsed: 0.16 sec  <<< FAILURE!
org.junit.ComparisonFailure: Different 

[jira] [Commented] (FLINK-7588) Document RocksDB tuning for spinning disks

2017-11-18 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258309#comment-16258309
 ] 

Ted Yu commented on FLINK-7588:
---

bq. Be careful about whether you have enough memory to keep all bloom filters

Other than the above being tricky, the other guidelines are actionable.

> Document RocksDB tuning for spinning disks
> --
>
> Key: FLINK-7588
> URL: https://issues.apache.org/jira/browse/FLINK-7588
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Ted Yu
>
> In docs/ops/state/large_state_tuning.md , it was mentioned that:
> bq. the default configuration is tailored towards SSDs and performs 
> suboptimal on spinning disks
> We should add recommendation targeting spinning disks:
> https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#difference-of-spinning-disk



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-8101) Elasticsearch 6.x support

2017-11-18 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 reassigned FLINK-8101:
-

Assignee: Hai Zhou UTC+8

> Elasticsearch 6.x support
> -
>
> Key: FLINK-8101
> URL: https://issues.apache.org/jira/browse/FLINK-8101
> Project: Flink
>  Issue Type: New Feature
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> Recently, elasticsearch 6.0.0 was released: 
> https://www.elastic.co/blog/elasticsearch-6-0-0-released  
> The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8038) Support MAP value constructor

2017-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258276#comment-16258276
 ] 

ASF GitHub Bot commented on FLINK-8038:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5015#discussion_r151851733
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo}
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import org.apache.flink.table.plan.schema.MapRelDataType
+import org.apache.flink.table.validate.{ValidationFailure, 
ValidationResult, ValidationSuccess}
+
+import scala.collection.JavaConverters._
+
+case class MapConstructor(elements: Seq[Expression]) extends Expression {
+  override private[flink] def children: Seq[Expression] = elements
+
+  private[flink] var mapResultType: TypeInformation[_] = new MapTypeInfo(
+new GenericTypeInfo[AnyRef](classOf[AnyRef]),
+new GenericTypeInfo[AnyRef](classOf[AnyRef]))
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val typeFactory = 
relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory
+val entryRelDataTypes = elements
+  .map(x => typeFactory.createTypeFromTypeInfo(x.resultType, 
isNullable = false))
+val relDataType = SqlStdOperatorTable
--- End diff --

You are right, since both ARRAY and MAP require explicit type match. 
This actually prompts the question: "should we support implicit type case 
like Calcite did?


> Support MAP value constructor
> -
>
> Key: FLINK-8038
> URL: https://issues.apache.org/jira/browse/FLINK-8038
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> Similar to https://issues.apache.org/jira/browse/FLINK-4554
> We want to support Map value constructor which is supported by Calcite:
> https://calcite.apache.org/docs/reference.html#value-constructors
> {code:sql}
> SELECT
>   MAP['key1', f0, 'key2', f1] AS stringKeyedMap,
>   MAP['key', 'value'] AS literalMap,
>   MAP[f0, f1] AS fieldMap
> FROM
>   table
> {code}
> This should enable users to construct MapTypeInfo, one of the CompositeType.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...

2017-11-18 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5015#discussion_r151851733
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo}
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import org.apache.flink.table.plan.schema.MapRelDataType
+import org.apache.flink.table.validate.{ValidationFailure, 
ValidationResult, ValidationSuccess}
+
+import scala.collection.JavaConverters._
+
+case class MapConstructor(elements: Seq[Expression]) extends Expression {
+  override private[flink] def children: Seq[Expression] = elements
+
+  private[flink] var mapResultType: TypeInformation[_] = new MapTypeInfo(
+new GenericTypeInfo[AnyRef](classOf[AnyRef]),
+new GenericTypeInfo[AnyRef](classOf[AnyRef]))
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val typeFactory = 
relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory
+val entryRelDataTypes = elements
+  .map(x => typeFactory.createTypeFromTypeInfo(x.resultType, 
isNullable = false))
+val relDataType = SqlStdOperatorTable
--- End diff --

You are right, since both ARRAY and MAP require explicit type match. 
This actually prompts the question: "should we support implicit type case 
like Calcite did?


---


[jira] [Commented] (FLINK-8038) Support MAP value constructor

2017-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258275#comment-16258275
 ] 

ASF GitHub Bot commented on FLINK-8038:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5015#discussion_r151851711
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -705,6 +705,14 @@ trait ImplicitExpressionOperations {
 */
   def element() = ArrayElement(expr)
 
+  /**
+* Accesses the element of a map based on key.
+*
+* @param key key of the element
+* @return value of the element
+*/
+  def getValue(key: Expression) = MapElementGetValue(expr, key)
--- End diff --

I think that makes perfect sense since both of them utilize ITEM operator 


> Support MAP value constructor
> -
>
> Key: FLINK-8038
> URL: https://issues.apache.org/jira/browse/FLINK-8038
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> Similar to https://issues.apache.org/jira/browse/FLINK-4554
> We want to support Map value constructor which is supported by Calcite:
> https://calcite.apache.org/docs/reference.html#value-constructors
> {code:sql}
> SELECT
>   MAP['key1', f0, 'key2', f1] AS stringKeyedMap,
>   MAP['key', 'value'] AS literalMap,
>   MAP[f0, f1] AS fieldMap
> FROM
>   table
> {code}
> This should enable users to construct MapTypeInfo, one of the CompositeType.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...

2017-11-18 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5015#discussion_r151851711
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -705,6 +705,14 @@ trait ImplicitExpressionOperations {
 */
   def element() = ArrayElement(expr)
 
+  /**
+* Accesses the element of a map based on key.
+*
+* @param key key of the element
+* @return value of the element
+*/
+  def getValue(key: Expression) = MapElementGetValue(expr, key)
--- End diff --

I think that makes perfect sense since both of them utilize ITEM operator 


---


[jira] [Assigned] (FLINK-8104) Fix Row value constructor

2017-11-18 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong reassigned FLINK-8104:


Assignee: Rong Rong

> Fix Row value constructor
> -
>
> Key: FLINK-8104
> URL: https://issues.apache.org/jira/browse/FLINK-8104
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> Support Row value constructor which is currently broken. 
> See 
> {code:java}
> // 
> flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
>   @Test
>   def testValueConstructorFunctions(): Unit = {
> // TODO we need a special code path that flattens ROW types
> // testSqlApi("ROW('hello world', 12)", "hello world") // test base only 
> returns field 0
> // testSqlApi("('hello world', 12)", "hello world") // test base only 
> returns field 0
> // ...
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8104) Fix Row value constructor

2017-11-18 Thread Rong Rong (JIRA)
Rong Rong created FLINK-8104:


 Summary: Fix Row value constructor
 Key: FLINK-8104
 URL: https://issues.apache.org/jira/browse/FLINK-8104
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Reporter: Rong Rong


Support Row value constructor which is currently broken. 
See 

{code:java}
// 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
  @Test
  def testValueConstructorFunctions(): Unit = {
// TODO we need a special code path that flattens ROW types
// testSqlApi("ROW('hello world', 12)", "hello world") // test base only 
returns field 0
// testSqlApi("('hello world', 12)", "hello world") // test base only 
returns field 0
// ...
  }
{code}






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.

2017-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258254#comment-16258254
 ] 

ASF GitHub Bot commented on FLINK-8090:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5032
  
sounds good


> Improve error message when registering different states under the same name.
> 
>
> Key: FLINK-8090
> URL: https://issues.apache.org/jira/browse/FLINK-8090
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Xingcan Cui
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor> 
> firstMapStateDescriptor = new MapStateDescriptor<>(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO,
>   source.getType());
> final ListStateDescriptor secondListStateDescriptor = new 
> ListStateDescriptor(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction, Object>() {
>   private static final long serialVersionUID = 
> -805125545438296619L;
>   private transient MapState Tuple2> firstMapState;
> private transient ListState 
> secondListState;
>   @Override
>   public void open(Configuration parameters) 
> throws Exception {
>   super.open(parameters);
>   firstMapState = 
> getRuntimeContext().getMapState(firstMapStateDescriptor);
>   secondListState = 
> getRuntimeContext().getListState(secondListStateDescriptor);
>   }
>   @Override
>   public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception {
>   Tuple2 v = 
> firstMapState.get(value.f0);
>   if (v == null) {
>   v = new Tuple2<>(value.f0, 0L);
>   }
>   firstMapState.put(value.f0, new 
> Tuple2<>(v.f0, v.f1 + value.f1));
>   }
>   }
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
>   at 
> org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to 
> org.apache.flink.api.common.state.ListState
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
>   ... 9 more
> {code}
> Which is cryptic, as it does not explain the reason for the problem. The 
> error message should be something along the line of "Duplicate state name".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5032: [FLINK-8090] [DataStream] Improve the error message for d...

2017-11-18 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5032
  
sounds good


---


[jira] [Updated] (FLINK-8037) Missing cast in integer arithmetic in TransactionalIdsGenerator#generateIdsToAbort

2017-11-18 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-8037:
--
Description: 
{code}
  public Set generateIdsToAbort() {
Set idsToAbort = new HashSet<>();
for (int i = 0; i < safeScaleDownFactor; i++) {
  idsToAbort.addAll(generateIdsToUse(i * poolSize * totalNumberOfSubtasks));
{code}

The operands are integers where generateIdsToUse() expects long parameter.

  was:
{code}
  public Set generateIdsToAbort() {
Set idsToAbort = new HashSet<>();
for (int i = 0; i < safeScaleDownFactor; i++) {
  idsToAbort.addAll(generateIdsToUse(i * poolSize * totalNumberOfSubtasks));
{code}
The operands are integers where generateIdsToUse() expects long parameter.


> Missing cast in integer arithmetic in 
> TransactionalIdsGenerator#generateIdsToAbort
> --
>
> Key: FLINK-8037
> URL: https://issues.apache.org/jira/browse/FLINK-8037
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   public Set generateIdsToAbort() {
> Set idsToAbort = new HashSet<>();
> for (int i = 0; i < safeScaleDownFactor; i++) {
>   idsToAbort.addAll(generateIdsToUse(i * poolSize * 
> totalNumberOfSubtasks));
> {code}
> The operands are integers where generateIdsToUse() expects long parameter.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()

2017-11-18 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4534:
--
Description: 
Iteration over state.bucketStates is protected by synchronization in other 
methods, except for the following in restoreState():
{code}
for (BucketState bucketState : state.bucketStates.values()) {
{code}
and following in close():
{code}
for (Map.Entry entry : 
state.bucketStates.entrySet()) {
  closeCurrentPartFile(entry.getValue());
{code}
w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting 
line 752:
{code}
  Set pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
  LOG.debug("Moving pending files to final location on restore.");
  for (Long pastCheckpointId : pastCheckpointIds) {
{code}

  was:
Iteration over state.bucketStates is protected by synchronization in other 
methods, except for the following in restoreState():

{code}
for (BucketState bucketState : state.bucketStates.values()) {
{code}
and following in close():
{code}
for (Map.Entry entry : 
state.bucketStates.entrySet()) {
  closeCurrentPartFile(entry.getValue());
{code}
w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting 
line 752:
{code}
  Set pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
  LOG.debug("Moving pending files to final location on restore.");
  for (Long pastCheckpointId : pastCheckpointIds) {
{code}


> Lack of synchronization in BucketingSink#restoreState()
> ---
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
>
> Iteration over state.bucketStates is protected by synchronization in other 
> methods, except for the following in restoreState():
> {code}
> for (BucketState bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry entry : 
> state.bucketStates.entrySet()) {
>   closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue 
> starting line 752:
> {code}
>   Set pastCheckpointIds = 
> bucketState.pendingFilesPerCheckpoint.keySet();
>   LOG.debug("Moving pending files to final location on restore.");
>   for (Long pastCheckpointId : pastCheckpointIds) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-18 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-4228.
--
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed
1.4.0: 36b807567d93c431c1498241a42c20221cb6a664
1.5.0: cf8504dba606ee758ac16867423e65dbf6afc23a

> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0, 1.5.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258096#comment-16258096
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4939


> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7988) HadoopS3FileSystemITCase leaves test directories behind in S3

2017-11-18 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7988.
--
   Resolution: Fixed
Fix Version/s: 1.5.0
   1.4.0

Fixed
1.4.0: abdc7d1ecdc2e997ef6c9d1764348da00148ec28
1.5.0: 51b5b53c7cd7781959011ba48559c5361ac93ff9

> HadoopS3FileSystemITCase leaves test directories behind in S3
> -
>
> Key: FLINK-7988
> URL: https://issues.apache.org/jira/browse/FLINK-7988
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector, Tests
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
> Fix For: 1.4.0, 1.5.0
>
>
> {{HadoopS3FileSystemITCase}} creates a random test (base) directory in S3 
> which is not cleaned up. Please note, that the individual tests create 
> sub-directories and also always delete those at least.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7988) HadoopS3FileSystemITCase leaves test directories behind in S3

2017-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258095#comment-16258095
 ] 

ASF GitHub Bot commented on FLINK-7988:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4950


> HadoopS3FileSystemITCase leaves test directories behind in S3
> -
>
> Key: FLINK-7988
> URL: https://issues.apache.org/jira/browse/FLINK-7988
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector, Tests
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
> Fix For: 1.4.0, 1.5.0
>
>
> {{HadoopS3FileSystemITCase}} creates a random test (base) directory in S3 
> which is not cleaned up. Please note, that the individual tests create 
> sub-directories and also always delete those at least.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4939


---


[GitHub] flink pull request #4950: [FLINK-7988][s3] fix HadoopS3FileSystemITCase leav...

2017-11-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4950


---


[jira] [Updated] (FLINK-8103) Flink 1.4 not writing to standard out log file

2017-11-18 Thread Ryan Brideau (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Brideau updated FLINK-8103:

Description: 
I built the latest snapshot of 1.4 yesterday and tried testing it with a simple 
word count example, where StreamUtil is just a helper that checks input 
parameters:

{code:java}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object Words {
  def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
val dataStream = StreamUtil.getDataStream(env, params)

val wordDataStream = dataStream
  .flatMap{ _.split(" ") }

wordDataStream.println

// execute program
env.execute("Words Scala")
  }
}
{code}

This runs without an issue on the latest stable version of 1.3 and writes its 
results to the _out_ file, which I can tail to see the results. This doesn't 
happen in 1.4, however. I can modify it to write out to a file, however:


{code:java}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala._

object Words {
  def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
val dataStream = StreamUtil.getDataStream(env, params)

val wordDataStream = dataStream
  .flatMap{ _.split(" ") }

wordDataStream
  .writeAsText("file:///somepath/output", WriteMode.OVERWRITE)
  .setParallelism(1)

// execute program
env.execute("Words Scala")
  }
}
{code}

Any clues as to what might be causing this?

  was:
I built the latest snapshot of 1.4 yesterday and tried testing it with a simple 
word count example, where StreamUtil is just a helper than checks input 
parameters:

{code:java}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object Words {
  def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
val dataStream = StreamUtil.getDataStream(env, params)

val wordDataStream = dataStream
  .flatMap{ _.split(" ") }

wordDataStream.println

// execute program
env.execute("Words Scala")
  }
}
{code}

This runs without an issue on the latest stable version of 1.3 and writes its 
results to the _out_ file, which I can tail to see the results. This doesn't 
happen in 1.4, however. I can modify it to write out to a file, however:


{code:java}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala._

object Words {
  def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
val dataStream = StreamUtil.getDataStream(env, params)

val wordDataStream = dataStream
  .flatMap{ _.split(" ") }

wordDataStream
  .writeAsText("file:///somepath/output", WriteMode.OVERWRITE)
  .setParallelism(1)

// execute program
env.execute("Words Scala")
  }
}
{code}

Any clues as to what might be causing this?


> Flink 1.4 not writing to standard out log file
> --
>
> Key: FLINK-8103
> URL: https://issues.apache.org/jira/browse/FLINK-8103
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.0
> Environment: macOS 10.13 (High Sierra)
>Reporter: Ryan Brideau
>
> I built the latest snapshot of 1.4 yesterday and tried testing it with a 
> simple word count example, where StreamUtil is just a helper that checks 
> input parameters:
> {code:java}
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.scala._
> object Words {
>   def main(args: Array[String]) {
> // set up the execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val params = ParameterTool.fromArgs(args)
> env.getConfig.setGlobalJobParameters(params)
> val dataStream = StreamUtil.getDataStream(env, params)
> val wordDataStream = dataStream
>   .flatMap{ _.split(" ") }
> wordDataStream.println
> // execute program
> env.execute("Words Scala")
>   }
> }
> {code}
> This runs without an issue on the latest stable version of 1.3 and writes 

[jira] [Updated] (FLINK-8103) Flink 1.4 not writing to standard out log file

2017-11-18 Thread Ryan Brideau (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Brideau updated FLINK-8103:

Description: 
I built the latest snapshot of 1.4 yesterday and tried testing it with a simple 
word count example, where StreamUtil is just a helper than checks input 
parameters:

{code:java}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object Words {
  def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
val dataStream = StreamUtil.getDataStream(env, params)

val wordDataStream = dataStream
  .flatMap{ _.split(" ") }

wordDataStream.println

// execute program
env.execute("Words Scala")
  }
}
{code}

This runs without an issue on the latest stable version of 1.3 and writes its 
results to the _out_ file, which I can tail to see the results. This doesn't 
happen in 1.4, however. I can modify it to write out to a file, however:


{code:java}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala._

object Words {
  def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
val dataStream = StreamUtil.getDataStream(env, params)

val wordDataStream = dataStream
  .flatMap{ _.split(" ") }

wordDataStream
  .writeAsText("file:///somepath/output", WriteMode.OVERWRITE)
  .setParallelism(1)

// execute program
env.execute("Words Scala")
  }
}
{code}

Any clues as to what might be causing this?

  was:
I built the latest snapshot of 1.4 yesterday and tries testing it with a simple 
word count example, where StreamUtil is just a helper than checks input 
parameters:

{code:java}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object Words {
  def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
val dataStream = StreamUtil.getDataStream(env, params)

val wordDataStream = dataStream
  .flatMap{ _.split(" ") }

wordDataStream.println

// execute program
env.execute("Words Scala")
  }
}
{code}

This runs without an issue on the latest stable version of 1.3 and writes its 
results to the _out_ file, which I can tail to see the results. This doesn't 
happen in 1.4, however. I can modify it to write out to a file, however:


{code:java}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala._

object Words {
  def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
val dataStream = StreamUtil.getDataStream(env, params)

val wordDataStream = dataStream
  .flatMap{ _.split(" ") }

wordDataStream
  .writeAsText("file:///somepath/output", WriteMode.OVERWRITE)
  .setParallelism(1)

// execute program
env.execute("Words Scala")
  }
}
{code}

Any clues as to what might be causing this?


> Flink 1.4 not writing to standard out log file
> --
>
> Key: FLINK-8103
> URL: https://issues.apache.org/jira/browse/FLINK-8103
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.0
> Environment: macOS 10.13 (High Sierra)
>Reporter: Ryan Brideau
>
> I built the latest snapshot of 1.4 yesterday and tried testing it with a 
> simple word count example, where StreamUtil is just a helper than checks 
> input parameters:
> {code:java}
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.scala._
> object Words {
>   def main(args: Array[String]) {
> // set up the execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val params = ParameterTool.fromArgs(args)
> env.getConfig.setGlobalJobParameters(params)
> val dataStream = StreamUtil.getDataStream(env, params)
> val wordDataStream = dataStream
>   .flatMap{ _.split(" ") }
> wordDataStream.println
> // execute program
> env.execute("Words Scala")
>   }
> }
> {code}
> This runs without an issue on the latest stable version of 1.3 and writes 

[jira] [Created] (FLINK-8103) Flink 1.4 not writing to standard out log file

2017-11-18 Thread Ryan Brideau (JIRA)
Ryan Brideau created FLINK-8103:
---

 Summary: Flink 1.4 not writing to standard out log file
 Key: FLINK-8103
 URL: https://issues.apache.org/jira/browse/FLINK-8103
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.4.0
 Environment: macOS 10.13 (High Sierra)
Reporter: Ryan Brideau


I built the latest snapshot of 1.4 yesterday and tries testing it with a simple 
word count example, where StreamUtil is just a helper than checks input 
parameters:

{code:scala}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object Words {
  def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
val dataStream = StreamUtil.getDataStream(env, params)

val wordDataStream = dataStream
  .flatMap{ _.split(" ") }

wordDataStream.println

// execute program
env.execute("Words Scala")
  }
}
{code}

This runs without an issue on the latest stable version of 1.3 and writes its 
results to the _out_ file, which I can tail to see the results. This doesn't 
happen in 1.4, however. I can modify it to write out to a file, however:


{code:scala}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala._

object Words {
  def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
val dataStream = StreamUtil.getDataStream(env, params)

val wordDataStream = dataStream
  .flatMap{ _.split(" ") }

wordDataStream
  .writeAsText("file:///somepath/output", WriteMode.OVERWRITE)
  .setParallelism(1)

// execute program
env.execute("Words Scala")
  }
}
{code}

Any clues as to what might be causing this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8103) Flink 1.4 not writing to standard out log file

2017-11-18 Thread Ryan Brideau (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Brideau updated FLINK-8103:

Description: 
I built the latest snapshot of 1.4 yesterday and tries testing it with a simple 
word count example, where StreamUtil is just a helper than checks input 
parameters:

{code:java}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object Words {
  def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
val dataStream = StreamUtil.getDataStream(env, params)

val wordDataStream = dataStream
  .flatMap{ _.split(" ") }

wordDataStream.println

// execute program
env.execute("Words Scala")
  }
}
{code}

This runs without an issue on the latest stable version of 1.3 and writes its 
results to the _out_ file, which I can tail to see the results. This doesn't 
happen in 1.4, however. I can modify it to write out to a file, however:


{code:java}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala._

object Words {
  def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
val dataStream = StreamUtil.getDataStream(env, params)

val wordDataStream = dataStream
  .flatMap{ _.split(" ") }

wordDataStream
  .writeAsText("file:///somepath/output", WriteMode.OVERWRITE)
  .setParallelism(1)

// execute program
env.execute("Words Scala")
  }
}
{code}

Any clues as to what might be causing this?

  was:
I built the latest snapshot of 1.4 yesterday and tries testing it with a simple 
word count example, where StreamUtil is just a helper than checks input 
parameters:

{code:scala}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object Words {
  def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
val dataStream = StreamUtil.getDataStream(env, params)

val wordDataStream = dataStream
  .flatMap{ _.split(" ") }

wordDataStream.println

// execute program
env.execute("Words Scala")
  }
}
{code}

This runs without an issue on the latest stable version of 1.3 and writes its 
results to the _out_ file, which I can tail to see the results. This doesn't 
happen in 1.4, however. I can modify it to write out to a file, however:


{code:scala}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala._

object Words {
  def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
val dataStream = StreamUtil.getDataStream(env, params)

val wordDataStream = dataStream
  .flatMap{ _.split(" ") }

wordDataStream
  .writeAsText("file:///somepath/output", WriteMode.OVERWRITE)
  .setParallelism(1)

// execute program
env.execute("Words Scala")
  }
}
{code}

Any clues as to what might be causing this?


> Flink 1.4 not writing to standard out log file
> --
>
> Key: FLINK-8103
> URL: https://issues.apache.org/jira/browse/FLINK-8103
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.0
> Environment: macOS 10.13 (High Sierra)
>Reporter: Ryan Brideau
>
> I built the latest snapshot of 1.4 yesterday and tries testing it with a 
> simple word count example, where StreamUtil is just a helper than checks 
> input parameters:
> {code:java}
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.scala._
> object Words {
>   def main(args: Array[String]) {
> // set up the execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val params = ParameterTool.fromArgs(args)
> env.getConfig.setGlobalJobParameters(params)
> val dataStream = StreamUtil.getDataStream(env, params)
> val wordDataStream = dataStream
>   .flatMap{ _.split(" ") }
> wordDataStream.println
> // execute program
> env.execute("Words Scala")
>   }
> }
> {code}
> This runs without an issue on the latest stable version of 1.3 and writes 

[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2017-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258055#comment-16258055
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user haoch commented on the issue:

https://github.com/apache/flink/pull/2487
  
As mentioned above, moved the patch to Apache Bahir at 
https://github.com/apache/bahir/pull/54, resolve this one.


> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Hao Chen
>Assignee: Hao Chen
>  Labels: cep, library, patch-available
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. 
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
> * Filter
> * Join
> * Aggregation
> * Group by
> * Having
> * Window
> * Conditions and Expressions
> * Pattern processing
> * Sequence processing
> * Event Tables
> ...
> * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
> `SiddhiCEP` and `SiddhiStream`)
> * Register Flink DataStream associating native type information with 
> Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
> * Connect with single or multiple Flink DataStreams with Siddhi CEP 
> Execution Plan
> * Return output stream as DataStream with type intelligently inferred 
> from Siddhi Stream Schema
> * Integrate siddhi runtime state management with Flink state (See 
> `AbstractSiddhiOperator`)
> * Support siddhi plugin management to extend CEP functions. (See 
> `SiddhiCEP#registerExtension`)
> h2. Test Cases 
> * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: 
> https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java
> h2. Example
> {code}
>  StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
>  cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
>  cep.registerStream("inputStream1", input1, "id", "name", 
> "price","timestamp");
>  cep.registerStream("inputStream2", input2, "id", "name", 
> "price","timestamp");
>  DataStream> output = cep
>   .from("inputStream1").union("inputStream2")
>   .sql(
> "from every s1 = inputStream1[id == 2] "
>  + " -> s2 = inputStream2[id == 3] "
>  + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as 
> name_2 , custom:plus(s1.price,s2.price) as price"
>  + "insert into outputStream"
>   )
>   .returns("outputStream");
>  env.execute();
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

2017-11-18 Thread haoch
Github user haoch commented on the issue:

https://github.com/apache/flink/pull/2487
  
As mentioned above, moved the patch to Apache Bahir at 
https://github.com/apache/bahir/pull/54, resolve this one.


---


[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.

2017-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258014#comment-16258014
 ] 

ASF GitHub Bot commented on FLINK-8090:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5032
  
Hi @bowenli86, thanks for the review. This PR only changes the message of 
`RuntimeException`, thus may not be easily verified. To improve that, we'd add 
some dedicated exceptions for that. What do you think?


> Improve error message when registering different states under the same name.
> 
>
> Key: FLINK-8090
> URL: https://issues.apache.org/jira/browse/FLINK-8090
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Xingcan Cui
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor> 
> firstMapStateDescriptor = new MapStateDescriptor<>(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO,
>   source.getType());
> final ListStateDescriptor secondListStateDescriptor = new 
> ListStateDescriptor(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction, Object>() {
>   private static final long serialVersionUID = 
> -805125545438296619L;
>   private transient MapState Tuple2> firstMapState;
> private transient ListState 
> secondListState;
>   @Override
>   public void open(Configuration parameters) 
> throws Exception {
>   super.open(parameters);
>   firstMapState = 
> getRuntimeContext().getMapState(firstMapStateDescriptor);
>   secondListState = 
> getRuntimeContext().getListState(secondListStateDescriptor);
>   }
>   @Override
>   public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception {
>   Tuple2 v = 
> firstMapState.get(value.f0);
>   if (v == null) {
>   v = new Tuple2<>(value.f0, 0L);
>   }
>   firstMapState.put(value.f0, new 
> Tuple2<>(v.f0, v.f1 + value.f1));
>   }
>   }
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
>   at 
> org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to 
> org.apache.flink.api.common.state.ListState
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
>   ... 9 more
> {code}
> Which is cryptic, as it does not explain the reason for the problem. The 
> error message should be something along the line of "Duplicate state name".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5032: [FLINK-8090] [DataStream] Improve the error message for d...

2017-11-18 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5032
  
Hi @bowenli86, thanks for the review. This PR only changes the message of 
`RuntimeException`, thus may not be easily verified. To improve that, we'd add 
some dedicated exceptions for that. What do you think?


---