[jira] [Commented] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode

2024-02-04 Thread Feng Jiajie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814180#comment-17814180
 ] 

Feng Jiajie commented on FLINK-33981:
-

Hi [~fanrui] , thanks for the review, I have submitted the changes to 1.17 and 
master branch, the tests have been passed. 

> File Descriptor References Not Released After Job Execution in MiniCluster 
> Mode
> ---
>
> Key: FLINK-33981
> URL: https://issues.apache.org/jira/browse/FLINK-33981
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Feng Jiajie
>Assignee: Feng Jiajie
>Priority: Major
>  Labels: pull-request-available
>
> When using MiniCluster mode, file descriptors like 
> *{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}}* are 
> not released after a Job completes. Executing multiple Jobs in the same JVM 
> might result in leftover file descriptors, potentially leading to problems.
> After executing the reproducing code provided below (after entering the 
> sleep), running *lsof -p 18162* reveals:
> {code:java}
> ...
> java18162 sa_cluster   30r   DIR  253,1 01311962 
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java18162 sa_cluster   31r   DIR  253,1 01311962 
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java18162 sa_cluster   32r   DIR  253,1 01310787 
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java18162 sa_cluster   33r   DIR  253,1 01310787 
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java18162 sa_cluster   34r   DIR  253,1 01311960 
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java18162 sa_cluster   35r   DIR  253,1 01311960 
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java18162 sa_cluster   36r   DIR  253,1 01311974 
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java18162 sa_cluster   37r   DIR  253,1 01311974 
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java18162 sa_cluster   38r   DIR  253,1 01311979 
> /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
> ...
> {code}
> The code used for reproduction is as follows:
> {code:java}
> import org.apache.flink.api.common.JobExecutionResult;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.execution.JobClient;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
> import org.apache.flink.streaming.api.graph.StreamGraph;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.TimeoutException;
> /**
>  * javac -cp 'lib/*' TestReleaseFd.java
>  * java -Xmx600m -cp '.:lib/*' TestReleaseFd
>  */
> public class TestReleaseFd {
>   public static void main(String[] args) throws Exception {
> for (int i = 0; i < 10; ++i) {
>   int round = i;
>   Thread thread = new Thread(() -> {
> try {
>   Configuration configuration = new Configuration();
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>   env.setParallelism(1);
>   DataStreamSource longDataStreamSource = env.fromSequence(1, 
> 10);
>   longDataStreamSource.addSink(new DiscardingSink<>());
>   StreamGraph streamGraph = env.getStreamGraph();
>   streamGraph.setJobName("test-" + System.nanoTime());
>   JobClient jobClient = env.executeAsync(streamGraph);
>   CompletableFuture 
> jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
>   JobExecutionResult jobExecutionResult = null;
>   while (jobExecutionResult == null) {
> try {
>   jobExecutionResult = 
> jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS);
> } catch (TimeoutException timeoutException) {
>   // ignore
> }
>   }
>   System.out.println("finished round: " + round);
>   env.close();
> } catch (Exception e) {
>   throw new RuntimeException(e);
> }
>   });
>   thread.setDaemon(true);
>   thread.start();
> 

[jira] [Updated] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode

2024-02-04 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-33981:

Fix Version/s: 1.19.0
   1.17.3
   1.18.2

> File Descriptor References Not Released After Job Execution in MiniCluster 
> Mode
> ---
>
> Key: FLINK-33981
> URL: https://issues.apache.org/jira/browse/FLINK-33981
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Feng Jiajie
>Assignee: Feng Jiajie
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0, 1.17.3, 1.18.2
>
>
> When using MiniCluster mode, file descriptors like 
> *{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}}* are 
> not released after a Job completes. Executing multiple Jobs in the same JVM 
> might result in leftover file descriptors, potentially leading to problems.
> After executing the reproducing code provided below (after entering the 
> sleep), running *lsof -p 18162* reveals:
> {code:java}
> ...
> java18162 sa_cluster   30r   DIR  253,1 01311962 
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java18162 sa_cluster   31r   DIR  253,1 01311962 
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java18162 sa_cluster   32r   DIR  253,1 01310787 
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java18162 sa_cluster   33r   DIR  253,1 01310787 
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java18162 sa_cluster   34r   DIR  253,1 01311960 
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java18162 sa_cluster   35r   DIR  253,1 01311960 
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java18162 sa_cluster   36r   DIR  253,1 01311974 
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java18162 sa_cluster   37r   DIR  253,1 01311974 
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java18162 sa_cluster   38r   DIR  253,1 01311979 
> /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
> ...
> {code}
> The code used for reproduction is as follows:
> {code:java}
> import org.apache.flink.api.common.JobExecutionResult;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.execution.JobClient;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
> import org.apache.flink.streaming.api.graph.StreamGraph;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.TimeoutException;
> /**
>  * javac -cp 'lib/*' TestReleaseFd.java
>  * java -Xmx600m -cp '.:lib/*' TestReleaseFd
>  */
> public class TestReleaseFd {
>   public static void main(String[] args) throws Exception {
> for (int i = 0; i < 10; ++i) {
>   int round = i;
>   Thread thread = new Thread(() -> {
> try {
>   Configuration configuration = new Configuration();
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>   env.setParallelism(1);
>   DataStreamSource longDataStreamSource = env.fromSequence(1, 
> 10);
>   longDataStreamSource.addSink(new DiscardingSink<>());
>   StreamGraph streamGraph = env.getStreamGraph();
>   streamGraph.setJobName("test-" + System.nanoTime());
>   JobClient jobClient = env.executeAsync(streamGraph);
>   CompletableFuture 
> jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
>   JobExecutionResult jobExecutionResult = null;
>   while (jobExecutionResult == null) {
> try {
>   jobExecutionResult = 
> jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS);
> } catch (TimeoutException timeoutException) {
>   // ignore
> }
>   }
>   System.out.println("finished round: " + round);
>   env.close();
> } catch (Exception e) {
>   throw new RuntimeException(e);
> }
>   });
>   thread.setDaemon(true);
>   thread.start();
>   thread.join();
>   

[jira] [Commented] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode

2024-01-28 Thread Feng Jiajie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811659#comment-17811659
 ] 

Feng Jiajie commented on FLINK-33981:
-

I found the issue and will submit a fix later. Please assign the Jira issue to 
me, thanks.

> File Descriptor References Not Released After Job Execution in MiniCluster 
> Mode
> ---
>
> Key: FLINK-33981
> URL: https://issues.apache.org/jira/browse/FLINK-33981
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Feng Jiajie
>Priority: Major
>
> When using MiniCluster mode, file descriptors like 
> *{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}}* are 
> not released after a Job completes. Executing multiple Jobs in the same JVM 
> might result in leftover file descriptors, potentially leading to problems.
> After executing the reproducing code provided below (after entering the 
> sleep), running *lsof -p 18162* reveals:
> {code:java}
> ...
> java18162 sa_cluster   30r   DIR  253,1 01311962 
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java18162 sa_cluster   31r   DIR  253,1 01311962 
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java18162 sa_cluster   32r   DIR  253,1 01310787 
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java18162 sa_cluster   33r   DIR  253,1 01310787 
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java18162 sa_cluster   34r   DIR  253,1 01311960 
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java18162 sa_cluster   35r   DIR  253,1 01311960 
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java18162 sa_cluster   36r   DIR  253,1 01311974 
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java18162 sa_cluster   37r   DIR  253,1 01311974 
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java18162 sa_cluster   38r   DIR  253,1 01311979 
> /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
> ...
> {code}
> The code used for reproduction is as follows:
> {code:java}
> import org.apache.flink.api.common.JobExecutionResult;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.execution.JobClient;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
> import org.apache.flink.streaming.api.graph.StreamGraph;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.TimeoutException;
> /**
>  * javac -cp 'lib/*' TestReleaseFd.java
>  * java -Xmx600m -cp '.:lib/*' TestReleaseFd
>  */
> public class TestReleaseFd {
>   public static void main(String[] args) throws Exception {
> for (int i = 0; i < 10; ++i) {
>   int round = i;
>   Thread thread = new Thread(() -> {
> try {
>   Configuration configuration = new Configuration();
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>   env.setParallelism(1);
>   DataStreamSource longDataStreamSource = env.fromSequence(1, 
> 10);
>   longDataStreamSource.addSink(new DiscardingSink<>());
>   StreamGraph streamGraph = env.getStreamGraph();
>   streamGraph.setJobName("test-" + System.nanoTime());
>   JobClient jobClient = env.executeAsync(streamGraph);
>   CompletableFuture 
> jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
>   JobExecutionResult jobExecutionResult = null;
>   while (jobExecutionResult == null) {
> try {
>   jobExecutionResult = 
> jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS);
> } catch (TimeoutException timeoutException) {
>   // ignore
> }
>   }
>   System.out.println("finished round: " + round);
>   env.close();
> } catch (Exception e) {
>   throw new RuntimeException(e);
> }
>   });
>   thread.setDaemon(true);
>   thread.start();
>   thread.join();
>   System.out.println("done ... " + i);
> }
> 
> // === 

[jira] [Updated] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode

2024-01-03 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-33981:

Description: 
When using MiniCluster mode, file descriptors like 
*{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}}* are not 
released after a Job completes. Executing multiple Jobs in the same JVM might 
result in leftover file descriptors, potentially leading to problems.

After executing the reproducing code provided below (after entering the sleep), 
running *lsof -p 18162* reveals:
{code:java}
...
java18162 sa_cluster   30r   DIR  253,1 01311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
java18162 sa_cluster   31r   DIR  253,1 01311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
java18162 sa_cluster   32r   DIR  253,1 01310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
java18162 sa_cluster   33r   DIR  253,1 01310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
java18162 sa_cluster   34r   DIR  253,1 01311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
java18162 sa_cluster   35r   DIR  253,1 01311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
java18162 sa_cluster   36r   DIR  253,1 01311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
java18162 sa_cluster   37r   DIR  253,1 01311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
java18162 sa_cluster   38r   DIR  253,1 01311979 
/tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
...
{code}
The code used for reproduction is as follows:
{code:java}
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * javac -cp 'lib/*' TestReleaseFd.java
 * java -Xmx600m -cp '.:lib/*' TestReleaseFd
 */
public class TestReleaseFd {

  public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; ++i) {
  int round = i;
  Thread thread = new Thread(() -> {
try {
  Configuration configuration = new Configuration();
  final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
  env.setParallelism(1);

  DataStreamSource longDataStreamSource = env.fromSequence(1, 
10);
  longDataStreamSource.addSink(new DiscardingSink<>());

  StreamGraph streamGraph = env.getStreamGraph();
  streamGraph.setJobName("test-" + System.nanoTime());
  JobClient jobClient = env.executeAsync(streamGraph);

  CompletableFuture 
jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
  JobExecutionResult jobExecutionResult = null;
  while (jobExecutionResult == null) {
try {
  jobExecutionResult = jobExecutionResultCompletableFuture.get(20, 
TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
  // ignore
}
  }
  System.out.println("finished round: " + round);
  env.close();
} catch (Exception e) {
  throw new RuntimeException(e);
}
  });

  thread.setDaemon(true);
  thread.start();
  thread.join();

  System.out.println("done ... " + i);
}

// === lsof -p 18162
Thread.sleep(500_000_000);
  }
}
 {code}
