[jira] [Created] (FLINK-23057) flink-console.sh doesn't do variable expansion for FLINK_ENV_JAVA_OPTS like flink-daemon.sh

2021-06-21 Thread LIU Xiao (Jira)
LIU Xiao created FLINK-23057:


 Summary: flink-console.sh doesn't do variable expansion for 
FLINK_ENV_JAVA_OPTS like flink-daemon.sh
 Key: FLINK-23057
 URL: https://issues.apache.org/jira/browse/FLINK-23057
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.12.4, 1.13.1
Reporter: LIU Xiao


In flink-deamon.sh:

 
{code:java}
...

# Evaluate user options for local variable expansion
FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})

echo "Starting $DAEMON daemon on host $HOSTNAME."
"$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath 
"`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null &

...
{code}
There is a "$(eval echo ...)" line, so variables like ${FLINK_LOG_PREFIX} in 
FLINK_ENV_JAVA_OPTS can be expanded, as described in 
[https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/application_profiling/]

but flink-console.sh doesn't have the line, and as kubernetes-jobmanager.sh and 
kubernetes-taskmanager.sh all depend on flink-console.sh, so in native 
kubernetes application mode, variable expansion of FLINK_ENV_JAVA_OPTS is not 
working.

Add that line to flink-console.sh sovles the problem.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22610) The test-jar of flink-connector-kafka should include all test classes

2021-05-08 Thread LIU Xiao (Jira)
LIU Xiao created FLINK-22610:


 Summary: The test-jar of flink-connector-kafka should include all 
test classes
 Key: FLINK-22610
 URL: https://issues.apache.org/jira/browse/FLINK-22610
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.12.3, 1.13.0
Reporter: LIU Xiao


The test-jar of old kafka connector (flink-connector-kafka-base and 
flink-connector-kafka-0.11) includes convenient utility classes 
(KafkaTestEnvironment and KafkaTestEnvironmentImpl, etc.) to start an embedded 
kafka in unit test, and we used the utility classes to build some test cases 
for our project.

Now the utility classes other than KafkaTestEnvironmentImpl seem to be gone in 
test-jar of new kafka connector (flink-connector-kafka), and I find that is 
because they are not included in the configuration of maven-jar-plugin in 
pom.xml:
{code:java}



org.apache.maven.plugins
maven-jar-plugin



test-jar



**/KafkaTestEnvironmentImpl*
META-INF/LICENSE
META-INF/NOTICE





{code}
This configuration seems to be inherited from flink-connector-kafka-0.11, but I 
think the configuration of flink-connector-kafka-base should be used:
{code:java}
  

  
maven-jar-plugin

  

  test-jar

  

  
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Unbearably slow Table API time-windowed stream join with RocksDBStateBackend

2019-08-14 Thread LIU Xiao
Example SQL:

SELECT *
FROM stream1 s1, stream2 s2
WHERE s1.id = s2.id AND s1.rowtime = s2.rowtime

And we have lots of messages in stream1 and stream2 share a same rowtime.

It runs fine when using heap as the state backend,
but requires lots of heap memory sometimes (when upstream out of sync, etc), 
and a risk of full gc exists.

When we use RocksDBStateBackend to lower the heap memory usage, we found our 
program runs unbearably slow.

After examing the code we found
org.apache.flink.table.runtime.join.TimeBoundedStreamJoin#processElement1()
may be the cause of the problem (we are using Flink 1.6 but 1.8 should be same):
...
// Check if we need to cache the current row.
if (rightOperatorTime < rightQualifiedUpperBound) {
  // Operator time of right stream has not exceeded the upper window bound 
of the current
  // row. Put it into the left cache, since later coming records from the 
right stream are
  // expected to be joined with it.
  var leftRowList = leftCache.get(timeForLeftRow)
  if (null == leftRowList) {
leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
  }
  leftRowList.add(JTuple2.of(leftRow, emitted))
  leftCache.put(timeForLeftRow, leftRowList)
...

In above code, if there are lots of messages with a same timeForLeftRow,
the serialization and deserialization cost will be very high when using 
RocksDBStateBackend.

A simple fix I came up with:
...
  // cache to store rows from the left stream
  //private var leftCache: MapState[Long, JList[JTuple2[Row, Boolean]]] = _
  private var leftCache: MapState[JTuple2[Long, Integer], JList[JTuple2[Row, 
Boolean]]] = _
  private var leftCacheSize: MapState[Long, Integer] = _
...
// Check if we need to cache the current row.
if (rightOperatorTime < rightQualifiedUpperBound) {
  // Operator time of right stream has not exceeded the upper window bound 
of the current
  // row. Put it into the left cache, since later coming records from the 
right stream are
  // expected to be joined with it.
  //var leftRowList = leftCache.get(timeForLeftRow)
  //if (null == leftRowList) {
  //  leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
  //}
  //leftRowList.add(JTuple2.of(leftRow, emitted))
  //leftCache.put(timeForLeftRow, leftRowList)
  var leftRowListSize = leftCacheSize.get(timeForLeftRow)
  if (null == leftRowListSize) {
leftRowListSize = new Integer(0)
  }
  leftCache.put(JTuple2.of(timeForLeftRow, leftRowListSize), 
JTuple2.of(leftRow, emitted))
  leftCacheSize.put(timeForLeftRow, leftRowListSize + 1)
...

-- 
LIU Xiao 



[jira] [Created] (FLINK-11987) Kafka producer occasionally throws NullpointerException

2019-03-21 Thread LIU Xiao (JIRA)
LIU Xiao created FLINK-11987:


 Summary: Kafka producer occasionally throws NullpointerException
 Key: FLINK-11987
 URL: https://issues.apache.org/jira/browse/FLINK-11987
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.7.2, 1.6.4, 1.6.3
 Environment: Flink 1.6.2 (Standalone Cluster)

Oracle JDK 1.8u151

Centos 7.4
Reporter: LIU Xiao


We are using Flink 1.6.2 in our production environment, and kafka producer 
occasionally throws NullpointerException.

We found in line 175 of 
flink/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java,
 NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR was created as a static variable.

Then in line 837, 
"context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);"
 was called, and that leads to line 734 of 
 
flink/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java:
 "stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());"

In function initializeSerializerUnlessSet(line 283 of 
flink/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java):

if (serializer == null) {
checkState(typeInfo != null, "no serializer and no type info");
// instantiate the serializer
serializer = typeInfo.createSerializer(executionConfig);
// we can drop the type info now, no longer needed
typeInfo  = null;
}

"serializer = typeInfo.createSerializer(executionConfig);" is the line which 
throws the exception.

We think that's because multiple subtasks of the same producer in a same 
TaskManager share a same NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)