[jira] [Assigned] (FLINK-35217) Missing fsync in FileSystemCheckpointStorage

2024-04-25 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-35217:
--

Assignee: Stefan Richter

> Missing fsync in FileSystemCheckpointStorage
> 
>
> Key: FLINK-35217
> URL: https://issues.apache.org/jira/browse/FLINK-35217
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.18.0, 1.19.0
>Reporter: Marc Aurel Fritz
>Assignee: Stefan Richter
>Priority: Critical
>
> While running Flink on a system with unstable power supply checkpoints were 
> regularly corrupted in the form of "_metadata" files with a file size of 0 
> bytes. In all cases the previous checkpoint data had already been deleted, 
> causing progress to be lost completely.
> Further investigation revealed that the "FileSystemCheckpointStorage" doesn't 
> perform "fsync" when writing a new checkpoint to disk. This means the old 
> checkpoint gets removed without making sure that the new one is durably 
> persisted on disk. "strace" on the jobmanager's process confirms this 
> behavior:
>  # The checkpoint chk-60's in-progress metadata is written at "openat"
>  # The checkpoint chk-60's in-progress metadata is atomically renamed at 
> "rename"
>  # The old checkpoint chk-59 is deleted at "unlink"
> For durable persistence an "fsync" call is missing before step 3.
> Full "strace" log:
> {code:java}
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fc970) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fca00) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc", 
> {st_mode=S_IFDIR|0755, st_size=42, ...}) = 0
> [pid 51618] 11:44:30 
> mkdir("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 0777) 
> = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc860) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc740) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  0x7fd2ad5fc7d0) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51618] 11:44:30 
> rename("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata") = > 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51644] 11:44:30 
> unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata")
>  = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
> ...}, AT_EMPTY_PATH) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
> ...}, AT_EMPTY_PATH) = 0
> [pid 51644] 11:44:30 
> unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59") 

[jira] [Commented] (FLINK-35217) Missing fsync in FileSystemCheckpointStorage

2024-04-25 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840787#comment-17840787
 ] 

Stefan Richter commented on FLINK-35217:


I think you are right, close will only guarantee a flush, i.e. passing all data 
to the OS, but not forcing the OS to write to disk.

> Missing fsync in FileSystemCheckpointStorage
> 
>
> Key: FLINK-35217
> URL: https://issues.apache.org/jira/browse/FLINK-35217
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.18.0, 1.19.0
>Reporter: Marc Aurel Fritz
>Priority: Critical
>
> While running Flink on a system with unstable power supply checkpoints were 
> regularly corrupted in the form of "_metadata" files with a file size of 0 
> bytes. In all cases the previous checkpoint data had already been deleted, 
> causing progress to be lost completely.
> Further investigation revealed that the "FileSystemCheckpointStorage" doesn't 
> perform "fsync" when writing a new checkpoint to disk. This means the old 
> checkpoint gets removed without making sure that the new one is durably 
> persisted on disk. "strace" on the jobmanager's process confirms this 
> behavior:
>  # The checkpoint chk-60's in-progress metadata is written at "openat"
>  # The checkpoint chk-60's in-progress metadata is atomically renamed at 
> "rename"
>  # The old checkpoint chk-59 is deleted at "unlink"
> For durable persistence an "fsync" call is missing before step 3.
> Full "strace" log:
> {code:java}
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fc970) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fca00) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc", 
> {st_mode=S_IFDIR|0755, st_size=42, ...}) = 0
> [pid 51618] 11:44:30 
> mkdir("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 0777) 
> = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc860) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc740) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  0x7fd2ad5fc7d0) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51618] 11:44:30 
> rename("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata") = > 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51644] 11:44:30 
> unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata")
>  = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
> ...}, AT_EMPTY_PATH) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
> ...}, AT_EMPTY_PATH) = 

[jira] [Commented] (FLINK-35217) Missing fsync in FileSystemCheckpointStorage

2024-04-23 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840115#comment-17840115
 ] 

Stefan Richter commented on FLINK-35217:


Hi, the code is calling close on the output stream which usually implies that 
it's flushed and synced. I'm wondering if this is a OS or Java version specific 
problem?

> Missing fsync in FileSystemCheckpointStorage
> 
>
> Key: FLINK-35217
> URL: https://issues.apache.org/jira/browse/FLINK-35217
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.18.0, 1.19.0
>Reporter: Marc Aurel Fritz
>Priority: Critical
>
> While running Flink on a system with unstable power supply checkpoints were 
> regularly corrupted in the form of "_metadata" files with a file size of 0 
> bytes. In all cases the previous checkpoint data had already been deleted, 
> causing progress to be lost completely.
> Further investigation revealed that the "FileSystemCheckpointStorage" doesn't 
> perform "fsync" when writing a new checkpoint to disk. This means the old 
> checkpoint gets removed without making sure that the new one is durably 
> persisted on disk. "strace" on the jobmanager's process confirms this 
> behavior:
>  # The checkpoint chk-60's in-progress metadata is written at "openat"
>  # The checkpoint chk-60's in-progress metadata is atomically renamed at 
> "rename"
>  # The old checkpoint chk-59 is deleted at "unlink"
> For durable persistence an "fsync" call is missing before step 3.
> Full "strace" log:
> {code:java}
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fc970) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fca00) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc", 
> {st_mode=S_IFDIR|0755, st_size=42, ...}) = 0
> [pid 51618] 11:44:30 
> mkdir("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 0777) 
> = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc860) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc740) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  0x7fd2ad5fc7d0) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51618] 11:44:30 
> rename("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata") = > 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51644] 11:44:30 
> unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata")
>  = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
> ...}, AT_EMPTY_PATH) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, 

[jira] [Updated] (FLINK-34693) Memory leak in KafkaWriter

2024-03-15 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-34693:
---
Attachment: (was: image-2024-03-15-10-30-50-902.png)

> Memory leak in KafkaWriter
> --
>
> Key: FLINK-34693
> URL: https://issues.apache.org/jira/browse/FLINK-34693
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0, 1.19.0, 1.18.1
>Reporter: Stefan Richter
>Priority: Blocker
> Attachments: image-2024-03-15-10-30-08-280.png
>
>
> KafkaWriter is keeping objects in Dequeue of closeables 
> ({{{}producerCloseables{}}}) that are never removed so that the can be GC’ed.
> From heap 
> dump:!04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true=9d1e022e-8762-45b3-877b-d298ec956078==870337=306=2284=!
>   !image-2024-03-15-10-30-08-280.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34693) Memory leak in KafkaWriter

2024-03-15 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-34693:
---
Description: 
KafkaWriter is keeping objects in Dequeue of closeables 
({{{}producerCloseables{}}}) that are never removed so that the can be GC’ed.

>From heap 
>dump:!04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true=9d1e022e-8762-45b3-877b-d298ec956078==870337=306=2284=!
>  !image-2024-03-15-10-30-08-280.png!

  was:
KafkaWriter is keeping instances of {{TwoPhaseCommitProducer}} in Dequeue of 
closeables ({{{}producerCloseables{}}}). We are only adding instances to the 
queue (for each txn?), but never remove them so that the can be GC’ed.

>From heap dump:

!image-2024-03-15-10-30-50-902.png!
!04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true=9d1e022e-8762-45b3-877b-d298ec956078==870337=306=2284=!
 !image-2024-03-15-10-30-08-280.png!


> Memory leak in KafkaWriter
> --
>
> Key: FLINK-34693
> URL: https://issues.apache.org/jira/browse/FLINK-34693
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0, 1.19.0, 1.18.1
>Reporter: Stefan Richter
>Priority: Blocker
> Attachments: image-2024-03-15-10-30-08-280.png, 
> image-2024-03-15-10-30-50-902.png
>
>
> KafkaWriter is keeping objects in Dequeue of closeables 
> ({{{}producerCloseables{}}}) that are never removed so that the can be GC’ed.
> From heap 
> dump:!04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true=9d1e022e-8762-45b3-877b-d298ec956078==870337=306=2284=!
>   !image-2024-03-15-10-30-08-280.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34693) Memory leak in KafkaWriter

2024-03-15 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-34693:
---
Description: 
KafkaWriter is keeping instances of {{TwoPhaseCommitProducer}} in Dequeue of 
closeables ({{{}producerCloseables{}}}). We are only adding instances to the 
queue (for each txn?), but never remove them so that the can be GC’ed.

>From heap dump:

!image-2024-03-15-10-30-50-902.png!
!04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true=9d1e022e-8762-45b3-877b-d298ec956078==870337=306=2284=!
 !image-2024-03-15-10-30-08-280.png!

  was:
KafkaWriter is keeping instances of {{TwoPhaseCommitProducer}} in Dequeue of 
closeables ({{{}producerCloseables{}}}). We are only adding instances to the 
queue (for each txn?), but never remove them so that the can be GC’ed.

>From heap dump:

!image-2024-03-15-10-30-50-902.png!
!04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true=9d1e022e-8762-45b3-877b-d298ec956078==870337=306=2284=!!image-2024-03-15-10-30-08-280.png!!image-2024-03-15-10-28-48-591.png!


> Memory leak in KafkaWriter
> --
>
> Key: FLINK-34693
> URL: https://issues.apache.org/jira/browse/FLINK-34693
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0, 1.19.0, 1.18.1
>Reporter: Stefan Richter
>Priority: Blocker
> Attachments: image-2024-03-15-10-30-08-280.png, 
> image-2024-03-15-10-30-50-902.png
>
>
> KafkaWriter is keeping instances of {{TwoPhaseCommitProducer}} in Dequeue of 
> closeables ({{{}producerCloseables{}}}). We are only adding instances to the 
> queue (for each txn?), but never remove them so that the can be GC’ed.
> From heap dump:
> !image-2024-03-15-10-30-50-902.png!
> !04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true=9d1e022e-8762-45b3-877b-d298ec956078==870337=306=2284=!
>  !image-2024-03-15-10-30-08-280.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34693) Memory leak in KafkaWriter

2024-03-15 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-34693:
--

 Summary: Memory leak in KafkaWriter
 Key: FLINK-34693
 URL: https://issues.apache.org/jira/browse/FLINK-34693
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.18.1, 1.19.0, 1.18.0
Reporter: Stefan Richter
 Attachments: image-2024-03-15-10-30-08-280.png, 
image-2024-03-15-10-30-50-902.png

KafkaWriter is keeping instances of {{TwoPhaseCommitProducer}} in Dequeue of 
closeables ({{{}producerCloseables{}}}). We are only adding instances to the 
queue (for each txn?), but never remove them so that the can be GC’ed.

>From heap dump:

!image-2024-03-15-10-30-50-902.png!
!04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true=9d1e022e-8762-45b3-877b-d298ec956078==870337=306=2284=!!image-2024-03-15-10-30-08-280.png!!image-2024-03-15-10-28-48-591.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34579) Introduce metric for time since last completed checkpoint

2024-03-05 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-34579.
--
Resolution: Won't Do

> Introduce metric for time since last completed checkpoint
> -
>
> Key: FLINK-34579
> URL: https://issues.apache.org/jira/browse/FLINK-34579
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> This metric will help us to identify jobs with checkpointing problems without 
> first requiring to complete or fail the checkpoint first before the problem 
> surfaces.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34579) Introduce metric for time since last completed checkpoint

2024-03-05 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-34579:
--

 Summary: Introduce metric for time since last completed checkpoint
 Key: FLINK-34579
 URL: https://issues.apache.org/jira/browse/FLINK-34579
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.20.0


This metric will help us to identify jobs with checkpointing problems without 
first requiring to complete or fail the checkpoint first before the problem 
surfaces.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34546) Emit span with failure labels on failure

2024-03-01 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-34546.
--
Resolution: Fixed

> Emit span with failure labels on failure
> 
>
> Key: FLINK-34546
> URL: https://issues.apache.org/jira/browse/FLINK-34546
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> To improve observability, we should emit a span for each failure that 
> contains details about the failure classification.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34546) Emit span with failure labels on failure

2024-02-29 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-34546:
--

 Summary: Emit span with failure labels on failure
 Key: FLINK-34546
 URL: https://issues.apache.org/jira/browse/FLINK-34546
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.20.0


To improve observability, we should emit a span for each failure that contains 
details about the failure classification.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33555) LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory:

2024-02-16 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817916#comment-17817916
 ] 

Stefan Richter commented on FLINK-33555:


[~mapohl] I did some work related to local recovery (essentially using 
available local state in rescaling), but that doesn't affect allocation or 
non-rescaling scenarios.

> LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory:
> ---
>
> Key: FLINK-33555
> URL: https://issues.apache.org/jira/browse/FLINK-33555
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.19.0, 1.20.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
> Attachments: FLINK-33555.log
>
>
> https://github.com/XComp/flink/actions/runs/6868936761/job/18680977238#step:12:13492
> {code}
> Error: 21:44:15 21:44:15.144 [ERROR]   
> LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory:119
>  [The task was deployed to AllocationID(fcf411eadbae8beed895a78ea1653046) but 
> it should have been deployed to 
> AllocationID(dec337d82b9d960004ffd73be8a2c5d5) for local recovery., The task 
> was deployed to AllocationID(a61fd8a6bc5ef9d467f32f918bdfb385) but it should 
> have been deployed to AllocationID(fcf411eadbae8beed895a78ea1653046) for 
> local recovery., The task was deployed to 
> AllocationID(dec337d82b9d960004ffd73be8a2c5d5) but it should have been 
> deployed to AllocationID(a61fd8a6bc5ef9d467f32f918bdfb385) for local 
> recovery.] ==> expected:  but was: 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33962) Chaining-agnostic OperatorID generation for improved state compatibility on parallelism change