The above code can be consistently reproduced in Flink 1.18.0, but there is no 
issue in Flink 1.14.6.

  was:
When using MiniCluster mode, file descriptors like 
{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}} are not 
released after a Job completes. Executing multiple Jobs in the same JVM might 
result in leftover file descriptors, potentially leading to problems.

After executing the reproducing code provided below (after entering the sleep), 
running lsof -p 18162 reveals:
{code:java}
...
java18162 sa_cluster   30r   DIR  253,1 01311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
java18162 

[jira] [Updated] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode

2024-01-03 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-33981:

Description: 
When using MiniCluster mode, file descriptors like 
{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}} are not 
released after a Job completes. Executing multiple Jobs in the same JVM might 
result in leftover file descriptors, potentially leading to problems.

After executing the reproducing code provided below (after entering the sleep), 
running lsof -p 18162 reveals:
{code:java}
...
java18162 sa_cluster   30r   DIR  253,1 01311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
java18162 sa_cluster   31r   DIR  253,1 01311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
java18162 sa_cluster   32r   DIR  253,1 01310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
java18162 sa_cluster   33r   DIR  253,1 01310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
java18162 sa_cluster   34r   DIR  253,1 01311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
java18162 sa_cluster   35r   DIR  253,1 01311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
java18162 sa_cluster   36r   DIR  253,1 01311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
java18162 sa_cluster   37r   DIR  253,1 01311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
java18162 sa_cluster   38r   DIR  253,1 01311979 
/tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
...
{code}
The code used for reproduction is as follows:
{code:java}
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * javac -cp 'lib/*' TestReleaseFd.java
 * java -Xmx600m -cp '.:lib/*' TestReleaseFd
 */
public class TestReleaseFd {

  public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; ++i) {
  int round = i;
  Thread thread = new Thread(() -> {
try {
  Configuration configuration = new Configuration();
  final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
  env.setParallelism(1);

  DataStreamSource longDataStreamSource = env.fromSequence(1, 
10);
  longDataStreamSource.addSink(new DiscardingSink<>());

  StreamGraph streamGraph = env.getStreamGraph();
  streamGraph.setJobName("test-" + System.nanoTime());
  JobClient jobClient = env.executeAsync(streamGraph);

  CompletableFuture 
jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
  JobExecutionResult jobExecutionResult = null;
  while (jobExecutionResult == null) {
try {
  jobExecutionResult = jobExecutionResultCompletableFuture.get(20, 
TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
  // ignore
}
  }
  System.out.println("finished round: " + round);
  env.close();
} catch (Exception e) {
  throw new RuntimeException(e);
}
  });

  thread.setDaemon(true);
  thread.start();
  thread.join();

  System.out.println("done ... " + i);
}

// === lsof -p 18162
Thread.sleep(500_000_000);
  }
}
 {code}
The above code can be consistently reproduced in Flink 1.18.0, but there is no 
issue in Flink 1.14.6.

  was:
When using MiniCluster mode, file descriptors like 
{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}} are not 
released after a Job completes. Executing multiple Jobs in the same JVM might 
result in leftover file descriptors, potentially leading to problems.

After executing the reproducing code provided below (after entering the sleep), 
running lsof -p 18162 reveals:
{code:java}
...
java18162 sa_cluster   30r   DIR  253,1 01311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
java18162 sa_cluster  

[jira] [Commented] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode

2024-01-03 Thread Feng Jiajie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802398#comment-17802398
 ] 

Feng Jiajie commented on FLINK-33981:
-

The above code can be consistently reproduced in Flink 1.18.0, but there is no 
issue in Flink 1.14.6.

> File Descriptor References Not Released After Job Execution in MiniCluster 
> Mode
> ---
>
> Key: FLINK-33981
> URL: https://issues.apache.org/jira/browse/FLINK-33981
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Feng Jiajie
>Priority: Major
>
> When using MiniCluster mode, file descriptors like 
> {{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}} are not 
> released after a Job completes. Executing multiple Jobs in the same JVM might 
> result in leftover file descriptors, potentially leading to problems.
> After executing the reproducing code provided below (after entering the 
> sleep), running lsof -p 18162 reveals:
> {code:java}
> ...
> java18162 sa_cluster   30r   DIR  253,1 01311962 
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java18162 sa_cluster   31r   DIR  253,1 01311962 
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java18162 sa_cluster   32r   DIR  253,1 01310787 
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java18162 sa_cluster   33r   DIR  253,1 01310787 
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java18162 sa_cluster   34r   DIR  253,1 01311960 
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java18162 sa_cluster   35r   DIR  253,1 01311960 
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java18162 sa_cluster   36r   DIR  253,1 01311974 
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java18162 sa_cluster   37r   DIR  253,1 01311974 
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java18162 sa_cluster   38r   DIR  253,1 01311979 
> /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
> ...
> {code}
> The code used for reproduction is as follows:
> {code:java}
> import org.apache.flink.api.common.JobExecutionResult;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.execution.JobClient;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
> import org.apache.flink.streaming.api.graph.StreamGraph;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.TimeoutException;
> /**
>  * javac -cp 'lib/*' TestReleaseFd.java
>  * java -Xmx600m -cp '.:lib/*' TestReleaseFd
>  */
> public class TestReleaseFd {
>   public static void main(String[] args) throws Exception {
> for (int i = 0; i < 10; ++i) {
>   int round = i;
>   Thread thread = new Thread(() -> {
> try {
>   Configuration configuration = new Configuration();
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>   env.setParallelism(1);
>   DataStreamSource longDataStreamSource = env.fromSequence(1, 
> 10);
>   longDataStreamSource.addSink(new DiscardingSink<>());
>   StreamGraph streamGraph = env.getStreamGraph();
>   streamGraph.setJobName("test-" + System.nanoTime());
>   JobClient jobClient = env.executeAsync(streamGraph);
>   CompletableFuture 
> jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
>   JobExecutionResult jobExecutionResult = null;
>   while (jobExecutionResult == null) {
> try {
>   jobExecutionResult = 
> jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS);
> } catch (TimeoutException timeoutException) {
>   // ignore
> }
>   }
>   System.out.println("finished round: " + round);
>   env.close();
> } catch (Exception e) {
>   throw new RuntimeException(e);
> }
>   });
>   thread.setDaemon(true);
>   thread.start();
>   thread.join();
>   System.out.println("done ... " + i);
> }
> 
> // 

[jira] [Created] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode

2024-01-03 Thread Feng Jiajie (Jira)
Feng Jiajie created FLINK-33981:
---

 Summary: File Descriptor References Not Released After Job 
Execution in MiniCluster Mode
 Key: FLINK-33981
 URL: https://issues.apache.org/jira/browse/FLINK-33981
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.18.0
Reporter: Feng Jiajie


When using MiniCluster mode, file descriptors like 
{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}} are not 
released after a Job completes. Executing multiple Jobs in the same JVM might 
result in leftover file descriptors, potentially leading to problems.

After executing the reproducing code provided below (after entering the sleep), 
running lsof -p 18162 reveals:
{code:java}
...
java18162 sa_cluster   30r   DIR  253,1 01311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
java18162 sa_cluster   31r   DIR  253,1 01311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
java18162 sa_cluster   32r   DIR  253,1 01310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
java18162 sa_cluster   33r   DIR  253,1 01310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
java18162 sa_cluster   34r   DIR  253,1 01311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
java18162 sa_cluster   35r   DIR  253,1 01311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
java18162 sa_cluster   36r   DIR  253,1 01311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
java18162 sa_cluster   37r   DIR  253,1 01311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
java18162 sa_cluster   38r   DIR  253,1 01311979 
/tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
...
{code}
The code used for reproduction is as follows:

 
{code:java}
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * javac -cp 'lib/*' TestReleaseFd.java
 * java -Xmx600m -cp '.:lib/*' TestReleaseFd
 */
public class TestReleaseFd {

