[jira] [Created] (FLINK-23057) flink-console.sh doesn't do variable expansion for FLINK_ENV_JAVA_OPTS like flink-daemon.sh

2021-06-21 Thread LIU Xiao (Jira)
LIU Xiao created FLINK-23057:


 Summary: flink-console.sh doesn't do variable expansion for 
FLINK_ENV_JAVA_OPTS like flink-daemon.sh
 Key: FLINK-23057
 URL: https://issues.apache.org/jira/browse/FLINK-23057
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.12.4, 1.13.1
Reporter: LIU Xiao


In flink-deamon.sh:

 
{code:java}
...

# Evaluate user options for local variable expansion
FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})

echo "Starting $DAEMON daemon on host $HOSTNAME."
"$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath 
"`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null &

...
{code}
There is a "$(eval echo ...)" line, so variables like ${FLINK_LOG_PREFIX} in 
FLINK_ENV_JAVA_OPTS can be expanded, as described in 
[https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/application_profiling/]

but flink-console.sh doesn't have the line, and as kubernetes-jobmanager.sh and 
kubernetes-taskmanager.sh all depend on flink-console.sh, so in native 
kubernetes application mode, variable expansion of FLINK_ENV_JAVA_OPTS is not 
working.

Add that line to flink-console.sh sovles the problem.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22610) The test-jar of flink-connector-kafka should include all test classes

2021-05-08 Thread LIU Xiao (Jira)
LIU Xiao created FLINK-22610:


 Summary: The test-jar of flink-connector-kafka should include all 
test classes
 Key: FLINK-22610
 URL: https://issues.apache.org/jira/browse/FLINK-22610
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.12.3, 1.13.0
Reporter: LIU Xiao


The test-jar of old kafka connector (flink-connector-kafka-base and 
flink-connector-kafka-0.11) includes convenient utility classes 
(KafkaTestEnvironment and KafkaTestEnvironmentImpl, etc.) to start an embedded 
kafka in unit test, and we used the utility classes to build some test cases 
for our project.

Now the utility classes other than KafkaTestEnvironmentImpl seem to be gone in 
test-jar of new kafka connector (flink-connector-kafka), and I find that is 
because they are not included in the configuration of maven-jar-plugin in 
pom.xml:
{code:java}



org.apache.maven.plugins
maven-jar-plugin



test-jar



**/KafkaTestEnvironmentImpl*
META-INF/LICENSE
META-INF/NOTICE





{code}
This configuration seems to be inherited from flink-connector-kafka-0.11, but I 
think the configuration of flink-connector-kafka-base should be used:
{code:java}
  

  
maven-jar-plugin

  

  test-jar

  

  
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-11987) Kafka producer occasionally throws NullpointerException

2019-03-21 Thread LIU Xiao (JIRA)
LIU Xiao created FLINK-11987:


 Summary: Kafka producer occasionally throws NullpointerException
 Key: FLINK-11987
 URL: https://issues.apache.org/jira/browse/FLINK-11987
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.7.2, 1.6.4, 1.6.3
 Environment: Flink 1.6.2 (Standalone Cluster)

Oracle JDK 1.8u151

Centos 7.4
Reporter: LIU Xiao


We are using Flink 1.6.2 in our production environment, and kafka producer 
occasionally throws NullpointerException.

We found in line 175 of 
flink/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java,
 NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR was created as a static variable.

Then in line 837, 
"context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);"
 was called, and that leads to line 734 of 
 
flink/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java:
 "stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());"

In function initializeSerializerUnlessSet(line 283 of 
flink/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java):

if (serializer == null) {
checkState(typeInfo != null, "no serializer and no type info");
// instantiate the serializer
serializer = typeInfo.createSerializer(executionConfig);
// we can drop the type info now, no longer needed
typeInfo  = null;
}

"serializer = typeInfo.createSerializer(executionConfig);" is the line which 
throws the exception.

We think that's because multiple subtasks of the same producer in a same 
TaskManager share a same NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)