2024-02-07 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815274#comment-17815274
 ] 

Stefan Richter commented on FLINK-33962:


Hi [~Zhanghao Chen] ! The proposed change in this Jira makes sense to me, and I 
think that using the same idea as outlined in the FLIP should also work for 
this case. From the top of my head I don't see a problem to revive the previous 
mechanism for compatibility.

> Chaining-agnostic OperatorID generation for improved state compatibility on 
> parallelism change
> --
>
> Key: FLINK-33962
> URL: https://issues.apache.org/jira/browse/FLINK-33962
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Zhanghao Chen
>Priority: Major
>
> *Background*
> Flink restores opeartor state from snapshots based on matching the 
> operatorIDs. Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID 
> generation when no user-set uid exists. The generated OperatorID is 
> deterministic with respect to:
>  * node-local properties (the traverse ID in the BFS for the stream graph)
>  * chained output nodes
>  * input nodes hashes
> *Problem*
> The chaining behavior will affect state compatibility, as the generation of 
> the OperatorID of an Op is dependent on its chained output nodes. For 
> example, a simple source->sink DAG with source and sink chained together is 
> state imcompatible with an otherwise identical DAG with source and sink 
> unchained (either because the parallelisms of the two ops are changed to be 
> unequal or chaining is disabled). This greatly limits the flexibility to 
> perform chain-breaking/joining for performance tuning.
> *Proposal*
> Introduce {{StreamGraphHasherV3}} that is agnostic to the chaining behavior 
> of operators, which effectively just removes L227-235 of 
> [flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
>  at master · apache/flink 
> (github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java].
>  
> This will not hurt the deteministicity of the ID generation across job 
> submission as long as the stream graph topology doesn't change, and since new 
> versions of Flink have already adopted pure operator-level state recovery, 
> this will not break state recovery across job submission as long as both 
> submissions use the same hasher.
> This will, however, break cross-version state compatibility. So we can 
> introduce a new option to enable using HasherV3 in v1.19 and consider making 
> it the default hasher in v2.0.
> Looking forward to suggestions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34050) Rocksdb state has space amplification after rescaling with DeleteRange

2024-02-05 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814312#comment-17814312
 ] 

Stefan Richter edited comment on FLINK-34050 at 2/5/24 10:41 AM:
-

Just one idea: since the current proposal is making the rescaling times worse, 
it can have significant drawback. How about we call deleteFiles async before 
the next checkpoint after a rescaling, thus making sure that the space 
amplification never makes it into the checkpoint and doing it outside of a 
critical path for restoring. Wdyt?


was (Author: srichter):
Just one idea: since the current proposal is making the rescaling times worse, 
it can have significant drawback. How about we call deleteFiles in the async 
part of the next checkpoint after a rescaling, thus making sure that the space 
amplification never makes it into the checkpoint and doing it outside of a 
critical path for restoring or processing. Wdyt?

> Rocksdb state has space amplification after rescaling with DeleteRange
> --
>
> Key: FLINK-34050
> URL: https://issues.apache.org/jira/browse/FLINK-34050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Major
> Attachments: image-2024-01-10-21-23-48-134.png, 
> image-2024-01-10-21-24-10-983.png, image-2024-01-10-21-28-24-312.png
>
>
> FLINK-21321 use deleteRange to speed up rocksdb rescaling, however it will 
> cause space amplification in some case.
> We can reproduce this problem using wordCount job:
> 1) before rescaling, state operator in wordCount job has 2 parallelism and 
> 4G+ full checkpoint size;
> !image-2024-01-10-21-24-10-983.png|width=266,height=130!
> 2) then restart job with 4 parallelism (for state operator),  the full 
> checkpoint size of new job will be 8G+ ;
> 3) after many successful checkpoints, the full checkpoint size is still 8G+;
> !image-2024-01-10-21-28-24-312.png|width=454,height=111!
>  
> The root cause of this issue is that the deleted keyGroupRange does not 
> overlap with current DB keyGroupRange, so new data written into rocksdb after 
> rescaling almost never do LSM compaction with the deleted data (belonging to 
> other keyGroupRange.)
>  
> And the space amplification may affect Rocksdb read performance and disk 
> space usage after rescaling. It looks like a regression due to the 
> introduction of deleteRange for rescaling optimization.
>  
> To slove this problem, I think maybe we can invoke 
> Rocksdb.deleteFilesInRanges after deleteRange?
> {code:java}
> public static void clipDBWithKeyGroupRange() {
>   //...
>   List ranges = new ArrayList<>();
>   //...
>   deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
>   ranges.add(beginKeyGroupBytes);
>   ranges.add(endKeyGroupBytes);
>   //
>   for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
>  db.deleteFilesInRanges(columnFamilyHandle, ranges, false);
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34050) Rocksdb state has space amplification after rescaling with DeleteRange

2024-02-05 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814312#comment-17814312
 ] 

Stefan Richter commented on FLINK-34050:


Just one idea: since the current proposal is making the rescaling times worse, 
it can have significant drawback. How about we call deleteFiles in the async 
part of the next checkpoint after a rescaling, thus making sure that the space 
amplification never makes it into the checkpoint and doing it outside of a 
critical path for restoring or processing. Wdyt?

> Rocksdb state has space amplification after rescaling with DeleteRange
> --
>
> Key: FLINK-34050
> URL: https://issues.apache.org/jira/browse/FLINK-34050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Major
> Attachments: image-2024-01-10-21-23-48-134.png, 
> image-2024-01-10-21-24-10-983.png, image-2024-01-10-21-28-24-312.png
>
>
> FLINK-21321 use deleteRange to speed up rocksdb rescaling, however it will 
> cause space amplification in some case.
> We can reproduce this problem using wordCount job:
> 1) before rescaling, state operator in wordCount job has 2 parallelism and 
> 4G+ full checkpoint size;
> !image-2024-01-10-21-24-10-983.png|width=266,height=130!
> 2) then restart job with 4 parallelism (for state operator),  the full 
> checkpoint size of new job will be 8G+ ;
> 3) after many successful checkpoints, the full checkpoint size is still 8G+;
> !image-2024-01-10-21-28-24-312.png|width=454,height=111!
>  
> The root cause of this issue is that the deleted keyGroupRange does not 
> overlap with current DB keyGroupRange, so new data written into rocksdb after 
> rescaling almost never do LSM compaction with the deleted data (belonging to 
> other keyGroupRange.)
>  
> And the space amplification may affect Rocksdb read performance and disk 
> space usage after rescaling. It looks like a regression due to the 
> introduction of deleteRange for rescaling optimization.
>  
> To slove this problem, I think maybe we can invoke 
> Rocksdb.deleteFilesInRanges after deleteRange?
> {code:java}
> public static void clipDBWithKeyGroupRange() {
>   //...
>   List ranges = new ArrayList<>();
>   //...
>   deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
>   ranges.add(beginKeyGroupBytes);
>   ranges.add(endKeyGroupBytes);
>   //
>   for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
>  db.deleteFilesInRanges(columnFamilyHandle, ranges, false);
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34200) AutoRescalingITCase#testCheckpointRescalingInKeyedState fails

2024-01-26 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811194#comment-17811194
 ] 

Stefan Richter commented on FLINK-34200:


[~fanrui] I think you misunderstood my comment. I believe the problem is with 
the test code/test utils, not in Flink.

> AutoRescalingITCase#testCheckpointRescalingInKeyedState fails
> -
>
> Key: FLINK-34200
> URL: https://issues.apache.org/jira/browse/FLINK-34200
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56601=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=ea7cf968-e585-52cb-e0fc-f48de023a7ca=8200]
> {code:java}
> Jan 19 02:31:53 02:31:53.954 [ERROR] Tests run: 32, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 1050 s <<< FAILURE! -- in 
> org.apache.flink.test.checkpointing.AutoRescalingITCase
> Jan 19 02:31:53 02:31:53.954 [ERROR] 
> org.apache.flink.test.checkpointing.AutoRescalingITCase.testCheckpointRescalingInKeyedState[backend
>  = rocksdb, buffersPerChannel = 2] -- Time elapsed: 59.10 s <<< FAILURE!
> Jan 19 02:31:53 java.lang.AssertionError: expected:<[(0,8000), (0,32000), 
> (0,48000), (0,72000), (1,78000), (1,3), (1,54000), (0,2000), (0,1), 
> (0,5), (0,66000), (0,74000), (0,82000), (1,8), (1,0), (1,16000), 
> (1,24000), (1,4), (1,56000), (1,64000), (0,12000), (0,28000), (0,52000), 
> (0,6), (0,68000), (0,76000), (1,18000), (1,26000), (1,34000), (1,42000), 
> (1,58000), (0,6000), (0,14000), (0,22000), (0,38000), (0,46000), (0,62000), 
> (0,7), (1,4000), (1,2), (1,36000), (1,44000)]> but was:<[(0,8000), 
> (0,32000), (0,48000), (0,72000), (1,78000), (1,3), (1,54000), (0,2000), 
> (0,1), (0,5), (0,66000), (0,74000), (0,82000), (1,8), (1,0), 
> (1,16000), (1,24000), (1,4), (1,56000), (1,64000), (0,12000), (0,28000), 
> (0,52000), (0,6), (0,68000), (0,76000), (0,1000), (0,25000), (0,33000), 
> (0,41000), (1,18000), (1,26000), (1,34000), (1,42000), (1,58000), (0,6000), 
> (0,14000), (0,22000), (0,38000), (0,46000), (0,62000), (0,7), (1,4000), 
> (1,2), (1,36000), (1,44000)]>
> Jan 19 02:31:53   at org.junit.Assert.fail(Assert.java:89)
> Jan 19 02:31:53   at org.junit.Assert.failNotEquals(Assert.java:835)
> Jan 19 02:31:53   at org.junit.Assert.assertEquals(Assert.java:120)
> Jan 19 02:31:53   at org.junit.Assert.assertEquals(Assert.java:146)
> Jan 19 02:31:53   at 
> org.apache.flink.test.checkpointing.AutoRescalingITCase.testCheckpointRescalingKeyedState(AutoRescalingITCase.java:296)
> Jan 19 02:31:53   at 
> org.apache.flink.test.checkpointing.AutoRescalingITCase.testCheckpointRescalingInKeyedState(AutoRescalingITCase.java:196)
> Jan 19 02:31:53   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 19 02:31:53   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32410) Allocate hash-based collections with sufficient capacity for expected size

2024-01-24 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter resolved FLINK-32410.

Fix Version/s: 1.18.0
   (was: 1.19.0)
   Resolution: Done

> Allocate hash-based collections with sufficient capacity for expected size
> --
>
> Key: FLINK-32410
> URL: https://issues.apache.org/jira/browse/FLINK-32410
> Project: Flink
>  Issue Type: Improvement
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.18.0
>
>
> The JDK API to create hash-based collections for a certain capacity is 
> arguable misleading because it doesn't size the collections to "hold a 
> specific number of items" like you'd expect it would. Instead it sizes it to 
> hold load-factor% of the specified number.
> For the common pattern to allocate a hash-based collection with the size of 
> expected elements to avoid rehashes, this means that a rehash is essentially 
> guaranteed.
> We should introduce helper methods (similar to Guava's 
> `Maps.newHashMapWithExpectedSize(int)`) for allocations for expected size and 
> replace  the direct constructor calls with those.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32410) Allocate hash-based collections with sufficient capacity for expected size

2024-01-24 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810420#comment-17810420
 ] 

Stefan Richter commented on FLINK-32410:


Yes, it's already done.

> Allocate hash-based collections with sufficient capacity for expected size
> --
>
> Key: FLINK-32410
> URL: https://issues.apache.org/jira/browse/FLINK-32410
> Project: Flink
>  Issue Type: Improvement
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0
>
>
> The JDK API to create hash-based collections for a certain capacity is 
> arguable misleading because it doesn't size the collections to "hold a 
> specific number of items" like you'd expect it would. Instead it sizes it to 
> hold load-factor% of the specified number.
> For the common pattern to allocate a hash-based collection with the size of 
> expected elements to avoid rehashes, this means that a rehash is essentially 
> guaranteed.
> We should introduce helper methods (similar to Guava's 
> `Maps.newHashMapWithExpectedSize(int)`) for allocations for expected size and 
> replace  the direct constructor calls with those.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33696) FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter

2024-01-24 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter resolved FLINK-33696.

Resolution: Done

merged in 7db2ecad

> FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter
> 
>
> Key: FLINK-33696
> URL: https://issues.apache.org/jira/browse/FLINK-33696
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Metrics
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.19.0
>
>
> h1. Motivation
> [FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces]
>  is adding TraceReporter interface. However with 
> [FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces]
>  alone, Log4jTraceReporter would be the only available implementation of 
> TraceReporter interface, which is not very helpful.
> In this FLIP I’m proposing to contribute both MetricExporter and 
> TraceReporter implementation using OpenTelemetry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34218) AutoRescalingITCase#testCheckpointRescalingInKeyedState fails