  public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; ++i) {
  int round = i;
  Thread thread = new Thread(() -> {
try {
  Configuration configuration = new Configuration();
  final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
  env.setParallelism(1);

  DataStreamSource longDataStreamSource = env.fromSequence(1, 
10);
  longDataStreamSource.addSink(new DiscardingSink<>());

  StreamGraph streamGraph = env.getStreamGraph();
  streamGraph.setJobName("test-" + System.nanoTime());
  JobClient jobClient = env.executeAsync(streamGraph);

  CompletableFuture 
jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
  JobExecutionResult jobExecutionResult = null;
  while (jobExecutionResult == null) {
try {
  jobExecutionResult = jobExecutionResultCompletableFuture.get(20, 
TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
  // ignore
}
  }
  System.out.println("finished round: " + round);
  env.close();
} catch (Exception e) {
  throw new RuntimeException(e);
}
  });

  thread.setDaemon(true);
  thread.start();
  thread.join();

  System.out.println("done ... " + i);
}

// === lsof -p 18162
Thread.sleep(500_000_000);
  }
}
 {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33360) HybridSource fails to clear the previous round's state when switching sources, leading to data loss

2023-10-25 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-33360:

Fix Version/s: 1.18.1

> HybridSource fails to clear the previous round's state when switching 
> sources, leading to data loss
> ---
>
> Key: FLINK-33360
> URL: https://issues.apache.org/jira/browse/FLINK-33360
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HybridSource
>Affects Versions: 1.16.2, 1.18.0, 1.17.1
>Reporter: Feng Jiajie
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.18.1
>
>
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator:
> {code:java}
>             // track readers that have finished processing for current 
> enumerator
>             finishedReaders.add(subtaskId);
>             if (finishedReaders.size() == context.currentParallelism()) {
>                 LOG.debug("All readers finished, ready to switch 
> enumerator!");
>                 if (currentSourceIndex + 1 < sources.size()) {
>                     switchEnumerator();
>                     // switch all readers prior to sending split assignments
>                     for (int i = 0; i < context.currentParallelism(); i++) {
>                         sendSwitchSourceEvent(i, currentSourceIndex);
>                     }
>                 }
>             } {code}
> I think that *finishedReaders* is used to keep track of all the subTaskIds 
> that have finished reading the current round of the source. Therefore, in the 
> *switchEnumerator* function, *finishedReaders* should be cleared:
> If it's not cleared, then in the next source reading, whenever any 
> SourceReader reports a *SourceReaderFinishedEvent* (while other SourceReaders 
> may not have finished processing in parallel), the condition 
> *finishedReaders.size() == context.currentParallelism()* will be satisfied 
> and it will trigger {*}sendSwitchSourceEvent{*}(i, currentSourceIndex), 
> sending a *SwitchSourceEvent* to all SourceReaders.
> If a SourceReader receives a SwitchSourceEvent before it finishes reading the 
> previous source, it will execute {*}currentReader.close(){*}, and some data 
> may not be fully read, resulting in a partial data loss in the source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33360) HybridSource fails to clear the previous round's state when switching sources, leading to data loss

2023-10-25 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-33360:

Affects Version/s: 1.18.0

> HybridSource fails to clear the previous round's state when switching 
> sources, leading to data loss
> ---
>
> Key: FLINK-33360
> URL: https://issues.apache.org/jira/browse/FLINK-33360
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HybridSource
>Affects Versions: 1.16.2, 1.18.0, 1.17.1
>Reporter: Feng Jiajie
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.3
>
>
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator:
> {code:java}
>             // track readers that have finished processing for current 
> enumerator
>             finishedReaders.add(subtaskId);
>             if (finishedReaders.size() == context.currentParallelism()) {
>                 LOG.debug("All readers finished, ready to switch 
> enumerator!");
>                 if (currentSourceIndex + 1 < sources.size()) {
>                     switchEnumerator();
>                     // switch all readers prior to sending split assignments
>                     for (int i = 0; i < context.currentParallelism(); i++) {
>                         sendSwitchSourceEvent(i, currentSourceIndex);
>                     }
>                 }
>             } {code}
> I think that *finishedReaders* is used to keep track of all the subTaskIds 
> that have finished reading the current round of the source. Therefore, in the 
> *switchEnumerator* function, *finishedReaders* should be cleared:
> If it's not cleared, then in the next source reading, whenever any 
> SourceReader reports a *SourceReaderFinishedEvent* (while other SourceReaders 
> may not have finished processing in parallel), the condition 
> *finishedReaders.size() == context.currentParallelism()* will be satisfied 
> and it will trigger {*}sendSwitchSourceEvent{*}(i, currentSourceIndex), 
> sending a *SwitchSourceEvent* to all SourceReaders.
> If a SourceReader receives a SwitchSourceEvent before it finishes reading the 
> previous source, it will execute {*}currentReader.close(){*}, and some data 
> may not be fully read, resulting in a partial data loss in the source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33360) HybridSource fails to clear the previous round's state when switching sources, leading to data loss

2023-10-25 Thread Feng Jiajie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779507#comment-17779507
 ] 

Feng Jiajie commented on FLINK-33360:
-

pr: https://github.com/apache/flink/pull/23593

> HybridSource fails to clear the previous round's state when switching 
> sources, leading to data loss
> ---
>
> Key: FLINK-33360
> URL: https://issues.apache.org/jira/browse/FLINK-33360
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HybridSource
>Affects Versions: 1.16.2, 1.17.1
>Reporter: Feng Jiajie
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.3
>
>
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator:
> {code:java}
>             // track readers that have finished processing for current 
> enumerator
>             finishedReaders.add(subtaskId);
>             if (finishedReaders.size() == context.currentParallelism()) {
>                 LOG.debug("All readers finished, ready to switch 
> enumerator!");
>                 if (currentSourceIndex + 1 < sources.size()) {
>                     switchEnumerator();
>                     // switch all readers prior to sending split assignments
>                     for (int i = 0; i < context.currentParallelism(); i++) {
>                         sendSwitchSourceEvent(i, currentSourceIndex);
>                     }
>                 }
>             } {code}
> I think that *finishedReaders* is used to keep track of all the subTaskIds 
> that have finished reading the current round of the source. Therefore, in the 
> *switchEnumerator* function, *finishedReaders* should be cleared:
> If it's not cleared, then in the next source reading, whenever any 
> SourceReader reports a *SourceReaderFinishedEvent* (while other SourceReaders 
> may not have finished processing in parallel), the condition 
> *finishedReaders.size() == context.currentParallelism()* will be satisfied 
> and it will trigger {*}sendSwitchSourceEvent{*}(i, currentSourceIndex), 
> sending a *SwitchSourceEvent* to all SourceReaders.
> If a SourceReader receives a SwitchSourceEvent before it finishes reading the 
> previous source, it will execute {*}currentReader.close(){*}, and some data 
> may not be fully read, resulting in a partial data loss in the source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33360) HybridSource fails to clear the previous round's state when switching sources, leading to data loss

2023-10-25 Thread Feng Jiajie (Jira)
Feng Jiajie created FLINK-33360:
---

 Summary: HybridSource fails to clear the previous round's state 
when switching sources, leading to data loss
 Key: FLINK-33360
 URL: https://issues.apache.org/jira/browse/FLINK-33360
 Project: Flink
  Issue Type: Bug
  Components: Connectors / HybridSource
Affects Versions: 1.17.1, 1.16.2
Reporter: Feng Jiajie
 Fix For: 1.7.3


org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator:
{code:java}
            // track readers that have finished processing for current 
enumerator
            finishedReaders.add(subtaskId);
            if (finishedReaders.size() == context.currentParallelism()) {
                LOG.debug("All readers finished, ready to switch enumerator!");
                if (currentSourceIndex + 1 < sources.size()) {
                    switchEnumerator();
                    // switch all readers prior to sending split assignments
                    for (int i = 0; i < context.currentParallelism(); i++) {
                        sendSwitchSourceEvent(i, currentSourceIndex);
                    }
                }
            } {code}
I think that *finishedReaders* is used to keep track of all the subTaskIds that 
have finished reading the current round of the source. Therefore, in the 
*switchEnumerator* function, *finishedReaders* should be cleared:

If it's not cleared, then in the next source reading, whenever any SourceReader 
reports a *SourceReaderFinishedEvent* (while other SourceReaders may not have 
finished processing in parallel), the condition *finishedReaders.size() == 
context.currentParallelism()* will be satisfied and it will trigger 
{*}sendSwitchSourceEvent{*}(i, currentSourceIndex), sending a 
*SwitchSourceEvent* to all SourceReaders.
If a SourceReader receives a SwitchSourceEvent before it finishes reading the 
previous source, it will execute {*}currentReader.close(){*}, and some data may 
not be fully read, resulting in a partial data loss in the source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33171) Consistent implicit type coercion support for equal and non-equal comparisons for codegen

2023-10-16 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-33171:

Summary: Consistent implicit type coercion support for equal and non-equal 
comparisons for codegen  (was: Consistent implicit type coercion support for 
equal and non-equi comparison for codegen)

> Consistent implicit type coercion support for equal and non-equal comparisons 
> for codegen
> -
>
> Key: FLINK-33171
> URL: https://issues.apache.org/jira/browse/FLINK-33171
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Feng Jiajie
>Assignee: Feng Jiajie
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.2, 1.18.1
>
>
> When executing the following SQL:
> {code:sql}
> SELECT
> time1,
> time1 = '2023-09-30 18:22:42.123' AS eq1,
> NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1
> FROM table1;
> {code}
> the result is as follows:
> {code:java}
> ++-+++
> | op |   time1 |eq1 | notEq1 |
> ++-+++
> | +I | 2023-09-30 18:22:42.123 |   TRUE |   TRUE |
> | +I | 2023-09-30 18:22:42.124 |  FALSE |   TRUE |
> ++-+++
> 2 rows in set
> {code}
> The "notEq1" in the first row should be FALSE.
> Here is the reproducing code:
> {code:java}
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.Schema;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
> public class TimePointNotEqualTest {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(new 
> Configuration());
> env.setParallelism(1);
> DataStreamSource longDataStreamSource = env.fromSequence(0, 1);
> RowTypeInfo rowTypeInfo =
> new RowTypeInfo(new TypeInformation[] {Types.LONG}, new 
> String[] {"time1"});
> SingleOutputStreamOperator map =
> longDataStreamSource.map(new RichMapFunction() {
> @Override
> public Row map(Long value) {
> Row row = new Row(1);
> row.setField(0, 1696069362123L + value);
> return row;
> }
> }, rowTypeInfo);
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> Schema schema = Schema.newBuilder()
> .column("time1", 
> DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class))
> .build();
> tableEnv.createTemporaryView("table1", map, schema);
> tableEnv.sqlQuery("SELECT "
> + "time1," // 2023-09-30 18:22:42.123
> + "time1 = '2023-09-30 18:22:42.123' AS eq1," // expect TRUE
> + "NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1 " // 
> expect FALSE but TRUE
> + "FROM table1").execute().print();
> }
> }
> {code}
> I would like to attempt to fix this issue. If possible, please assign the 
> issue to me. Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33171) Consistent implicit type coercion support for equal and non-equi comparison for codegen

