[jira] [Comment Edited] (FLINK-27554) The asf-site does not build on Apple Silicon
[ https://issues.apache.org/jira/browse/FLINK-27554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536970#comment-17536970 ] Xintong Song edited comment on FLINK-27554 at 5/14/22 5:42 AM: --- I've made a little progress on this. I ran into the same error with the `docker-build.sh`. Inspired by this [comment|https://github.com/rubyjs/libv8/issues/315#issuecomment-846475866], I tried with the `amd64` image, and it worked! The full command is: {code:java} docker run --rm --volume="$PWD:/srv/flink-web" --expose=4000 -p 4000:4000 -it --platform linux/amd64 ruby:2.6.0 bash -c "cd /srv/flink-web && gem install bundler && ./build.sh $@"{code} Unfortunately, while this works for `./docker-build.sh` (the generated HTML files look good), I ran into another error with `./docker-build.sh -p`. {code:java} Configuration file: /srv/flink-web/_config.yml Source: /srv/flink-web Destination: /srv/flink-web/content Incremental build: disabled. Enable with --incremental Generating... done in 85.06 seconds. bundler: failed to load command: jekyll (/srv/flink-web/.rubydeps/ruby/2.6.0/bin/jekyll) Traceback (most recent call last): 40: from /usr/local/bundle/bin/bundle:23:in `' 39: from /usr/local/bundle/bin/bundle:23:in `load' 38: from /usr/local/bundle/gems/bundler-2.3.13/exe/bundle:36:in `' 37: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/friendly_errors.rb:103:in `with_friendly_errors' 36: from /usr/local/bundle/gems/bundler-2.3.13/exe/bundle:48:in `block in ' 35: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/cli.rb:25:in `start' 34: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/vendor/thor/lib/thor/base.rb:485:in `start' 33: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/cli.rb:31:in `dispatch' 32: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/vendor/thor/lib/thor.rb:392:in `dispatch' 31: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/vendor/thor/lib/thor/invocation.rb:127:in `invoke_command' 30: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/vendor/thor/lib/thor/command.rb:27:in `run' 29: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/cli.rb:483:in `exec' 28: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/cli/exec.rb:23:in `run' 27: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/cli/exec.rb:58:in `kernel_load' 26: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/cli/exec.rb:58:in `load' 25: from /srv/flink-web/.rubydeps/ruby/2.6.0/bin/jekyll:23:in `' 24: from /srv/flink-web/.rubydeps/ruby/2.6.0/bin/jekyll:23:in `load' 23: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/jekyll-3.0.5/bin/jekyll:17:in `' 22: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/mercenary-0.3.6/lib/mercenary.rb:19:in `program' 21: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/mercenary-0.3.6/lib/mercenary/program.rb:42:in `go' 20: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/mercenary-0.3.6/lib/mercenary/command.rb:220:in `execute' 19: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/mercenary-0.3.6/lib/mercenary/command.rb:220:in `each' 18: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/mercenary-0.3.6/lib/mercenary/command.rb:220:in `block in execute' 17: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/jekyll-3.0.5/lib/jekyll/commands/serve.rb:26:in `block (2 levels) in init_with_program' 16: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/jekyll-3.0.5/lib/jekyll/commands/build.rb:39:in `process' 15: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/jekyll-3.0.5/lib/jekyll/commands/build.rb:72:in `watch' 14: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/jekyll-watch-1.5.1/lib/jekyll/watcher.rb:26:in `watch' 13: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/listen-3.7.1/lib/listen/listener.rb:92:in `start' 12: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/listen-3.7.1/lib/listen/fsm.rb:72:in `transition' 11: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/listen-3.7.1/lib/listen/fsm.rb:105:in `transition_with_callbacks!' 10: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/listen-3.7.1/lib/listen/fsm.rb:124:in `call' 9: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/listen-3.7.1/lib/listen/fsm.rb:124:in `instance_eval' 8: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/listen-3.7.1/lib/listen/listener.rb:71:in `block in ' 7: from /usr/local/lib/ruby/2.6.0/forwardable.rb:230:in `start' 6: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/listen-3.7.1/lib/listen/adapter/base.rb:66:in `start' 5: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/listen-3.7.1/lib/listen/adapter/base.rb:42:in `configure'
[jira] [Commented] (FLINK-27554) The asf-site does not build on Apple Silicon
[ https://issues.apache.org/jira/browse/FLINK-27554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536970#comment-17536970 ] Xintong Song commented on FLINK-27554: -- I've made a little progress on this. I ran into the same error with the `docker-build.sh`. Inspired by this [comment|https://github.com/rubyjs/libv8/issues/315#issuecomment-846475866], I tried with the `amd64` image, and it worked! The full command is: {code:java} docker run --rm --volume="$PWD:/srv/flink-web" --expose=4000 -p 4000:4000 -it --platform linux/amd64 ruby:2.6.0 bash -c "cd /srv/flink-web && gem install bundler && ./build.sh $@"{code} Unfortunately, while this works for `./docker-build.sh` (the generated HTML files look good), I ran into another error with `./docker-build.sh -p`. {code:java} Configuration file: /srv/flink-web/_config.yml Source: /srv/flink-web Destination: /srv/flink-web/content Incremental build: disabled. Enable with --incremental Generating... done in 85.06 seconds. bundler: failed to load command: jekyll (/srv/flink-web/.rubydeps/ruby/2.6.0/bin/jekyll) Traceback (most recent call last): 40: from /usr/local/bundle/bin/bundle:23:in `' 39: from /usr/local/bundle/bin/bundle:23:in `load' 38: from /usr/local/bundle/gems/bundler-2.3.13/exe/bundle:36:in `' 37: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/friendly_errors.rb:103:in `with_friendly_errors' 36: from /usr/local/bundle/gems/bundler-2.3.13/exe/bundle:48:in `block in ' 35: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/cli.rb:25:in `start' 34: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/vendor/thor/lib/thor/base.rb:485:in `start' 33: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/cli.rb:31:in `dispatch' 32: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/vendor/thor/lib/thor.rb:392:in `dispatch' 31: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/vendor/thor/lib/thor/invocation.rb:127:in `invoke_command' 30: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/vendor/thor/lib/thor/command.rb:27:in `run' 29: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/cli.rb:483:in `exec' 28: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/cli/exec.rb:23:in `run' 27: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/cli/exec.rb:58:in `kernel_load' 26: from /usr/local/bundle/gems/bundler-2.3.13/lib/bundler/cli/exec.rb:58:in `load' 25: from /srv/flink-web/.rubydeps/ruby/2.6.0/bin/jekyll:23:in `' 24: from /srv/flink-web/.rubydeps/ruby/2.6.0/bin/jekyll:23:in `load' 23: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/jekyll-3.0.5/bin/jekyll:17:in `' 22: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/mercenary-0.3.6/lib/mercenary.rb:19:in `program' 21: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/mercenary-0.3.6/lib/mercenary/program.rb:42:in `go' 20: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/mercenary-0.3.6/lib/mercenary/command.rb:220:in `execute' 19: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/mercenary-0.3.6/lib/mercenary/command.rb:220:in `each' 18: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/mercenary-0.3.6/lib/mercenary/command.rb:220:in `block in execute' 17: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/jekyll-3.0.5/lib/jekyll/commands/serve.rb:26:in `block (2 levels) in init_with_program' 16: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/jekyll-3.0.5/lib/jekyll/commands/build.rb:39:in `process' 15: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/jekyll-3.0.5/lib/jekyll/commands/build.rb:72:in `watch' 14: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/jekyll-watch-1.5.1/lib/jekyll/watcher.rb:26:in `watch' 13: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/listen-3.7.1/lib/listen/listener.rb:92:in `start' 12: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/listen-3.7.1/lib/listen/fsm.rb:72:in `transition' 11: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/listen-3.7.1/lib/listen/fsm.rb:105:in `transition_with_callbacks!' 10: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/listen-3.7.1/lib/listen/fsm.rb:124:in `call' 9: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/listen-3.7.1/lib/listen/fsm.rb:124:in `instance_eval' 8: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/listen-3.7.1/lib/listen/listener.rb:71:in `block in ' 7: from /usr/local/lib/ruby/2.6.0/forwardable.rb:230:in `start' 6: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/listen-3.7.1/lib/listen/adapter/base.rb:66:in `start' 5: from /srv/flink-web/.rubydeps/ruby/2.6.0/gems/listen-3.7.1/lib/listen/adapter/base.rb:42:in `configure' 4: from
[jira] [Updated] (FLINK-27611) ConcurrentModificationException during Flink-Pulsar checkpoint notification
[ https://issues.apache.org/jira/browse/FLINK-27611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Kania updated FLINK-27611: Component/s: Connectors / Pulsar > ConcurrentModificationException during Flink-Pulsar checkpoint notification > --- > > Key: FLINK-27611 > URL: https://issues.apache.org/jira/browse/FLINK-27611 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.15.0 >Reporter: Jason Kania >Priority: Major > > When attempting to run a job that was working in 1.12.7, but upgraded to > 1.15.0, the following exception is occurring outside of the control of my own > code: > > java.util.ConcurrentModificationException > at > java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239) > at > org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader.notifyCheckpointComplete(PulsarUnorderedSourceReader.java:129) > at > org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:511) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Thread.java:829) -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] Dengyu123 closed pull request #19718: [tests] Add annotation for override method
Dengyu123 closed pull request #19718: [tests] Add annotation for override method URL: https://github.com/apache/flink/pull/19718 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27611) ConcurrentModificationException during Flink-Pulsar checkpoint notification
[ https://issues.apache.org/jira/browse/FLINK-27611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Kania updated FLINK-27611: Issue Type: Bug (was: Improvement) > ConcurrentModificationException during Flink-Pulsar checkpoint notification > --- > > Key: FLINK-27611 > URL: https://issues.apache.org/jira/browse/FLINK-27611 > Project: Flink > Issue Type: Bug >Affects Versions: 1.15.0 >Reporter: Jason Kania >Priority: Major > > When attempting to run a job that was working in 1.12.7, but upgraded to > 1.15.0, the following exception is occurring outside of the control of my own > code: > > java.util.ConcurrentModificationException > at > java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239) > at > org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader.notifyCheckpointComplete(PulsarUnorderedSourceReader.java:129) > at > org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:511) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Thread.java:829) -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27611) ConcurrentModificationException during Flink-Pulsar checkpoint notification
Jason Kania created FLINK-27611: --- Summary: ConcurrentModificationException during Flink-Pulsar checkpoint notification Key: FLINK-27611 URL: https://issues.apache.org/jira/browse/FLINK-27611 Project: Flink Issue Type: Improvement Affects Versions: 1.15.0 Reporter: Jason Kania When attempting to run a job that was working in 1.12.7, but upgraded to 1.15.0, the following exception is occurring outside of the control of my own code: java.util.ConcurrentModificationException at java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208) at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244) at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239) at org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader.notifyCheckpointComplete(PulsarUnorderedSourceReader.java:129) at org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:511) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.base/java.lang.Thread.run(Thread.java:829) -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] tweise merged pull request #19719: [FLINK-27255] [flink-avro] flink-avro does not support ser/de of larg…
tweise merged PR #19719: URL: https://github.com/apache/flink/pull/19719 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tweise merged pull request #19720: [FLINK-27465] Handle conversion of negative long to timestamp in Avro…
tweise merged PR #19720: URL: https://github.com/apache/flink/pull/19720 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27465) AvroRowDeserializationSchema.convertToTimestamp fails with negative nano seconds
[ https://issues.apache.org/jira/browse/FLINK-27465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-27465: - Fix Version/s: 1.15.1 > AvroRowDeserializationSchema.convertToTimestamp fails with negative nano > seconds > > > Key: FLINK-27465 > URL: https://issues.apache.org/jira/browse/FLINK-27465 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.15.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.15.1 > > > The issue is exposed due to time zone dependency in > AvroRowDeSerializationSchemaTest. > > The root cause is that convertToTimestamp attempts to set negative value with > java.sql.Timestamp.setNanos -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27255) Flink-avro does not support serialization and deserialization of avro schema longer than 65535 characters
[ https://issues.apache.org/jira/browse/FLINK-27255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-27255: - Fix Version/s: 1.15.1 > Flink-avro does not support serialization and deserialization of avro schema > longer than 65535 characters > - > > Key: FLINK-27255 > URL: https://issues.apache.org/jira/browse/FLINK-27255 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.14.4 >Reporter: Haizhou Zhao >Assignee: Haizhou Zhao >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.14.5, 1.15.1 > > > The underlying serialization of avro schema uses string serialization method > of ObjectOutputStream.class, however, the default string serialization by > ObjectOutputStream.class does not support handling string of more than 66535 > characters (64kb). As a result, constructing flink operators that > input/output Avro Generic Record with huge schema is not possible. > > The purposed fix is two change the serialization and deserialization method > of these following classes so that huge string could also be handled. > > [GenericRecordAvroTypeInfo|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java#L107] > [SerializableAvroSchema|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L55] > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] tweise commented on pull request #19704: [FLINK-27465] Handle conversion of negative long to timestamp in Avro…
tweise commented on PR #19704: URL: https://github.com/apache/flink/pull/19704#issuecomment-1126633844 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #19722: [connectors][aws-base] Using entrySet for map iterate
flinkbot commented on PR #19722: URL: https://github.com/apache/flink/pull/19722#issuecomment-1126630673 ## CI report: * 8face87316f4af09594d64bfd14023f4da627414 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Dengyu123 opened a new pull request, #19722: [connectors][aws-base] Using entrySet for map iterate
Dengyu123 opened a new pull request, #19722: URL: https://github.com/apache/flink/pull/19722 This pr tries to replace the "keySet" with "entrySet" for iterate over . that case which both key and value are needed , it's more efficient to iterate the entrySet -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27600) Add basic logs for coping and deleting jars
[ https://issues.apache.org/jira/browse/FLINK-27600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27600: --- Labels: pull-request-available (was: ) > Add basic logs for coping and deleting jars > --- > > Key: FLINK-27600 > URL: https://issues.apache.org/jira/browse/FLINK-27600 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Xin Hao >Priority: Minor > Labels: pull-request-available > > Because we delete the jar after submitting the job, it's better to leave some > logs for the end-users. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #211: [FLINK-27600] Add minimal jar coping and deleting logs
Aitozi commented on code in PR #211: URL: https://github.com/apache/flink-kubernetes-operator/pull/211#discussion_r872917451 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/FileSystemBasedArtifactFetcher.java: ## @@ -39,6 +39,7 @@ public File fetch(String uri, File targetDir) throws Exception { FileSystem fileSystem = source.getFileSystem(); String fileName = source.getName(); File targetFile = new File(targetDir, fileName); +LOG.info("Coping file from {} to {}", source, targetFile); try (var inputStream = fileSystem.open(source)) { FileUtils.copyToFile(inputStream, targetFile); } Review Comment: I think the line below this already log this. It's in `debug` level. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #202: [FLINK-27483]Add session job config field
Aitozi commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872916175 ## docs/content/docs/operations/configuration.md: ## @@ -53,3 +53,12 @@ To learn more about metrics and logging configuration please refer to the dedica ## Operator Configuration Reference {{< generated/kubernetes_operator_config_configuration >}} + +## Job Specific Configuration Reference +Job specific configuration can be configured under `spec.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`. + +- For application clusters, `spec.flinkConfiguration` will be located in `FlinkDeployment` CustomResource. +- For session clusters, configuring `spec.flinkConfiguration` in parent `FlinkDeployment` will be applied to all session jobs within the session cluster. + - You can configure some additional job specific supplemental configuration through `spec.flinkConfiguration` in `FlinkSessionJob` CustomResource. + Those session job level configurations will override the parent session cluster's Flink configuration. Please note only the following configurations are considered to be valid configurations. +- `kubernetes.operator.user.artifacts.http.header` Review Comment: I think it's much clear now . Some other thoughts come to my mind that, Maybe we could extend the `ConfigOption` to mark the config belong to which component (flink/operator). It's out of this PR, we could do it later if necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field
FuyaoLi2017 commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872873594 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java: ## @@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment deployment) { return getConfig(deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment)); } +public Configuration getSessionJobObserveConfig( +FlinkDeployment deployment, FlinkSessionJob flinkSessionJob) { +Configuration sessionJobConfig = getObserveConfig(deployment); + +// merge session job specific config +Map sessionJobFlinkConfiguration = +flinkSessionJob.getSpec().getFlinkConfiguration(); +if (sessionJobFlinkConfiguration != null && !sessionJobFlinkConfiguration.isEmpty()) { +sessionJobFlinkConfiguration.forEach(sessionJobConfig::setString); Review Comment: I guess the logic would be more straight forward if this config in sessionJob directly overrides the parent config. If we do the merging work. Then a user will need to check two CRs instead of one to determine what are the actual headers that is applied during jar fetching during debugging. It could be a little bit confusing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field
FuyaoLi2017 commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872873594 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java: ## @@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment deployment) { return getConfig(deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment)); } +public Configuration getSessionJobObserveConfig( +FlinkDeployment deployment, FlinkSessionJob flinkSessionJob) { +Configuration sessionJobConfig = getObserveConfig(deployment); + +// merge session job specific config +Map sessionJobFlinkConfiguration = +flinkSessionJob.getSpec().getFlinkConfiguration(); +if (sessionJobFlinkConfiguration != null && !sessionJobFlinkConfiguration.isEmpty()) { +sessionJobFlinkConfiguration.forEach(sessionJobConfig::setString); Review Comment: I guess the logic would be more straight forward if this config in sessionJob directly overrides the parent config. If we do the merging work. Then a user will need to check two CRs instead of one to determine what are the actual headers that is applied during jar fetching during debugging. It could a little bit confusing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field
FuyaoLi2017 commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872863534 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/FileSystemBasedArtifactFetcher.java: ## @@ -33,7 +34,8 @@ public class FileSystemBasedArtifactFetcher implements ArtifactFetcher { new FileSystemBasedArtifactFetcher(); @Override -public File fetch(String uri, File targetDir) throws Exception { +public File fetch(String uri, Configuration flinkConfiguration, File targetDir) Review Comment: No, it is not used. I need to change this line since this class overrides the `artifactFetcher` interface. In the future, we might need this configuration? Not sure. Anyways, it doesn't hurt. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #19721: [hotfix][shell] Add stderr when 1 flink-dist*.jar cannot be resolved.
flinkbot commented on PR #19721: URL: https://github.com/apache/flink/pull/19721#issuecomment-1126523035 ## CI report: * 267285cd4cc39ebdc2846f8dfe3fe2a064a75b4c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rudikershaw commented on a diff in pull request #19721: [hotfix][shell] Add stderr when 1 flink-dist*.jar cannot be resolved.
rudikershaw commented on code in PR #19721: URL: https://github.com/apache/flink/pull/19721#discussion_r872850362 ## README.md: ## @@ -75,7 +75,7 @@ Prerequisites for building Flink: ``` git clone https://github.com/apache/flink.git cd flink -mvn clean package -DskipTests # this will take up to 10 minutes +./mvnw clean package -DskipTests # this will take up to 10 minutes Review Comment: Switched to recommend using the project provided Maven wrapper. Later versions of Maven installed on a developer's computer will fail since the use of insecure repositories was depreciated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rudikershaw opened a new pull request, #19721: [hotfix][shell] Add stderr when 1 flink-dist*.jar cannot be resolved.
rudikershaw opened a new pull request, #19721: URL: https://github.com/apache/flink/pull/19721 ## What is the purpose of the change When adding jars to the `/lib` directory any extra jar files that match the pattern `flink-dist*.jar` provoke an error when trying to use `BashJavaUtils` to get JVM parameters and dynamic configurations in `config.sh`. Although niche, this can be difficult to debug because there is no error message to describe the issue. We already print a useful error message if no `flink-dist*.jar` can be found at all. This pull request adds another error message when more than one `flink-dist*.jar` is found at this point. ## Brief change log - Fix minor typo in README.md - Add error message for when more than one file matching `flink-dist*.jar` exists in `/lib`. - Actioned a few minor shellcheck issues on the lines changed. ## Verifying this change This change can be verified as follows: - `./mvnw clean install -DskipTests -Dfast` - `cp flink-dist-scala/target/flink-dist-scala_2.12-1.16-SNAPSHOT.jar build-target/lib/` - `build-target/bin/start-cluster.sh` - You should see the error message "[ERROR] Multiple flink-dist*.jar found in /path/to/lib. Please resolve." - `rm build-target/lib/flink-dist-scala_2.12-1.16-SNAPSHOT.jar` - `build-target/bin/start-cluster.sh` - The cluster should start successfully. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): No - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: No - The serializers: No - The runtime per-record code paths (performance sensitive): No - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: No - The S3 file system connector: No ## Documentation - Does this pull request introduce a new feature? No - If yes, how is the feature documented? Not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager
gaborgsomogyi commented on PR #19372: URL: https://github.com/apache/flink/pull/19372#issuecomment-1126477747 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #209: [FLINK-27270] Add document of session job operations
gyfora merged PR #209: URL: https://github.com/apache/flink-kubernetes-operator/pull/209 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-27003) Operator Helm chart improvements
[ https://issues.apache.org/jira/browse/FLINK-27003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-27003. -- Resolution: Fixed > Operator Helm chart improvements > > > Key: FLINK-27003 > URL: https://issues.apache.org/jira/browse/FLINK-27003 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Major > Fix For: kubernetes-operator-1.0.0 > > > Umbrella ticket for helm related improvements for the next release -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-27270) Add document of session job operations
[ https://issues.apache.org/jira/browse/FLINK-27270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-27270. -- Resolution: Fixed merged to main 85fc32a8aa2293daf90d53c228d62552979f2710 > Add document of session job operations > -- > > Key: FLINK-27270 > URL: https://issues.apache.org/jira/browse/FLINK-27270 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Aitozi >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.0.0 > > > # Basic operations > # How to support different source jars -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #202: [FLINK-27483]Add session job config field
gyfora commented on PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#issuecomment-1126414287 @FuyaoLi2017 no worries, I have went through the code and added comments. Mostly minor stuff :) No need to worry about force pushing, this is your branch, it's necessary to force push PR branches to keep them clean. I also generally squash them together into 1-2 commits -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field
gyfora commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872744006 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java: ## @@ -141,4 +142,15 @@ public class KubernetesOperatorConfigOptions { .noDefaultValue() .withDescription( "Whether to enable recovery of missing/deleted jobmanager deployments. False by default for Flink 1.14, true for newer Flink version."); + +public static final ConfigOption> JAR_ARTIFACT_HTTP_HEADER = +ConfigOptions.key("kubernetes.operator.user.artifacts.http.header") +.mapType() +.noDefaultValue() +.withDescription( +"Custom HTTP header for a Flink job. If configured in cluster level, headers will be applied to all jobs within" Review Comment: I think the description should detail what the config is good for. Does it affect artifact downloading? It doesn't need to explain how configurations work, we have general docs for that. So I would completely remove this part: ``` If configured in cluster level, headers will be applied to all jobs within the cluster. This field can also be configured under spec.job.flinkConfiguration for a specific session job within a session cluster. If configured at session job level, it will override the cluster level configuration. ``` ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java: ## @@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment deployment) { return getConfig(deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment)); } +public Configuration getSessionJobObserveConfig( Review Comment: I think it's a bit confusing to call it `getSessionJobObserveConfig`. I would prefer to call it simply `getSessionJobConfig` ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java: ## @@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment deployment) { return getConfig(deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment)); } +public Configuration getSessionJobObserveConfig( +FlinkDeployment deployment, FlinkSessionJob flinkSessionJob) { +Configuration sessionJobConfig = getObserveConfig(deployment); + +// merge session job specific config +Map sessionJobFlinkConfiguration = +flinkSessionJob.getSpec().getFlinkConfiguration(); +if (sessionJobFlinkConfiguration != null && !sessionJobFlinkConfiguration.isEmpty()) { Review Comment: The `!sessionJobFlinkConfiguration.isEmpty()` check is not necessary ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java: ## @@ -41,6 +42,7 @@ public class FlinkOperatorConfiguration { Duration flinkCancelJobTimeout; Duration flinkShutdownClusterTimeout; String artifactsBaseDir; +Map artifactHttpHeader; Review Comment: We shouldn't add this extra field here, it's not an operator configuration and it's also not used anywhere ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/FileSystemBasedArtifactFetcher.java: ## @@ -33,7 +34,8 @@ public class FileSystemBasedArtifactFetcher implements ArtifactFetcher { new FileSystemBasedArtifactFetcher(); @Override -public File fetch(String uri, File targetDir) throws Exception { +public File fetch(String uri, Configuration flinkConfiguration, File targetDir) Review Comment: Why did you add the config field here? Is it used? ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java: ## @@ -404,4 +409,21 @@ private Optional validateServiceAccount(String serviceAccount) { } return Optional.empty(); } + +private Optional validateFlinkSessionJobConfig( +Map flinkSessionJobConfig) { +if (flinkSessionJobConfig == null) { +return Optional.empty(); +} + +for (String key : flinkSessionJobConfig.keySet()) { +if (!ALLOWED_FLINK_SESSION_JOB_CONF_KEYS.contains(key)) { Review Comment: Could we please add a simple test for this into the validator test? ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/HttpArtifactFetcher.java: ## @@ -32,12 +37,26 @@ public class HttpArtifactFetcher implements ArtifactFetcher { public static final HttpArtifactFetcher INSTANCE = new HttpArtifactFetcher(); @Override -
[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on pull request #202: [FLINK-27483]Add session job config field
FuyaoLi2017 commented on PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#issuecomment-1126402997 Hi @Aitozi @gyfora , thanks for reviewing this. I have updated the code based on your suggestions. 1. Update typos for `spec.job.flinkConfiguration`. 2. Update documentation about the session job flink configuration and provided allowed configurations (only the header config at the moment). 3. Add validator logic in `DefaultValidator`. Also refactored existing `validateFlinkConfig` method name to `validateFlinkDeploymentConfig` to avoid confusion. 4. Move configuration merge logic into `FlinkConfigManager`. Add a `getSessionJobObserveConfig` method, refactor a few places to call this method instead of `getObserveConfig`. There were some merge conflicts and rebased the main branch. I ran into some errors and introduced some unexpected changes, as a result, I force pushed the code to make it clean. Sorry for making the commit history kind of messy.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field
FuyaoLi2017 commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872722982 ## docs/content/docs/operations/configuration.md: ## @@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please refer to the dedica ## Operator Configuration Reference {{< generated/kubernetes_operator_config_configuration >}} + +## Job Specific Configuration Reference +Job specific configuration can be configured under `spec.job.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`. + +- For application clusters, `spec.job.flinkConfiguration` will be located in `FlinkDeployment` CustomResource. +- For session clusters, configuring `spec.job.flinkConfiguration` in parent `FlinkDeployment` will be applied to all session jobs within the session cluster. +You can also configure `spec.job.flinkConfiguration` in `FlinkSessionJob` CustomResource for a specific session job. Review Comment: I added a new line at the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field
FuyaoLi2017 commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872721505 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java: ## @@ -83,6 +84,13 @@ public void reconcile(FlinkSessionJob flinkSessionJob, Context context) throws E Configuration deployedConfig = configManager.getObserveConfig(flinkDepOptional.get()); +// merge session job specific config +Map sessionJobFlinkConfiguration = Review Comment: I created a `getSessionJobObserveConfig` method in `FlinkConfigManager` and replace this one and a few other places. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-27610) PrometheusPushGatewayReporter doesn't support basic authentication
Qihong Jiang created FLINK-27610: Summary: PrometheusPushGatewayReporter doesn't support basic authentication Key: FLINK-27610 URL: https://issues.apache.org/jira/browse/FLINK-27610 Project: Flink Issue Type: Improvement Components: Runtime / Metrics Reporter: Qihong Jiang PrometheusPushGatewayReporter doesn't support basic authentication. I would like to add two configurations (username,password) to support access to pushGateway which requires authentication -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field
FuyaoLi2017 commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872701915 ## docs/content/docs/operations/configuration.md: ## @@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please refer to the dedica ## Operator Configuration Reference {{< generated/kubernetes_operator_config_configuration >}} + +## Job Specific Configuration Reference +Job specific configuration can be configured under `spec.job.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`. + +- For application clusters, `spec.job.flinkConfiguration` will be located in `FlinkDeployment` CustomResource. +- For session clusters, configuring `spec.job.flinkConfiguration` in parent `FlinkDeployment` will be applied to all session jobs within the session cluster. +You can also configure `spec.job.flinkConfiguration` in `FlinkSessionJob` CustomResource for a specific session job. +The session job level configuration will override the parent session cluster's Flink configuration. Review Comment: Added related docs and validator code changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field
FuyaoLi2017 commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872700622 ## docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html: ## @@ -110,5 +110,11 @@ String The base dir to put the session job artifacts. + +kubernetes.operator.user.artifacts.http.header +(none) +Map +Custom HTTP header for a Flink job. If configured in cluster level, headers will be applied to all jobs within the cluster. This field can also be configured under spec.job.flinkConfiguration for a specific session job within a session cluster. If configured at session job level, it will override the cluster level configuration. Expected format: headerKey1:headerValue1,headerKey2:headerValue2. Review Comment: This is a generated html page. I guess we don't need to touch it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-27608) Flink may throw PartitionNotFound Exception if the downstream task reached Running state earlier than it's upstream task
[ https://issues.apache.org/jira/browse/FLINK-27608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536840#comment-17536840 ] Zhilong Hong edited comment on FLINK-27608 at 5/13/22 6:55 PM: --- When a PartitionNotFoundException is thrown in the scenario you mentioned above, it will be handled in by the logic located at {{org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler:298}}. The task will try to {{requestPartitionProducerState}} from the JobManager. If the upstream task is not ready (for example, in the DEPLOYING or INITIALIZING state), the SingleInputGate will try to retrigger another partition request until the partition is consumable. was (Author: thesharing): As a PartitionNotFoundException is thrown, it will be handled in by the logic located at {{{}org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler:298{}}}. The task will try to {{requestPartitionProducerState}} from the JobManager. If the upstream task is not ready (i.e. in the DEPLOYING or INITIALIZING state), the SingleInputGate will try to retrigger another partition request until the partition is consumable. > Flink may throw PartitionNotFound Exception if the downstream task reached > Running state earlier than it's upstream task > > > Key: FLINK-27608 > URL: https://issues.apache.org/jira/browse/FLINK-27608 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.2 >Reporter: zlzhang0122 >Priority: Major > Fix For: 1.16.0 > > > Flink streaming job deployment may throw PartitionNotFound Exception if the > downstream task reached Running state earlier than its upstream task and > after maximum backoff for partition requests passed.But the config of > taskmanager.network.request-backoff.max is not eay to decide. Can we use a > loop awaiting the upstream task partition be ready? > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Comment Edited] (FLINK-27608) Flink may throw PartitionNotFound Exception if the downstream task reached Running state earlier than it's upstream task
[ https://issues.apache.org/jira/browse/FLINK-27608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536840#comment-17536840 ] Zhilong Hong edited comment on FLINK-27608 at 5/13/22 6:52 PM: --- As a PartitionNotFoundException is thrown, it will be handled in by the logic located at {{{}org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler:298{}}}. The task will try to {{requestPartitionProducerState}} from the JobManager. If the upstream task is not ready (i.e. in the DEPLOYING or INITIALIZING state), the SingleInputGate will try to retrigger another partition request until the partition is consumable. was (Author: thesharing): As a {{PartitionNotFoundException}} is thrown, it will be handled in by the logic located at {{{}org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler:298{}}}. The {{Task}} will try to {{requestPartitionProducerState}} from the JobManager. If the upstream task is not ready (i.e. in the DEPLOYING or INITIALIZING state), the {{SingleInputGate}} will try to retrigger another partition request until the partition is consumable. > Flink may throw PartitionNotFound Exception if the downstream task reached > Running state earlier than it's upstream task > > > Key: FLINK-27608 > URL: https://issues.apache.org/jira/browse/FLINK-27608 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.2 >Reporter: zlzhang0122 >Priority: Major > Fix For: 1.16.0 > > > Flink streaming job deployment may throw PartitionNotFound Exception if the > downstream task reached Running state earlier than its upstream task and > after maximum backoff for partition requests passed.But the config of > taskmanager.network.request-backoff.max is not eay to decide. Can we use a > loop awaiting the upstream task partition be ready? > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27608) Flink may throw PartitionNotFound Exception if the downstream task reached Running state earlier than it's upstream task
[ https://issues.apache.org/jira/browse/FLINK-27608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536840#comment-17536840 ] Zhilong Hong commented on FLINK-27608: -- As a {{PartitionNotFoundException}} is thrown, it will be handled in by the logic located at {{{}org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler:298{}}}. The {{Task}} will try to {{requestPartitionProducerState}} from the JobManager. If the upstream task is not ready (i.e. in the DEPLOYING or INITIALIZING state), the {{SingleInputGate}} will try to retrigger another partition request until the partition is consumable. > Flink may throw PartitionNotFound Exception if the downstream task reached > Running state earlier than it's upstream task > > > Key: FLINK-27608 > URL: https://issues.apache.org/jira/browse/FLINK-27608 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.2 >Reporter: zlzhang0122 >Priority: Major > Fix For: 1.16.0 > > > Flink streaming job deployment may throw PartitionNotFound Exception if the > downstream task reached Running state earlier than its upstream task and > after maximum backoff for partition requests passed.But the config of > taskmanager.network.request-backoff.max is not eay to decide. Can we use a > loop awaiting the upstream task partition be ready? > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] flinkbot commented on pull request #19720: [FLINK-27465] Handle conversion of negative long to timestamp in Avro…
flinkbot commented on PR #19720: URL: https://github.com/apache/flink/pull/19720#issuecomment-1126310355 ## CI report: * 85c9612daae5c56f16de41fbd0ce2e5958d1762c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #19719: [FLINK-27255] [flink-avro] flink-avro does not support ser/de of larg…
flinkbot commented on PR #19719: URL: https://github.com/apache/flink/pull/19719#issuecomment-1126308081 ## CI report: * b9e4b181102292f7b4a164066cee125801853e3b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] haizhou-zhao opened a new pull request, #19720: [FLINK-27465] Handle conversion of negative long to timestamp in Avro…
haizhou-zhao opened a new pull request, #19720: URL: https://github.com/apache/flink/pull/19720 …RowDeserializationSchema Backport of fix to release-1.15. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27255) Flink-avro does not support serialization and deserialization of avro schema longer than 65535 characters
[ https://issues.apache.org/jira/browse/FLINK-27255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536802#comment-17536802 ] Haizhou Zhao commented on FLINK-27255: -- Here it is: https://github.com/apache/flink/pull/19719 > Flink-avro does not support serialization and deserialization of avro schema > longer than 65535 characters > - > > Key: FLINK-27255 > URL: https://issues.apache.org/jira/browse/FLINK-27255 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.14.4 >Reporter: Haizhou Zhao >Assignee: Haizhou Zhao >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.14.5 > > > The underlying serialization of avro schema uses string serialization method > of ObjectOutputStream.class, however, the default string serialization by > ObjectOutputStream.class does not support handling string of more than 66535 > characters (64kb). As a result, constructing flink operators that > input/output Avro Generic Record with huge schema is not possible. > > The purposed fix is two change the serialization and deserialization method > of these following classes so that huge string could also be handled. > > [GenericRecordAvroTypeInfo|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java#L107] > [SerializableAvroSchema|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L55] > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] haizhou-zhao opened a new pull request, #19719: [FLINK-27255] [flink-avro] flink-avro does not support ser/de of larg…
haizhou-zhao opened a new pull request, #19719: URL: https://github.com/apache/flink/pull/19719 …e avro schema (#19645) Backport of fix to release 1.15 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field
FuyaoLi2017 commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872636486 ## docs/content/docs/operations/configuration.md: ## @@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please refer to the dedica ## Operator Configuration Reference {{< generated/kubernetes_operator_config_configuration >}} + +## Job Specific Configuration Reference +Job specific configuration can be configured under `spec.job.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`. Review Comment: I think this should correct. If you take a look at the CRD, this property will be sitting in `spec.job.flinkConfiguration` from the root of the CRD. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #19716: [FLINK-27607][tests] Migrate module flink-connector-files to JUnit5
snuyanzin commented on PR #19716: URL: https://github.com/apache/flink/pull/19716#issuecomment-1126274461 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #19718: Flink dy
flinkbot commented on PR #19718: URL: https://github.com/apache/flink/pull/19718#issuecomment-1126265524 ## CI report: * 69a52bf6181733b12ea315c21704c0e50cb4d472 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Dengyu123 opened a new pull request, #19718: Flink dy
Dengyu123 opened a new pull request, #19718: URL: https://github.com/apache/flink/pull/19718 Using the @Override annotation could be more standardized and friendly to read -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #212: [FLINK-27572][FLINK-27594] Ensure HA metadata is present before restoring job with last state
gyfora commented on PR #212: URL: https://github.com/apache/flink-kubernetes-operator/pull/212#issuecomment-1126254738 cc @Aitozi @tweise @wangyang0918 @morhidi This is a larger change that fixes a couple outstanding critical issues and also aims to make the whole flow a bit simpler, more robust and easier to understand. I would appreciate your review/feedback :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27572) Verify HA Metadata present before performing last-state restore
[ https://issues.apache.org/jira/browse/FLINK-27572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27572: --- Labels: pull-request-available (was: ) > Verify HA Metadata present before performing last-state restore > --- > > Key: FLINK-27572 > URL: https://issues.apache.org/jira/browse/FLINK-27572 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Blocker > Labels: pull-request-available > Fix For: kubernetes-operator-1.0.0 > > > When we restore a job using the last-state logic we need to verify that the > HA metadata has not been deleted. And if it's not there we need to simply > throw an error because this requires manual user intervention. > This only applies when the FlinkDeployment is not already in a suspended > state with recorded last state information. > The problem be reproduced easily in 1.14 by triggering a fatal job error. > (turn of restart-strategy and kill TM for example). In these cases HA > metadata will be removed, and the next last-state upgrade should throw an > error instead of restoring from a completely empty state. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #212: [FLINK-27572][FLINK-27594] Ensure HA metadata is present before restoring job with last state
gyfora opened a new pull request, #212: URL: https://github.com/apache/flink-kubernetes-operator/pull/212 **This PR contains the following improvements/fixes for a number of loosely connected blocker issues:** 1. Eliminate bug that causes 0 delay infinite reconcile loop with the UpdateControl management logic 2. Ensure HA metadata is available when we are relying on it during stateful upgrades 3. Make JM deployment recovery condititional on availablity of HA metadata 4. Trigger fatal error when upgrade progress is stuck 5. Clean up and improve stateful upgrade conditions with added detailed debug logging 6. Greately simplify code around suspend/cancellation and restore operation in the reconciler 7. Make sure finished jobs are marked as stable to avoid rollback loop for short jobs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27074) python_job.py should write to TEST_DATA_DIR
[ https://issues.apache.org/jira/browse/FLINK-27074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536756#comment-17536756 ] Thomas Thornton commented on FLINK-27074: - [~chesnay] pull request is available for review [here|https://github.com/apache/flink/pull/19717]. > python_job.py should write to TEST_DATA_DIR > --- > > Key: FLINK-27074 > URL: https://issues.apache.org/jira/browse/FLINK-27074 > Project: Flink > Issue Type: Technical Debt > Components: API / Python, Tests >Reporter: Chesnay Schepler >Priority: Critical > Labels: pull-request-available > Fix For: 1.16.0 > > > Similar to other tests this job should write to TEST_DATA_DIR to make sure we > clean up the file after the test finished. > Currently this job is used twice in some e2e tests, where it can then run > into FileAlreadyExistsExceptions, so we should additionally make sure that > each job execution uses a different directory. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] flinkbot commented on pull request #19717: [FLINK-27074] Change python_job.py to write to unique paths within TEST_DATA_DIR
flinkbot commented on PR #19717: URL: https://github.com/apache/flink/pull/19717#issuecomment-1126234095 ## CI report: * 9df66fdad3309e0e5ad33ffe656a07760e1eaa68 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27074) python_job.py should write to TEST_DATA_DIR
[ https://issues.apache.org/jira/browse/FLINK-27074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27074: --- Labels: pull-request-available (was: ) > python_job.py should write to TEST_DATA_DIR > --- > > Key: FLINK-27074 > URL: https://issues.apache.org/jira/browse/FLINK-27074 > Project: Flink > Issue Type: Technical Debt > Components: API / Python, Tests >Reporter: Chesnay Schepler >Priority: Critical > Labels: pull-request-available > Fix For: 1.16.0 > > > Similar to other tests this job should write to TEST_DATA_DIR to make sure we > clean up the file after the test finished. > Currently this job is used twice in some e2e tests, where it can then run > into FileAlreadyExistsExceptions, so we should additionally make sure that > each job execution uses a different directory. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] twthorn opened a new pull request, #19717: [FLINK-27074] Change python_job.py to write to unique paths within TEST_DATA_DIR
twthorn opened a new pull request, #19717: URL: https://github.com/apache/flink/pull/19717 ## What is the purpose of the change Change python_job.py to use the shared TEST_DATA_DIR environment variable & directory for writing test output data (rather than its current custom logic for creating/removing an output directory). Also, generates a unique directory name within TEST_DATA_DIR so e2e tests that use this file multiple times do not run into name conflicts. ## Brief change log - Change python_job.py to write to unique paths within TEST_DATA_DIR ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager
gaborgsomogyi commented on PR #19372: URL: https://github.com/apache/flink/pull/19372#issuecomment-1126192259 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a diff in pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results
zhuzhurk commented on code in PR #19653: URL: https://github.com/apache/flink/pull/19653#discussion_r872369788 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java: ## @@ -42,6 +43,8 @@ public class DefaultSchedulingPipelinedRegion implements SchedulingPipelinedRegi private Set blockingConsumedPartitionGroups; +private Set persistentConsumedPartitionGroups; Review Comment: How about to add a `ResultPartitionType` field(which contains `isPersistent` info) to ConsumedPartitionGroup? I think it can simplify things a lot. ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java: ## @@ -481,8 +481,18 @@ public IntermediateDataSet createAndAddResultDataSet( public JobEdge connectNewDataSetAsInput( JobVertex input, DistributionPattern distPattern, ResultPartitionType partitionType) { +return this.connectNewDataSetAsInput( +input, distPattern, partitionType, new IntermediateDataSetID()); +} + +public JobEdge connectNewDataSetAsInput( +JobVertex input, +DistributionPattern distPattern, +ResultPartitionType partitionType, +IntermediateDataSetID intermediateDataSetID) { Review Comment: NIT: "intermediateDataSetID" -> "intermediateDataSetId". This is the naming convention we currently follow, although some old code are still using *ID variables. ## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java: ## @@ -85,7 +88,9 @@ private TaskDeploymentDescriptorFactory( List consumedPartitionGroups, Function resultPartitionRetriever, -BlobWriter blobWriter) { +BlobWriter blobWriter, +IntermediateDataSetID cachedIntermediateDataSetID, +Collection clusterPartitionShuffleDescriptors) { Review Comment: clusterPartitionShuffleDescriptors -> consumedClusterPartitionShuffleDescriptors ## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java: ## @@ -241,6 +271,23 @@ public static TaskDeploymentDescriptorFactory fromExecutionVertex( ExecutionVertex executionVertex, int attemptNumber) throws IOException { InternalExecutionGraphAccessor internalExecutionGraphAccessor = executionVertex.getExecutionGraphAccessor(); +final IntermediateDataSetID intermediateDataSetID = + executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIDToConsume(); +Collection clusterPartitionShuffleDescriptors = null; Review Comment: I prefer to make `clusterPartitionShuffleDescriptors` an empty list if the `intermediateDataSetID` is null, instead of making it nullable. ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java: ## @@ -175,9 +194,27 @@ public JobVertex(String name, JobVertexID id) { * @param operatorIDPairs The operator ID pairs of the job vertex. */ public JobVertex(String name, JobVertexID primaryId, List operatorIDPairs) { +this(name, primaryId, operatorIDPairs, null); +} + +/** + * Constructs a new job vertex and assigns it with the given name. + * + * @param name The name of the new job vertex. + * @param primaryId The id of the job vertex. + * @param operatorIDPairs The operator ID pairs of the job vertex. + * @param intermediateDataSetID The id of the cached intermediate dataset that the job vertex + * consumes. + */ +public JobVertex( Review Comment: Looks to me it is only used in test and can be replaced with `JobVertex#createAndAddResultDataSet`? ## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java: ## @@ -253,7 +300,9 @@ public static TaskDeploymentDescriptorFactory fromExecutionVertex( executionVertex.getParallelSubtaskIndex(), executionVertex.getAllConsumedPartitionGroups(), internalExecutionGraphAccessor::getResultPartitionOrThrow, -internalExecutionGraphAccessor.getBlobWriter()); +internalExecutionGraphAccessor.getBlobWriter(), +intermediateDataSetID, Review Comment: The param `intermediateDataSetID` is not needed because it can be get from the shuffle descriptor. ## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java: ## @@ -253,7 +300,9 @@ public static TaskDeploymentDescriptorFactory fromExecutionVertex( executionVertex.getParallelSubtaskIndex(), executionVertex.getAllConsumedPartitionGroups(), internalExecutionGraphAccessor::getResultPartitionOrThrow, -
[GitHub] [flink-table-store] ajian2002 commented on pull request #121: Introduce AggregatuibMergeFunction
ajian2002 commented on PR #121: URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1126169687 If I want to test the same MergeFunction on tables with different structures, what should I do? (Is it possible to create a new Test file again?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] ajian2002 commented on a diff in pull request #121: Introduce AggregatuibMergeFunction
ajian2002 commented on code in PR #121: URL: https://github.com/apache/flink-table-store/pull/121#discussion_r872505878 ## flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AggregationITCase.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector; + +import org.apache.flink.types.Row; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.apache.flink.util.CollectionUtil.iteratorToList; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * ITCase for partial update. + */ +public class AggregationITCase extends FileStoreTableITCase { + +@Override +protected List ddl() { +return Collections.singletonList("CREATE TABLE IF NOT EXISTS T3 (" + "a STRING," + "b INT," + "c INT ," + "PRIMARY KEY (a) NOT ENFORCED )" + " WITH ('merge-engine'='aggregation' );"); +} + +@Test +public void testMergeInMemory() throws ExecutionException, InterruptedException { +bEnv.executeSql("INSERT INTO T3 VALUES " + "('pk1',1, 2), " + "('pk1',1, 2)").await(); +List result = iteratorToList(bEnv.from("T3").execute().collect()); +assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 2, 4)); +} + +@Test +public void testMergeRead() throws ExecutionException, InterruptedException { +bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',1, 2)").await(); +bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',1, 4)").await(); +bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',2, 0)").await(); +List result = iteratorToList(bEnv.from("T3").execute().collect()); +assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 4, 6)); +} + + +@Test +public void testMergeCompaction() throws ExecutionException, InterruptedException { +// Wait compaction +bEnv.executeSql("ALTER TABLE T3 SET ('commit.force-compact'='true')"); + +// key pk1 +bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 3, 1)").await(); +bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 4, 5)").await(); +bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 4, 6)").await(); + +// key pk2 +bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 6,7)").await(); +bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 9,0)").await(); +bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 4,4)").await(); + +List result = iteratorToList(bEnv.from("T3").execute().collect()); +assertThat(result).containsExactlyInAnyOrder(Row.of("pk1",11,12), Row.of("pk2",19,11)); +} + +//@Test Review Comment: This test example comes from `flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java` I don't know if this test makes sense for `flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java`, do I need to keep this test . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field
gyfora commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872488147 ## docs/content/docs/operations/configuration.md: ## @@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please refer to the dedica ## Operator Configuration Reference {{< generated/kubernetes_operator_config_configuration >}} + +## Job Specific Configuration Reference +Job specific configuration can be configured under `spec.job.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`. + +- For application clusters, `spec.job.flinkConfiguration` will be located in `FlinkDeployment` CustomResource. +- For session clusters, configuring `spec.job.flinkConfiguration` in parent `FlinkDeployment` will be applied to all session jobs within the session cluster. +You can also configure `spec.job.flinkConfiguration` in `FlinkSessionJob` CustomResource for a specific session job. +The session job level configuration will override the parent session cluster's Flink configuration. Review Comment: If we can enumerate the possible configuration options and keys we can simply add a method to the validator that checks that the user doesn't try to set something that won't take effect -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #202: [FLINK-27483]Add session job config field
Aitozi commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872450640 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/HttpArtifactFetcher.java: ## @@ -32,12 +37,24 @@ public class HttpArtifactFetcher implements ArtifactFetcher { public static final HttpArtifactFetcher INSTANCE = new HttpArtifactFetcher(); @Override -public File fetch(String uri, File targetDir) throws Exception { +public File fetch(String uri, Configuration flinkConfiguration, File targetDir) +throws Exception { var start = System.currentTimeMillis(); URL url = new URL(uri); +HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + +Map clusterLevelHeader = Review Comment: we do not know this header is cluster level or job level here, what about naming it `headers` directly ? ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/AbstractFlinkSpec.java: ## @@ -40,4 +42,7 @@ public abstract class AbstractFlinkSpec { * restart, change the number to anything other than the current value. */ private Long restartNonce; + +/** Flink configuration overrides for the Flink deployment. */ Review Comment: nit: the comment should be update: overrides for the Flink deployment or session job. ## docs/content/docs/operations/configuration.md: ## @@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please refer to the dedica ## Operator Configuration Reference {{< generated/kubernetes_operator_config_configuration >}} + +## Job Specific Configuration Reference +Job specific configuration can be configured under `spec.job.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`. + +- For application clusters, `spec.job.flinkConfiguration` will be located in `FlinkDeployment` CustomResource. +- For session clusters, configuring `spec.job.flinkConfiguration` in parent `FlinkDeployment` will be applied to all session jobs within the session cluster. +You can also configure `spec.job.flinkConfiguration` in `FlinkSessionJob` CustomResource for a specific session job. +The session job level configuration will override the parent session cluster's Flink configuration. Review Comment: I think we should let user know that the cluster level's flink config will not overridden by the session job's flinkConfiguration actually (I mean the session cluster's config). Personally, I think what sessionjob`spec.flinkConfiguration` defined, is not the **flink** Configuration, just some custom configs. ## docs/content/docs/operations/configuration.md: ## @@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please refer to the dedica ## Operator Configuration Reference {{< generated/kubernetes_operator_config_configuration >}} + +## Job Specific Configuration Reference +Job specific configuration can be configured under `spec.job.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`. + +- For application clusters, `spec.job.flinkConfiguration` will be located in `FlinkDeployment` CustomResource. Review Comment: ditto ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java: ## @@ -83,6 +84,13 @@ public void reconcile(FlinkSessionJob flinkSessionJob, Context context) throws E Configuration deployedConfig = configManager.getObserveConfig(flinkDepOptional.get()); +// merge session job specific config +Map sessionJobFlinkConfiguration = Review Comment: nit: maybe we could move this to the `FlinkConfigManager` ## docs/content/docs/operations/configuration.md: ## @@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please refer to the dedica ## Operator Configuration Reference {{< generated/kubernetes_operator_config_configuration >}} + +## Job Specific Configuration Reference +Job specific configuration can be configured under `spec.job.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`. + +- For application clusters, `spec.job.flinkConfiguration` will be located in `FlinkDeployment` CustomResource. +- For session clusters, configuring `spec.job.flinkConfiguration` in parent `FlinkDeployment` will be applied to all session jobs within the session cluster. +You can also configure `spec.job.flinkConfiguration` in `FlinkSessionJob` CustomResource for a specific session job. Review Comment: A newline here ## docs/content/docs/operations/configuration.md: ## @@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please refer to the dedica ## Operator Configuration
[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager
gaborgsomogyi commented on PR #19372: URL: https://github.com/apache/flink/pull/19372#issuecomment-1126125435 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #203: [FLINK-27337] Prevent session cluster to be deleted when there are ru…
gyfora merged PR #203: URL: https://github.com/apache/flink-kubernetes-operator/pull/203 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #202: [FLINK-27483]Add session job config field
gyfora commented on PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#issuecomment-1126082667 Thank you @FuyaoLi2017 I will try to review this latest on monday as I will be travelling during the weekend. @Aitozi if you have some spare capacity I would appreciate a review but we can also do this on monday :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-27609) Tracking flink-version and flink-revision in FlinkDeploymentStatus
[ https://issues.apache.org/jira/browse/FLINK-27609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora reassigned FLINK-27609: -- Assignee: Matyas Orhidi > Tracking flink-version and flink-revision in FlinkDeploymentStatus > -- > > Key: FLINK-27609 > URL: https://issues.apache.org/jira/browse/FLINK-27609 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-0.1.0 >Reporter: Matyas Orhidi >Assignee: Matyas Orhidi >Priority: Major > Fix For: kubernetes-operator-1.0.0 > > > The rest api can provide accurate versioning information through the config > endpoint: > [https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#config] > The operator should propagate such fields in the status: > * flink-version > * flink-revision > This greatly improves the ability to identify malicious Flink versions (CVE > affected, deprecated, etc.) in managed environments. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27609) Tracking flink-version and flink-revision in FlinkDeploymentStatus
Matyas Orhidi created FLINK-27609: - Summary: Tracking flink-version and flink-revision in FlinkDeploymentStatus Key: FLINK-27609 URL: https://issues.apache.org/jira/browse/FLINK-27609 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-0.1.0 Reporter: Matyas Orhidi Fix For: kubernetes-operator-1.0.0 The rest api can provide accurate versioning information through the config endpoint: [https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#config] The operator should propagate such fields in the status: * flink-version * flink-revision This greatly improves the ability to identify malicious Flink versions (CVE affected, deprecated, etc.) in managed environments. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results
TanYuxin-tyx commented on code in PR #19653: URL: https://github.com/apache/flink/pull/19653#discussion_r872423445 ## flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java: ## @@ -84,6 +86,33 @@ CompletableFuture registerPartitionWithProducer( PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor); +/** + * Returns all the shuffle descriptors for the partitions in the intermediate data set with the + * given id. + * + * @param intermediateDataSetID The id of hte intermediate data set. Review Comment: hte -> the? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-27337) Prevent session cluster to be deleted when there are running jobs
[ https://issues.apache.org/jira/browse/FLINK-27337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-27337. -- Resolution: Fixed merged to main 5239cd6a7ae0812e1c399949f0facde972dcfa78 > Prevent session cluster to be deleted when there are running jobs > - > > Key: FLINK-27337 > URL: https://issues.apache.org/jira/browse/FLINK-27337 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Yang Wang >Assignee: Aitozi >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.0.0 > > > We should prevent the session cluster to be deleted when there are running > jobs. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27608) Flink may throw PartitionNotFound Exception if the downstream task reached Running state earlier than it's upstream task
[ https://issues.apache.org/jira/browse/FLINK-27608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zlzhang0122 updated FLINK-27608: Component/s: Runtime / Network > Flink may throw PartitionNotFound Exception if the downstream task reached > Running state earlier than it's upstream task > > > Key: FLINK-27608 > URL: https://issues.apache.org/jira/browse/FLINK-27608 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.2 >Reporter: zlzhang0122 >Priority: Major > Fix For: 1.16.0 > > > Flink streaming job deployment may throw PartitionNotFound Exception if the > downstream task reached Running state earlier than its upstream task and > after maximum backoff for partition requests passed.But the config of > taskmanager.network.request-backoff.max is not eay to decide. Can we use a > loop awaiting the upstream task partition be ready? > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27608) Flink may throw PartitionNotFound Exception if the downstream task reached Running state earlier than it's upstream task
zlzhang0122 created FLINK-27608: --- Summary: Flink may throw PartitionNotFound Exception if the downstream task reached Running state earlier than it's upstream task Key: FLINK-27608 URL: https://issues.apache.org/jira/browse/FLINK-27608 Project: Flink Issue Type: Bug Affects Versions: 1.14.2 Reporter: zlzhang0122 Fix For: 1.16.0 Flink streaming job deployment may throw PartitionNotFound Exception if the downstream task reached Running state earlier than its upstream task and after maximum backoff for partition requests passed.But the config of taskmanager.network.request-backoff.max is not eay to decide. Can we use a loop awaiting the upstream task partition be ready? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] flinkbot commented on pull request #19716: [FLINK-27607][tests] Migrate module flink-connector-files to JUnit5
flinkbot commented on PR #19716: URL: https://github.com/apache/flink/pull/19716#issuecomment-1126061144 ## CI report: * f8c3129f7ed0028a2a4002cab0ef26877b5f3653 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27607) [JUnit5 Migration] Module: flink-connector-files
[ https://issues.apache.org/jira/browse/FLINK-27607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27607: --- Labels: pull-request-available (was: ) > [JUnit5 Migration] Module: flink-connector-files > > > Key: FLINK-27607 > URL: https://issues.apache.org/jira/browse/FLINK-27607 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager
gaborgsomogyi commented on PR #19372: URL: https://github.com/apache/flink/pull/19372#issuecomment-1126051696 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin opened a new pull request, #19716: [FLINK-27607][tests] Migrate module flink-connector-files to JUnit5
snuyanzin opened a new pull request, #19716: URL: https://github.com/apache/flink/pull/19716 ## What is the purpose of the change Update the flink-connector-files module to AssertJ and JUnit 5 following the [JUnit 5 Migration Guide](https://docs.google.com/document/d/1514Wa_aNB9bJUen4xm5uiuXOooOJTtXqS_Jqk9KJitU/edit) ## Brief change log Use JUnit5 and AssertJ in tests instead of JUnit4 and Hamcrest ## Verifying this change This change is a code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: ( no) - The runtime per-record code paths (performance sensitive): ( no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no) - The S3 file system connector: ( no) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-27607) [JUnit5 Migration] Module: flink-connector-files
Sergey Nuyanzin created FLINK-27607: --- Summary: [JUnit5 Migration] Module: flink-connector-files Key: FLINK-27607 URL: https://issues.apache.org/jira/browse/FLINK-27607 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Sergey Nuyanzin -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] alpreu commented on a diff in pull request #19660: [FLINK-27185][connectors] Convert connector modules to assertj
alpreu commented on code in PR #19660: URL: https://github.com/apache/flink/pull/19660#discussion_r872383249 ## flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java: ## @@ -723,11 +721,11 @@ public void testCassandraBatchPojoFormat() throws Exception { final List pojos = writePojosWithOutputFormat(annotatedPojoClass); ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); -Assert.assertEquals(20, rs.all().size()); +assertThat(rs.all()).hasSize(20); final List result = readPojosWithInputFormat(annotatedPojoClass); -Assert.assertEquals(20, result.size()); -assertThat(result, samePropertyValuesAs(pojos)); +assertThat(result).hasSize(20); +assertThat(result).satisfies(matching(samePropertyValuesAs(pojos))); Review Comment: I actually noticed just now that this method can also take a configuration that allows us to ignore the order :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #19715: [FLINK-27606][table] Fix code generator exception when using imperative udaf that is introduced for planner scala-free
flinkbot commented on PR #19715: URL: https://github.com/apache/flink/pull/19715#issuecomment-1126036539 ## CI report: * 7803b17f358cda070931874947ae9ed18030dcc1 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27606) Fix code generator exception when using imperative udaf that is introduced for planner scala-free
[ https://issues.apache.org/jira/browse/FLINK-27606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27606: --- Labels: pull-request-available (was: ) > Fix code generator exception when using imperative udaf that is introduced > for planner scala-free > - > > Key: FLINK-27606 > URL: https://issues.apache.org/jira/browse/FLINK-27606 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.0, 1.16.0 >Reporter: dalongliu >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.15.1 > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] lsyldliu opened a new pull request, #19715: [FLINK-27606][table] Fix code generator exception when using imperative udaf that is introduced for planner scala-free
lsyldliu opened a new pull request, #19715: URL: https://github.com/apache/flink/pull/19715 ## Brief change log - *Fix code generator exception when using imperative udaf that is introduced for planner scala-free* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] beyond1920 commented on a diff in pull request #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type
beyond1920 commented on code in PR #19709: URL: https://github.com/apache/flink/pull/19709#discussion_r872348182 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala: ## @@ -75,6 +75,146 @@ object HashCodeGenerator { return $resultTerm; } +@Override +public int hashCode($ARRAY_DATA $inputTerm) { + ${genThrowException("RowData hash function doesn't support to generate hash code for ArrayData.")} +} + +@Override +public int hashCode($MAP_DATA $inputTerm) { + ${genThrowException("RowData hash function doesn't support to generate hash code for MapData.")} +} + +${ctx.reuseInnerClassDefinitionCode()} + } +""".stripMargin + +new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig) + } + + def generateArrayHash( + ctx: CodeGeneratorContext, + elementType: LogicalType, + name: String): GeneratedHashFunction = { +val className = newName(name) +val baseClass = classOf[HashFunction] +val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM + +val typeTerm = primitiveTypeTermForType(elementType) +val isNull = newName("isNull") +val fieldTerm = newName("fieldTerm") +val hashIntTerm = CodeGenUtils.newName("hashCode") +val i = newName("i") + +// Generate element hash code firstly +val elementHashBody = hashCodeForType(ctx, elementType, fieldTerm) +val code = + j""" + public class $className implements ${baseClass.getCanonicalName} { + +${ctx.reuseMemberCode()} + +public $className(Object[] references) throws Exception { + ${ctx.reuseInitCode()} +} + +@Override +public int hashCode($ARRAY_DATA $inputTerm) { + int $hashIntTerm = 0; Review Comment: Did you forget to call `ctx.reuseLocalVariableCode()` here? ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala: ## @@ -75,6 +75,146 @@ object HashCodeGenerator { return $resultTerm; } +@Override +public int hashCode($ARRAY_DATA $inputTerm) { + ${genThrowException("RowData hash function doesn't support to generate hash code for ArrayData.")} +} + +@Override +public int hashCode($MAP_DATA $inputTerm) { + ${genThrowException("RowData hash function doesn't support to generate hash code for MapData.")} +} + +${ctx.reuseInnerClassDefinitionCode()} + } +""".stripMargin + +new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig) + } + + def generateArrayHash( + ctx: CodeGeneratorContext, + elementType: LogicalType, + name: String): GeneratedHashFunction = { +val className = newName(name) +val baseClass = classOf[HashFunction] +val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM + +val typeTerm = primitiveTypeTermForType(elementType) +val isNull = newName("isNull") +val fieldTerm = newName("fieldTerm") +val hashIntTerm = CodeGenUtils.newName("hashCode") +val i = newName("i") + +// Generate element hash code firstly +val elementHashBody = hashCodeForType(ctx, elementType, fieldTerm) +val code = + j""" + public class $className implements ${baseClass.getCanonicalName} { + +${ctx.reuseMemberCode()} + +public $className(Object[] references) throws Exception { + ${ctx.reuseInitCode()} +} + +@Override +public int hashCode($ARRAY_DATA $inputTerm) { + int $hashIntTerm = 0; + for (int $i = 0; $i < $inputTerm.size(); $i++) { +boolean $isNull = $inputTerm.isNullAt($i); +if (!$isNull) { + $typeTerm $fieldTerm = ${rowFieldReadAccess(i, inputTerm, elementType)}; + $hashIntTerm += $elementHashBody; +} + } + + return $hashIntTerm; +} + +@Override +public int hashCode($ROW_DATA $inputTerm) { + ${genThrowException("ArrayData hash function doesn't support to generate hash code for RowData.")} +} + +@Override +public int hashCode($MAP_DATA $inputTerm) { + ${genThrowException("ArrayData hash function doesn't support to generate hash code for MapData.")} +} + +${ctx.reuseInnerClassDefinitionCode()} + } +""".stripMargin + +new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig) + } + + def generateMapHash( + ctx: CodeGeneratorContext, + keyType: LogicalType, + valueType: LogicalType, + name: String): GeneratedHashFunction = { +val className = newName(name) +val baseClass = classOf[HashFunction] +val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM + +val keyTypeTerm =
[jira] [Created] (FLINK-27606) Fix code generator exception when using imperative udaf that is introduced for planner scala-free
dalongliu created FLINK-27606: - Summary: Fix code generator exception when using imperative udaf that is introduced for planner scala-free Key: FLINK-27606 URL: https://issues.apache.org/jira/browse/FLINK-27606 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.15.0, 1.16.0 Reporter: dalongliu Fix For: 1.16.0, 1.15.1 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] gaoyunhaii commented on a diff in pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results
gaoyunhaii commented on code in PR #19653: URL: https://github.com/apache/flink/pull/19653#discussion_r872318377 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java: ## @@ -142,6 +142,12 @@ public class JobVertex implements java.io.Serializable { */ private String resultOptimizerProperties; +/** + * Optional, the intermediateDataSetId of the cached intermediate dataset that the job vertex + * consumes. + */ +@Nullable private final IntermediateDataSetID intermediateDataSetID; Review Comment: Each JobVertex may consumes multiple `IntermediateDataSet`, thus I think it is not sufficient to bookkeep one id here? Perhaps like two-inputs tasks (join) or multiple-inputs tasks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-elasticsearch] MartijnVisser merged pull request #18: [hotfix][archunit] Fix MiniClusterExtension archunit violations
MartijnVisser merged PR #18: URL: https://github.com/apache/flink-connector-elasticsearch/pull/18 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27603) Hive dialect supports "reload function"
[ https://issues.apache.org/jira/browse/FLINK-27603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia updated FLINK-27603: - Description: In Hive, by "reload function", the user can use the function defined in other session. But in flink, it will always get the function from the catalog, and if it's hive catalog, it'll load all the user defined functions from the metastore. So, there's no need to "reload function" explicitly for it actually is done implicitly. But when use Hive dialect in Flink, it'll throw an unsupported exception for the statement "reload function". I'm wondering keep the current behavior to throw excpetion or consider it as an 'NopOperation'. was: In Hive, by "reload function", the user can use the function defined in other session. But in flink, it will always get the function from the catalog, and if it's hive catalog, it'll load all the user defined functions from the metastore. So, there's no need to "reload function" explicitly for it actually is done implicitly. But when use Hive dialect in Flink, it'll throw an exception for the statement "reload function". I'm wondering keep the current behavior to throw excpetion or consider it as an 'NopOperation'. > Hive dialect supports "reload function" > --- > > Key: FLINK-27603 > URL: https://issues.apache.org/jira/browse/FLINK-27603 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: luoyuxia >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > In Hive, by "reload function", the user can use the function defined in other > session. > But in flink, it will always get the function from the catalog, and if it's > hive catalog, it'll load all the user defined functions from the metastore. > So, there's no need to "reload function" explicitly for it actually is done > implicitly. > But when use Hive dialect in Flink, it'll throw an unsupported exception for > the statement "reload function". > I'm wondering keep the current behavior to throw excpetion or consider it as > an 'NopOperation'. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27603) Hive dialect supports "reload function"
[ https://issues.apache.org/jira/browse/FLINK-27603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia updated FLINK-27603: - Description: In Hive, by "reload function", the user can use the function defined in other session. But in flink, it will always get the function from the catalog, and if it's hive catalog, it'll load all the user defined functions from the metastore. So, there's no need to "reload function" explicitly for it actually is done implicitly. But when use Hive dialect in Flink, it'll throw an exception for the statement "reload function". I'm wondering keep the current behavior to throw excpetion or consider it as an 'NopOperation'. was:In Hive, by > Hive dialect supports "reload function" > --- > > Key: FLINK-27603 > URL: https://issues.apache.org/jira/browse/FLINK-27603 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: luoyuxia >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > In Hive, by "reload function", the user can use the function defined in other > session. > But in flink, it will always get the function from the catalog, and if it's > hive catalog, it'll load all the user defined functions from the metastore. > So, there's no need to "reload function" explicitly for it actually is done > implicitly. > But when use Hive dialect in Flink, it'll throw an exception for the > statement "reload function". > I'm wondering keep the current behavior to throw excpetion or consider it as > an 'NopOperation'. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] gaoyunhaii commented on a diff in pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results
gaoyunhaii commented on code in PR #19653: URL: https://github.com/apache/flink/pull/19653#discussion_r872337757 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java: ## @@ -299,6 +302,21 @@ private void handleTaskFailure( final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult( executionVertexId, error, timestamp); + +// Notify shuffle master that the cached intermediate dataset is corrupted. +if (failureHandlingResult.getError() instanceof CacheCorruptedException) { Review Comment: I think it might be not sufficient to only consider `DefaultScheduler`. We might also have AdaptiveBatchScheduler for batch jobs and AdaptiveScheduler for streaming jobs (if we also consider the case to start a new streaming job with cached result partition). ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java: ## @@ -142,6 +142,12 @@ public class JobVertex implements java.io.Serializable { */ private String resultOptimizerProperties; +/** + * Optional, the intermediateDataSetId of the cached intermediate dataset that the job vertex + * consumes. + */ +@Nullable private final IntermediateDataSetID intermediateDataSetID; Review Comment: Each JobVertex may consumes multiple `IntermediateDataSet`, thus I think it is not sufficient to bookkeep one id here? ## flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java: ## @@ -45,6 +56,9 @@ public class NettyShuffleMaster implements ShuffleMaster private final int networkBufferSize; +private final Map> Review Comment: Previously for TaskExecutor managed partitions, the process of prompting should be implemented in `JobMasterPartitionTracker` directly, thus the `NettyShuffleMaster` should not need to manage the cluster partitions directly? Also I'm a bit wondering if it is possible we unify the two processes? ## flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java: ## @@ -84,6 +86,33 @@ CompletableFuture registerPartitionWithProducer( PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor); +/** + * Returns all the shuffle descriptors for the partitions in the intermediate data set with the + * given id. + * + * @param intermediateDataSetID The id of hte intermediate data set. + * @return all the shuffle descriptors for the partitions in the intermediate data set. Null if + * not exist. + */ +default Collection getClusterPartitionShuffleDescriptors( Review Comment: Might also cc @wsry for a double check of the proposed methods here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27603) Hive dialect supports "reload function"
[ https://issues.apache.org/jira/browse/FLINK-27603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia updated FLINK-27603: - Description: In Hive, by > Hive dialect supports "reload function" > --- > > Key: FLINK-27603 > URL: https://issues.apache.org/jira/browse/FLINK-27603 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: luoyuxia >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > In Hive, by -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #203: [FLINK-27337] Prevent session cluster to be deleted when there are ru…
Aitozi commented on PR #203: URL: https://github.com/apache/flink-kubernetes-operator/pull/203#issuecomment-1125982252 force pushed to solve conflicts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RyanSkraba commented on pull request #18635: [FLINK-25962][format-avro] Use namespaces for generated records
RyanSkraba commented on PR #18635: URL: https://github.com/apache/flink/pull/18635#issuecomment-1125974968 I've fixed the merge conflicts in this PR: for older PRs like this, does the Flink project prefer merging or rebasing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27605) Upgrade mockito to a version where static function mocking is possible
[ https://issues.apache.org/jira/browse/FLINK-27605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi updated FLINK-27605: -- Labels: pull-request-available (was: ) > Upgrade mockito to a version where static function mocking is possible > -- > > Key: FLINK-27605 > URL: https://issues.apache.org/jira/browse/FLINK-27605 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.16.0 >Reporter: Gabor Somogyi >Assignee: Gabor Somogyi >Priority: Major > Labels: pull-request-available > > There are some external APIs used by Flink which are designed in a way which > is super hard to test. > Such a pattern is when a class instance is only available through a static > function and no publicly available constructor exists. > A good example to this is "UserGroupInformation" class in Hadoop library. > We've listed many different possibilities how to make it testable in the > following place: > https://github.com/apache/flink/pull/19372#discussion_r849691193 > After deep consideration we've agreed to use static function mocking in such > exceptional cases in order to make code clean and maintainable. > It should be emphasized that the general direction not changed in terms of > Mockito and Powermock usage. It should be avoided as much as possible. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #209: [FLINK-27270] Add document of session job operations
Aitozi commented on PR #209: URL: https://github.com/apache/flink-kubernetes-operator/pull/209#issuecomment-1125972445 @morhidi Thanks for your suggestion, I have addressed your comments, PTAL again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager
gaborgsomogyi commented on PR #19372: URL: https://github.com/apache/flink/pull/19372#issuecomment-1125967453 I've done the following steps and I think every suggestion is added: * Squashed all the commits * Added the last missing `@Nullable`s * Changed the PR description to reflect Mockito related changes * Linked this PR to FLINK-27605 Please let me know if I've missed anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager
gaborgsomogyi commented on code in PR #19372: URL: https://github.com/apache/flink/pull/19372#discussion_r872305484 ## flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java: ## @@ -45,10 +56,25 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager { private final Configuration configuration; +private final KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider; + @VisibleForTesting final Map delegationTokenProviders; -public KerberosDelegationTokenManager(Configuration configuration) { +private final ScheduledExecutor scheduledExecutor; + +private final ExecutorService ioExecutor; Review Comment: Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-27605) Upgrade mockito to a version where static function mocking is possible
[ https://issues.apache.org/jira/browse/FLINK-27605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi reassigned FLINK-27605: - Assignee: Gabor Somogyi > Upgrade mockito to a version where static function mocking is possible > -- > > Key: FLINK-27605 > URL: https://issues.apache.org/jira/browse/FLINK-27605 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.16.0 >Reporter: Gabor Somogyi >Assignee: Gabor Somogyi >Priority: Major > > There are some external APIs used by Flink which are designed in a way which > is super hard to test. > Such a pattern is when a class instance is only available through a static > function and no publicly available constructor exists. > A good example to this is "UserGroupInformation" class in Hadoop library. > We've listed many different possibilities how to make it testable in the > following place: > https://github.com/apache/flink/pull/19372#discussion_r849691193 > After deep consideration we've agreed to use static function mocking in such > exceptional cases in order to make code clean and maintainable. > It should be emphasized that the general direction not changed in terms of > Mockito and Powermock usage. It should be avoided as much as possible. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] deadwind4 commented on a diff in pull request #19682: [FLINK-25795][python][connector/pulsar] Add pulsar sink DataStream API
deadwind4 commented on code in PR #19682: URL: https://github.com/apache/flink/pull/19682#discussion_r872299864 ## flink-python/pyflink/datastream/connectors.py: ## @@ -1449,6 +1455,314 @@ def build(self) -> 'PulsarSource': return PulsarSource(self._j_pulsar_source_builder.build()) +class DeliveryGuarantee(Enum): +""" +DeliverGuarantees that can be chosen. In general your pipeline can only offer the lowest +delivery guarantee which is supported by your sources and sinks. + +:data: `EXACTLY_ONCE`: +Records are only delivered exactly-once also under failover scenarios. To build a complete +exactly-once pipeline is required that the source and sink support exactly-once and are +properly configured. + +:data: `AT_LEAST_ONCE`: +Records are ensured to be delivered but it may happen that the same record is delivered +multiple times. Usually, this guarantee is faster than the exactly-once delivery. + +:data: `NONE`: +Records are delivered on a best effort basis. It is often the fastest way to process records +but it may happen that records are lost or duplicated. +""" + +EXACTLY_ONCE = 0, +AT_LEAST_ONCE = 1, +NONE = 2 + +def _to_j_delivery_guarantee(self): +JDeliveryGuarantee = get_gateway().jvm \ +.org.apache.flink.connector.base.DeliveryGuarantee +return getattr(JDeliveryGuarantee, self.name) + + +class PulsarSerializationSchema(object): +""" +The serialization schema for how to serialize records into Pulsar. +""" + +def __init__(self, _j_pulsar_serialization_schema): +self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema + +@staticmethod +def flink_schema(serialization_schema: SerializationSchema) \ +-> 'PulsarSerializationSchema': +""" +Create a PulsarSerializationSchema by using the flink's SerializationSchema. It would +serialize the message into byte array and send it to Pulsar with Schema#BYTES. +""" +JPulsarSerializationSchema = get_gateway().jvm.org.apache.flink \ +.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema +_j_pulsar_serialization_schema = JPulsarSerializationSchema.flinkSchema( +serialization_schema._j_serialization_schema) +return PulsarSerializationSchema(_j_pulsar_serialization_schema) + + +class TopicRoutingMode(Enum): +""" +The routing policy for choosing the desired topic by the given message. + +:data: `ROUND_ROBIN`: +The producer will publish messages across all partitions in a round-robin fashion to achieve +maximum throughput. Please note that round-robin is not done per individual message but +rather it's set to the same boundary of batching delay, to ensure batching is effective. + +:data: `MESSAGE_KEY_HASH`: +If no key is provided, The partitioned producer will randomly pick one single topic partition +and publish all the messages into that partition. If a key is provided on the message, the +partitioned producer will hash the key and assign the message to a particular partition. + +:data: `CUSTOM`: +Use custom topic router implementation that will be called to determine the partition for a +particular message. +""" + +ROUND_ROBIN = 0 +MESSAGE_KEY_HASH = 1 +CUSTOM = 2 + +def _to_j_topic_routing_mode(self): +JTopicRoutingMode = get_gateway().jvm \ + .org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode +return getattr(JTopicRoutingMode, self.name) + + +class MessageDelayer(object): +""" +A delayer for Pulsar broker passing the sent message to the downstream consumer. This is only +works in {@link SubscriptionType#Shared} subscription. + +Read delayed message delivery + https://pulsar.apache.org/docs/en/next/concepts-messaging/#delayed-message-delivery for better +understanding this feature. +""" +def __init__(self, _j_message_delayer): +self._j_message_delayer = _j_message_delayer + +@staticmethod +def never() -> 'MessageDelayer': +""" +All the messages should be consumed immediately. +""" +JMessageDelayer = get_gateway().jvm \ + .org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer +return MessageDelayer(JMessageDelayer.never()) + +@staticmethod +def fixed(duration: Duration) -> 'MessageDelayer': +""" +All the messages should be consumed in a fixed duration. +""" +JMessageDelayer = get_gateway().jvm \ + .org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer +return MessageDelayer(JMessageDelayer.fixed(duration._j_duration)) + + +class PulsarSink(Sink): +""" +The Sink implementation of Pulsar. Please use a PulsarSinkBuilder to construct a +PulsarSink. The following
[GitHub] [flink] MartijnVisser merged pull request #19710: [FLINK-24433][Tests][Buildsystem] Prevent running out of disk space
MartijnVisser merged PR #19710: URL: https://github.com/apache/flink/pull/19710 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaborgsomogyi commented on pull request #19372: [FLINK-26043][runtime][security] Add periodic kerberos relogin to KerberosDelegationTokenManager
gaborgsomogyi commented on PR #19372: URL: https://github.com/apache/flink/pull/19372#issuecomment-1125930126 As suggested I've created FLINK-27605. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-27605) Upgrade mockito to a version where static function mocking is possible
Gabor Somogyi created FLINK-27605: - Summary: Upgrade mockito to a version where static function mocking is possible Key: FLINK-27605 URL: https://issues.apache.org/jira/browse/FLINK-27605 Project: Flink Issue Type: Sub-task Components: Tests Affects Versions: 1.16.0 Reporter: Gabor Somogyi There are some external APIs used by Flink which are designed in a way which is super hard to test. Such a pattern is when a class instance is only available through a static function and no publicly available constructor exists. A good example to this is "UserGroupInformation" class in Hadoop library. We've listed many different possibilities how to make it testable in the following place: https://github.com/apache/flink/pull/19372#discussion_r849691193 After deep consideration we've agreed to use static function mocking in such exceptional cases in order to make code clean and maintainable. It should be emphasized that the general direction not changed in terms of Mockito and Powermock usage. It should be avoided as much as possible. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-27578) Elasticsearch6SinkE2ECase.testScaleUp fails after "Exhausted retry attempts"
[ https://issues.apache.org/jira/browse/FLINK-27578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-27578. -- Resolution: Invalid This issue is resolved via FLINK-24433 > Elasticsearch6SinkE2ECase.testScaleUp fails after "Exhausted retry attempts" > > > Key: FLINK-27578 > URL: https://issues.apache.org/jira/browse/FLINK-27578 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch >Reporter: Martijn Visser >Assignee: Alexander Preuss >Priority: Major > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=35588=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a=117547 > {code:java} > > Test org.apache.flink.streaming.tests.Elasticsearch6SinkE2ECase.testScaleUp > failed with: > org.apache.flink.util.FlinkException: Exhausted retry attempts. > at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:173) > at > org.apache.flink.streaming.tests.ElasticsearchSinkE2ECaseBase.checkResultWithSemantic(ElasticsearchSinkE2ECaseBase.java:76) > at > org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase.restartFromSavepoint(SinkTestSuiteBase.java:326) > at > org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase.testScaleUp(SinkTestSuiteBase.java:201) > {code} > These test runs are also flooding the logs (see above), which causes all E2E > tests to fail due to running out of disk space on the Azure provided > machines. See > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=35599=logs=ae4f8708-9994-57d3-c2d7-b892156e7812=af184cdd-c6d8-5084-0b69-7e9c67b35f7a > or > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=35573=logs=ae4f8708-9994-57d3-c2d7-b892156e7812=af184cdd-c6d8-5084-0b69-7e9c67b35f7a > for example. > This issue most likely causes FLINK-24433 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Comment Edited] (FLINK-24433) "No space left on device" in Azure e2e tests
[ https://issues.apache.org/jira/browse/FLINK-24433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536562#comment-17536562 ] Martijn Visser edited comment on FLINK-24433 at 5/13/22 10:55 AM: -- In the end, the TARbal clean-up failed. While investigating further, I noticed that multiple e2e tests had debug logging (though labeled INFO) activated. I've disabled that to make sure that logs aren't overflooding. This seems to have fixed the issue for now. Fixed in master: 4c138a440f8de315470a663fd751b7293ff3ceb8 release-1.15: c57be81ce5d121523db26c86a48cff9222f688d2 release-1.14: 713f0e03a500852564c06241a2e64d141b31e4fe was (Author: martijnvisser): Fixed in master: 4c138a440f8de315470a663fd751b7293ff3ceb8 release-1.15: c57be81ce5d121523db26c86a48cff9222f688d2 release-1.14: 713f0e03a500852564c06241a2e64d141b31e4fe > "No space left on device" in Azure e2e tests > > > Key: FLINK-24433 > URL: https://issues.apache.org/jira/browse/FLINK-24433 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines >Affects Versions: 1.15.0 >Reporter: Dawid Wysakowicz >Assignee: Martijn Visser >Priority: Blocker > Labels: auto-deprioritized-critical, pull-request-available, > test-stability > Fix For: 1.16.0, 1.14.5, 1.15.1 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=24668=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=070ff179-953e-5bda-71fa-d6599415701c=19772 > {code} > Sep 30 17:08:42 Job has been submitted with JobID > 5594c18e128a328ede39cfa59cb3cb07 > Sep 30 17:08:56 2021-09-30 17:08:56,809 main ERROR Recovering from > StringBuilderEncoder.encode('2021-09-30 17:08:56,807 WARN > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An > exception occurred when fetching query results > Sep 30 17:08:56 java.util.concurrent.ExecutionException: > org.apache.flink.runtime.rest.util.RestClientException: [Internal server > error., Sep 30 17:08:56 org.apache.flink.runtime.messages.FlinkJobNotFoundException: > Could not find Flink job (5594c18e128a328ede39cfa59cb3cb07) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:923) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:937) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordina2021-09-30T17:08:57.1584224Z > ##[error]No space left on device > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (FLINK-24433) "No space left on device" in Azure e2e tests
[ https://issues.apache.org/jira/browse/FLINK-24433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reassigned FLINK-24433: -- Assignee: Martijn Visser > "No space left on device" in Azure e2e tests > > > Key: FLINK-24433 > URL: https://issues.apache.org/jira/browse/FLINK-24433 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines >Affects Versions: 1.15.0 >Reporter: Dawid Wysakowicz >Assignee: Martijn Visser >Priority: Blocker > Labels: auto-deprioritized-critical, pull-request-available, > test-stability > Fix For: 1.16.0, 1.14.5, 1.15.1 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=24668=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=070ff179-953e-5bda-71fa-d6599415701c=19772 > {code} > Sep 30 17:08:42 Job has been submitted with JobID > 5594c18e128a328ede39cfa59cb3cb07 > Sep 30 17:08:56 2021-09-30 17:08:56,809 main ERROR Recovering from > StringBuilderEncoder.encode('2021-09-30 17:08:56,807 WARN > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An > exception occurred when fetching query results > Sep 30 17:08:56 java.util.concurrent.ExecutionException: > org.apache.flink.runtime.rest.util.RestClientException: [Internal server > error., Sep 30 17:08:56 org.apache.flink.runtime.messages.FlinkJobNotFoundException: > Could not find Flink job (5594c18e128a328ede39cfa59cb3cb07) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:923) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:937) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordina2021-09-30T17:08:57.1584224Z > ##[error]No space left on device > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-24433) "No space left on device" in Azure e2e tests
[ https://issues.apache.org/jira/browse/FLINK-24433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-24433. -- Fix Version/s: 1.16.0 1.14.5 1.15.1 Resolution: Fixed Fixed in master: 4c138a440f8de315470a663fd751b7293ff3ceb8 release-1.15: c57be81ce5d121523db26c86a48cff9222f688d2 release-1.14: 713f0e03a500852564c06241a2e64d141b31e4fe > "No space left on device" in Azure e2e tests > > > Key: FLINK-24433 > URL: https://issues.apache.org/jira/browse/FLINK-24433 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines >Affects Versions: 1.15.0 >Reporter: Dawid Wysakowicz >Priority: Blocker > Labels: auto-deprioritized-critical, pull-request-available, > test-stability > Fix For: 1.16.0, 1.14.5, 1.15.1 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=24668=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=070ff179-953e-5bda-71fa-d6599415701c=19772 > {code} > Sep 30 17:08:42 Job has been submitted with JobID > 5594c18e128a328ede39cfa59cb3cb07 > Sep 30 17:08:56 2021-09-30 17:08:56,809 main ERROR Recovering from > StringBuilderEncoder.encode('2021-09-30 17:08:56,807 WARN > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An > exception occurred when fetching query results > Sep 30 17:08:56 java.util.concurrent.ExecutionException: > org.apache.flink.runtime.rest.util.RestClientException: [Internal server > error., Sep 30 17:08:56 org.apache.flink.runtime.messages.FlinkJobNotFoundException: > Could not find Flink job (5594c18e128a328ede39cfa59cb3cb07) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:923) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:937) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordina2021-09-30T17:08:57.1584224Z > ##[error]No space left on device > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)