2024-01-24 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810313#comment-17810313
 ] 

Stefan Richter commented on FLINK-34218:


Hi, I also cannot reproduce the problem locally, but I'm rather confident that 
it's a test problem.

> AutoRescalingITCase#testCheckpointRescalingInKeyedState fails
> -
>
> Key: FLINK-34218
> URL: https://issues.apache.org/jira/browse/FLINK-34218
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Rui Fan
>Priority: Major
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56740=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba=ae4f8708-9994-57d3-c2d7-b892156e7812



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34199) Add tracing for durations of rescaling/restoring RocksDB incremental checkpoints from downloaded and local state

2024-01-23 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-34199:
---
Summary: Add tracing for durations of rescaling/restoring RocksDB 
incremental checkpoints from downloaded and local state  (was: Add tracing for 
durations of rescaling/restoring from local state)

> Add tracing for durations of rescaling/restoring RocksDB incremental 
> checkpoints from downloaded and local state
> 
>
> Key: FLINK-34199
> URL: https://issues.apache.org/jira/browse/FLINK-34199
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> Adds tracing for durations of rescaling/restoring from local state. This 
> enables more fine grained monitoring of restore operations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34199) Add tracing for durations of rescaling/restoring from local state

2024-01-22 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-34199:
--

 Summary: Add tracing for durations of rescaling/restoring from 
local state
 Key: FLINK-34199
 URL: https://issues.apache.org/jira/browse/FLINK-34199
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.19.0
Reporter: Stefan Richter
Assignee: Stefan Richter


Adds tracing for durations of rescaling/restoring from local state. This 
enables more fine grained monitoring of restore operations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34134) Add tracing for restored state size and locations

2024-01-17 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-34134:
--

 Summary: Add tracing for restored state size and locations
 Key: FLINK-34134
 URL: https://issues.apache.org/jira/browse/FLINK-34134
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Stefan Richter
Assignee: Stefan Richter


We can add tracing during the restore that reports the state size that was 
restored by location(s). This is particularly interesting for a mixed recovery 
with some local and some remote state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32444) Enable object reuse for Flink SQL jobs by default

2023-11-30 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791536#comment-17791536
 ] 

Stefan Richter commented on FLINK-32444:


[~pnowojski] if there really is an issue with heap backend, then we also need 
to be careful about what type of caching we can build for RocksDB in the future.

> Enable object reuse for Flink SQL jobs by default
> -
>
> Key: FLINK-32444
> URL: https://issues.apache.org/jira/browse/FLINK-32444
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Jark Wu
>Priority: Major
> Fix For: 1.19.0
>
>
> Currently, object reuse is not enabled by default for Flink Streaming Jobs, 
> but is enabled by default for Flink Batch jobs. That is not consistent for 
> stream-batch unification. Besides, SQL operators are safe to enable object 
> reuse and this is a great performance improvement for SQL jobs. 
> We should also be careful with the Table-DataStream conversion case 
> (StreamTableEnvironment) which is not safe to enable object reuse by default. 
> Maybe we can just enable it for SQL Client/Gateway and TableEnvironment. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33341) Use available local keyed state for rescaling

2023-10-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-33341.
--
Resolution: Fixed

merged in a4ad86f 

> Use available local keyed state for rescaling
> -
>
> Key: FLINK-33341
> URL: https://issues.apache.org/jira/browse/FLINK-33341
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> Local state is currently only used for recovery. However, it would make sense 
> to also use available local state in rescaling scenarios to reduce the amount 
> of data to download from remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33341) Use available local keyed state for rescaling

2023-10-26 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-33341:
---
Summary: Use available local keyed state for rescaling  (was: Use available 
local state for rescaling)

> Use available local keyed state for rescaling
> -
>
> Key: FLINK-33341
> URL: https://issues.apache.org/jira/browse/FLINK-33341
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> Local state is currently only used for recovery. However, it would make sense 
> to also use available local state in rescaling scenarios to reduce the amount 
> of data to download from remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33341) Use available local state for rescaling

2023-10-24 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779129#comment-17779129
 ] 

Stefan Richter commented on FLINK-33341:


FYI, here is a link to the development branch: 
https://github.com/apache/flink/compare/master...StefanRRichter:flink:srichter-local-rescaling-FLINK-33341

> Use available local state for rescaling
> ---
>
> Key: FLINK-33341
> URL: https://issues.apache.org/jira/browse/FLINK-33341
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> Local state is currently only used for recovery. However, it would make sense 
> to also use available local state in rescaling scenarios to reduce the amount 
> of data to download from remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33341) Use available local state for rescaling

2023-10-24 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779124#comment-17779124
 ] 

Stefan Richter commented on FLINK-33341:


[~Yanfei Lei], yes only the previous local state is available to be used in 
rescaling, so we might still need to download additional state from remote. But 
oftentimes we don't need to download everything from remote, in particular if 
we scale out we will often find the complete state locally on some machines and 
just need to drop some key-groups. And for scale-in, we should at least find 
one piece of the state locally. There is no good reason not to 
opportunistically use local state also in rescaling scenarios. No change to the 
scheduler will be needed.

> Use available local state for rescaling
> ---
>
> Key: FLINK-33341
> URL: https://issues.apache.org/jira/browse/FLINK-33341
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> Local state is currently only used for recovery. However, it would make sense 
> to also use available local state in rescaling scenarios to reduce the amount 
> of data to download from remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33341) Use available local state for rescaling

2023-10-23 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778677#comment-17778677
 ] 

Stefan Richter commented on FLINK-33341:


Exactly, we support local state for recovery - but not for rescaling, yet.

> Use available local state for rescaling
> ---
>
> Key: FLINK-33341
> URL: https://issues.apache.org/jira/browse/FLINK-33341
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> Local state is currently only used for recovery. However, it would make sense 
> to also use available local state in rescaling scenarios to reduce the amount 
> of data to download from remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33341) Use available local state for rescaling

2023-10-23 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-33341:
--

 Summary: Use available local state for rescaling
 Key: FLINK-33341
 URL: https://issues.apache.org/jira/browse/FLINK-33341
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Stefan Richter
Assignee: Stefan Richter


Local state is currently only used for recovery. However, it would make sense 
to also use available local state in rescaling scenarios to reduce the amount 
of data to download from remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33246) Add RescalingIT case that uses checkpoints and resource requests

2023-10-11 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-33246:
--

 Summary: Add RescalingIT case that uses checkpoints and resource 
requests
 Key: FLINK-33246
 URL: https://issues.apache.org/jira/browse/FLINK-33246
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Reporter: Stefan Richter
Assignee: Stefan Richter


RescalingITCase currently uses savepoints and cancel/restart for rescaling. We 
should add a test that also tests rescaling from checkpoints under changing 
resource requirements, i.e. without cancelation of the job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33077) Minimize the risk of hard back-pressure with buffer debloating enabled

2023-09-12 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-33077:
--

 Summary: Minimize the risk of hard back-pressure with buffer 
debloating enabled
 Key: FLINK-33077
 URL: https://issues.apache.org/jira/browse/FLINK-33077
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.18.0


{*}Problem{*}:
Buffer debloating sets buffer size to {{256}} bytes because of back-pressure.
Such small buffers might not be enough to emit the processing results of a 
single record. The task thread would request new buffers, and often block.
That results in significant checkpoint delays (up to minutes instead of 
seconds).

Adding more overdraft buffers helps, but depends on the job DoP
Raising {{taskmanager.memory.min-segment-size}} from {{256}} helps, but depends 
on the multiplication factor of the operator.

{*}Solution{*}:
 * Ignore Buffer Debloater hints and extend the buffer if possible - when this 
prevents emitting an output record fully AND this is the last available buffer.
 * Prevent the subsequent flush of the buffer so that more output records can 
be emitted (flatMap-like and join operators)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32782) Release Testing: Disable WAL in RocksDBWriteBatchWrapper by default

2023-08-29 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-32782:
---
Description: 
Covered by nightly tests, for example 

- run_test "Resuming Externalized Checkpoint (rocks, incremental, no 
parallelism change) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks 
true true" "skip_check_exceptions"
- run_test "Resuming Externalized Checkpoint (rocks, incremental, scale up) 
end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks 
true true" "skip_check_exceptions"
- run_test "Resuming Externalized Checkpoint (rocks, incremental, scale down) 
end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks 
true true" "skip_check_exceptions"

> Release Testing:  Disable WAL in RocksDBWriteBatchWrapper by default
> 
>
> Key: FLINK-32782
> URL: https://issues.apache.org/jira/browse/FLINK-32782
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.18.0
>
>
> Covered by nightly tests, for example 
> - run_test "Resuming Externalized Checkpoint (rocks, incremental, no 
> parallelism change) end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 
> rocks true true" "skip_check_exceptions"
> - run_test "Resuming Externalized Checkpoint (rocks, incremental, scale up) 
> end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 
> rocks true true" "skip_check_exceptions"
> - run_test "Resuming Externalized Checkpoint (rocks, incremental, scale down) 
> end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 
> rocks true true" "skip_check_exceptions"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32783) Release Testing: Improve parallel download of RocksDB incremental state

2023-08-29 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-32783:
---
Description: 
This feature is automatically used whenever we download state during a restart 
from a RocksDB incremental checkpoint. This should be tested with and without 
task-local recovery.

Will be covered by the nightly tests:

* run_test "Resuming Externalized Checkpoint (rocks, incremental, no 
parallelism change) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks 
true true" "skip_check_exceptions"
* run_test "Resuming Externalized Checkpoint (rocks, incremental, scale up) 
end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks 
true true" "skip_check_exceptions"
* run_test "Resuming Externalized Checkpoint (rocks, incremental, scale 
down) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks 
true true" "skip_check_exceptions"

  was:This feature is automatically used whenever we download state during a 
restart from a RocksDB incremental checkpoint. This should be tested with and 
without task-local recovery.


> Release Testing: Improve parallel download of RocksDB incremental state
> ---
>
> Key: FLINK-32783
> URL: https://issues.apache.org/jira/browse/FLINK-32783
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.18.0
>
>
> This feature is automatically used whenever we download state during a 
> restart from a RocksDB incremental checkpoint. This should be tested with and 
> without task-local recovery.
> Will be covered by the nightly tests:
> * run_test "Resuming Externalized Checkpoint (rocks, incremental, no 
> parallelism change) end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 
> rocks true true" "skip_check_exceptions"
> * run_test "Resuming Externalized Checkpoint (rocks, incremental, scale 
> up) end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 
> rocks true true" "skip_check_exceptions"
> * run_test "Resuming Externalized Checkpoint (rocks, incremental, scale 
> down) end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 
> rocks true true" "skip_check_exceptions"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32783) Release Testing: Improve parallel download of RocksDB incremental state

2023-08-23 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-32783:
---
Description: This feature is automatically used whenever we download state 
during a restart from a RocksDB incremental checkpoint. This should be tested 
with and without task-local recovery.

> Release Testing: Improve parallel download of RocksDB incremental state
> ---
>
> Key: FLINK-32783
> URL: https://issues.apache.org/jira/browse/FLINK-32783
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.18.0
>
>
> This feature is automatically used whenever we download state during a 
> restart from a RocksDB incremental checkpoint. This should be tested with and 
> without task-local recovery.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32681) RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie

2023-08-01 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-32681.
--
Resolution: Fixed

merged into master d11cc32

> RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie
> 
>
> Key: FLINK-32681
> URL: https://issues.apache.org/jira/browse/FLINK-32681
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51712=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef
> Failed 3 times in yesterdays nightly run.
> {code}
> Jul 26 01:12:46 01:12:46.889 [ERROR] 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure
>   Time elapsed: 0.044 s  <<< FAILURE!
> Jul 26 01:12:46 java.lang.AssertionError
> Jul 26 01:12:46   at org.junit.Assert.fail(Assert.java:87)
> Jul 26 01:12:46   at org.junit.Assert.assertTrue(Assert.java:42)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:65)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:75)
> Jul 26 01:12:46   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure(RocksDBStateDownloaderTest.java:151)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32681) RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie

2023-08-01 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17749599#comment-17749599
 ] 

Stefan Richter commented on FLINK-32681:


[~Feifan Wang]Thanks for the offer but I've already worked on a fix yesterday. 
But please feel free to review it later.

> RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie
> 
>
> Key: FLINK-32681
> URL: https://issues.apache.org/jira/browse/FLINK-32681
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51712=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef
> Failed 3 times in yesterdays nightly run.
> {code}
> Jul 26 01:12:46 01:12:46.889 [ERROR] 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure
>   Time elapsed: 0.044 s  <<< FAILURE!
> Jul 26 01:12:46 java.lang.AssertionError
> Jul 26 01:12:46   at org.junit.Assert.fail(Assert.java:87)
> Jul 26 01:12:46   at org.junit.Assert.assertTrue(Assert.java:42)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:65)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:75)
> Jul 26 01:12:46   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure(RocksDBStateDownloaderTest.java:151)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32681) RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie

2023-07-31 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17749247#comment-17749247
 ] 

Stefan Richter commented on FLINK-32681:


I'm running on Mac, but with a sleep I already managed to reproduce it. I'll 
take a look what's going on.

> RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie
> 
>
> Key: FLINK-32681
> URL: https://issues.apache.org/jira/browse/FLINK-32681
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51712=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef
> Failed 3 times in yesterdays nightly run.
> {code}
> Jul 26 01:12:46 01:12:46.889 [ERROR] 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure
>   Time elapsed: 0.044 s  <<< FAILURE!
> Jul 26 01:12:46 java.lang.AssertionError
> Jul 26 01:12:46   at org.junit.Assert.fail(Assert.java:87)
> Jul 26 01:12:46   at org.junit.Assert.assertTrue(Assert.java:42)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:65)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:75)
> Jul 26 01:12:46   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure(RocksDBStateDownloaderTest.java:151)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32681) RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie

2023-07-31 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17749203#comment-17749203
 ] 

Stefan Richter commented on FLINK-32681:


[~mapohl] I was trying to reproduce this locally without success, could it be 
an infra related problem? The code is calling 
`FileUtils::deleteDirectoryQuietly` to cleanup files and if something goes 
wrong during the deletion, it could still find the directories and fail the 
test. I could also just try to test if delete was called and not if the files 
where actually deleted to abstract away from such infra problems, wdyt?

> RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie
> 
>
> Key: FLINK-32681
> URL: https://issues.apache.org/jira/browse/FLINK-32681
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51712=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef
> Failed 3 times in yesterdays nightly run.
> {code}
> Jul 26 01:12:46 01:12:46.889 [ERROR] 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure
>   Time elapsed: 0.044 s  <<< FAILURE!
> Jul 26 01:12:46 java.lang.AssertionError
> Jul 26 01:12:46   at org.junit.Assert.fail(Assert.java:87)
> Jul 26 01:12:46   at org.junit.Assert.assertTrue(Assert.java:42)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:65)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:75)
> Jul 26 01:12:46   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure(RocksDBStateDownloaderTest.java:151)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32410) Allocate hash-based collections with sufficient capacity for expected size

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-32410:
---
Fix Version/s: 1.18.0
   (was: 1.19.0)

> Allocate hash-based collections with sufficient capacity for expected size
> --
>
> Key: FLINK-32410
> URL: https://issues.apache.org/jira/browse/FLINK-32410
> Project: Flink
>  Issue Type: Improvement
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> The JDK API to create hash-based collections for a certain capacity is 
> arguable misleading because it doesn't size the collections to "hold a 
> specific number of items" like you'd expect it would. Instead it sizes it to 
> hold load-factor% of the specified number.
> For the common pattern to allocate a hash-based collection with the size of 
> expected elements to avoid rehashes, this means that a rehash is essentially 
> guaranteed.
> We should introduce helper methods (similar to Guava's 
> `Maps.newHashMapWithExpectedSize(int)`) for allocations for expected size and 
> replace  the direct constructor calls with those.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32345) Improve parallel download of RocksDB incremental state

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-32345:
---
Fix Version/s: 1.18.0
   (was: 1.19.0)

> Improve parallel download of RocksDB incremental state
> --
>
> Key: FLINK-32345
> URL: https://issues.apache.org/jira/browse/FLINK-32345
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> {{RocksDBStateDownloader}} is used to download the files for incremental 
> checkpoints in parallel. However, the parallelism is currently restricted to 
> a single {{IncrementalRemoteKeyedStateHandle}} and also a single state type 
> (shared, private) within the handle at a time.
> We should support parallelization across multiple state types and across 
> multiple state handles. In particular, this can improve our download times 
> for scale-in.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32326) Disable WAL in RocksDBWriteBatchWrapper by default

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-32326.
--
Resolution: Fixed

> Disable WAL in RocksDBWriteBatchWrapper by default
> --
>
> Key: FLINK-32326
> URL: https://issues.apache.org/jira/browse/FLINK-32326
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> We should disable WAL by default in RocksDBWriteBatchWrapper for the case 
> that now WriteOption is provided. This is the case in all restore operations 
> and can impact the performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32345) Improve parallel download of RocksDB incremental state

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-32345.
--
Resolution: Fixed

> Improve parallel download of RocksDB incremental state
> --
>
> Key: FLINK-32345
> URL: https://issues.apache.org/jira/browse/FLINK-32345
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> {{RocksDBStateDownloader}} is used to download the files for incremental 
> checkpoints in parallel. However, the parallelism is currently restricted to 
> a single {{IncrementalRemoteKeyedStateHandle}} and also a single state type 
> (shared, private) within the handle at a time.
> We should support parallelization across multiple state types and across 
> multiple state handles. In particular, this can improve our download times 
> for scale-in.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32326) Disable WAL in RocksDBWriteBatchWrapper by default

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-32326:
---
Fix Version/s: 1.18.0
   (was: 1.19.0)

> Disable WAL in RocksDBWriteBatchWrapper by default
> --
>
> Key: FLINK-32326
> URL: https://issues.apache.org/jira/browse/FLINK-32326
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> We should disable WAL by default in RocksDBWriteBatchWrapper for the case 
> that now WriteOption is provided. This is the case in all restore operations 
> and can impact the performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32347) Exceptions from the CompletedCheckpointStore are not registered by the CheckpointFailureManager

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-32347:
---
Fix Version/s: 1.18.0

> Exceptions from the CompletedCheckpointStore are not registered by the 
> CheckpointFailureManager 
> 
>
> Key: FLINK-32347
> URL: https://issues.apache.org/jira/browse/FLINK-32347
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.3, 1.16.2, 1.17.1
>Reporter: Tigran Manasyan
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Currently if an error occurs while saving a completed checkpoint in the 
> {_}CompletedCheckpointStore{_}, _CheckpointCoordinator_ doesn't call 
> _CheckpointFailureManager_ to handle the error. Such behavior leads to the 
> fact, that errors from _CompletedCheckpointStore_ don't increase the failed 
> checkpoints count and 
> _'execution.checkpointing.tolerable-failed-checkpoints'_ option does not 
> limit the number of errors of this kind in any way.
> Possible solution may be to move the notification of 
> _CheckpointFailureManager_ about successful checkpoint after storing 
> completed checkpoint in the _CompletedCheckpointStore_ and providing the 
> exception to the _CheckpointFailureManager_ in the 
> {_}CheckpointCoordinator#{_}{_}[addCompletedCheckpointToStoreAndSubsumeOldest()|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1440]{_}
>  method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32347) Exceptions from the CompletedCheckpointStore are not registered by the CheckpointFailureManager

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-32347.
--
Resolution: Fixed

> Exceptions from the CompletedCheckpointStore are not registered by the 
> CheckpointFailureManager 
> 
>
> Key: FLINK-32347
> URL: https://issues.apache.org/jira/browse/FLINK-32347
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.3, 1.16.2, 1.17.1
>Reporter: Tigran Manasyan
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> Currently if an error occurs while saving a completed checkpoint in the 
> {_}CompletedCheckpointStore{_}, _CheckpointCoordinator_ doesn't call 
> _CheckpointFailureManager_ to handle the error. Such behavior leads to the 
> fact, that errors from _CompletedCheckpointStore_ don't increase the failed 
> checkpoints count and 
> _'execution.checkpointing.tolerable-failed-checkpoints'_ option does not 
> limit the number of errors of this kind in any way.
> Possible solution may be to move the notification of 
> _CheckpointFailureManager_ about successful checkpoint after storing 
> completed checkpoint in the _CompletedCheckpointStore_ and providing the 
> exception to the _CheckpointFailureManager_ in the 
> {_}CheckpointCoordinator#{_}{_}[addCompletedCheckpointToStoreAndSubsumeOldest()|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1440]{_}
>  method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32441) DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore fails with timeout on AZP

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-32441.
--
Resolution: Fixed

> DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore fails with 
> timeout on AZP
> --
>
> Key: FLINK-32441
> URL: https://issues.apache.org/jira/browse/FLINK-32441
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, Tests
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> This build 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50461=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9274
> fails with timeout on 
> {{DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32441) DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore fails with timeout on AZP

2023-06-27 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737642#comment-17737642
 ] 

Stefan Richter commented on FLINK-32441:


Fixed in master 0c787f5.

> DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore fails with 
> timeout on AZP
> --
>
> Key: FLINK-32441
> URL: https://issues.apache.org/jira/browse/FLINK-32441
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, Tests
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> This build 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50461=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9274
> fails with timeout on 
> {{DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-30859) Remove flink-connector-kafka from master branch

2023-06-27 Thread Stefan Richter (Jira)


[ https://issues.apache.org/jira/browse/FLINK-30859 ]


Stefan Richter deleted comment on FLINK-30859:


was (Author: srichter):
[~tzulitai] I think you forgot to remove some code here: 
https://github.com/apache/flink/blob/149a5e34c1ed8d8943c901a98c65c70693915811/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java#L30C2-L30C2
 and it causes compilation errors.

> Remove flink-connector-kafka from master branch
> ---
>
> Key: FLINK-30859
> URL: https://issues.apache.org/jira/browse/FLINK-30859
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Remove flink-connector-kafka from master branch since the repo has now been 
> externalized and 1.17 commits have been sync'ed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30859) Remove flink-connector-kafka from master branch

2023-06-27 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737569#comment-17737569
 ] 

Stefan Richter commented on FLINK-30859:


[~tzulitai] I think you forgot to remove some code here: 
https://github.com/apache/flink/blob/149a5e34c1ed8d8943c901a98c65c70693915811/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java#L30C2-L30C2
 and it causes compilation errors.

> Remove flink-connector-kafka from master branch
> ---
>
> Key: FLINK-30859
> URL: https://issues.apache.org/jira/browse/FLINK-30859
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Remove flink-connector-kafka from master branch since the repo has now been 
> externalized and 1.17 commits have been sync'ed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32441) DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore fails with timeout on AZP

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-32441:
--

Assignee: Stefan Richter

> DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore fails with 
> timeout on AZP
> --
>
> Key: FLINK-32441
> URL: https://issues.apache.org/jira/browse/FLINK-32441
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, Tests
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> This build 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50461=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9274
> fails with timeout on 
> {{DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32437) Determine and set correct maxParallelism for operator chains

2023-06-26 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-32437:
---
Fix Version/s: 2.0.0
   (was: 1.19.0)

> Determine and set correct maxParallelism for operator chains
> 
>
> Key: FLINK-32437
> URL: https://issues.apache.org/jira/browse/FLINK-32437
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Current code in {{StreamingJobGraphGenerator}} does not properly determine 
> and set the correct maxParallelism of operator chains. We should set the 
> maxParallelism of the chain as the minimum of all the maxParallelism values 
> among operators in the chain.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32437) Determine and set correct maxParallelism for operator chains

2023-06-26 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-32437:
--

 Summary: Determine and set correct maxParallelism for operator 
chains
 Key: FLINK-32437
 URL: https://issues.apache.org/jira/browse/FLINK-32437
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.19.0


Current code in {{StreamingJobGraphGenerator}} does not properly determine and 
set the correct maxParallelism of operator chains. We should set the 
maxParallelism of the chain as the minimum of all the maxParallelism values 
among operators in the chain.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32410) Allocate hash-based collections with sufficient capacity for expected size

2023-06-21 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-32410:
--

 Summary: Allocate hash-based collections with sufficient capacity 
for expected size
 Key: FLINK-32410
 URL: https://issues.apache.org/jira/browse/FLINK-32410
 Project: Flink
  Issue Type: Improvement
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.19.0


The JDK API to create hash-based collections for a certain capacity is arguable 
misleading because it doesn't size the collections to "hold a specific number 
of items" like you'd expect it would. Instead it sizes it to hold load-factor% 
of the specified number.

For the common pattern to allocate a hash-based collection with the size of 
expected elements to avoid rehashes, this means that a rehash is essentially 
guaranteed.

We should introduce helper methods (similar to Guava's 
`Maps.newHashMapWithExpectedSize(int)`) for allocations for expected size and 
replace  the direct constructor calls with those.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31238) Use IngestDB to speed up Rocksdb rescaling recovery

2023-06-19 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17734183#comment-17734183
 ] 

Stefan Richter commented on FLINK-31238:


[~mayuehappy] Thanks for working on this topic! I saw that your PR against 
RocksDB was merged a few days ago. With your code in RocksDB, are you planning 
to continue with this JIRA and is there any planned release?