2023-10-16 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-33171:

Summary: Consistent implicit type coercion support for equal and non-equi 
comparison for codegen  (was: Table SQL support Not Equal for TimePoint type 
and TimeString)

> Consistent implicit type coercion support for equal and non-equi comparison 
> for codegen
> ---
>
> Key: FLINK-33171
> URL: https://issues.apache.org/jira/browse/FLINK-33171
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Feng Jiajie
>Assignee: Feng Jiajie
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.2, 1.18.1
>
>
> When executing the following SQL:
> {code:sql}
> SELECT
> time1,
> time1 = '2023-09-30 18:22:42.123' AS eq1,
> NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1
> FROM table1;
> {code}
> the result is as follows:
> {code:java}
> ++-+++
> | op |   time1 |eq1 | notEq1 |
> ++-+++
> | +I | 2023-09-30 18:22:42.123 |   TRUE |   TRUE |
> | +I | 2023-09-30 18:22:42.124 |  FALSE |   TRUE |
> ++-+++
> 2 rows in set
> {code}
> The "notEq1" in the first row should be FALSE.
> Here is the reproducing code:
> {code:java}
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.Schema;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
> public class TimePointNotEqualTest {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(new 
> Configuration());
> env.setParallelism(1);
> DataStreamSource longDataStreamSource = env.fromSequence(0, 1);
> RowTypeInfo rowTypeInfo =
> new RowTypeInfo(new TypeInformation[] {Types.LONG}, new 
> String[] {"time1"});
> SingleOutputStreamOperator map =
> longDataStreamSource.map(new RichMapFunction() {
> @Override
> public Row map(Long value) {
> Row row = new Row(1);
> row.setField(0, 1696069362123L + value);
> return row;
> }
> }, rowTypeInfo);
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> Schema schema = Schema.newBuilder()
> .column("time1", 
> DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class))
> .build();
> tableEnv.createTemporaryView("table1", map, schema);
> tableEnv.sqlQuery("SELECT "
> + "time1," // 2023-09-30 18:22:42.123
> + "time1 = '2023-09-30 18:22:42.123' AS eq1," // expect TRUE
> + "NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1 " // 
> expect FALSE but TRUE
> + "FROM table1").execute().print();
> }
> }
> {code}
> I would like to attempt to fix this issue. If possible, please assign the 
> issue to me. Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33171) Table SQL support Not Equal for TimePoint type and TimeString

2023-09-30 Thread Feng Jiajie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770698#comment-17770698
 ] 

Feng Jiajie commented on FLINK-33171:
-

I have attempted to submit a pull request: 
[https://github.com/apache/flink/pull/23478]

> Table SQL support Not Equal for TimePoint type and TimeString
> -
>
> Key: FLINK-33171
> URL: https://issues.apache.org/jira/browse/FLINK-33171
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Feng Jiajie
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.2, 1.18.1
>
>
> When executing the following SQL:
> {code:sql}
> SELECT
> time1,
> time1 = '2023-09-30 18:22:42.123' AS eq1,
> NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1
> FROM table1;
> {code}
> the result is as follows:
> {code:java}
> ++-+++
> | op |   time1 |eq1 | notEq1 |
> ++-+++
> | +I | 2023-09-30 18:22:42.123 |   TRUE |   TRUE |
> | +I | 2023-09-30 18:22:42.124 |  FALSE |   TRUE |
> ++-+++
> 2 rows in set
> {code}
> The "notEq1" in the first row should be FALSE.
> Here is the reproducing code:
> {code:java}
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.Schema;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
> public class TimePointNotEqualTest {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(new 
> Configuration());
> env.setParallelism(1);
> DataStreamSource longDataStreamSource = env.fromSequence(0, 1);
> RowTypeInfo rowTypeInfo =
> new RowTypeInfo(new TypeInformation[] {Types.LONG}, new 
> String[] {"time1"});
> SingleOutputStreamOperator map =
> longDataStreamSource.map(new RichMapFunction() {
> @Override
> public Row map(Long value) {
> Row row = new Row(1);
> row.setField(0, 1696069362123L + value);
> return row;
> }
> }, rowTypeInfo);
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> Schema schema = Schema.newBuilder()
> .column("time1", 
> DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class))
> .build();
> tableEnv.createTemporaryView("table1", map, schema);
> tableEnv.sqlQuery("SELECT "
> + "time1," // 2023-09-30 18:22:42.123
> + "time1 = '2023-09-30 18:22:42.123' AS eq1," // expect TRUE
> + "NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1 " // 
> expect FALSE but TRUE
> + "FROM table1").execute().print();
> }
> }
> {code}
> I would like to attempt to fix this issue. If possible, please assign the 
> issue to me. Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33171) Table SQL support Not Equal for TimePoint type and TimeString

2023-09-30 Thread Feng Jiajie (Jira)
Feng Jiajie created FLINK-33171:
---

 Summary: Table SQL support Not Equal for TimePoint type and 
TimeString
 Key: FLINK-33171
 URL: https://issues.apache.org/jira/browse/FLINK-33171
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.17.1, 1.18.0
Reporter: Feng Jiajie
 Fix For: 1.17.2, 1.18.1


When executing the following SQL:
{code:sql}
SELECT
time1,
time1 = '2023-09-30 18:22:42.123' AS eq1,
NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1
FROM table1;
{code}
the result is as follows:
{code:java}
++-+++
| op |   time1 |eq1 | notEq1 |
++-+++
| +I | 2023-09-30 18:22:42.123 |   TRUE |   TRUE |
| +I | 2023-09-30 18:22:42.124 |  FALSE |   TRUE |
++-+++
2 rows in set
{code}
The "notEq1" in the first row should be FALSE.

Here is the reproducing code:
{code:java}
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class TimePointNotEqualTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(new 
Configuration());
env.setParallelism(1);

DataStreamSource longDataStreamSource = env.fromSequence(0, 1);
RowTypeInfo rowTypeInfo =
new RowTypeInfo(new TypeInformation[] {Types.LONG}, new 
String[] {"time1"});
SingleOutputStreamOperator map =
longDataStreamSource.map(new RichMapFunction() {
@Override
public Row map(Long value) {
Row row = new Row(1);
row.setField(0, 1696069362123L + value);
return row;
}
}, rowTypeInfo);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Schema schema = Schema.newBuilder()
.column("time1", 
DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class))
.build();
tableEnv.createTemporaryView("table1", map, schema);

tableEnv.sqlQuery("SELECT "
+ "time1," // 2023-09-30 18:22:42.123
+ "time1 = '2023-09-30 18:22:42.123' AS eq1," // expect TRUE
+ "NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1 " // 
expect FALSE but TRUE
+ "FROM table1").execute().print();
}
}
{code}
I would like to attempt to fix this issue. If possible, please assign the issue 
to me. Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2019-12-19 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-15152:

Priority: Critical  (was: Major)

