[jira] [Created] (FLINK-23057) flink-console.sh doesn't do variable expansion for FLINK_ENV_JAVA_OPTS like flink-daemon.sh
LIU Xiao created FLINK-23057: Summary: flink-console.sh doesn't do variable expansion for FLINK_ENV_JAVA_OPTS like flink-daemon.sh Key: FLINK-23057 URL: https://issues.apache.org/jira/browse/FLINK-23057 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.12.4, 1.13.1 Reporter: LIU Xiao In flink-deamon.sh: {code:java} ... # Evaluate user options for local variable expansion FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS}) echo "Starting $DAEMON daemon on host $HOSTNAME." "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null & ... {code} There is a "$(eval echo ...)" line, so variables like ${FLINK_LOG_PREFIX} in FLINK_ENV_JAVA_OPTS can be expanded, as described in [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/application_profiling/] but flink-console.sh doesn't have the line, and as kubernetes-jobmanager.sh and kubernetes-taskmanager.sh all depend on flink-console.sh, so in native kubernetes application mode, variable expansion of FLINK_ENV_JAVA_OPTS is not working. Add that line to flink-console.sh sovles the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22610) The test-jar of flink-connector-kafka should include all test classes
LIU Xiao created FLINK-22610: Summary: The test-jar of flink-connector-kafka should include all test classes Key: FLINK-22610 URL: https://issues.apache.org/jira/browse/FLINK-22610 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.12.3, 1.13.0 Reporter: LIU Xiao The test-jar of old kafka connector (flink-connector-kafka-base and flink-connector-kafka-0.11) includes convenient utility classes (KafkaTestEnvironment and KafkaTestEnvironmentImpl, etc.) to start an embedded kafka in unit test, and we used the utility classes to build some test cases for our project. Now the utility classes other than KafkaTestEnvironmentImpl seem to be gone in test-jar of new kafka connector (flink-connector-kafka), and I find that is because they are not included in the configuration of maven-jar-plugin in pom.xml: {code:java} org.apache.maven.plugins maven-jar-plugin test-jar **/KafkaTestEnvironmentImpl* META-INF/LICENSE META-INF/NOTICE {code} This configuration seems to be inherited from flink-connector-kafka-0.11, but I think the configuration of flink-connector-kafka-base should be used: {code:java} maven-jar-plugin test-jar {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-11987) Kafka producer occasionally throws NullpointerException
LIU Xiao created FLINK-11987: Summary: Kafka producer occasionally throws NullpointerException Key: FLINK-11987 URL: https://issues.apache.org/jira/browse/FLINK-11987 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.7.2, 1.6.4, 1.6.3 Environment: Flink 1.6.2 (Standalone Cluster) Oracle JDK 1.8u151 Centos 7.4 Reporter: LIU Xiao We are using Flink 1.6.2 in our production environment, and kafka producer occasionally throws NullpointerException. We found in line 175 of flink/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java, NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR was created as a static variable. Then in line 837, "context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);" was called, and that leads to line 734 of flink/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java: "stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());" In function initializeSerializerUnlessSet(line 283 of flink/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java): if (serializer == null) { checkState(typeInfo != null, "no serializer and no type info"); // instantiate the serializer serializer = typeInfo.createSerializer(executionConfig); // we can drop the type info now, no longer needed typeInfo = null; } "serializer = typeInfo.createSerializer(executionConfig);" is the line which throws the exception. We think that's because multiple subtasks of the same producer in a same TaskManager share a same NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)