> Use IngestDB to speed up Rocksdb rescaling recovery 
> 
>
> Key: FLINK-31238
> URL: https://issues.apache.org/jira/browse/FLINK-31238
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
> Attachments: image-2023-02-27-16-41-18-552.png, 
> image-2023-02-27-16-57-18-435.png, image-2023-03-07-14-27-10-260.png, 
> image-2023-03-09-15-23-30-581.png, image-2023-03-09-15-26-12-314.png, 
> image-2023-03-09-15-28-32-363.png, image-2023-03-09-15-41-03-074.png, 
> image-2023-03-09-15-41-08-379.png, image-2023-03-09-15-45-56-081.png, 
> image-2023-03-09-15-46-01-176.png, image-2023-03-09-15-50-04-281.png, 
> image-2023-03-29-15-25-21-868.png, screenshot-1.png
>
>
> (The detailed design is in this document
> [https://docs.google.com/document/d/10MNVytTsyiDLZQSR89kDkVdmK_YjbM6jh0teerfDFfI|https://docs.google.com/document/d/10MNVytTsyiDLZQSR89kDkVdmK_YjbM6jh0teerfDFfI])
> There have been many discussions and optimizations in the community about 
> optimizing rocksdb scaling and recovery.
> https://issues.apache.org/jira/browse/FLINK-17971
> https://issues.apache.org/jira/browse/FLINK-8845
> https://issues.apache.org/jira/browse/FLINK-21321
> We hope to discuss some of our explorations under this ticket
> The process of scaling and recovering in rocksdb simply requires two steps
>  # Insert the valid keyGroup data of the new task.
>  # Delete the invalid data in the old stateHandle.
> The current method for data writing is to specify the main Db first and then 
> insert data using writeBatch.In addition, the method of deleteRange is 
> currently used to speed up the ClipDB. But in our production environment, we 
> found that the speed of rescaling is still very slow, especially when the 
> state of a single Task is large. 
>  
> We hope that the previous sst file can be reused directly when restoring 
> state, instead of retraversing the data. So we made some attempts to optimize 
> it in our internal version of flink and frocksdb.
>  
> We added two APIs *ClipDb* and *IngestDb* in frocksdb. 
>  * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and 
> db.Delete, DeleteValue and RangeTombstone will not be generated for parts 
> beyond the key range. We will iterate over the FileMetaData of db. Process 
> each sst file. There are three situations here. 
> If all the keys of a file are required, we will keep the sst file and do 
> nothing 
> If all the keys of the sst file exceed the specified range, we will delete 
> the file directly. 
> If we only need some part of the sst file, we will rewrite the required keys 
> to generate a new sst file。
> All sst file changes will be placed in a VersionEdit, and the current 
> versions will LogAndApply this edit to ensure that these changes can take 
> effect
>  * IngestDb is used to directly ingest all sst files of one DB into another 
> DB. But it is necessary to strictly ensure that the keys of the two DBs do 
> not overlap, which is easy to do in the Flink scenario. The hard link method 
> will be used in the process of ingesting files, so it will be very fast. At 
> the same time, the file number of the main DB will be incremented 
> sequentially, and the SequenceNumber of the main DB will be updated to the 
> larger SequenceNumber of the two DBs.
> When IngestDb and ClipDb are supported, the state restoration logic is as 
> follows
>  * Open the first StateHandle as the main DB and pause the compaction.
>  * Clip the main DB according to the KeyGroup range of the Task with ClipDB
>  * Open other StateHandles in sequence as Tmp DB, and perform ClipDb  
> according to the KeyGroup range
>  * Ingest all tmpDb into the main Db after tmpDb cliped
>  * Open the Compaction process of the main DB
> !screenshot-1.png|width=923,height=243!
> We have done some benchmark tests on the internal Flink version, and the test 
> results show that compared with the writeBatch method, the expansion and 
> recovery speed of IngestDb can be increased by 5 to 10 times as follows 
> (SstFileWriter means uses the recovery method of generating sst files through 
> SstFileWriter in parallel)
>  * parallelism changes from 4 to 2
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 8.018 s/op
> Iteration 2: 9.551 s/op
> Iteration 

[jira] [Commented] (FLINK-32347) Exceptions from the CompletedCheckpointStore are not registered by the CheckpointFailureManager

2023-06-15 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733059#comment-17733059
 ] 

Stefan Richter commented on FLINK-32347:


Hey, I've already opened a PR. The issue was still unassigned, so I thought I 
can still work on it.

> Exceptions from the CompletedCheckpointStore are not registered by the 
> CheckpointFailureManager 
> 
>
> Key: FLINK-32347
> URL: https://issues.apache.org/jira/browse/FLINK-32347
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.3, 1.16.2, 1.17.1
>Reporter: Tigran Manasyan
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> Currently if an error occurs while saving a completed checkpoint in the 
> {_}CompletedCheckpointStore{_}, _CheckpointCoordinator_ doesn't call 
> _CheckpointFailureManager_ to handle the error. Such behavior leads to the 
> fact, that errors from _CompletedCheckpointStore_ don't increase the failed 
> checkpoints count and 
> _'execution.checkpointing.tolerable-failed-checkpoints'_ option does not 
> limit the number of errors of this kind in any way.
> Possible solution may be to move the notification of 
> _CheckpointFailureManager_ about successful checkpoint after storing 
> completed checkpoint in the _CompletedCheckpointStore_ and providing the 
> exception to the _CheckpointFailureManager_ in the 
> {_}CheckpointCoordinator#{_}{_}[addCompletedCheckpointToStoreAndSubsumeOldest()|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1440]{_}
>  method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32347) Exceptions from the CompletedCheckpointStore are not registered by the CheckpointFailureManager

2023-06-15 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-32347:
--

Assignee: Stefan Richter

> Exceptions from the CompletedCheckpointStore are not registered by the 
> CheckpointFailureManager 
> 
>
> Key: FLINK-32347
> URL: https://issues.apache.org/jira/browse/FLINK-32347
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.3, 1.16.2, 1.17.1
>Reporter: Tigran Manasyan
>Assignee: Stefan Richter
>Priority: Major
>
> Currently if an error occurs while saving a completed checkpoint in the 
> {_}CompletedCheckpointStore{_}, _CheckpointCoordinator_ doesn't call 
> _CheckpointFailureManager_ to handle the error. Such behavior leads to the 
> fact, that errors from _CompletedCheckpointStore_ don't increase the failed 
> checkpoints count and 
> _'execution.checkpointing.tolerable-failed-checkpoints'_ option does not 
> limit the number of errors of this kind in any way.
> Possible solution may be to move the notification of 
> _CheckpointFailureManager_ about successful checkpoint after storing 
> completed checkpoint in the _CompletedCheckpointStore_ and providing the 
> exception to the _CheckpointFailureManager_ in the 
> {_}CheckpointCoordinator#{_}{_}[addCompletedCheckpointToStoreAndSubsumeOldest()|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1440]{_}
>  method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32345) Improve parallel download of RocksDB incremental state

2023-06-15 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-32345:
--

 Summary: Improve parallel download of RocksDB incremental state
 Key: FLINK-32345
 URL: https://issues.apache.org/jira/browse/FLINK-32345
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.19.0


{{RocksDBStateDownloader}} is used to download the files for incremental 
checkpoints in parallel. However, the parallelism is currently restricted to a 
single {{IncrementalRemoteKeyedStateHandle}} and also a single state type 
(shared, private) within the handle at a time.
We should support parallelization across multiple state types and across 
multiple state handles. In particular, this can improve our download times for 
scale-in.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32326) Disable WAL in RocksDBWriteBatchWrapper by default

2023-06-13 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-32326:
--

 Summary: Disable WAL in RocksDBWriteBatchWrapper by default
 Key: FLINK-32326
 URL: https://issues.apache.org/jira/browse/FLINK-32326
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.19.0


We should disable WAL by default in RocksDBWriteBatchWrapper for the case that 
now WriteOption is provided. This is the case in all restore operations and can 
impact the performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints

2023-05-16 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17723051#comment-17723051
 ] 

Stefan Richter commented on FLINK-31963:


Yes, PR is currently in review here: https://github.com/apache/flink/pull/22584

> java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned 
> checkpoints
> -
>
> Key: FLINK-31963
> URL: https://issues.apache.org/jira/browse/FLINK-31963
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.16.1, 1.15.4, 1.18.0
> Environment: Flink: 1.17.0
> FKO: 1.4.0
> StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint 
> enabled)
>Reporter: Tan Kim
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: stability
> Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, 
> taskmanager_error.txt
>
>
> I'm testing Autoscaler through Kubernetes Operator and I'm facing the 
> following issue.
> As you know, when a job is scaled down through the autoscaler, the job 
> manager and task manager go down and then back up again.
> When this happens, an index out of bounds exception is thrown and the state 
> is not restored from a checkpoint.
> [~gyfora] told me via the Flink Slack troubleshooting channel that this is 
> likely an issue with Unaligned Checkpoint and not an issue with the 
> autoscaler, but I'm opening a ticket with Gyula for more clarification.
> Please see the attached JM and TM error logs.
> Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints

2023-05-12 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722120#comment-17722120
 ] 

Stefan Richter commented on FLINK-31963:


Seems that this is similar to the problem described in FLINK-27031.

> java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned 
> checkpoints
> -
>
> Key: FLINK-31963
> URL: https://issues.apache.org/jira/browse/FLINK-31963
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0
> Environment: Flink: 1.17.0
> FKO: 1.4.0
> StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint 
> enabled)
>Reporter: Tan Kim
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: stability
> Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, 
> taskmanager_error.txt
>
>
> I'm testing Autoscaler through Kubernetes Operator and I'm facing the 
> following issue.
> As you know, when a job is scaled down through the autoscaler, the job 
> manager and task manager go down and then back up again.
> When this happens, an index out of bounds exception is thrown and the state 
> is not restored from a checkpoint.
> [~gyfora] told me via the Flink Slack troubleshooting channel that this is 
> likely an issue with Unaligned Checkpoint and not an issue with the 
> autoscaler, but I'm opening a ticket with Gyula for more clarification.
> Please see the attached JM and TM error logs.
> Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints

2023-05-11 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721788#comment-17721788
 ] 

Stefan Richter commented on FLINK-31963:


I have a local reproducer as well as a fix, will open a PR once I have written 
the tests.

> java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned 
> checkpoints
> -
>
> Key: FLINK-31963
> URL: https://issues.apache.org/jira/browse/FLINK-31963
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0
> Environment: Flink: 1.17.0
> FKO: 1.4.0
> StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint 
> enabled)
>Reporter: Tan Kim
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: stability
> Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, 
> taskmanager_error.txt
>
>
> I'm testing Autoscaler through Kubernetes Operator and I'm facing the 
> following issue.
> As you know, when a job is scaled down through the autoscaler, the job 
> manager and task manager go down and then back up again.
> When this happens, an index out of bounds exception is thrown and the state 
> is not restored from a checkpoint.
> [~gyfora] told me via the Flink Slack troubleshooting channel that this is 
> likely an issue with Unaligned Checkpoint and not an issue with the 
> autoscaler, but I'm opening a ticket with Gyula for more clarification.
> Please see the attached JM and TM error logs.
> Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints

2023-05-10 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721387#comment-17721387
 ] 

Stefan Richter commented on FLINK-31963:


[~masteryhx] Did your job also make use of side-outputs? Just fishing among 
things that are potentially "unusual" about the jobs.

> java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned 
> checkpoints
> -
>
> Key: FLINK-31963
> URL: https://issues.apache.org/jira/browse/FLINK-31963
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0
> Environment: Flink: 1.17.0
> FKO: 1.4.0
> StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint 
> enabled)
>Reporter: Tan Kim
>Priority: Critical
>  Labels: stability
> Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, 
> taskmanager_error.txt
>
>
> I'm testing Autoscaler through Kubernetes Operator and I'm facing the 
> following issue.
> As you know, when a job is scaled down through the autoscaler, the job 
> manager and task manager go down and then back up again.
> When this happens, an index out of bounds exception is thrown and the state 
> is not restored from a checkpoint.
> [~gyfora] told me via the Flink Slack troubleshooting channel that this is 
> likely an issue with Unaligned Checkpoint and not an issue with the 
> autoscaler, but I'm opening a ticket with Gyula for more clarification.
> Please see the attached JM and TM error logs.
> Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints

2023-05-08 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720596#comment-17720596
 ] 

Stefan Richter commented on FLINK-31963:


Hi, just to clarify: when you say a checkpoint that fails once fails always - 
does this only apply for restore with rescaling or can you also not recover 
from the CP when the parallelism remains unchanged? If it only happens with 
rescaling, can you at least recover for some parallelism values or for no 
change at all?

> java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned 
> checkpoints
> -
>
> Key: FLINK-31963
> URL: https://issues.apache.org/jira/browse/FLINK-31963
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0
> Environment: Flink: 1.17.0
> FKO: 1.4.0
> StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint 
> enabled)
>Reporter: Tan Kim
>Priority: Critical
>  Labels: stability
> Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, 
> taskmanager_error.txt
>
>
> I'm testing Autoscaler through Kubernetes Operator and I'm facing the 
> following issue.
> As you know, when a job is scaled down through the autoscaler, the job 
> manager and task manager go down and then back up again.
> When this happens, an index out of bounds exception is thrown and the state 
> is not restored from a checkpoint.
> [~gyfora] told me via the Flink Slack troubleshooting channel that this is 
> likely an issue with Unaligned Checkpoint and not an issue with the 
> autoscaler, but I'm opening a ticket with Gyula for more clarification.
> Please see the attached JM and TM error logs.
> Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-13325) Add test case for FLINK-13249

2019-07-18 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-13325:
--

 Summary: Add test case for FLINK-13249
 Key: FLINK-13325
 URL: https://issues.apache.org/jira/browse/FLINK-13325
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Affects Versions: 1.9.0
Reporter: Stefan Richter
Assignee: Stefan Richter






--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Closed] (FLINK-13249) Distributed Jepsen test fails with blocked TaskExecutor

2019-07-18 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-13249.
--
Resolution: Fixed

Merged in:
master: 23bd23b325
release-1.9: 3eff6387b5

