[jira] [Updated] (FLINK-7488) TaskManagerHeapSizeCalculationJavaBashTest sometimes fails
[ https://issues.apache.org/jira/browse/FLINK-7488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7488: -- Description: {code} compareNetworkBufShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.239 sec <<< FAILURE! org.junit.ComparisonFailure: Different network buffer memory sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600> at org.junit.Assert.assertEquals(Assert.java:115) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:235) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:81) compareHeapSizeShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.16 sec <<< FAILURE! org.junit.ComparisonFailure: Different heap sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]1000> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]1000> at org.junit.Assert.assertEquals(Assert.java:115) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:275) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:110) {code} $HADOOP_CONF_DIR was not set prior to running the test. was: {code} compareNetworkBufShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.239 sec <<< FAILURE! org.junit.ComparisonFailure: Different network buffer memory sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600> at org.junit.Assert.assertEquals(Assert.java:115) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:235) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:81) compareHeapSizeShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.16 sec <<< FAILURE! org.junit.ComparisonFailure: Different
[jira] [Commented] (FLINK-7588) Document RocksDB tuning for spinning disks
[ https://issues.apache.org/jira/browse/FLINK-7588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258309#comment-16258309 ] Ted Yu commented on FLINK-7588: --- bq. Be careful about whether you have enough memory to keep all bloom filters Other than the above being tricky, the other guidelines are actionable. > Document RocksDB tuning for spinning disks > -- > > Key: FLINK-7588 > URL: https://issues.apache.org/jira/browse/FLINK-7588 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Ted Yu > > In docs/ops/state/large_state_tuning.md , it was mentioned that: > bq. the default configuration is tailored towards SSDs and performs > suboptimal on spinning disks > We should add recommendation targeting spinning disks: > https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#difference-of-spinning-disk -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-8101) Elasticsearch 6.x support
[ https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou UTC+8 reassigned FLINK-8101: - Assignee: Hai Zhou UTC+8 > Elasticsearch 6.x support > - > > Key: FLINK-8101 > URL: https://issues.apache.org/jira/browse/FLINK-8101 > Project: Flink > Issue Type: New Feature > Components: ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > Fix For: 1.5.0 > > > Recently, elasticsearch 6.0.0 was released: > https://www.elastic.co/blog/elasticsearch-6-0-0-released > The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8038) Support MAP value constructor
[ https://issues.apache.org/jira/browse/FLINK-8038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258276#comment-16258276 ] ASF GitHub Bot commented on FLINK-8038: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5015#discussion_r151851733 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo} +import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} +import org.apache.flink.table.plan.schema.MapRelDataType +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} + +import scala.collection.JavaConverters._ + +case class MapConstructor(elements: Seq[Expression]) extends Expression { + override private[flink] def children: Seq[Expression] = elements + + private[flink] var mapResultType: TypeInformation[_] = new MapTypeInfo( +new GenericTypeInfo[AnyRef](classOf[AnyRef]), +new GenericTypeInfo[AnyRef](classOf[AnyRef])) + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { +val typeFactory = relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory +val entryRelDataTypes = elements + .map(x => typeFactory.createTypeFromTypeInfo(x.resultType, isNullable = false)) +val relDataType = SqlStdOperatorTable --- End diff -- You are right, since both ARRAY and MAP require explicit type match. This actually prompts the question: "should we support implicit type case like Calcite did? > Support MAP value constructor > - > > Key: FLINK-8038 > URL: https://issues.apache.org/jira/browse/FLINK-8038 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Similar to https://issues.apache.org/jira/browse/FLINK-4554 > We want to support Map value constructor which is supported by Calcite: > https://calcite.apache.org/docs/reference.html#value-constructors > {code:sql} > SELECT > MAP['key1', f0, 'key2', f1] AS stringKeyedMap, > MAP['key', 'value'] AS literalMap, > MAP[f0, f1] AS fieldMap > FROM > table > {code} > This should enable users to construct MapTypeInfo, one of the CompositeType. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5015#discussion_r151851733 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo} +import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} +import org.apache.flink.table.plan.schema.MapRelDataType +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} + +import scala.collection.JavaConverters._ + +case class MapConstructor(elements: Seq[Expression]) extends Expression { + override private[flink] def children: Seq[Expression] = elements + + private[flink] var mapResultType: TypeInformation[_] = new MapTypeInfo( +new GenericTypeInfo[AnyRef](classOf[AnyRef]), +new GenericTypeInfo[AnyRef](classOf[AnyRef])) + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { +val typeFactory = relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory +val entryRelDataTypes = elements + .map(x => typeFactory.createTypeFromTypeInfo(x.resultType, isNullable = false)) +val relDataType = SqlStdOperatorTable --- End diff -- You are right, since both ARRAY and MAP require explicit type match. This actually prompts the question: "should we support implicit type case like Calcite did? ---
[jira] [Commented] (FLINK-8038) Support MAP value constructor
[ https://issues.apache.org/jira/browse/FLINK-8038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258275#comment-16258275 ] ASF GitHub Bot commented on FLINK-8038: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5015#discussion_r151851711 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -705,6 +705,14 @@ trait ImplicitExpressionOperations { */ def element() = ArrayElement(expr) + /** +* Accesses the element of a map based on key. +* +* @param key key of the element +* @return value of the element +*/ + def getValue(key: Expression) = MapElementGetValue(expr, key) --- End diff -- I think that makes perfect sense since both of them utilize ITEM operator > Support MAP value constructor > - > > Key: FLINK-8038 > URL: https://issues.apache.org/jira/browse/FLINK-8038 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Similar to https://issues.apache.org/jira/browse/FLINK-4554 > We want to support Map value constructor which is supported by Calcite: > https://calcite.apache.org/docs/reference.html#value-constructors > {code:sql} > SELECT > MAP['key1', f0, 'key2', f1] AS stringKeyedMap, > MAP['key', 'value'] AS literalMap, > MAP[f0, f1] AS fieldMap > FROM > table > {code} > This should enable users to construct MapTypeInfo, one of the CompositeType. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5015#discussion_r151851711 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -705,6 +705,14 @@ trait ImplicitExpressionOperations { */ def element() = ArrayElement(expr) + /** +* Accesses the element of a map based on key. +* +* @param key key of the element +* @return value of the element +*/ + def getValue(key: Expression) = MapElementGetValue(expr, key) --- End diff -- I think that makes perfect sense since both of them utilize ITEM operator ---
[jira] [Assigned] (FLINK-8104) Fix Row value constructor
[ https://issues.apache.org/jira/browse/FLINK-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong reassigned FLINK-8104: Assignee: Rong Rong > Fix Row value constructor > - > > Key: FLINK-8104 > URL: https://issues.apache.org/jira/browse/FLINK-8104 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Support Row value constructor which is currently broken. > See > {code:java} > // > flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala > @Test > def testValueConstructorFunctions(): Unit = { > // TODO we need a special code path that flattens ROW types > // testSqlApi("ROW('hello world', 12)", "hello world") // test base only > returns field 0 > // testSqlApi("('hello world', 12)", "hello world") // test base only > returns field 0 > // ... > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8104) Fix Row value constructor
Rong Rong created FLINK-8104: Summary: Fix Row value constructor Key: FLINK-8104 URL: https://issues.apache.org/jira/browse/FLINK-8104 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Rong Rong Support Row value constructor which is currently broken. See {code:java} // flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala @Test def testValueConstructorFunctions(): Unit = { // TODO we need a special code path that flattens ROW types // testSqlApi("ROW('hello world', 12)", "hello world") // test base only returns field 0 // testSqlApi("('hello world', 12)", "hello world") // test base only returns field 0 // ... } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.
[ https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258254#comment-16258254 ] ASF GitHub Bot commented on FLINK-8090: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5032 sounds good > Improve error message when registering different states under the same name. > > > Key: FLINK-8090 > URL: https://issues.apache.org/jira/browse/FLINK-8090 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Xingcan Cui > > Currently a {{ProcessFunction}} like this: > {code} > final MapStateDescriptor> > firstMapStateDescriptor = new MapStateDescriptor<>( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO, > source.getType()); > final ListStateDescriptor secondListStateDescriptor = new > ListStateDescriptor( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO); > new ProcessFunction , Object>() { > private static final long serialVersionUID = > -805125545438296619L; > private transient MapState Tuple2 > firstMapState; > private transient ListState > secondListState; > @Override > public void open(Configuration parameters) > throws Exception { > super.open(parameters); > firstMapState = > getRuntimeContext().getMapState(firstMapStateDescriptor); > secondListState = > getRuntimeContext().getListState(secondListStateDescriptor); > } > @Override > public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception { > Tuple2 v = > firstMapState.get(value.f0); > if (v == null) { > v = new Tuple2<>(value.f0, 0L); > } > firstMapState.put(value.f0, new > Tuple2<>(v.f0, v.f1 + value.f1)); > } > } > {code} > fails with: > {code} > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127) > at > org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassCastException: > org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to > org.apache.flink.api.common.state.ListState > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71) > ... 9 more > {code} > Which is cryptic, as it does not explain the reason for the problem. The > error message should be something along the line of "Duplicate state name". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5032: [FLINK-8090] [DataStream] Improve the error message for d...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5032 sounds good ---
[jira] [Updated] (FLINK-8037) Missing cast in integer arithmetic in TransactionalIdsGenerator#generateIdsToAbort
[ https://issues.apache.org/jira/browse/FLINK-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-8037: -- Description: {code} public Set generateIdsToAbort() { Set idsToAbort = new HashSet<>(); for (int i = 0; i < safeScaleDownFactor; i++) { idsToAbort.addAll(generateIdsToUse(i * poolSize * totalNumberOfSubtasks)); {code} The operands are integers where generateIdsToUse() expects long parameter. was: {code} public Set generateIdsToAbort() { Set idsToAbort = new HashSet<>(); for (int i = 0; i < safeScaleDownFactor; i++) { idsToAbort.addAll(generateIdsToUse(i * poolSize * totalNumberOfSubtasks)); {code} The operands are integers where generateIdsToUse() expects long parameter. > Missing cast in integer arithmetic in > TransactionalIdsGenerator#generateIdsToAbort > -- > > Key: FLINK-8037 > URL: https://issues.apache.org/jira/browse/FLINK-8037 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > public Set generateIdsToAbort() { > Set idsToAbort = new HashSet<>(); > for (int i = 0; i < safeScaleDownFactor; i++) { > idsToAbort.addAll(generateIdsToUse(i * poolSize * > totalNumberOfSubtasks)); > {code} > The operands are integers where generateIdsToUse() expects long parameter. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-4534: -- Description: Iteration over state.bucketStates is protected by synchronization in other methods, except for the following in restoreState(): {code} for (BucketState bucketState : state.bucketStates.values()) { {code} and following in close(): {code} for (Map.Entryentry : state.bucketStates.entrySet()) { closeCurrentPartFile(entry.getValue()); {code} w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting line 752: {code} Set pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); LOG.debug("Moving pending files to final location on restore."); for (Long pastCheckpointId : pastCheckpointIds) { {code} was: Iteration over state.bucketStates is protected by synchronization in other methods, except for the following in restoreState(): {code} for (BucketState bucketState : state.bucketStates.values()) { {code} and following in close(): {code} for (Map.Entry entry : state.bucketStates.entrySet()) { closeCurrentPartFile(entry.getValue()); {code} w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting line 752: {code} Set pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); LOG.debug("Moving pending files to final location on restore."); for (Long pastCheckpointId : pastCheckpointIds) { {code} > Lack of synchronization in BucketingSink#restoreState() > --- > > Key: FLINK-4534 > URL: https://issues.apache.org/jira/browse/FLINK-4534 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang > > Iteration over state.bucketStates is protected by synchronization in other > methods, except for the following in restoreState(): > {code} > for (BucketState bucketState : state.bucketStates.values()) { > {code} > and following in close(): > {code} > for (Map.Entry entry : > state.bucketStates.entrySet()) { > closeCurrentPartFile(entry.getValue()); > {code} > w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue > starting line 752: > {code} > Set pastCheckpointIds = > bucketState.pendingFilesPerCheckpoint.keySet(); > LOG.debug("Moving pending files to final location on restore."); > for (Long pastCheckpointId : pastCheckpointIds) { > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-4228. -- Resolution: Fixed Fix Version/s: 1.5.0 Fixed 1.4.0: 36b807567d93c431c1498241a42c20221cb6a664 1.5.0: cf8504dba606ee758ac16867423e65dbf6afc23a > YARN artifact upload does not work with S3AFileSystem > - > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.4.0, 1.5.0 > > > The issue now is exclusive to running on YARN with s3a:// as your configured > FileSystem. If so, the Flink session will fail on staging itself because it > tries to copy the flink/lib directory to S3 and the S3aFileSystem does not > support recursive copy. > h2. Old Issue > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294) > ... 9 more > {code} > Running with S3NFileSystem, the error does not occur. The problem might be > due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created > automatically. We might need to manually create folders and copy only actual > files for {{S3AFileSystem}}. More investigation is required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258096#comment-16258096 ] ASF GitHub Bot commented on FLINK-4228: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4939 > YARN artifact upload does not work with S3AFileSystem > - > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.4.0 > > > The issue now is exclusive to running on YARN with s3a:// as your configured > FileSystem. If so, the Flink session will fail on staging itself because it > tries to copy the flink/lib directory to S3 and the S3aFileSystem does not > support recursive copy. > h2. Old Issue > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294) > ... 9 more > {code} > Running with S3NFileSystem, the error does not occur. The problem might be > due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created > automatically. We might need to manually create folders and copy only actual > files for {{S3AFileSystem}}. More investigation is required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7988) HadoopS3FileSystemITCase leaves test directories behind in S3
[ https://issues.apache.org/jira/browse/FLINK-7988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7988. -- Resolution: Fixed Fix Version/s: 1.5.0 1.4.0 Fixed 1.4.0: abdc7d1ecdc2e997ef6c9d1764348da00148ec28 1.5.0: 51b5b53c7cd7781959011ba48559c5361ac93ff9 > HadoopS3FileSystemITCase leaves test directories behind in S3 > - > > Key: FLINK-7988 > URL: https://issues.apache.org/jira/browse/FLINK-7988 > Project: Flink > Issue Type: Bug > Components: filesystem-connector, Tests >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > Fix For: 1.4.0, 1.5.0 > > > {{HadoopS3FileSystemITCase}} creates a random test (base) directory in S3 > which is not cleaned up. Please note, that the individual tests create > sub-directories and also always delete those at least. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7988) HadoopS3FileSystemITCase leaves test directories behind in S3
[ https://issues.apache.org/jira/browse/FLINK-7988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258095#comment-16258095 ] ASF GitHub Bot commented on FLINK-7988: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4950 > HadoopS3FileSystemITCase leaves test directories behind in S3 > - > > Key: FLINK-7988 > URL: https://issues.apache.org/jira/browse/FLINK-7988 > Project: Flink > Issue Type: Bug > Components: filesystem-connector, Tests >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > Fix For: 1.4.0, 1.5.0 > > > {{HadoopS3FileSystemITCase}} creates a random test (base) directory in S3 > which is not cleaned up. Please note, that the individual tests create > sub-directories and also always delete those at least. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4939 ---
[GitHub] flink pull request #4950: [FLINK-7988][s3] fix HadoopS3FileSystemITCase leav...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4950 ---
[jira] [Updated] (FLINK-8103) Flink 1.4 not writing to standard out log file
[ https://issues.apache.org/jira/browse/FLINK-8103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Brideau updated FLINK-8103: Description: I built the latest snapshot of 1.4 yesterday and tried testing it with a simple word count example, where StreamUtil is just a helper that checks input parameters: {code:java} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._ object Words { def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) val dataStream = StreamUtil.getDataStream(env, params) val wordDataStream = dataStream .flatMap{ _.split(" ") } wordDataStream.println // execute program env.execute("Words Scala") } } {code} This runs without an issue on the latest stable version of 1.3 and writes its results to the _out_ file, which I can tail to see the results. This doesn't happen in 1.4, however. I can modify it to write out to a file, however: {code:java} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.scala._ object Words { def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) val dataStream = StreamUtil.getDataStream(env, params) val wordDataStream = dataStream .flatMap{ _.split(" ") } wordDataStream .writeAsText("file:///somepath/output", WriteMode.OVERWRITE) .setParallelism(1) // execute program env.execute("Words Scala") } } {code} Any clues as to what might be causing this? was: I built the latest snapshot of 1.4 yesterday and tried testing it with a simple word count example, where StreamUtil is just a helper than checks input parameters: {code:java} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._ object Words { def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) val dataStream = StreamUtil.getDataStream(env, params) val wordDataStream = dataStream .flatMap{ _.split(" ") } wordDataStream.println // execute program env.execute("Words Scala") } } {code} This runs without an issue on the latest stable version of 1.3 and writes its results to the _out_ file, which I can tail to see the results. This doesn't happen in 1.4, however. I can modify it to write out to a file, however: {code:java} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.scala._ object Words { def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) val dataStream = StreamUtil.getDataStream(env, params) val wordDataStream = dataStream .flatMap{ _.split(" ") } wordDataStream .writeAsText("file:///somepath/output", WriteMode.OVERWRITE) .setParallelism(1) // execute program env.execute("Words Scala") } } {code} Any clues as to what might be causing this? > Flink 1.4 not writing to standard out log file > -- > > Key: FLINK-8103 > URL: https://issues.apache.org/jira/browse/FLINK-8103 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.0 > Environment: macOS 10.13 (High Sierra) >Reporter: Ryan Brideau > > I built the latest snapshot of 1.4 yesterday and tried testing it with a > simple word count example, where StreamUtil is just a helper that checks > input parameters: > {code:java} > import org.apache.flink.api.java.utils.ParameterTool > import org.apache.flink.streaming.api.scala._ > object Words { > def main(args: Array[String]) { > // set up the execution environment > val env = StreamExecutionEnvironment.getExecutionEnvironment > val params = ParameterTool.fromArgs(args) > env.getConfig.setGlobalJobParameters(params) > val dataStream = StreamUtil.getDataStream(env, params) > val wordDataStream = dataStream > .flatMap{ _.split(" ") } > wordDataStream.println > // execute program > env.execute("Words Scala") > } > } > {code} > This runs without an issue on the latest stable version of 1.3 and writes
[jira] [Updated] (FLINK-8103) Flink 1.4 not writing to standard out log file
[ https://issues.apache.org/jira/browse/FLINK-8103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Brideau updated FLINK-8103: Description: I built the latest snapshot of 1.4 yesterday and tried testing it with a simple word count example, where StreamUtil is just a helper than checks input parameters: {code:java} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._ object Words { def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) val dataStream = StreamUtil.getDataStream(env, params) val wordDataStream = dataStream .flatMap{ _.split(" ") } wordDataStream.println // execute program env.execute("Words Scala") } } {code} This runs without an issue on the latest stable version of 1.3 and writes its results to the _out_ file, which I can tail to see the results. This doesn't happen in 1.4, however. I can modify it to write out to a file, however: {code:java} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.scala._ object Words { def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) val dataStream = StreamUtil.getDataStream(env, params) val wordDataStream = dataStream .flatMap{ _.split(" ") } wordDataStream .writeAsText("file:///somepath/output", WriteMode.OVERWRITE) .setParallelism(1) // execute program env.execute("Words Scala") } } {code} Any clues as to what might be causing this? was: I built the latest snapshot of 1.4 yesterday and tries testing it with a simple word count example, where StreamUtil is just a helper than checks input parameters: {code:java} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._ object Words { def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) val dataStream = StreamUtil.getDataStream(env, params) val wordDataStream = dataStream .flatMap{ _.split(" ") } wordDataStream.println // execute program env.execute("Words Scala") } } {code} This runs without an issue on the latest stable version of 1.3 and writes its results to the _out_ file, which I can tail to see the results. This doesn't happen in 1.4, however. I can modify it to write out to a file, however: {code:java} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.scala._ object Words { def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) val dataStream = StreamUtil.getDataStream(env, params) val wordDataStream = dataStream .flatMap{ _.split(" ") } wordDataStream .writeAsText("file:///somepath/output", WriteMode.OVERWRITE) .setParallelism(1) // execute program env.execute("Words Scala") } } {code} Any clues as to what might be causing this? > Flink 1.4 not writing to standard out log file > -- > > Key: FLINK-8103 > URL: https://issues.apache.org/jira/browse/FLINK-8103 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.0 > Environment: macOS 10.13 (High Sierra) >Reporter: Ryan Brideau > > I built the latest snapshot of 1.4 yesterday and tried testing it with a > simple word count example, where StreamUtil is just a helper than checks > input parameters: > {code:java} > import org.apache.flink.api.java.utils.ParameterTool > import org.apache.flink.streaming.api.scala._ > object Words { > def main(args: Array[String]) { > // set up the execution environment > val env = StreamExecutionEnvironment.getExecutionEnvironment > val params = ParameterTool.fromArgs(args) > env.getConfig.setGlobalJobParameters(params) > val dataStream = StreamUtil.getDataStream(env, params) > val wordDataStream = dataStream > .flatMap{ _.split(" ") } > wordDataStream.println > // execute program > env.execute("Words Scala") > } > } > {code} > This runs without an issue on the latest stable version of 1.3 and writes
[jira] [Created] (FLINK-8103) Flink 1.4 not writing to standard out log file
Ryan Brideau created FLINK-8103: --- Summary: Flink 1.4 not writing to standard out log file Key: FLINK-8103 URL: https://issues.apache.org/jira/browse/FLINK-8103 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.4.0 Environment: macOS 10.13 (High Sierra) Reporter: Ryan Brideau I built the latest snapshot of 1.4 yesterday and tries testing it with a simple word count example, where StreamUtil is just a helper than checks input parameters: {code:scala} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._ object Words { def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) val dataStream = StreamUtil.getDataStream(env, params) val wordDataStream = dataStream .flatMap{ _.split(" ") } wordDataStream.println // execute program env.execute("Words Scala") } } {code} This runs without an issue on the latest stable version of 1.3 and writes its results to the _out_ file, which I can tail to see the results. This doesn't happen in 1.4, however. I can modify it to write out to a file, however: {code:scala} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.scala._ object Words { def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) val dataStream = StreamUtil.getDataStream(env, params) val wordDataStream = dataStream .flatMap{ _.split(" ") } wordDataStream .writeAsText("file:///somepath/output", WriteMode.OVERWRITE) .setParallelism(1) // execute program env.execute("Words Scala") } } {code} Any clues as to what might be causing this? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8103) Flink 1.4 not writing to standard out log file
[ https://issues.apache.org/jira/browse/FLINK-8103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Brideau updated FLINK-8103: Description: I built the latest snapshot of 1.4 yesterday and tries testing it with a simple word count example, where StreamUtil is just a helper than checks input parameters: {code:java} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._ object Words { def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) val dataStream = StreamUtil.getDataStream(env, params) val wordDataStream = dataStream .flatMap{ _.split(" ") } wordDataStream.println // execute program env.execute("Words Scala") } } {code} This runs without an issue on the latest stable version of 1.3 and writes its results to the _out_ file, which I can tail to see the results. This doesn't happen in 1.4, however. I can modify it to write out to a file, however: {code:java} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.scala._ object Words { def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) val dataStream = StreamUtil.getDataStream(env, params) val wordDataStream = dataStream .flatMap{ _.split(" ") } wordDataStream .writeAsText("file:///somepath/output", WriteMode.OVERWRITE) .setParallelism(1) // execute program env.execute("Words Scala") } } {code} Any clues as to what might be causing this? was: I built the latest snapshot of 1.4 yesterday and tries testing it with a simple word count example, where StreamUtil is just a helper than checks input parameters: {code:scala} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._ object Words { def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) val dataStream = StreamUtil.getDataStream(env, params) val wordDataStream = dataStream .flatMap{ _.split(" ") } wordDataStream.println // execute program env.execute("Words Scala") } } {code} This runs without an issue on the latest stable version of 1.3 and writes its results to the _out_ file, which I can tail to see the results. This doesn't happen in 1.4, however. I can modify it to write out to a file, however: {code:scala} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.scala._ object Words { def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) val dataStream = StreamUtil.getDataStream(env, params) val wordDataStream = dataStream .flatMap{ _.split(" ") } wordDataStream .writeAsText("file:///somepath/output", WriteMode.OVERWRITE) .setParallelism(1) // execute program env.execute("Words Scala") } } {code} Any clues as to what might be causing this? > Flink 1.4 not writing to standard out log file > -- > > Key: FLINK-8103 > URL: https://issues.apache.org/jira/browse/FLINK-8103 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.0 > Environment: macOS 10.13 (High Sierra) >Reporter: Ryan Brideau > > I built the latest snapshot of 1.4 yesterday and tries testing it with a > simple word count example, where StreamUtil is just a helper than checks > input parameters: > {code:java} > import org.apache.flink.api.java.utils.ParameterTool > import org.apache.flink.streaming.api.scala._ > object Words { > def main(args: Array[String]) { > // set up the execution environment > val env = StreamExecutionEnvironment.getExecutionEnvironment > val params = ParameterTool.fromArgs(args) > env.getConfig.setGlobalJobParameters(params) > val dataStream = StreamUtil.getDataStream(env, params) > val wordDataStream = dataStream > .flatMap{ _.split(" ") } > wordDataStream.println > // execute program > env.execute("Words Scala") > } > } > {code} > This runs without an issue on the latest stable version of 1.3 and writes
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258055#comment-16258055 ] ASF GitHub Bot commented on FLINK-4520: --- Github user haoch commented on the issue: https://github.com/apache/flink/pull/2487 As mentioned above, moved the patch to Apache Bahir at https://github.com/apache/bahir/pull/54, resolve this one. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...
Github user haoch commented on the issue: https://github.com/apache/flink/pull/2487 As mentioned above, moved the patch to Apache Bahir at https://github.com/apache/bahir/pull/54, resolve this one. ---
[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.
[ https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258014#comment-16258014 ] ASF GitHub Bot commented on FLINK-8090: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5032 Hi @bowenli86, thanks for the review. This PR only changes the message of `RuntimeException`, thus may not be easily verified. To improve that, we'd add some dedicated exceptions for that. What do you think? > Improve error message when registering different states under the same name. > > > Key: FLINK-8090 > URL: https://issues.apache.org/jira/browse/FLINK-8090 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Xingcan Cui > > Currently a {{ProcessFunction}} like this: > {code} > final MapStateDescriptor> > firstMapStateDescriptor = new MapStateDescriptor<>( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO, > source.getType()); > final ListStateDescriptor secondListStateDescriptor = new > ListStateDescriptor( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO); > new ProcessFunction , Object>() { > private static final long serialVersionUID = > -805125545438296619L; > private transient MapState Tuple2 > firstMapState; > private transient ListState > secondListState; > @Override > public void open(Configuration parameters) > throws Exception { > super.open(parameters); > firstMapState = > getRuntimeContext().getMapState(firstMapStateDescriptor); > secondListState = > getRuntimeContext().getListState(secondListStateDescriptor); > } > @Override > public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception { > Tuple2 v = > firstMapState.get(value.f0); > if (v == null) { > v = new Tuple2<>(value.f0, 0L); > } > firstMapState.put(value.f0, new > Tuple2<>(v.f0, v.f1 + value.f1)); > } > } > {code} > fails with: > {code} > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127) > at > org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassCastException: > org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to > org.apache.flink.api.common.state.ListState > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71) > ... 9 more > {code} > Which is cryptic, as it does not explain the reason for the problem. The > error message should be something along the line of "Duplicate state name". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5032: [FLINK-8090] [DataStream] Improve the error message for d...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/5032 Hi @bowenli86, thanks for the review. This PR only changes the message of `RuntimeException`, thus may not be easily verified. To improve that, we'd add some dedicated exceptions for that. What do you think? ---