> Job running without periodic checkpoint for stop failed at the beginning
> 
>
> Key: FLINK-15152
> URL: https://issues.apache.org/jira/browse/FLINK-15152
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.1
>Reporter: Feng Jiajie
>Priority: Critical
>  Labels: checkpoint, scheduler
>
> I have a streaming job configured with periodically checkpoint, but after one 
> week running, I found there isn't any checkpoint file.
> h2. Reproduce the problem:
> 1. Job was submitted to YARN:
> {code:java}
> bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
> flink-example-1.0-SNAPSHOT.jar{code}
> 2. Then immediately, before all the task switch to RUNNING (about seconds), 
> I(actually a job control script) send a "stop with savepoint" command by 
> flink cli:
> {code:java}
> bin/flink stop -yid application_1575872737452_0019 
> f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
> {code}
> log in jobmanager.log:
> {code:java}
> 2019-12-09 17:56:56,512 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> triggering task Source: Socket Stream -> Map (1/1) of job 
> f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED 
> instead. Aborting checkpoint.
> {code}
> Then the job task(taskmanager) *continues to run normally without* checkpoint.
> h2. The cause of the problem:
> 1. "stop with savepoint" command call the code 
> stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
>  and then triggerSynchronousSavepoint:
> {code:java}
> // we stop the checkpoint coordinator so that we are guaranteed
> // to have only the data of the synchronous savepoint committed.
> // in case of failure, and if the job restarts, the coordinator
> // will be restarted by the CheckpointCoordinatorDeActivator.
> checkpointCoordinator.stopCheckpointScheduler();{code}
> 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint 
> failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
> {code:java}
> LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
> instead. Aborting checkpoint.",
>   tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
>   job,
>   ExecutionState.RUNNING,
>   ee.getState());
> throw new 
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
> 3. finally, "stop with savepoint" failed, with 
> "checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
> of the job. The job is still running without periodically checkpoint. 
>  
> sample code for reproduce:
> {code:java}
> public class StreamingJob {
>   private static StateBackend makeRocksdbBackend() throws IOException {
> RocksDBStateBackend rocksdbBackend = new 
> RocksDBStateBackend("file:///tmp/aaa");
> rocksdbBackend.enableTtlCompactionFilter();
> 
> rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
> return rocksdbBackend;
>   }
>   public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 10 sec
> env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
> env.setStateBackend(makeRocksdbBackend());
> env.setRestartStrategy(RestartStrategies.noRestart());
> CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> checkpointConfig.enableExternalizedCheckpoints(
> 
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> checkpointConfig.setFailOnCheckpointingErrors(true);
> DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
> text.map(new MapFunction>() {
>   @Override
>   public Tuple2 map(String s) {
> String[] s1 = s.split(" ");
> return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
>   }
> }).keyBy(0).flatMap(new CountWindowAverage()).print();
> env.execute("Flink Streaming Java API Skeleton");
>   }
>   public static class CountWindowAverage extends 
> RichFlatMapFunction, Tuple2> {
> private transient ValueState> sum;
> @Override
> public void flatMap(Tuple2 input, Collector Long>> out) throws Exception {
>   Tuple2 currentSum = sum.value();
>   currentSum.f0 += 1;
>   currentSum.f1 += input.f1;
>   sum.update(currentSum);
>   out.collect(new Tuple2<>(input.f0, currentSum.f1));
> }
> @Override
> public void open(Configuration config) {

[jira] [Commented] (FLINK-15308) Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1

2019-12-19 Thread Feng Jiajie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16999892#comment-16999892
 ] 

Feng Jiajie commented on FLINK-15308:
-

Really looking forward to it.

> Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1
> --
>
> Key: FLINK-15308
> URL: https://issues.apache.org/jira/browse/FLINK-15308
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.10.0
> Environment: $ git log
> commit 4b54da2c67692b1c9d43e1184c00899b0151b3ae
> Author: bowen.li 
> Date: Tue Dec 17 17:37:03 2019 -0800
>Reporter: Feng Jiajie
>Assignee: Yingjie Cao
>Priority: Blocker
> Fix For: 1.10.0
>
> Attachments: image-2019-12-19-10-55-30-644.png
>
>
> Job worked well with default flink-conf.yaml with 
> pipelined-shuffle.compression:
> {code:java}
> taskmanager.numberOfTaskSlots: 1
> taskmanager.network.pipelined-shuffle.compression.enabled: true
> {code}
> But when I set taskmanager.numberOfTaskSlots to 4 or 6:
> {code:java}
> taskmanager.numberOfTaskSlots: 6
> taskmanager.network.pipelined-shuffle.compression.enabled: true
> {code}
> job failed:
> {code:java}
> $ bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 12288m 
> ~/flink-example-1.0-SNAPSHOT.jar
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/data/sa_cluster/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - The configuration directory 
> ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
>  already contains a LOG4J config file.If you want to use logback, then please 
> delete or rename the log configuration file.
> 2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - The configuration directory 
> ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
>  already contains a LOG4J config file.If you want to use logback, then please 
> delete or rename the log configuration file.
> 2019-12-18 15:04:40,907 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2019-12-18 15:04:41,084 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Cluster specification: 
> ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=12288, 
> numberTaskManagers=1, slotsPerTaskManager=6}
> 2019-12-18 15:04:42,344 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Submitting application master application_1576573857638_0026
> 2019-12-18 15:04:42,370 INFO  
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
> application application_1576573857638_0026
> 2019-12-18 15:04:42,371 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Waiting for the cluster to be allocated
> 2019-12-18 15:04:42,372 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Deploying cluster, current state ACCEPTED
> 2019-12-18 15:04:45,388 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - YARN application has been deployed successfully.
> 2019-12-18 15:04:45,390 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Found Web Interface debugboxcreate431x3.sa:36162 of 
> application 'application_1576573857638_0026'.
> Job has been submitted with JobID 9140c70769f4271cc22ea8becaa26272
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: org.apache.flink.client.program.ProgramInvocationException: 
> Job failed (JobID: 9140c70769f4271cc22ea8becaa26272)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>   at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>   at 
> 

[jira] [Commented] (FLINK-15308) Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1

2019-12-18 Thread Feng Jiajie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16999706#comment-16999706
 ] 

Feng Jiajie commented on FLINK-15308:
-

Hi [~kevin.cyj] ,

I can reproduce the problem every time.

YARN cluster:  3 node ( 8 core 32GB )
{code:java}
$ cat flink-conf.yaml | grep -v '^#' | grep -v '^$'
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.memory.total-process.size: 1024m
taskmanager.numberOfTaskSlots: 6
parallelism.default: 1
taskmanager.network.pipelined-shuffle.compression.enabled: true
jobmanager.execution.failover-strategy: region
{code}

> Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1
> --
>
> Key: FLINK-15308
> URL: https://issues.apache.org/jira/browse/FLINK-15308
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.10.0
> Environment: $ git log
> commit 4b54da2c67692b1c9d43e1184c00899b0151b3ae
> Author: bowen.li 
> Date: Tue Dec 17 17:37:03 2019 -0800
>Reporter: Feng Jiajie
>Assignee: Yingjie Cao
>Priority: Blocker
> Attachments: image-2019-12-19-10-55-30-644.png
>
>
> Job worked well with default flink-conf.yaml with 
> pipelined-shuffle.compression:
> {code:java}
> taskmanager.numberOfTaskSlots: 1
> taskmanager.network.pipelined-shuffle.compression.enabled: true
> {code}
> But when I set taskmanager.numberOfTaskSlots to 4 or 6:
> {code:java}
> taskmanager.numberOfTaskSlots: 6
> taskmanager.network.pipelined-shuffle.compression.enabled: true
> {code}
> job failed:
> {code:java}
> $ bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 12288m 
> ~/flink-example-1.0-SNAPSHOT.jar
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/data/sa_cluster/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - The configuration directory 
> ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
>  already contains a LOG4J config file.If you want to use logback, then please 
> delete or rename the log configuration file.
> 2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - The configuration directory 
> ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
>  already contains a LOG4J config file.If you want to use logback, then please 
> delete or rename the log configuration file.
> 2019-12-18 15:04:40,907 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2019-12-18 15:04:41,084 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Cluster specification: 
> ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=12288, 
> numberTaskManagers=1, slotsPerTaskManager=6}
> 2019-12-18 15:04:42,344 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Submitting application master application_1576573857638_0026
> 2019-12-18 15:04:42,370 INFO  
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
> application application_1576573857638_0026
> 2019-12-18 15:04:42,371 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Waiting for the cluster to be allocated
> 2019-12-18 15:04:42,372 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Deploying cluster, current state ACCEPTED
> 2019-12-18 15:04:45,388 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - YARN application has been deployed successfully.
> 2019-12-18 15:04:45,390 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Found Web Interface debugboxcreate431x3.sa:36162 of 
> application 'application_1576573857638_0026'.
> Job has been submitted with JobID 9140c70769f4271cc22ea8becaa26272
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: org.apache.flink.client.program.ProgramInvocationException: 
> Job failed (JobID: 

[jira] [Comment Edited] (FLINK-15308) Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1

2019-12-18 Thread Feng Jiajie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16998931#comment-16998931
 ] 

Feng Jiajie edited comment on FLINK-15308 at 12/18/19 8:35 AM:
---

Here is my test code:

[https://github.com/fengjiajie/my-flink-test|https://github.com/fengjiajie/my-flink-test/tree/master/src/main]

run cmd:
{code:java}
bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 8192m 
~/laputa-flink-example-1.0-SNAPSHOT.jar
{code}
and
{code:java}
nc -l 31212
{code}
on the host debugboxcreate431x1 `cn/kbyte/StreamingJob.java:88` 
{code:java}
new SocketClientSink<>("debugboxcreate431x1", 31212, new SimpleStringSchema()))
{code}
[~kevin.cyj]

 


was (Author: fengjiajie):
[https://github.com/fengjiajie/my-flink-test|https://github.com/fengjiajie/my-flink-test/tree/master/src/main]

run cmd:

bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 8192m 
~/laputa-flink-example-1.0-SNAPSHOT.jar

[~kevin.cyj]

 

> Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1
> --
>
> Key: FLINK-15308
> URL: https://issues.apache.org/jira/browse/FLINK-15308
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.10.0
> Environment: $ git log
> commit 4b54da2c67692b1c9d43e1184c00899b0151b3ae
> Author: bowen.li 
> Date: Tue Dec 17 17:37:03 2019 -0800
>Reporter: Feng Jiajie
>Assignee: Yingjie Cao
>Priority: Blocker
>
> Job worked well with default flink-conf.yaml with 
> pipelined-shuffle.compression:
> {code:java}
> taskmanager.numberOfTaskSlots: 1
> taskmanager.network.pipelined-shuffle.compression.enabled: true
> {code}
> But when I set taskmanager.numberOfTaskSlots to 4 or 6:
> {code:java}
> taskmanager.numberOfTaskSlots: 6
> taskmanager.network.pipelined-shuffle.compression.enabled: true
> {code}
> job failed:
> {code:java}
> $ bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 12288m 
> ~/flink-example-1.0-SNAPSHOT.jar
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/data/sa_cluster/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - The configuration directory 
> ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
>  already contains a LOG4J config file.If you want to use logback, then please 
> delete or rename the log configuration file.
> 2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - The configuration directory 
> ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
>  already contains a LOG4J config file.If you want to use logback, then please 
> delete or rename the log configuration file.
> 2019-12-18 15:04:40,907 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2019-12-18 15:04:41,084 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Cluster specification: 
> ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=12288, 
> numberTaskManagers=1, slotsPerTaskManager=6}
> 2019-12-18 15:04:42,344 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Submitting application master application_1576573857638_0026
> 2019-12-18 15:04:42,370 INFO  
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
> application application_1576573857638_0026
> 2019-12-18 15:04:42,371 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Waiting for the cluster to be allocated
> 2019-12-18 15:04:42,372 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Deploying cluster, current state ACCEPTED
> 2019-12-18 15:04:45,388 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - YARN application has been deployed successfully.
> 2019-12-18 15:04:45,390 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Found Web Interface debugboxcreate431x3.sa:36162 of 
> application 'application_1576573857638_0026'.
> Job has been submitted with JobID 9140c70769f4271cc22ea8becaa26272
> 