> Distributed Jepsen test fails with blocked TaskExecutor
> ---
>
> Key: FLINK-13249
> URL: https://issues.apache.org/jira/browse/FLINK-13249
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.9.0
>
> Attachments: jstack_25661_YarnTaskExecutorRunner
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The distributed Jepsen test which kills {{JobMasters}} started to fail 
> recently. From a first glance, it looks as if the {{TaskExecutor's}} main 
> thread is blocked by some operation. Further investigation is required.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Closed] (FLINK-13256) Periodical checkpointing is stopped after failovers

2019-07-18 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-13256.
--
Resolution: Fixed

Merged in:
master: 1ec34249a0
release-1.9: b7bfafca14

> Periodical checkpointing is stopped after failovers
> ---
>
> Key: FLINK-13256
> URL: https://issues.apache.org/jira/browse/FLINK-13256
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.0
>Reporter: Zhu Zhu
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: 15_15_20__07_15_2019.jpg, jm_no_cp_after_failover.log
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In this case, we observed that the job initially is triggering periodical 
> checkpoints as expected.
> But after 2 region failovers, no checkpoint is triggered any more, even after 
> all the tasks are RUNNING again.
> A sample log is attached along with the related topology desc pic.
> This case may not be reproduced every time.
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13249) Distributed Jepsen test fails with blocked TaskExecutor

2019-07-16 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885981#comment-16885981
 ] 

Stefan Richter commented on FLINK-13249:


My tendency is also more towards having a well-specified executor, in 
particular as the code is not just some stateless request.

I would have another suggestion that might get around the problem, so how about 
changing:
{code}
void requestPartitionProducerState(
IntermediateDataSetID intermediateDataSetId,
ResultPartitionID resultPartitionId,
Consumer responseConsumer);
{code}

and then do in `Task`
{code}
futurePartitionState.whenCompleteAsync(
(ExecutionState executionState, Throwable throwable) -> 
{
responseConsumer.accept(new 
PartitionProducerStateResponseHandle(executionState, throwable));
},
executor);
{code}

This would keep the control where execution happens in the task and we don't 
need to pass down the executor. Wdyt?


> Distributed Jepsen test fails with blocked TaskExecutor
> ---
>
> Key: FLINK-13249
> URL: https://issues.apache.org/jira/browse/FLINK-13249
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.9.0
>
> Attachments: jstack_25661_YarnTaskExecutorRunner
>
>
> The distributed Jepsen test which kills {{JobMasters}} started to fail 
> recently. From a first glance, it looks as if the {{TaskExecutor's}} main 
> thread is blocked by some operation. Further investigation is required.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13249) Distributed Jepsen test fails with blocked TaskExecutor

2019-07-15 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885059#comment-16885059
 ] 

Stefan Richter commented on FLINK-13249:


[~till.rohrmann] alright, I think then we have a consensus on how to approach 
this. I think the last remaining question is whether or not we actually require 
to pass in the task's executor or simply use the common pool. Option one 
obviously has the advantage that it integrates with how execution is supposed 
to happen and with with the task lifecycle, disadvantage is that it seems like 
we would need to pass down the executor from quiet far way. Option 2 would be 
simpler if we agree that there is no requirement that this has to run through 
the particular executor.

> Distributed Jepsen test fails with blocked TaskExecutor
> ---
>
> Key: FLINK-13249
> URL: https://issues.apache.org/jira/browse/FLINK-13249
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.9.0
>
> Attachments: jstack_25661_YarnTaskExecutorRunner
>
>
> The distributed Jepsen test which kills {{JobMasters}} started to fail 
> recently. From a first glance, it looks as if the {{TaskExecutor's}} main 
> thread is blocked by some operation. Further investigation is required.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13249) Distributed Jepsen test fails with blocked TaskExecutor

2019-07-15 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885002#comment-16885002
 ] 

Stefan Richter commented on FLINK-13249:


[~gaoyunhaii] That is also how I understood your comment, just wanted to 
clarify that we agree the approach should be to change the 
`thenAccept({routine2})` to `thenAcceptAsync({routine2}, taskExecutor)` to be 
on the safe side and guarantee it never runs from the netty thread. The biggest 
downside would be that we have to pass down the executor into the gate.

> Distributed Jepsen test fails with blocked TaskExecutor
> ---
>
> Key: FLINK-13249
> URL: https://issues.apache.org/jira/browse/FLINK-13249
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.9.0
>
> Attachments: jstack_25661_YarnTaskExecutorRunner
>
>
> The distributed Jepsen test which kills {{JobMasters}} started to fail 
> recently. From a first glance, it looks as if the {{TaskExecutor's}} main 
> thread is blocked by some operation. Further investigation is required.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13249) Distributed Jepsen test fails with blocked TaskExecutor

2019-07-15 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16884960#comment-16884960
 ] 

Stefan Richter commented on FLINK-13249:


[~gaoyunhaii] thanks for the input, I think what you wrote makes sense. So the 
point that we would have to address is ensuring that the callback is not run by 
the netty IO thread, because they are never allowed to run blocking op?

> Distributed Jepsen test fails with blocked TaskExecutor
> ---
>
> Key: FLINK-13249
> URL: https://issues.apache.org/jira/browse/FLINK-13249
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.9.0
>
> Attachments: jstack_25661_YarnTaskExecutorRunner
>
>
> The distributed Jepsen test which kills {{JobMasters}} started to fail 
> recently. From a first glance, it looks as if the {{TaskExecutor's}} main 
> thread is blocked by some operation. Further investigation is required.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-12958) Integrate AsyncWaitOperator with mailbox

2019-07-12 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-12958:
---
Labels:   (was: pull-request-available)

> Integrate AsyncWaitOperator with mailbox
> 
>
> Key: FLINK-12958
> URL: https://issues.apache.org/jira/browse/FLINK-12958
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: Stefan Richter
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13248) Enhance mailbox executor with yield-to-downstream functionality

2019-07-12 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-13248:
--

 Summary: Enhance mailbox executor with yield-to-downstream 
functionality
 Key: FLINK-13248
 URL: https://issues.apache.org/jira/browse/FLINK-13248
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Stefan Richter
 Fix For: 1.10.0






--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (FLINK-12958) Integrate AsyncWaitOperator with mailbox

2019-07-12 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-12958:
--

Assignee: (was: Stefan Richter)

> Integrate AsyncWaitOperator with mailbox
> 
>
> Key: FLINK-12958
> URL: https://issues.apache.org/jira/browse/FLINK-12958
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Closed] (FLINK-12804) Introduce mailbox-based ExecutorService

2019-07-12 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-12804.
--
Resolution: Implemented

Merged in:
master: 6cf98b671d

> Introduce mailbox-based ExecutorService
> ---
>
> Key: FLINK-12804
> URL: https://issues.apache.org/jira/browse/FLINK-12804
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-12804) Introduce mailbox-based ExecutorService

2019-07-12 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-12804:
---
Fix Version/s: (was: 1.9.0)
   1.10.0

> Introduce mailbox-based ExecutorService
> ---
>
> Key: FLINK-12804
> URL: https://issues.apache.org/jira/browse/FLINK-12804
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently

2019-07-10 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881825#comment-16881825
 ] 

Stefan Richter commented on FLINK-13072:


I think it is surely not a bad idea to improve or clarify the documentation if 
it helps to prevent confusion for users.

> RocksDBStateBachend is not thread safe and data loss silently
> -
>
> Key: FLINK-13072
> URL: https://issues.apache.org/jira/browse/FLINK-13072
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.0, 1.8.1
>Reporter: lynn1.zhang
>Priority: Blocker
> Attachments: flink-demo.zip, image-2019-07-03-17-04-17-253.png
>
>
> I create 2 mapstates in one operator, then create 2 threads in apply method, 
> each thread operate each map state(the operator is same), the expect result 
> is that 2 state have the same result but not. I upload the code, please help 
> to try it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-13063) AsyncWaitOperator shouldn't be releasing checkpointingLock

2019-07-09 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-13063.
--
  Resolution: Fixed
Release Note: This changes the default chaining behavior of the 
AsyncWaitOperator. By default, we now break chains so that the 
AsyncWaitOperator is never chained after another operator.

Merged in:
master: c773ce5

