[jira] [Created] (FLINK-23057) flink-console.sh doesn't do variable expansion for FLINK_ENV_JAVA_OPTS like flink-daemon.sh
LIU Xiao created FLINK-23057: Summary: flink-console.sh doesn't do variable expansion for FLINK_ENV_JAVA_OPTS like flink-daemon.sh Key: FLINK-23057 URL: https://issues.apache.org/jira/browse/FLINK-23057 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.12.4, 1.13.1 Reporter: LIU Xiao In flink-deamon.sh: {code:java} ... # Evaluate user options for local variable expansion FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS}) echo "Starting $DAEMON daemon on host $HOSTNAME." "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null & ... {code} There is a "$(eval echo ...)" line, so variables like ${FLINK_LOG_PREFIX} in FLINK_ENV_JAVA_OPTS can be expanded, as described in [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/application_profiling/] but flink-console.sh doesn't have the line, and as kubernetes-jobmanager.sh and kubernetes-taskmanager.sh all depend on flink-console.sh, so in native kubernetes application mode, variable expansion of FLINK_ENV_JAVA_OPTS is not working. Add that line to flink-console.sh sovles the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22610) The test-jar of flink-connector-kafka should include all test classes
LIU Xiao created FLINK-22610: Summary: The test-jar of flink-connector-kafka should include all test classes Key: FLINK-22610 URL: https://issues.apache.org/jira/browse/FLINK-22610 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.12.3, 1.13.0 Reporter: LIU Xiao The test-jar of old kafka connector (flink-connector-kafka-base and flink-connector-kafka-0.11) includes convenient utility classes (KafkaTestEnvironment and KafkaTestEnvironmentImpl, etc.) to start an embedded kafka in unit test, and we used the utility classes to build some test cases for our project. Now the utility classes other than KafkaTestEnvironmentImpl seem to be gone in test-jar of new kafka connector (flink-connector-kafka), and I find that is because they are not included in the configuration of maven-jar-plugin in pom.xml: {code:java} org.apache.maven.plugins maven-jar-plugin test-jar **/KafkaTestEnvironmentImpl* META-INF/LICENSE META-INF/NOTICE {code} This configuration seems to be inherited from flink-connector-kafka-0.11, but I think the configuration of flink-connector-kafka-base should be used: {code:java} maven-jar-plugin test-jar {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
Unbearably slow Table API time-windowed stream join with RocksDBStateBackend
Example SQL: SELECT * FROM stream1 s1, stream2 s2 WHERE s1.id = s2.id AND s1.rowtime = s2.rowtime And we have lots of messages in stream1 and stream2 share a same rowtime. It runs fine when using heap as the state backend, but requires lots of heap memory sometimes (when upstream out of sync, etc), and a risk of full gc exists. When we use RocksDBStateBackend to lower the heap memory usage, we found our program runs unbearably slow. After examing the code we found org.apache.flink.table.runtime.join.TimeBoundedStreamJoin#processElement1() may be the cause of the problem (we are using Flink 1.6 but 1.8 should be same): ... // Check if we need to cache the current row. if (rightOperatorTime < rightQualifiedUpperBound) { // Operator time of right stream has not exceeded the upper window bound of the current // row. Put it into the left cache, since later coming records from the right stream are // expected to be joined with it. var leftRowList = leftCache.get(timeForLeftRow) if (null == leftRowList) { leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1) } leftRowList.add(JTuple2.of(leftRow, emitted)) leftCache.put(timeForLeftRow, leftRowList) ... In above code, if there are lots of messages with a same timeForLeftRow, the serialization and deserialization cost will be very high when using RocksDBStateBackend. A simple fix I came up with: ... // cache to store rows from the left stream //private var leftCache: MapState[Long, JList[JTuple2[Row, Boolean]]] = _ private var leftCache: MapState[JTuple2[Long, Integer], JList[JTuple2[Row, Boolean]]] = _ private var leftCacheSize: MapState[Long, Integer] = _ ... // Check if we need to cache the current row. if (rightOperatorTime < rightQualifiedUpperBound) { // Operator time of right stream has not exceeded the upper window bound of the current // row. Put it into the left cache, since later coming records from the right stream are // expected to be joined with it. //var leftRowList = leftCache.get(timeForLeftRow) //if (null == leftRowList) { // leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1) //} //leftRowList.add(JTuple2.of(leftRow, emitted)) //leftCache.put(timeForLeftRow, leftRowList) var leftRowListSize = leftCacheSize.get(timeForLeftRow) if (null == leftRowListSize) { leftRowListSize = new Integer(0) } leftCache.put(JTuple2.of(timeForLeftRow, leftRowListSize), JTuple2.of(leftRow, emitted)) leftCacheSize.put(timeForLeftRow, leftRowListSize + 1) ... -- LIU Xiao
[jira] [Created] (FLINK-11987) Kafka producer occasionally throws NullpointerException
LIU Xiao created FLINK-11987: Summary: Kafka producer occasionally throws NullpointerException Key: FLINK-11987 URL: https://issues.apache.org/jira/browse/FLINK-11987 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.7.2, 1.6.4, 1.6.3 Environment: Flink 1.6.2 (Standalone Cluster) Oracle JDK 1.8u151 Centos 7.4 Reporter: LIU Xiao We are using Flink 1.6.2 in our production environment, and kafka producer occasionally throws NullpointerException. We found in line 175 of flink/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java, NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR was created as a static variable. Then in line 837, "context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);" was called, and that leads to line 734 of flink/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java: "stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());" In function initializeSerializerUnlessSet(line 283 of flink/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java): if (serializer == null) { checkState(typeInfo != null, "no serializer and no type info"); // instantiate the serializer serializer = typeInfo.createSerializer(executionConfig); // we can drop the type info now, no longer needed typeInfo = null; } "serializer = typeInfo.createSerializer(executionConfig);" is the line which throws the exception. We think that's because multiple subtasks of the same producer in a same TaskManager share a same NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)