[jira] [Comment Edited] (FLINK-15308) Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1

2019-12-18 Thread Feng Jiajie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16998931#comment-16998931
 ] 

Feng Jiajie edited comment on FLINK-15308 at 12/18/19 8:30 AM:
---

[https://github.com/fengjiajie/my-flink-test|https://github.com/fengjiajie/my-flink-test/tree/master/src/main]

run cmd:

bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 8192m 
~/laputa-flink-example-1.0-SNAPSHOT.jar

[~kevin.cyj]

 


was (Author: fengjiajie):
[https://github.com/fengjiajie/my-flink-test/tree/master/src/main]

run cmd:

bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 8192m 
~/laputa-flink-example-1.0-SNAPSHOT.jar

> Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1
> --
>
> Key: FLINK-15308
> URL: https://issues.apache.org/jira/browse/FLINK-15308
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.10.0
> Environment: $ git log
> commit 4b54da2c67692b1c9d43e1184c00899b0151b3ae
> Author: bowen.li 
> Date: Tue Dec 17 17:37:03 2019 -0800
>Reporter: Feng Jiajie
>Assignee: Yingjie Cao
>Priority: Blocker
>
> Job worked well with default flink-conf.yaml with 
> pipelined-shuffle.compression:
> {code:java}
> taskmanager.numberOfTaskSlots: 1
> taskmanager.network.pipelined-shuffle.compression.enabled: true
> {code}
> But when I set taskmanager.numberOfTaskSlots to 4 or 6:
> {code:java}
> taskmanager.numberOfTaskSlots: 6
> taskmanager.network.pipelined-shuffle.compression.enabled: true
> {code}
> job failed:
> {code:java}
> $ bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 12288m 
> ~/flink-example-1.0-SNAPSHOT.jar
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/data/sa_cluster/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - The configuration directory 
> ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
>  already contains a LOG4J config file.If you want to use logback, then please 
> delete or rename the log configuration file.
> 2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - The configuration directory 
> ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
>  already contains a LOG4J config file.If you want to use logback, then please 
> delete or rename the log configuration file.
> 2019-12-18 15:04:40,907 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2019-12-18 15:04:41,084 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Cluster specification: 
> ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=12288, 
> numberTaskManagers=1, slotsPerTaskManager=6}
> 2019-12-18 15:04:42,344 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Submitting application master application_1576573857638_0026
> 2019-12-18 15:04:42,370 INFO  
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
> application application_1576573857638_0026
> 2019-12-18 15:04:42,371 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Waiting for the cluster to be allocated
> 2019-12-18 15:04:42,372 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Deploying cluster, current state ACCEPTED
> 2019-12-18 15:04:45,388 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - YARN application has been deployed successfully.
> 2019-12-18 15:04:45,390 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Found Web Interface debugboxcreate431x3.sa:36162 of 
> application 'application_1576573857638_0026'.
> Job has been submitted with JobID 9140c70769f4271cc22ea8becaa26272
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: org.apache.flink.client.program.ProgramInvocationException: 
> Job failed (JobID: 9140c70769f4271cc22ea8becaa26272)
>   at 
> 

[jira] [Commented] (FLINK-15308) Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1

2019-12-18 Thread Feng Jiajie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16998931#comment-16998931
 ] 

Feng Jiajie commented on FLINK-15308:
-

[https://github.com/fengjiajie/my-flink-test/tree/master/src/main]

run cmd:

bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 8192m 
~/laputa-flink-example-1.0-SNAPSHOT.jar

> Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1
> --
>
> Key: FLINK-15308
> URL: https://issues.apache.org/jira/browse/FLINK-15308
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.10.0
> Environment: $ git log
> commit 4b54da2c67692b1c9d43e1184c00899b0151b3ae
> Author: bowen.li 
> Date: Tue Dec 17 17:37:03 2019 -0800
>Reporter: Feng Jiajie
>Assignee: Yingjie Cao
>Priority: Blocker
>
> Job worked well with default flink-conf.yaml with 
> pipelined-shuffle.compression:
> {code:java}
> taskmanager.numberOfTaskSlots: 1
> taskmanager.network.pipelined-shuffle.compression.enabled: true
> {code}
> But when I set taskmanager.numberOfTaskSlots to 4 or 6:
> {code:java}
> taskmanager.numberOfTaskSlots: 6
> taskmanager.network.pipelined-shuffle.compression.enabled: true
> {code}
> job failed:
> {code:java}
> $ bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 12288m 
> ~/flink-example-1.0-SNAPSHOT.jar
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/data/sa_cluster/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - The configuration directory 
> ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
>  already contains a LOG4J config file.If you want to use logback, then please 
> delete or rename the log configuration file.
> 2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - The configuration directory 
> ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
>  already contains a LOG4J config file.If you want to use logback, then please 
> delete or rename the log configuration file.
> 2019-12-18 15:04:40,907 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2019-12-18 15:04:41,084 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Cluster specification: 
> ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=12288, 
> numberTaskManagers=1, slotsPerTaskManager=6}
> 2019-12-18 15:04:42,344 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Submitting application master application_1576573857638_0026
> 2019-12-18 15:04:42,370 INFO  
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
> application application_1576573857638_0026
> 2019-12-18 15:04:42,371 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Waiting for the cluster to be allocated
> 2019-12-18 15:04:42,372 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Deploying cluster, current state ACCEPTED
> 2019-12-18 15:04:45,388 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - YARN application has been deployed successfully.
> 2019-12-18 15:04:45,390 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Found Web Interface debugboxcreate431x3.sa:36162 of 
> application 'application_1576573857638_0026'.
> Job has been submitted with JobID 9140c70769f4271cc22ea8becaa26272
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: org.apache.flink.client.program.ProgramInvocationException: 
> Job failed (JobID: 9140c70769f4271cc22ea8becaa26272)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>   at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>   at 
> 

[jira] [Updated] (FLINK-15308) Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1

2019-12-17 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-15308:

Priority: Blocker  (was: Major)

> Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1
> --
>
> Key: FLINK-15308
> URL: https://issues.apache.org/jira/browse/FLINK-15308
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.10.0
> Environment: $ git log
> commit 4b54da2c67692b1c9d43e1184c00899b0151b3ae
> Author: bowen.li 
> Date: Tue Dec 17 17:37:03 2019 -0800
>Reporter: Feng Jiajie
>Priority: Blocker
>
> Job worked well with default flink-conf.yaml with 
> pipelined-shuffle.compression:
> {code:java}
> taskmanager.numberOfTaskSlots: 1
> taskmanager.network.pipelined-shuffle.compression.enabled: true
> {code}
> But when I set taskmanager.numberOfTaskSlots to 4 or 6:
> {code:java}
> taskmanager.numberOfTaskSlots: 6
> taskmanager.network.pipelined-shuffle.compression.enabled: true
> {code}
> job failed:
> {code:java}
> $ bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 12288m 
> ~/flink-example-1.0-SNAPSHOT.jar
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/data/sa_cluster/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - The configuration directory 
> ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
>  already contains a LOG4J config file.If you want to use logback, then please 
> delete or rename the log configuration file.
> 2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - The configuration directory 
> ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
>  already contains a LOG4J config file.If you want to use logback, then please 
> delete or rename the log configuration file.
> 2019-12-18 15:04:40,907 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2019-12-18 15:04:41,084 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Cluster specification: 
> ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=12288, 
> numberTaskManagers=1, slotsPerTaskManager=6}
> 2019-12-18 15:04:42,344 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Submitting application master application_1576573857638_0026
> 2019-12-18 15:04:42,370 INFO  
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
> application application_1576573857638_0026
> 2019-12-18 15:04:42,371 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Waiting for the cluster to be allocated
> 2019-12-18 15:04:42,372 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Deploying cluster, current state ACCEPTED
> 2019-12-18 15:04:45,388 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - YARN application has been deployed successfully.
> 2019-12-18 15:04:45,390 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Found Web Interface debugboxcreate431x3.sa:36162 of 
> application 'application_1576573857638_0026'.
> Job has been submitted with JobID 9140c70769f4271cc22ea8becaa26272
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: org.apache.flink.client.program.ProgramInvocationException: 
> Job failed (JobID: 9140c70769f4271cc22ea8becaa26272)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>   at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>   at 
> 

[jira] [Created] (FLINK-15308) Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1

2019-12-17 Thread Feng Jiajie (Jira)
Feng Jiajie created FLINK-15308:
---

 Summary: Job failed when enable pipelined-shuffle.compression and 
numberOfTaskSlots > 1
 Key: FLINK-15308
 URL: https://issues.apache.org/jira/browse/FLINK-15308
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.10.0
 Environment: $ git log
commit 4b54da2c67692b1c9d43e1184c00899b0151b3ae
Author: bowen.li 
Date: Tue Dec 17 17:37:03 2019 -0800
Reporter: Feng Jiajie