> AsyncWaitOperator shouldn't be releasing checkpointingLock
> --
>
> Key: FLINK-13063
> URL: https://issues.apache.org/jira/browse/FLINK-13063
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.6.4, 1.7.2, 1.8.1, 1.9.0
>Reporter: Piotr Nowojski
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> 1.
> For the following setup of chained operators:
> {noformat}
> SourceOperator -> FlatMap -> AsyncOperator{noformat}
> Lets assume that input buffer of {{AsyncOperator}} is full. We start 
> processing a record from the {{SourceOperator}}, we pass it to the 
> {{FlatMap}}, which fan it out (multiplies it 10 times). First multiplied 
> record reaches {{AsyncOperator}} and is special treated (stored in 
> {{AsyncWaitOperator#pendingStreamElementQueueEntry}} ) and then 
> {{AsyncWaitOperator}} waits (and releases) on the checkpoint lock (in 
> {{AsyncWaitOperator#addAsyncBufferEntry}} . If a checkpoint is triggered now, 
> both {{SourceOperator}} and {{FlatMap}} will be checkpointed assumed that all 
> of those 10 multiplied records were processed, which is not true. Only the 
> first one is checkpointed by the {{AsyncWatiOperator}}. Remaining 9 are not. 
> So if we ever restore state from this checkpoint, we have lost those 9 
> records.
> 2.
> Similar issue (I think previously known) can happen if for example some 
> upstream operator to the {{AsyncOperator}} fires a processing time timer, 
> that emits some data. But in that case, 
> {{AsyncWaitOperator#pendingStreamElementQueueEntry}} is being overwritten.
> 3.
> If upstream operator has the following pseudo code:
> {code:java}
> stateA = true
> output.collect(x)
> stateB = true{code}
> one would assume that stateA and stateB access/writes will be atomic from the 
> perspective of the checkpoints. But again, because {{AsyncWaitOperator}} 
> releases the checkpoint lock, they will not be.
> CC [~aljoscha] [~StephanEwen] [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently

2019-07-09 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881149#comment-16881149
 ] 

Stefan Richter commented on FLINK-13072:


[~zicat] After reading my previous comment, do you think we can close this 
issue?

> RocksDBStateBachend is not thread safe and data loss silently
> -
>
> Key: FLINK-13072
> URL: https://issues.apache.org/jira/browse/FLINK-13072
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.0, 1.8.1
>Reporter: lynn1.zhang
>Priority: Blocker
> Attachments: flink-demo.zip, image-2019-07-03-17-04-17-253.png
>
>
> I create 2 mapstates in one operator, then create 2 threads in apply method, 
> each thread operate each map state(the operator is same), the expect result 
> is that 2 state have the same result but not. I upload the code, please help 
> to try it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12730) Combine BitSet implementations in flink-runtime

2019-07-09 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-12730.
--
Resolution: Implemented

Merged in:
master: 14c4b23 

> Combine BitSet implementations in flink-runtime
> ---
>
> Key: FLINK-12730
> URL: https://issues.apache.org/jira/browse/FLINK-12730
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> There are two implementations for BitSet in flink-runtime ocmponent: one is 
> org.apache.flink.runtime.operators.util.BloomFilter#BitSet, while the other 
> is org.apache.flink.runtime.operators.util.BitSet
> The two classes are quite similar in their API and implementations. The only 
> difference is that, the former is based based on long operation while the 
> latter is based on byte operation. This has the following consequence:
>  # The byte based BitSet has better performance for get/set operations.
>  # The long based BitSet has better performance for the clear operation.
> We combine the two implementations and make the best of both worlds.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12693) Store state per key-group in CopyOnWriteStateTable

2019-07-09 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-12693.
--
Resolution: Implemented

Merged in:
master: 8f47b38

> Store state per key-group in CopyOnWriteStateTable
> --
>
> Key: FLINK-12693
> URL: https://issues.apache.org/jira/browse/FLINK-12693
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Yu Li
>Assignee: PengFei Li
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Since we propose to use KeyGroup as the unit of spilling/loading, the first 
> step is to store state per key-groups. Currently {{NestedMapsStateTable}} 
> natively supports this, so we only need to refine {{CopyOnWriteStateTable}}
> The main efforts required here is to extract the customized hash-map out of 
> {{CopyOnWriteStateTable}} then use such a hash-map as the state holder for 
> each KeyGroup. Whereafter we could extract some common logic out into 
> {{StateTable}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-13063) AsyncWaitOperator shouldn't be releasing checkpointingLock

2019-07-05 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-13063:
--

Assignee: Stefan Richter  (was: Piotr Nowojski)

> AsyncWaitOperator shouldn't be releasing checkpointingLock
> --
>
> Key: FLINK-13063
> URL: https://issues.apache.org/jira/browse/FLINK-13063
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.6.4, 1.7.2, 1.8.1, 1.9.0
>Reporter: Piotr Nowojski
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.9.0
>
>
> 1.
> For the following setup of chained operators:
> {noformat}
> SourceOperator -> FlatMap -> AsyncOperator{noformat}
> Lets assume that input buffer of {{AsyncOperator}} is full. We start 
> processing a record from the {{SourceOperator}}, we pass it to the 
> {{FlatMap}}, which fan it out (multiplies it 10 times). First multiplied 
> record reaches {{AsyncOperator}} and is special treated (stored in 
> {{AsyncWaitOperator#pendingStreamElementQueueEntry}} ) and then 
> {{AsyncWaitOperator}} waits (and releases) on the checkpoint lock (in 
> {{AsyncWaitOperator#addAsyncBufferEntry}} . If a checkpoint is triggered now, 
> both {{SourceOperator}} and {{FlatMap}} will be checkpointed assumed that all 
> of those 10 multiplied records were processed, which is not true. Only the 
> first one is checkpointed by the {{AsyncWatiOperator}}. Remaining 9 are not. 
> So if we ever restore state from this checkpoint, we have lost those 9 
> records.
> 2.
> Similar issue (I think previously known) can happen if for example some 
> upstream operator to the {{AsyncOperator}} fires a processing time timer, 
> that emits some data. But in that case, 
> {{AsyncWaitOperator#pendingStreamElementQueueEntry}} is being overwritten.
> 3.
> If upstream operator has the following pseudo code:
> {code:java}
> stateA = true
> output.collect(x)
> stateB = true{code}
> one would assume that stateA and stateB access/writes will be atomic from the 
> perspective of the checkpoints. But again, because {{AsyncWaitOperator}} 
> releases the checkpoint lock, they will not be.
> CC [~aljoscha] [~StephanEwen] [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently

2019-07-05 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16879099#comment-16879099
 ] 

Stefan Richter edited comment on FLINK-13072 at 7/5/19 9:21 AM:


[~zicat] All operators in Flink are assumed to be single-threaded and spawning 
up threads in any user function is something that is not supported by the 
framework.

The quote from the doc that you present is about the `StateBackend` classes, 
which are (misleadingly named) just the factory for the actual backend 
implementations like `RocksDBKeyedStateBackend`. So yes, the factory has to be 
thread safe, but the backends (i.e. subclasses of `AbstractKeyedStateBackend`) 
and other things are not. What you attempt will not work.


was (Author: srichter):
[~zicat] All operators in Flink are assumed to be single-threaded and spawning 
up threads in any user function is something that is not supported by the 
framework.

The quote from the doc that you present is about the `StateBackend` classes, 
which are (misleadingly named) just the factory for the actual backend 
implementations like `RocksDBKeyedStateBackend`. So yes, the factory has to be 
thread safe, but the backends and other things are not. What you attempt will 
not work.

> RocksDBStateBachend is not thread safe and data loss silently
> -
>
> Key: FLINK-13072
> URL: https://issues.apache.org/jira/browse/FLINK-13072
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.0, 1.8.1
>Reporter: lynn1.zhang
>Priority: Blocker
> Attachments: flink-demo.zip, image-2019-07-03-17-04-17-253.png
>
>
> I create 2 mapstates in one operator, then create 2 threads in apply method, 
> each thread operate each map state(the operator is same), the expect result 
> is that 2 state have the same result but not. I upload the code, please help 
> to try it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently

2019-07-05 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16879099#comment-16879099
 ] 

Stefan Richter commented on FLINK-13072:


[~zicat] All operators in Flink are assumed to be single-threaded and spawning 
up threads in any user function is something that is not supported by the 
framework.

The quote from the doc that you present is about the `StateBackend` classes, 
which are (misleadingly named) just the factory for the actual backend 
implementations like `RocksDBKeyedStateBackend`. So yes, the factory has to be 
thread safe, but the backends and other things are not. What you attempt will 
not work.

> RocksDBStateBachend is not thread safe and data loss silently
> -
>
> Key: FLINK-13072
> URL: https://issues.apache.org/jira/browse/FLINK-13072
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.0, 1.8.1
>Reporter: lynn1.zhang
>Priority: Blocker
> Attachments: flink-demo.zip, image-2019-07-03-17-04-17-253.png
>
>
> I create 2 mapstates in one operator, then create 2 threads in apply method, 
> each thread operate each map state(the operator is same), the expect result 
> is that 2 state have the same result but not. I upload the code, please help 
> to try it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7289) Memory allocation of RocksDB can be problematic in container environments

2019-06-27 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16874318#comment-16874318
 ] 

Stefan Richter commented on FLINK-7289:
---

[~mikekap] I think that is a very good suggestion and could help with the 
problem. [~carp84] wdyt?

> Memory allocation of RocksDB can be problematic in container environments
> -
>
> Key: FLINK-7289
> URL: https://issues.apache.org/jira/browse/FLINK-7289
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
>Reporter: Stefan Richter
>Priority: Major
>
> Flink's RocksDB based state backend allocates native memory. The amount of 
> allocated memory by RocksDB is not under the control of Flink or the JVM and 
> can (theoretically) grow without limits.
> In container environments, this can be problematic because the process can 
> exceed the memory budget of the container, and the process will get killed. 
> Currently, there is no other option than trusting RocksDB to be well behaved 
> and to follow its memory configurations. However, limiting RocksDB's memory 
> usage is not as easy as setting a single limit parameter. The memory limit is 
> determined by an interplay of several configuration parameters, which is 
> almost impossible to get right for users. Even worse, multiple RocksDB 
> instances can run inside the same process and make reasoning about the 
> configuration also dependent on the Flink job.
> Some information about the memory management in RocksDB can be found here:
> https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
> https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
> We should try to figure out ways to help users in one or more of the 
> following ways:
> - Some way to autotune or calculate the RocksDB configuration.
> - Conservative default values.
> - Additional documentation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11662) Discarded checkpoint can cause Tasks to fail

2019-06-27 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-11662.
--
   Resolution: Fixed
Fix Version/s: 1.9.0

Merged in:
master: b760d55

> Discarded checkpoint can cause Tasks to fail
> 
>
> Key: FLINK-11662
> URL: https://issues.apache.org/jira/browse/FLINK-11662
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.7.0, 1.8.0
>Reporter: madong
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: jobmanager.log, taskmanager.log
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Flink's {{CheckpointCoordinator}} discards an ongoing checkpoint as soon as 
> it receives the first decline message. Part of the discard operation is the 
> deletion of the checkpointing directory. Depending on the underlying 
> {{FileSystem}} implementation, concurrent write and read operation to files 
> in the checkpoint directory can then fail (e.g. this is the case with HDFS). 
> If there is still a local checkpointing operation running for some {{Task}} 
> and belonging to the discarded checkpoint, then it can happen that the 
> checkpointing operation fails (e.g. an {{AsyncCheckpointRunnable}}). 
> Depending on the configuration of the {{CheckpointExceptionHandler}}, this 
> can lead to a task failure and a job recovery which is caused by an already 
> discarded checkpoint.
> {code:java}
> 2019-02-16 11:26:29.378 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 1389046 @ 1550287589373 for job 599a6ac3c371874d12ebf024978cadbc.
> 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline 
> checkpoint 1389046 by task 7239e5d29203c4c720ed2db6f5db33fc of job 
> 599a6ac3c371874d12ebf024978cadbc.
> 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
> checkpoint 1389046 of job 599a6ac3c371874d12ebf024978cadbc.
> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
>  Task Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (3/3) was 
> not running
> at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 2019-02-16 11:26:29.697 [flink-akka.actor.default-dispatcher-68] INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaSource 
> -> mapOperate -> Timestamps/Watermarks (1/3) 
> (a5657b784d235731cd468164e85d0b50) switched from RUNNING to FAILED.
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
> java.lang.Exception: Could not materialize checkpoint 1389046 for operator 
> Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 1389046 for 
> operator Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 common frames omitted
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9
>  in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at 

[jira] [Created] (FLINK-12958) Integrate AsyncWaitOperator with mailbox

2019-06-24 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-12958:
--

 Summary: Integrate AsyncWaitOperator with mailbox
 Key: FLINK-12958
 URL: https://issues.apache.org/jira/browse/FLINK-12958
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Stefan Richter
Assignee: Stefan Richter






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12364) Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-19 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-12364.
--
Resolution: Implemented

Merged in:
master: 8c57e5ae

> Introduce a CheckpointFailureManager to centralized manage checkpoint failure
> -
>
> Key: FLINK-12364
> URL: https://issues.apache.org/jira/browse/FLINK-12364
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This issue tracks the work of T2 section about in design document : 
> [https://docs.google.com/document/d/1ce7RtecuTxcVUJlnU44hzcO2Dwq9g4Oyd8_biy94hJc/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12804) Introduce mailbox-based ExecutorService

2019-06-11 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-12804:
--

 Summary: Introduce mailbox-based ExecutorService
 Key: FLINK-12804
 URL: https://issues.apache.org/jira/browse/FLINK-12804
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-06-03 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16855013#comment-16855013
 ] 

Stefan Richter commented on FLINK-12070:


I have one more additional question: can you share the heap size of your JVM 
and also the ratio of JVM heap to total physical memory? Possible that the new 
implementation could perform better with different memory settings, i.e. only 
the required size for JVM heap and keeping more physical memory available for 
mmap - please note that mmap does not account for heap size, but for process 
memory.

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Stephan Ewen
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: image-2019-04-18-17-38-24-949.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8871) Checkpoint cancellation is not propagated to stop checkpointing threads on the task manager

2019-05-25 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848118#comment-16848118
 ] 

Stefan Richter edited comment on FLINK-8871 at 5/25/19 9:58 AM:


After this quiet intense exchange, I was thinking a bit how we can move forward 
with many of the topics related to checkpoint failure handling, cancelation, 
and cleanups. In particular, I think it would be good if we could try to 
prioritize the work and try to come up with an attempt to a "conflict free" 
schedule (as much as possible). This schedule should consider the work that has 
already been made by different people and the best order in which we should get 
them into Flink. Let me try to summarize some of the different threads in this 
area:

- FLINK-11662: Without a proper cancellation scheme of running checkpoints in 
tasks, the JM currently eagerly deletes the checkpoint directory when a pending 
checkpoint is cancelled (e.g. when one task reported a failure). Tasks are not 
aware when a checkpoint on which they work failed, so their attempt to finish 
their part of the checkpoint can fail with IOException (directory was deleted 
by JM) and under the current default exception handling of such failures (see 
{{ExecutionConfig.failTaskOnCheckpointError}}) this causes Task failures that 
will restart the job. A lot about solutions to this problem is actually 
contained in the discussions under FLINK-10930, so I wanted to recall this a 
little bit. It seems like the solution could come a bit from two ends, and in 
the end it might be good to also address both sides: A) avoid that checkpoint 
failures (in particular in the async part) can cause a task to fail. This 
should be the new behavior and the current default is problematic. B) Take care 
of a better deletion of checkpoint directories on the JM - this what some of 
the other issues below are about. *IMO, point A) is the most important issue 
among all that we have to address. - please read FLINK-10930 again, the 
comments from [~StephanEwen] have valuable information for this issue.*
- FLINK-8871: This issue, introducing a mechanism that makes task aware that a 
checkpoint failed and was cancelled by decision of the JM. This is part of B) 
from my previous point, will lead to cleaner behavior and avoid unnecessary 
work on the task side. FLINK-10966 looks like one subtask in the context of 
this larger effort.
- FLINK-10724, FLINK-12209, et. al.: Refactoring of failure reporting (unified 
places, message), intention is to have decisions about how to react to failures 
only in the JM, enables "fail job after n failed checkpoints" feature. From 
what I observed on the PR from [~yanghua], it might be that this would benefit 
from *solving FLINK-11662 first* as well, because otherwise we will not be able 
to have all control over checkpoint failures only in the JM, as failing task 
can fail the job outside of the failure manager's control.
- FLINK-10855: CheckpointCoordinator does not delete checkpoint directory of 
late or failed checkpoints under some circumstances explained in the issue. 
Depending on some details, I think this might benefit from the clear 
cancellation with FLINK-8871.

There are some duplicates for some of the issues, I tried to pick the most 
representatives. I also tried to prioritize them in order of importance ind 
critical path. [~yunta] and [~yanghua], can you discuss what work was already 
done in your teams and divide those tasks in a conflict free way? I think there 
should be enough work for everybody. I would suggest to first discuss among you 
and then make the plan public. If we have the competing solutions for some 
parts, we can discuss them in the community, pick the most suitable or maybe 
fuse the best ideas from both. Wdyt? Is my list somewhat complete or did I 
overlook something crucial?


was (Author: srichter):
After this quiet intense exchange, I was thinking a bit how we can move forward 
with many of the topics related to checkpoint failure handling, cancelation, 
and cleanups. In particular, I think it would be good if we could try to 
prioritize the work and try to come up with an attempt to a "conflict free" 
schedule (as much as possible). This schedule should consider the work that has 
already been made by different people and the best order in which we should get 
them into Flink. Let me try to summarize some of the different threads in this 
area:

- FLINK-11662: Without a proper cancellation scheme of running checkpoints in 
tasks, the JM currently eagerly deletes the checkpoint directory when a pending 
checkpoint is cancelled (e.g. when one task reported a failure). Tasks are not 
aware when a checkpoint on which they work failed, so their attempt to finish 
their part of the checkpoint can fail with IOException (directory was deleted 
by JM) and under the current default exception handling of such 

[jira] [Commented] (FLINK-8871) Checkpoint cancellation is not propagated to stop checkpointing threads on the task manager

2019-05-25 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848118#comment-16848118
 ] 

Stefan Richter commented on FLINK-8871:
---

After this quiet intense exchange, I was thinking a bit how we can move forward 
with many of the topics related to checkpoint failure handling, cancelation, 
and cleanups. In particular, I think it would be good if we could try to 
prioritize the work and try to come up with an attempt to a "conflict free" 
schedule (as much as possible). This schedule should consider the work that has 
already been made by different people and the best order in which we should get 
them into Flink. Let me try to summarize some of the different threads in this 
area:

- FLINK-11662: Without a proper cancellation scheme of running checkpoints in 
tasks, the JM currently eagerly deletes the checkpoint directory when a pending 
checkpoint is cancelled (e.g. when one task reported a failure). Tasks are not 
aware when a checkpoint on which they work failed, so their attempt to finish 
their part of the checkpoint can fail with IOException (directory was deleted 
by JM) and under the current default exception handling of such failures (see 
{{ExecutionConfig.failTaskOnCheckpointError}}) this causes Task failures that 
will restart the job. A lot about solutions to this problem is actually 
contained in the discussions under FLINK-10930, so I wanted to recall this a 
little bit. It seems like the solution could come a bit from two ends, and in 
the end it might be good to also address both sides: A) avoid that checkpoint 
failures (in particular in the async part) can cause a task to fail. This 
should be the new behavior and the current default is problematic. B) Take care 
of a better deletion of checkpoint directories on the JM - this what some of 
the other issues below are about. *IMO, point A) is the most important issue 
among all that we have to address. - please read FLINK-10930 again, the 
comments from [~StephanEwen] have valuable information for this issue.*
- FLINK-8871: This issue, introducing a mechanism that makes task aware that a 
checkpoint failed and was cancelled by decision of the JM. This is part of B) 
from my previous point, will lead to cleaner behavior and avoid unnecessary 
work on the task side. FLINK-10966 looks like one subtask in the context of 
this larger effort.
- FLINK-10724: Refactoring of failure reporting (unified places, message), 
intention is to have decisions about how to react to failures only in the JM, 
enables "fail job after n failed checkpoints" feature. From what I observed on 
the PR from [~yanghua], it might be that this would benefit from *solving 
FLINK-11662 first* as well, because otherwise we will not be able to have all 
control over checkpoint failures only in the JM, as failing task can fail the 
job outside of the failure manager's control.
- FLINK-10855: CheckpointCoordinator does not delete checkpoint directory of 
late or failed checkpoints under some circumstances explained in the issue. 
Depending on some details, I think this might benefit from the clear 
cancellation with FLINK-8871.

There are some duplicates for some of the issues, I tried to pick the most 
representatives. I also tried to prioritize them in order of importance ind 
critical path. [~yunta] and [~yanghua], can you discuss what work was already 
done in your teams and divide those tasks in a conflict free way? I think there 
should be enough work for everybody. I would suggest to first discuss among you 
and then make the plan public. If we have the competing solutions for some 
parts, we can discuss them in the community, pick the most suitable or maybe 
fuse the best ideas from both. Wdyt? Is my list somewhat complete or did I 
overlook something crucial?

> Checkpoint cancellation is not propagated to stop checkpointing threads on 
> the task manager
> ---
>
> Key: FLINK-8871
> URL: https://issues.apache.org/jira/browse/FLINK-8871
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.3.2, 1.4.1, 1.5.0, 1.6.0, 1.7.0
>Reporter: Stefan Richter
>Priority: Critical
>
> Flink currently lacks any form of feedback mechanism from the job manager / 
> checkpoint coordinator to the tasks when it comes to failing a checkpoint. 
> This means that running snapshots on the tasks are also not stopped even if 
> their owning checkpoint is already cancelled. Two examples for cases where 
> this applies are checkpoint timeouts and local checkpoint failures on a task 
> together with a configuration that does not fail tasks on checkpoint failure. 
> Notice that those running snapshots do no longer account for the maximum 
> number of parallel checkpoints, 

[jira] [Assigned] (FLINK-12482) Make checkpoint trigger/notifyComplete run via the mailbox queue

2019-05-23 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-12482:
--

Assignee: Stefan Richter  (was: vinoyang)

> Make checkpoint trigger/notifyComplete run via the mailbox queue
> 
>
> Key: FLINK-12482
> URL: https://issues.apache.org/jira/browse/FLINK-12482
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Operators
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> For the stream source, we also need to enqueue checkpoint related signals 
> (trigger, notifyComplete) to the mailbox now so that they run in the stream 
> task's main-thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12483) Support (legacy) SourceFunctions with mailbox

2019-05-23 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-12483.
--
   Resolution: Implemented
Fix Version/s: 1.9.0

Merged in:
master ead91396

> Support (legacy) SourceFunctions with mailbox
> -
>
> Key: FLINK-12483
> URL: https://issues.apache.org/jira/browse/FLINK-12483
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Operators
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> We need to modify the current source stream task to run sources in a separate 
> thread that is mutually exclusive to the mailbox mode thread. See section 4 
> in 
> https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12480) Introduce mailbox to StreamTask main-loop

2019-05-23 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-12480.
--
Resolution: Implemented

Merged in:
master 022f6cce

> Introduce mailbox to StreamTask main-loop
> -
>
> Key: FLINK-12480
> URL: https://issues.apache.org/jira/browse/FLINK-12480
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Operators
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In this sub-task, we introduce the mailbox data-structure to the 
> {{StreamTask}}'s main-loop that runs the step-wise actions. After this 
> sub-task, the mailbox events should already be respected, but the loop will 
> currently still always find that the mailbox is empty because it is not yet 
> integrated with the events that we will include in the next sub-tasks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12478) Decompose monolithic run-loops in StreamTask implementations into step-wise calls

2019-05-23 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-12478.
--
Resolution: Implemented

Merged in:
master 022f6cce

> Decompose monolithic run-loops in StreamTask implementations into step-wise 
> calls
> -
>
> Key: FLINK-12478
> URL: https://issues.apache.org/jira/browse/FLINK-12478
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Operators
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> As a first step to introduce a mailbox model, we need to split up the 
> monolithic loops in stream task implementations into stepwise actions. The 
> step-wise actions are still run in a loop, but give us the opportunity to 
> check the mailbox between each step in the future. This mailbox will be added 
> to the loop in a later subtasks.
> We also exclude breaking the stream sources into steps because this is not 
> possible before FLIP-27 is implemented and we need to be compatible to the 
> current source function interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12591) StatsDReporterTest is unstable

2019-05-22 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-12591:
--

 Summary: StatsDReporterTest is unstable
 Key: FLINK-12591
 URL: https://issues.apache.org/jira/browse/FLINK-12591
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.9.0
Reporter: Stefan Richter


Problem can be reproduced by looping the test locally in the IDE. See also

https://api.travis-ci.org/v3/job/535675153/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10855) CheckpointCoordinator does not delete checkpoint directory of late/failed checkpoints

2019-05-21 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16844829#comment-16844829
 ] 

Stefan Richter commented on FLINK-10855:


[~yanghua] Just a heads-up that you pinged the wrong Stefan Richter in your 
question, that is maybe why you got a confusing answer.

> CheckpointCoordinator does not delete checkpoint directory of late/failed 
> checkpoints
> -
>
> Key: FLINK-10855
> URL: https://issues.apache.org/jira/browse/FLINK-10855
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>
> In case that an acknowledge checkpoint message is late or a checkpoint cannot 
> be acknowledged, we discard the subtask state in the 
> {{CheckpointCoordinator}}. What's not happening in this case is that we 
> delete the parent directory of the checkpoint. This only happens when we 
> dispose a {{PendingCheckpoint#dispose}}. 
> Due to this behaviour it can happen that a checkpoint fails (e.g. a task not 
> being ready) and we delete the checkpoint directory. Next another task writes 
> its checkpoint data to the checkpoint directory (thereby creating it again) 
> and sending an acknowledge message back to the {{CheckpointCoordinator}}. The 
> {{CheckpointCoordinator}} will realize that there is no longer a 
> {{PendingCheckpoint}} and will discard the sub task state. This will remove 
> the state files from the checkpoint directory but will leave the checkpoint 
> directory untouched.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-6755) Allow triggering Checkpoints through command line client

2019-05-17 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842210#comment-16842210
 ] 

Stefan Richter edited comment on FLINK-6755 at 5/17/19 2:30 PM:


IMO, this somehow leads to the more general question about the future 
directions of checkpoints and savepoints and their distinguising features. At 
this point, on a very general level, I agree that there is a valid use case for 
this request. With a closer look, I think that it also leaves some open 
questions, e.g. how do the manually triggeres checkpoints interact with the 
automatic ones (reset timer?). It also sounds like the target use case would 
like to have a similar function like stop-with-savepoint, a 
stop-with-checkpoint basically. We would also have to solve the question of 
general side effects.


was (Author: srichter):
IMO, this somehow leads to the more general question about the future 
directions of checkpoints and savepoints and their distinguising features. At 
this point, on a very general level, I agree that there is a valid use case for 
this request. With a closer look, I think that it also leaves some open 
questions, e.g. how do the manually triggeres checkpoints interact with the 
automatic ones (reset timer?). It also sounds like the target use case would 
like to have a similar function like stop-with-savepoint, a 
stop-with-checkpoint basically. We would also have to solve the question of 
general side effects: if we introduce triggered checkpoints, do they commit 
side effects (like checkpoints) or not (like savepoints, except if used in the 
context of stop).

Thinking a bit about unification of concepts, so far the de-facto differences 
that evolved are i) ownership for delete and ii) what happens to side-effects. 
We can have the discussion and it probably should start by anwering those 
questions for this idea.

> Allow triggering Checkpoints through command line client
> 
>
> Key: FLINK-6755
> URL: https://issues.apache.org/jira/browse/FLINK-6755
> Project: Flink
>  Issue Type: New Feature
>  Components: Command Line Client, Runtime / Checkpointing
>Affects Versions: 1.3.0
>Reporter: Gyula Fora
>Assignee: vinoyang
>Priority: Major
>
> The command line client currently only allows triggering (and canceling with) 
> Savepoints. 
> While this is good if we want to fork or modify the pipelines in a 
> non-checkpoint compatible way, now with incremental checkpoints this becomes 
> wasteful for simple job restarts/pipeline updates. 
> I suggest we add a new command: 
> ./bin/flink checkpoint  [checkpointDirectory]
> and a new flag -c for the cancel command to indicate we want to trigger a 
> checkpoint:
> ./bin/flink cancel -c [targetDirectory] 
> Otherwise this can work similar to the current savepoint taking logic, we 
> could probably even piggyback on the current messages by adding boolean flag 
> indicating whether it should be a savepoint or a checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-6755) Allow triggering Checkpoints through command line client

2019-05-17 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842210#comment-16842210
 ] 

Stefan Richter edited comment on FLINK-6755 at 5/17/19 2:24 PM:


IMO, this somehow leads to the more general question about the future 
directions of checkpoints and savepoints and their distinguising features. It 
this point, on a very general level, I agree that there is a valid use case for 
this request. With a closer look, I think that it also leaves some open 
questions, e.g. how do the manually triggeres checkpoints interact with the 
automatic ones (reset timer?). It also sounds like the target use case would 
like to have a similar function like stop-with-savepoint, a 
stop-with-checkpoint basically. We would also have to solve the question of 
general side effects: if we introduce triggered checkpoints, do they commit 
side effects (like checkpoints) or not (like savepoints, except if used in the 
context of stop).

Thinking a bit about unification of concepts, so far the de-facto differences 
that evolved are i) ownership for delete and ii) what happens to side-effects. 
We can have the discussion and it probably should start by anwering those 
questions for this idea.


was (Author: srichter):
IMO, this somehow leads to the more general question about the future 
directions of checkpoints and savepoints and their distinguising features. It 
this point, on a very general level, I agree that there is a valid use case for 
this request. With a closer look, I think that it also leaves some open 
questions, e.g. how do the manually triggeres checkpoints interact with the 
automatic ones (reset timer?). It also sounds like the target use case would 
like to have a similar function like stop-with-savepoint, a 
stop-with-checkpoint basically. We would also have to solve the question of 
general side effects: if we introduce triggered checkpoints, do the commit side 
effects (like checkpoints) or not (like savepoints, except if used in the 
context of stop).

Thinking a bit about unification of concepts, so far the de-facto differences 
that evolved are i) ownership for delete and ii) what happens to side-effects. 
We can have the discussion and it probably should start by anwering those 
questions for this idea.

> Allow triggering Checkpoints through command line client
> 
>
> Key: FLINK-6755
> URL: https://issues.apache.org/jira/browse/FLINK-6755
> Project: Flink
>  Issue Type: New Feature
>  Components: Command Line Client, Runtime / Checkpointing
>Affects Versions: 1.3.0
>Reporter: Gyula Fora
>Assignee: vinoyang
>Priority: Major
>
> The command line client currently only allows triggering (and canceling with) 
> Savepoints. 
> While this is good if we want to fork or modify the pipelines in a 
> non-checkpoint compatible way, now with incremental checkpoints this becomes 
> wasteful for simple job restarts/pipeline updates. 
> I suggest we add a new command: 
> ./bin/flink checkpoint  [checkpointDirectory]
> and a new flag -c for the cancel command to indicate we want to trigger a 
> checkpoint:
> ./bin/flink cancel -c [targetDirectory] 
> Otherwise this can work similar to the current savepoint taking logic, we 
> could probably even piggyback on the current messages by adding boolean flag 
> indicating whether it should be a savepoint or a checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   5   6   7   8   9   10   >