Job worked well with default flink-conf.yaml with pipelined-shuffle.compression:
{code:java}
taskmanager.numberOfTaskSlots: 1
taskmanager.network.pipelined-shuffle.compression.enabled: true
{code}
But when I set taskmanager.numberOfTaskSlots to 4 or 6:
{code:java}
taskmanager.numberOfTaskSlots: 6
taskmanager.network.pipelined-shuffle.compression.enabled: true
{code}
job failed:
{code:java}
$ bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 12288m 
~/flink-example-1.0-SNAPSHOT.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/data/sa_cluster/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- The configuration directory 
('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
 already contains a LOG4J config file.If you want to use logback, then please 
delete or rename the log configuration file.
2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- The configuration directory 
('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
 already contains a LOG4J config file.If you want to use logback, then please 
delete or rename the log configuration file.
2019-12-18 15:04:40,907 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- No path for the flink jar passed. Using the location of class 
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-12-18 15:04:41,084 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Cluster specification: ClusterSpecification{masterMemoryMB=1024, 
taskManagerMemoryMB=12288, numberTaskManagers=1, slotsPerTaskManager=6}
2019-12-18 15:04:42,344 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Submitting application master application_1576573857638_0026
2019-12-18 15:04:42,370 INFO  
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
application application_1576573857638_0026
2019-12-18 15:04:42,371 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Waiting for the cluster to be allocated
2019-12-18 15:04:42,372 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Deploying cluster, current state ACCEPTED
2019-12-18 15:04:45,388 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- YARN application has been deployed successfully.
2019-12-18 15:04:45,390 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Found Web Interface debugboxcreate431x3.sa:36162 of application 
'application_1576573857638_0026'.
Job has been submitted with JobID 9140c70769f4271cc22ea8becaa26272


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: org.apache.flink.client.program.ProgramInvocationException: 
Job failed (JobID: 9140c70769f4271cc22ea8becaa26272)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
at 

[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2019-12-11 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-15152:

Labels: checkpoint scheduler  (was: checkpoint)

> Job running without periodic checkpoint for stop failed at the beginning
> 
>
> Key: FLINK-15152
> URL: https://issues.apache.org/jira/browse/FLINK-15152
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.1
>Reporter: Feng Jiajie
>Priority: Major
>  Labels: checkpoint, scheduler
>
> I have a streaming job configured with periodically checkpoint, but after one 
> week running, I found there isn't any checkpoint file.
> h2. Reproduce the problem:
> 1. Job was submitted to YARN:
> {code:java}
> bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
> flink-example-1.0-SNAPSHOT.jar{code}
> 2. Then immediately, before all the task switch to RUNNING (about seconds), 
> I(actually a job control script) send a "stop with savepoint" command by 
> flink cli:
> {code:java}
> bin/flink stop -yid application_1575872737452_0019 
> f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
> {code}
> log in jobmanager.log:
> {code:java}
> 2019-12-09 17:56:56,512 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> triggering task Source: Socket Stream -> Map (1/1) of job 
> f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED 
> instead. Aborting checkpoint.
> {code}
> Then the job task(taskmanager) *continues to run normally without* checkpoint.
> h2. The cause of the problem:
> 1. "stop with savepoint" command call the code 
> stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
>  and then triggerSynchronousSavepoint:
> {code:java}
> // we stop the checkpoint coordinator so that we are guaranteed
> // to have only the data of the synchronous savepoint committed.
> // in case of failure, and if the job restarts, the coordinator
> // will be restarted by the CheckpointCoordinatorDeActivator.
> checkpointCoordinator.stopCheckpointScheduler();{code}
> 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint 
> failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
> {code:java}
> LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
> instead. Aborting checkpoint.",
>   tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
>   job,
>   ExecutionState.RUNNING,
>   ee.getState());
> throw new 
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
> 3. finally, "stop with savepoint" failed, with 
> "checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
> of the job. The job is still running without periodically checkpoint. 
>  
> sample code for reproduce:
> {code:java}
> public class StreamingJob {
>   private static StateBackend makeRocksdbBackend() throws IOException {
> RocksDBStateBackend rocksdbBackend = new 
> RocksDBStateBackend("file:///tmp/aaa");
> rocksdbBackend.enableTtlCompactionFilter();
> 
> rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
> return rocksdbBackend;
>   }
>   public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 10 sec
> env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
> env.setStateBackend(makeRocksdbBackend());
> env.setRestartStrategy(RestartStrategies.noRestart());
> CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> checkpointConfig.enableExternalizedCheckpoints(
> 
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> checkpointConfig.setFailOnCheckpointingErrors(true);
> DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
> text.map(new MapFunction>() {
>   @Override
>   public Tuple2 map(String s) {
> String[] s1 = s.split(" ");
> return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
>   }
> }).keyBy(0).flatMap(new CountWindowAverage()).print();
> env.execute("Flink Streaming Java API Skeleton");
>   }
>   public static class CountWindowAverage extends 
> RichFlatMapFunction, Tuple2> {
> private transient ValueState> sum;
> @Override
> public void flatMap(Tuple2 input, Collector Long>> out) throws Exception {
>   Tuple2 currentSum = sum.value();
>   currentSum.f0 += 1;
>   currentSum.f1 += input.f1;
>   sum.update(currentSum);
>   out.collect(new Tuple2<>(input.f0, currentSum.f1));
> }
> @Override
> public void 

[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2019-12-09 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-15152:

Description: 
I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:

1. Job was submitted to YARN:
{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}
2. Then immediately, before all the task switch to RUNNING (about seconds), 
I(actually a job control script) send a "stop with savepoint" command by flink 
cli:
{code:java}
bin/flink stop -yid application_1575872737452_0019 
f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
{code}
log in jobmanager.log:
{code:java}
2019-12-09 17:56:56,512 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
triggering task Source: Socket Stream -> Map (1/1) of job 
f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.
{code}
Then the job task(taskmanager) *continues to run normally without* checkpoint.
h2. The cause of the problem:

1. "stop with savepoint" command call the code 
stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
 and then triggerSynchronousSavepoint:
{code:java}
// we stop the checkpoint coordinator so that we are guaranteed
// to have only the data of the synchronous savepoint committed.
// in case of failure, and if the job restarts, the coordinator
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();{code}
2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint 
failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
{code:java}
LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
instead. Aborting checkpoint.",
  tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
  job,
  ExecutionState.RUNNING,
  ee.getState());
throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
3. finally, "stop with savepoint" failed, with 
"checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
of the job. The job is still running without periodically checkpoint. 

 

sample code for reproduce:
{code:java}
public class StreamingJob {

  private static StateBackend makeRocksdbBackend() throws IOException {
RocksDBStateBackend rocksdbBackend = new 
RocksDBStateBackend("file:///tmp/aaa");
rocksdbBackend.enableTtlCompactionFilter();

rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
return rocksdbBackend;
  }

  public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// 10 sec
env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(makeRocksdbBackend());
env.setRestartStrategy(RestartStrategies.noRestart());

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.setFailOnCheckpointingErrors(true);

DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
text.map(new MapFunction>() {
  @Override
  public Tuple2 map(String s) {
String[] s1 = s.split(" ");
return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
  }
}).keyBy(0).flatMap(new CountWindowAverage()).print();

env.execute("Flink Streaming Java API Skeleton");
  }

  public static class CountWindowAverage extends 
RichFlatMapFunction, Tuple2> {

private transient ValueState> sum;

@Override
public void flatMap(Tuple2 input, Collector> 
out) throws Exception {
  Tuple2 currentSum = sum.value();
  currentSum.f0 += 1;
  currentSum.f1 += input.f1;
  sum.update(currentSum);
  out.collect(new Tuple2<>(input.f0, currentSum.f1));
}

@Override
public void open(Configuration config) {
  ValueStateDescriptor> descriptor =
  new ValueStateDescriptor<>(
  "average", // the state name
  TypeInformation.of(new TypeHint>() {
  }), // type information
  Tuple2.of(0L, 0L)); // default value of the state, if nothing was 
set
  sum = getRuntimeContext().getState(descriptor);
}
  }
}
{code}

  was:
I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:

1. Job was submitted to YARN:
{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}
2. Then immediately, before all 

[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2019-12-09 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-15152:

Description: 
I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:

1. Job was submitted to YARN:
{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}
2. Then immediately, before all the task switch to RUNNING (about seconds), 
I(actually a job control script) send a "stop with savepoint" command by flink 
cli:
{code:java}
bin/flink stop -yid application_1575872737452_0019 
f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
{code}
log in jobmanager.log:
{code:java}
2019-12-09 17:56:56,512 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
triggering task Source: Socket Stream -> Map (1/1) of job 
f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.
{code}
Then the job task *continues to run normally without* checkpoint.
h2. The cause of the problem:

1. "stop with savepoint" command call the code 
stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
 and then triggerSynchronousSavepoint:
{code:java}
// we stop the checkpoint coordinator so that we are guaranteed
// to have only the data of the synchronous savepoint committed.
// in case of failure, and if the job restarts, the coordinator
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();{code}
2. but "before all the task switch to RUNNING", checkpoint failed at 
org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
{code:java}
LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
instead. Aborting checkpoint.",
  tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
  job,
  ExecutionState.RUNNING,
  ee.getState());
throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
3. finally, "stop with savepoint" failed, with 
"checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
of the job.

 

sample code:
{code:java}
public class StreamingJob {

  private static StateBackend makeRocksdbBackend() throws IOException {
RocksDBStateBackend rocksdbBackend = new 
RocksDBStateBackend("file:///tmp/aaa");
rocksdbBackend.enableTtlCompactionFilter();

rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
return rocksdbBackend;
  }

  public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// 10 sec
env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(makeRocksdbBackend());
env.setRestartStrategy(RestartStrategies.noRestart());

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.setFailOnCheckpointingErrors(true);

DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
text.map(new MapFunction>() {
  @Override
  public Tuple2 map(String s) {
String[] s1 = s.split(" ");
return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
  }
}).keyBy(0).flatMap(new CountWindowAverage()).print();

env.execute("Flink Streaming Java API Skeleton");
  }

  public static class CountWindowAverage extends 
RichFlatMapFunction, Tuple2> {

private transient ValueState> sum;

@Override
public void flatMap(Tuple2 input, Collector> 
out) throws Exception {
  Tuple2 currentSum = sum.value();
  currentSum.f0 += 1;
  currentSum.f1 += input.f1;
  sum.update(currentSum);
  out.collect(new Tuple2<>(input.f0, currentSum.f1));
}

@Override
public void open(Configuration config) {
  ValueStateDescriptor> descriptor =
  new ValueStateDescriptor<>(
  "average", // the state name
  TypeInformation.of(new TypeHint>() {
  }), // type information
  Tuple2.of(0L, 0L)); // default value of the state, if nothing was 
set
  sum = getRuntimeContext().getState(descriptor);
}
  }
}
{code}

  was:
I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:

1. Job was submitted to YARN:
{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}
2. Then immediately, before all the task switch to RUNNING (about seconds), 
I(actually a job control script) send a "stop with 

[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2019-12-09 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-15152:

Description: 
I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:

1. Job was submitted to YARN:
{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}
2. Then immediately, before all the task switch to RUNNING (about seconds), 
I(actually a job control script) send a "stop with savepoint" command by flink 
cli:
{code:java}
bin/flink stop -yid application_1575872737452_0019 
f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
{code}
Then the job task continues to run normally, but no checkpointing.
h2. The cause of the problem:

1. "stop with savepoint" command call the code 
stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
 and then triggerSynchronousSavepoint:
{code:java}
// we stop the checkpoint coordinator so that we are guaranteed
// to have only the data of the synchronous savepoint committed.
// in case of failure, and if the job restarts, the coordinator
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();{code}
2. but "before all the task switch to RUNNING", checkpoint failed at 
org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
{code:java}
LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
instead. Aborting checkpoint.",
  tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
  job,
  ExecutionState.RUNNING,
  ee.getState());
throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
3. finally, "stop with savepoint" failed, with 
"checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
of the job.

 

sample code:
{code:java}
public class StreamingJob {

  private static StateBackend makeRocksdbBackend() throws IOException {
RocksDBStateBackend rocksdbBackend = new 
RocksDBStateBackend("file:///tmp/aaa");
rocksdbBackend.enableTtlCompactionFilter();

rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
return rocksdbBackend;
  }

  public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// 10 sec
env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(makeRocksdbBackend());
env.setRestartStrategy(RestartStrategies.noRestart());

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.setFailOnCheckpointingErrors(true);

DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
text.map(new MapFunction>() {
  @Override
  public Tuple2 map(String s) {
String[] s1 = s.split(" ");
return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
  }
}).keyBy(0).flatMap(new CountWindowAverage()).print();

env.execute("Flink Streaming Java API Skeleton");
  }

  public static class CountWindowAverage extends 
RichFlatMapFunction, Tuple2> {

private transient ValueState> sum;

@Override
public void flatMap(Tuple2 input, Collector> 
out) throws Exception {
  Tuple2 currentSum = sum.value();
  currentSum.f0 += 1;
  currentSum.f1 += input.f1;
  sum.update(currentSum);
  out.collect(new Tuple2<>(input.f0, currentSum.f1));
}

@Override
public void open(Configuration config) {
  ValueStateDescriptor> descriptor =
  new ValueStateDescriptor<>(
  "average", // the state name
  TypeInformation.of(new TypeHint>() {
  }), // type information
  Tuple2.of(0L, 0L)); // default value of the state, if nothing was 
set
  sum = getRuntimeContext().getState(descriptor);
}
  }
}
{code}

  was:
I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:

1. Job was submitted to YARN:
{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}
2. Then immediately, before all the task switch to RUNNING (about seconds), 
I(actually a job control script) send a stop with savepoint command by flink 
cli:
{code:java}
bin/flink stop -yid application_1575872737452_0019 
f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
{code}
Then the job task continues to run normally, but no checkpointing.
h2. The cause of the problem:

1. "stop with savepoint" command call the code 

[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2019-12-09 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-15152:

Description: 
I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:

1. Job was submitted to YARN:
{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}
2. Then immediately, before all the task switch to RUNNING (about seconds), 
I(actually a job control script) send a stop with savepoint command by flink 
cli:
{code:java}
bin/flink stop -yid application_1575872737452_0019 
f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
{code}
Then the job task continues to run normally, but no checkpointing.
h2. The cause of the problem:

1. "stop with savepoint" command call the code 
stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
 and then triggerSynchronousSavepoint:
{code:java}
// we stop the checkpoint coordinator so that we are guaranteed
// to have only the data of the synchronous savepoint committed.
// in case of failure, and if the job restarts, the coordinator
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();{code}
2. but "before all the task switch to RUNNING", checkpoint failed at 
org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
{code:java}
LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
instead. Aborting checkpoint.",
  tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
  job,
  ExecutionState.RUNNING,
  ee.getState());
throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
3. finally, "stop with savepoint" failed, with 
"checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
of the job.

 

sample code:
{code:java}
public class StreamingJob {

  private static StateBackend makeRocksdbBackend() throws IOException {
RocksDBStateBackend rocksdbBackend = new 
RocksDBStateBackend("file:///tmp/aaa");
rocksdbBackend.enableTtlCompactionFilter();

rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
return rocksdbBackend;
  }

  public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// 10 sec
env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(makeRocksdbBackend());
env.setRestartStrategy(RestartStrategies.noRestart());

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.setFailOnCheckpointingErrors(true);

DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
text.map(new MapFunction>() {
  @Override
  public Tuple2 map(String s) {
String[] s1 = s.split(" ");
return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
  }
}).keyBy(0).flatMap(new CountWindowAverage()).print();

env.execute("Flink Streaming Java API Skeleton");
  }

  public static class CountWindowAverage extends 
RichFlatMapFunction, Tuple2> {

private transient ValueState> sum;

@Override
public void flatMap(Tuple2 input, Collector> 
out) throws Exception {
  Tuple2 currentSum = sum.value();
  currentSum.f0 += 1;
  currentSum.f1 += input.f1;
  sum.update(currentSum);
  out.collect(new Tuple2<>(input.f0, currentSum.f1));
}

@Override
public void open(Configuration config) {
  ValueStateDescriptor> descriptor =
  new ValueStateDescriptor<>(
  "average", // the state name
  TypeInformation.of(new TypeHint>() {
  }), // type information
  Tuple2.of(0L, 0L)); // default value of the state, if nothing was 
set
  sum = getRuntimeContext().getState(descriptor);
}
  }
}
{code}

  was:
I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:
 # Job was submitted to YARN:

{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}

 # Then immediately, before all the task switch to RUNNING (about seconds), 
I(actually a job control script) send a stop with savepoint command by flink 
cli:
{code:java}
bin/flink stop -yid application_1575872737452_0019 
f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
{code}

Then the job task continues to run normally, but no checkpointing.
h2. The cause of the problem:
 # "stop with savepoint" command call the code 

[jira] [Created] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2019-12-09 Thread Feng Jiajie (Jira)
Feng Jiajie created FLINK-15152:
---

 Summary: Job running without periodic checkpoint for stop failed 
at the beginning
 Key: FLINK-15152
 URL: https://issues.apache.org/jira/browse/FLINK-15152
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.9.1
Reporter: Feng Jiajie


I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:
 # Job was submitted to YARN:

{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}

 # Then immediately, before all the task switch to RUNNING (about seconds), 
I(actually a job control script) send a stop with savepoint command by flink 
cli:
{code:java}
bin/flink stop -yid application_1575872737452_0019 
f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
{code}

Then the job task continues to run normally, but no checkpointing.
h2. The cause of the problem:
 # "stop with savepoint" command call the code 
stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
 and then triggerSynchronousSavepoint:

{code:java}
// we stop the checkpoint coordinator so that we are guaranteed
// to have only the data of the synchronous savepoint committed.
// in case of failure, and if the job restarts, the coordinator
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();{code}

 # but "before all the task switch to RUNNING", checkpoint failed at 
org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509

{code:java}
LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
instead. Aborting checkpoint.",
  tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
  job,
  ExecutionState.RUNNING,
  ee.getState());
throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}

 # finally, "stop with savepoint" failed, with 
"checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
of the job.

 

sample code:
{code:java}
public class StreamingJob {

  private static StateBackend makeRocksdbBackend() throws IOException {
RocksDBStateBackend rocksdbBackend = new 
RocksDBStateBackend("file:///tmp/aaa");
rocksdbBackend.enableTtlCompactionFilter();

rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
return rocksdbBackend;
  }

  public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// 10 sec
env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(makeRocksdbBackend());
env.setRestartStrategy(RestartStrategies.noRestart());

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.setFailOnCheckpointingErrors(true);

DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
text.map(new MapFunction>() {
  @Override
  public Tuple2 map(String s) {
String[] s1 = s.split(" ");
return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
  }
}).keyBy(0).flatMap(new CountWindowAverage()).print();

env.execute("Flink Streaming Java API Skeleton");
  }

  public static class CountWindowAverage extends 
RichFlatMapFunction, Tuple2> {

private transient ValueState> sum;

@Override
public void flatMap(Tuple2 input, Collector> 
out) throws Exception {
  Tuple2 currentSum = sum.value();
  currentSum.f0 += 1;
  currentSum.f1 += input.f1;
  sum.update(currentSum);
  out.collect(new Tuple2<>(input.f0, currentSum.f1));
}

@Override
public void open(Configuration config) {
  ValueStateDescriptor> descriptor =
  new ValueStateDescriptor<>(
  "average", // the state name
  TypeInformation.of(new TypeHint>() {
  }), // type information
  Tuple2.of(0L, 0L)); // default value of the state, if nothing was 
set
  sum = getRuntimeContext().getState(descriptor);
}
  }
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)