[jira] [Commented] (FLINK-9231) Enable SO_REUSEADDR on listen sockets
[ https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527633#comment-16527633 ] Triones Deng commented on FLINK-9231: - [~azagrebin] WebFrontendBootstrap is WebFrontendBootstrap is to be deprecated after FLIP-6, RestServerEndpoint is going to be used instead, so maybe we can add this feature to RestServerEndpoint, need create a jira and close this or, update this or ... > Enable SO_REUSEADDR on listen sockets > - > > Key: FLINK-9231 > URL: https://issues.apache.org/jira/browse/FLINK-9231 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Ted Yu >Assignee: Triones Deng >Priority: Major > Labels: pull-request-available > > This allows sockets to be bound even if there are sockets from a previous > application that are still pending closure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9411) Support parquet rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519405#comment-16519405 ] Triones Deng edited comment on FLINK-9411 at 6/21/18 2:29 PM: -- [~StephanEwen] sure a design is necessary, here there may be two ways to do it i think in production. Orc and Parquet file is popupar because of columnar storage. there are two method to support parquet writer. 1. just write a ParquetStreamWriter which is a subclass of StreamWriterBase. which looks the same as FLINK-9407 2. with a HdfsWriterWrapper which own one delegate writer, when end user want to use one format, just simply to specify the format like orc, parquet and let the wrapper create a suitable writer like OrcStreamWriter or ParquetStreamWriter and so on. sample code for the HdfsWriterWrapper {code:java} public class HdfsWriterWrapper implements Writer { private Writer delegate; private String format; private Configuration configuration; private TableSchema tableSchema; public HdfsWriterWrapper(Configuration configuration, String format,Class tableClass,String[] columnFields){ } {code} which one is better? was (Author: triones): [~StephanEwen] sure a design is necessary, here there may be two ways to do it i think in production. Orc and Parquet file is popupar because of columnar storage. there are two method to support parquet writer. 1. just write a ParquetStreamWriter which is a subclass of StreamWriterBase. which looks the same as FLINK-9407 2. with a HdfsWriterWrapper which own one delegate writer, when end user want to use one format, just simply to specify the format like orc, parquet and let the wrapper create a suitable writer like OrcStreamWriter or ParquetStreamWriter and so on. sample code for the HdfsWriterWrapper {code:java} public class HdfsWriterWrapper implements Writer { private Writer delegate; private String format; private Configuration configuration; private TableSchema tableSchema; public HdfsWriterWrapper(Configuration configuration, String format,Class tableClass,String[] columnFields){ } {code} what do you think will be better? > Support parquet rolling sink writer > --- > > Key: FLINK-9411 > URL: https://issues.apache.org/jira/browse/FLINK-9411 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: mingleizhang >Assignee: Triones Deng >Priority: Major > > Like support orc rolling sink writer in FLINK-9407 , we should also support > parquet rolling sink writer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9411) Support parquet rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519405#comment-16519405 ] Triones Deng commented on FLINK-9411: - [~StephanEwen] sure a design is necessary, here there may be two ways to do it i think in production. Orc and Parquet file is popupar because of columnar storage. there are two method to support parquet writer. 1. just write a ParquetStreamWriter which is a subclass of StreamWriterBase. which looks the same as FLINK-9407 2. with a HdfsWriterWrapper which own one delegate writer, when end user want to use one format, just simply to specify the format like orc, parquet and let the wrapper create a suitable writer like OrcStreamWriter or ParquetStreamWriter and so on. sample code for the HdfsWriterWrapper {code:java} public class HdfsWriterWrapper implements Writer { private Writer delegate; private String format; private Configuration configuration; private TableSchema tableSchema; public HdfsWriterWrapper(Configuration configuration, String format,Class tableClass,String[] columnFields){ } {code} what do you think will be better? > Support parquet rolling sink writer > --- > > Key: FLINK-9411 > URL: https://issues.apache.org/jira/browse/FLINK-9411 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: mingleizhang >Assignee: Triones Deng >Priority: Major > > Like support orc rolling sink writer in FLINK-9407 , we should also support > parquet rolling sink writer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9390) Shutdown of KafkaProducer causes confusing log message
[ https://issues.apache.org/jira/browse/FLINK-9390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481438#comment-16481438 ] Triones Deng commented on FLINK-9390: - [~StephanEwen] notice that now flink make use of TaskInterrupter to cancel running task,it is hard tell the InterruptedException due to cancel or real exception. so when the user try to cancel the application, will call {code:java} Task.cancelOrFailAndCancelInvokable() {code} here we can give the user a hint like a log that the InterruptedException due to cancel, so the user can ignore the below InterruptedException log. > Shutdown of KafkaProducer causes confusing log message > -- > > Key: FLINK-9390 > URL: https://issues.apache.org/jira/browse/FLINK-9390 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.6.0 >Reporter: Stefan Richter >Priority: Minor > > I found some logged exceptions in a user log that occurred during shutdown in > the context of the Kafka Producer. Those exceptions are most certainly not a > real problem, but can be confusing to users, so maybe we can get rid of them. > {code} > 2018-05-16 08:52:16,526 DEBUG > org.apache.flink.streaming.api.operators.async.Emitter - Emitter thread got > interrupted, shutting down. > 2018-05-16 08:52:16,527 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal > of stream operator. > org.apache.kafka.common.KafkaException: Failed to close kafka producer > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:479) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:384) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.InterruptedException > at java.lang.Object.wait(Native Method) > at java.lang.Thread.join(Thread.java:1260) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703) > ... 9 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9390) Shutdown of KafkaProducer causes confusing log message
[ https://issues.apache.org/jira/browse/FLINK-9390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478931#comment-16478931 ] Triones Deng edited comment on FLINK-9390 at 5/17/18 11:41 AM: --- [~srichter] Could you please kindly let me konw how to reproduce this issue? also please let me know the env that cause the issue, like kafka version... was (Author: triones): [~srichter] Could you please kindly let me konw how to reproduce this issue? > Shutdown of KafkaProducer causes confusing log message > -- > > Key: FLINK-9390 > URL: https://issues.apache.org/jira/browse/FLINK-9390 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.6.0 >Reporter: Stefan Richter >Priority: Minor > > I found some logged exceptions in a user log that occurred during shutdown in > the context of the Kafka Producer. Those exceptions are most certainly not a > real problem, but can be confusing to users, so maybe we can get rid of them. > {code} > 2018-05-16 08:52:16,526 DEBUG > org.apache.flink.streaming.api.operators.async.Emitter - Emitter thread got > interrupted, shutting down. > 2018-05-16 08:52:16,527 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal > of stream operator. > org.apache.kafka.common.KafkaException: Failed to close kafka producer > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:479) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:384) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.InterruptedException > at java.lang.Object.wait(Native Method) > at java.lang.Thread.join(Thread.java:1260) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703) > ... 9 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9390) Shutdown of KafkaProducer causes confusing log message
[ https://issues.apache.org/jira/browse/FLINK-9390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478931#comment-16478931 ] Triones Deng commented on FLINK-9390: - [~srichter] Could you please kindly let me konw how to reproduce this issue? > Shutdown of KafkaProducer causes confusing log message > -- > > Key: FLINK-9390 > URL: https://issues.apache.org/jira/browse/FLINK-9390 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.6.0 >Reporter: Stefan Richter >Priority: Minor > > I found some logged exceptions in a user log that occurred during shutdown in > the context of the Kafka Producer. Those exceptions are most certainly not a > real problem, but can be confusing to users, so maybe we can get rid of them. > {code} > 2018-05-16 08:52:16,526 DEBUG > org.apache.flink.streaming.api.operators.async.Emitter - Emitter thread got > interrupted, shutting down. > 2018-05-16 08:52:16,527 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal > of stream operator. > org.apache.kafka.common.KafkaException: Failed to close kafka producer > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:479) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:384) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.InterruptedException > at java.lang.Object.wait(Native Method) > at java.lang.Thread.join(Thread.java:1260) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703) > ... 9 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9340) ScheduleOrUpdateConsumersTest may fail with Address already in use
[ https://issues.apache.org/jira/browse/FLINK-9340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474387#comment-16474387 ] Triones Deng commented on FLINK-9340: - [~yuzhih...@gmail.com] i am interested in this. > ScheduleOrUpdateConsumersTest may fail with Address already in use > -- > > Key: FLINK-9340 > URL: https://issues.apache.org/jira/browse/FLINK-9340 > Project: Flink > Issue Type: Test >Reporter: Ted Yu >Priority: Minor > > When ScheduleOrUpdateConsumersTest is run in the test suite, I saw: > {code} > Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 8.034 sec <<< > FAILURE! - in > org.apache.flink.runtime.jobmanager.scheduler.ScheduleOrUpdateConsumersTest > org.apache.flink.runtime.jobmanager.scheduler.ScheduleOrUpdateConsumersTest > Time elapsed: 8.034 sec <<< ERROR! > java.net.BindException: Address already in use > at sun.nio.ch.Net.bind0(Native Method) > at sun.nio.ch.Net.bind(Net.java:433) > at sun.nio.ch.Net.bind(Net.java:425) > at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223) > at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:125) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:485) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1081) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:502) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:487) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:904) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.bind(AbstractChannel.java:198) > at > org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:348) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > {code} > Seems there was address / port conflict. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9284) Update CLI page
[ https://issues.apache.org/jira/browse/FLINK-9284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465962#comment-16465962 ] Triones Deng commented on FLINK-9284: - [~Zentol] see the example link. there are two place where -m option used. and only one place use port. {code:java} ./bin/flink run -m myJMHost:6123 \ ./examples/batch/WordCount.jar \ --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out {code} now the flink code make use of 8081 for http port and rest port. I think here we can change 6123 to 8081 to submit the job. anything wrong please feel free to correct me > Update CLI page > --- > > Key: FLINK-9284 > URL: https://issues.apache.org/jira/browse/FLINK-9284 > Project: Flink > Issue Type: Improvement > Components: Client, Documentation >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Triones Deng >Priority: Critical > Fix For: 1.5.0 > > > The [CLI|https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html] > page must be updated for 1.5. > The > [examples|https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#examples] > using the {{-m}} option must be updated to use {{8081}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9303) Unassign partitions from Kafka client if partitions become unavailable
[ https://issues.apache.org/jira/browse/FLINK-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465923#comment-16465923 ] Triones Deng commented on FLINK-9303: - [~tzulitai] i am interested in this issue. in you jira. there are some conditions will cause the partitions become unavailable. 1. the partition leader change will cause the partition unavailable shortly. 2. the topic is deleted as the ML memtioned 3. the partition become offline. like all replication down. here what's your idea? > Unassign partitions from Kafka client if partitions become unavailable > -- > > Key: FLINK-9303 > URL: https://issues.apache.org/jira/browse/FLINK-9303 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.6.0 > > > Originally reported in ML: > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamically-deleting-kafka-topics-does-not-remove-partitions-from-kafkaConsumer-td19946.html] > The problem is that the Kafka consumer has no notion of "closed" partitions > at the moment, so statically assigned partitions to the Kafka client is never > removed and is always continuously requested for records. > This causes LOG noises as reported in the reported mail thread. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9231) Enable SO_REUSEADDR on listen sockets
[ https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448079#comment-16448079 ] Triones Deng edited comment on FLINK-9231 at 4/24/18 2:05 PM: -- [~yuzhih...@gmail.com] I notice that there are four kind of server socket, # JobManager and TaskManager create socket server by NetUtils.createSocketFromPorts. here just looking for a available port to create ActorSystem. the config in akka-remote.jar the config item "tcp-reuse-addr = off-for-windows". and will close the ServerSocket at once when find a available port. code as below. here i think we can direct call new ServerSocker(port) without backlog will be ok, what do you think? {code:java} val result = AkkaUtils.retryOnBindException({ // Try all ports in the range until successful val socket = NetUtils.createSocketFromPorts( actorSystemPortRange, new NetUtils.SocketFactory { override def createSocket(port: Int): ServerSocket = new ServerSocket( // Use the correct listening address, bound ports will only be // detected later by Akka. port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress)) }) val port = if (socket == null) { throw new BindException(s"Unable to allocate port for TaskManager.") } else { try { socket.getLocalPort() } finally { socket.close() } } .. }, { !actorSystemPortRange.hasNext }, 5000) {code} # BlobServer make use of ServerSocket or SSLContext to create ServerSocket # make use of Netty for io,like NettyServer. # WebFrontendBootstrap make use of netty to create ServerBootstrap. I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful if your server has been shut down, and then restarted right away while sockets are still active on its port. You should be aware that if any unexpected data comes in, it may confuse your server, but while this is possible, it is not likely" (see :[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)] , Here may be we can allow sockets to set SO_REUSEADDR when start WebFrontendBootstrap. what's your idea? anything wrong please feel free to correct me. sample code for WebFrontendBootstrap.java like: {code:java} this.bootstrap = new ServerBootstrap(); this.bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(initializer).childOption(ChannelOption.SO_REUSEADDR,true);{code} was (Author: triones): [~yuzhih...@gmail.com] I notice that there are four kind of server socket, # JobManager and TaskManager create socket server by NetUtils.createSocketFromPorts. here just looking for a available port to create ActorSystem. will close the ServerSocket at once when find a available port. code as below. here i think we can direct call new ServerSocker(port) without backlog will be ok, what do you think? {code:java} val result = AkkaUtils.retryOnBindException({ // Try all ports in the range until successful val socket = NetUtils.createSocketFromPorts( actorSystemPortRange, new NetUtils.SocketFactory { override def createSocket(port: Int): ServerSocket = new ServerSocket( // Use the correct listening address, bound ports will only be // detected later by Akka. port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress)) }) val port = if (socket == null) { throw new BindException(s"Unable to allocate port for TaskManager.") } else { try { socket.getLocalPort() } finally { socket.close() } } .. }, { !actorSystemPortRange.hasNext }, 5000) {code} # BlobServer make use of ServerSocket or SSLContext to create ServerSocket # make use of Netty for io,like NettyServer. # WebFrontendBootstrap make use of netty to create ServerBootstrap. I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful if your server has been shut down, and then restarted right away while sockets are still active on its port. You should be aware that if any unexpected data comes in, it may confuse your server, but while this is possible, it is not likely" (see :[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)] , Here may be we can allow sockets to set SO_REUSEADDR when start WebFrontendBootstrap. what's your idea? anything wrong please feel free to correct me. sample code for WebFrontendBootstrap.java like: {code:java} this.bootstrap = new ServerBootstrap(); this.bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(initializer).childOption(ChannelOption.SO_REUSEADDR,true);{code} > Enable SO_REUSEADDR on listen sockets > - > > Key: FLINK-9231 > URL: https://issues.apache.org/jira/browse/FLINK-9231 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: Triones Deng >Priority: Major > > This allows sockets to be bound even if there are
[jira] [Comment Edited] (FLINK-9231) Enable SO_REUSEADDR on listen sockets
[ https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448079#comment-16448079 ] Triones Deng edited comment on FLINK-9231 at 4/24/18 1:56 PM: -- [~yuzhih...@gmail.com] I notice that there are four kind of server socket, # JobManager and TaskManager create socket server by NetUtils.createSocketFromPorts. here just looking for a available port to create ActorSystem. will close the ServerSocket at once when find a available port. code as below. here i think we can direct call new ServerSocker(port) without backlog will be ok, what do you think? {code:java} val result = AkkaUtils.retryOnBindException({ // Try all ports in the range until successful val socket = NetUtils.createSocketFromPorts( actorSystemPortRange, new NetUtils.SocketFactory { override def createSocket(port: Int): ServerSocket = new ServerSocket( // Use the correct listening address, bound ports will only be // detected later by Akka. port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress)) }) val port = if (socket == null) { throw new BindException(s"Unable to allocate port for TaskManager.") } else { try { socket.getLocalPort() } finally { socket.close() } } .. }, { !actorSystemPortRange.hasNext }, 5000) {code} # BlobServer make use of ServerSocket or SSLContext to create ServerSocket # make use of Netty for io,like NettyServer. # WebFrontendBootstrap make use of netty to create ServerBootstrap. I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful if your server has been shut down, and then restarted right away while sockets are still active on its port. You should be aware that if any unexpected data comes in, it may confuse your server, but while this is possible, it is not likely" (see :[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)] , Here may be we can allow sockets to set SO_REUSEADDR when start WebFrontendBootstrap. what's your idea? anything wrong please feel free to correct me. sample code for WebFrontendBootstrap.java like: {code:java} this.bootstrap = new ServerBootstrap(); this.bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(initializer).childOption(ChannelOption.SO_REUSEADDR,true);{code} was (Author: triones): [~yuzhih...@gmail.com] I notice that there are three kind of server socket, # JobManager and TaskManager create socket server by NetUtils.createSocketFromPorts. here just looking for a available port to create ActorSystem. will close the ServerSocket at once when find a available port. code as below. here i think we can direct call new ServerSocker(port) without backlog will be ok, what do you think? {code:java} val result = AkkaUtils.retryOnBindException({ // Try all ports in the range until successful val socket = NetUtils.createSocketFromPorts( actorSystemPortRange, new NetUtils.SocketFactory { override def createSocket(port: Int): ServerSocket = new ServerSocket( // Use the correct listening address, bound ports will only be // detected later by Akka. port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress)) }) val port = if (socket == null) { throw new BindException(s"Unable to allocate port for TaskManager.") } else { try { socket.getLocalPort() } finally { socket.close() } } .. }, { !actorSystemPortRange.hasNext }, 5000) {code} # make use of Netty for io,like NettyServer. # BlobServer make use of ServerSocket or SSLContext to create ServerSocket # WebFrontendBootstrap make use of netty to create ServerBootstrap. I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful if your server has been shut down, and then restarted right away while sockets are still active on its port. You should be aware that if any unexpected data comes in, it may confuse your server, but while this is possible, it is not likely" (see :[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)] , Here may be we can allow sockets to set SO_REUSEADDR when start WebFrontendBootstrap. what's your idea? anything wrong please feel free to correct me. sample code for WebFrontendBootstrap.java like: {code:java} this.bootstrap = new ServerBootstrap(); this.bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(initializer).childOption(ChannelOption.SO_REUSEADDR,true);{code} > Enable SO_REUSEADDR on listen sockets > - > > Key: FLINK-9231 > URL: https://issues.apache.org/jira/browse/FLINK-9231 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: Triones Deng >Priority: Major > > This allows sockets to be bound even if there are sockets > from a previous application that are still pending closure. -- This
[jira] [Comment Edited] (FLINK-9231) Enable SO_REUSEADDR on listen sockets
[ https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448079#comment-16448079 ] Triones Deng edited comment on FLINK-9231 at 4/24/18 1:54 PM: -- [~yuzhih...@gmail.com] I notice that there are three kind of server socket, # JobManager and TaskManager create socket server by NetUtils.createSocketFromPorts. here just looking for a available port to create ActorSystem. will close the ServerSocket at once when find a available port. code as below. here i think we can direct call new ServerSocker(port) without backlog will be ok, what do you think? {code:java} val result = AkkaUtils.retryOnBindException({ // Try all ports in the range until successful val socket = NetUtils.createSocketFromPorts( actorSystemPortRange, new NetUtils.SocketFactory { override def createSocket(port: Int): ServerSocket = new ServerSocket( // Use the correct listening address, bound ports will only be // detected later by Akka. port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress)) }) val port = if (socket == null) { throw new BindException(s"Unable to allocate port for TaskManager.") } else { try { socket.getLocalPort() } finally { socket.close() } } .. }, { !actorSystemPortRange.hasNext }, 5000) {code} # make use of Netty for io,like NettyServer. # BlobServer make use of ServerSocket or SSLContext to create ServerSocket # WebFrontendBootstrap make use of netty to create ServerBootstrap. I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful if your server has been shut down, and then restarted right away while sockets are still active on its port. You should be aware that if any unexpected data comes in, it may confuse your server, but while this is possible, it is not likely" (see :[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)] , Here may be we can allow sockets to set SO_REUSEADDR when start WebFrontendBootstrap. what's your idea? anything wrong please feel free to correct me. sample code for WebFrontendBootstrap.java like: {code:java} this.bootstrap = new ServerBootstrap(); this.bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(initializer).childOption(ChannelOption.SO_REUSEADDR,true);{code} was (Author: triones): [~yuzhih...@gmail.com] I notice that there are two kind of server socket, # make use of SockerServer, like JobManager,TaskManager, TaskManagerRunner and BlobServer, all of them create socket server by NetUtils.createSocketFromPorts # make use of Netty for io,like NettyServer. I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful if your server has been shut down, and then restarted right away while sockets are still active on its port. You should be aware that if any unexpected data comes in, it may confuse your server, but while this is possible, it is not likely" (see :[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)] , Here may be we can allow sockets to set SO_REUSEADDR when start JobManager,TaskManager, TaskManagerRunner. anything wrong please correct me. sample code for JobManager.scala like: {code:java} val socket = NetUtils.createSocketFromPorts( listeningPortRange, new NetUtils.SocketFactory { override def createSocket(port: Int): ServerSocket = { // Use the correct listening address, bound ports will only be // detected later by Akka. val serverSocket = new ServerSocket() serverSocket.setReuseAddress(true) serverSocket.bind(new InetSocketAddress(InetAddress.getByName(NetUtils.getWildcardIPAddress), port), 0) serverSocket } }) {code} > Enable SO_REUSEADDR on listen sockets > - > > Key: FLINK-9231 > URL: https://issues.apache.org/jira/browse/FLINK-9231 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: Triones Deng >Priority: Major > > This allows sockets to be bound even if there are sockets > from a previous application that are still pending closure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9231) Enable SO_REUSEADDR on listen sockets
[ https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448079#comment-16448079 ] Triones Deng commented on FLINK-9231: - [~yuzhih...@gmail.com] I notice that there are two kind of server socket, # make use of SockerServer, like JobManager,TaskManager, TaskManagerRunner and BlobServer, all of them create socket server by NetUtils.createSocketFromPorts # make use of Netty for io,like NettyServer. I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful if your server has been shut down, and then restarted right away while sockets are still active on its port. You should be aware that if any unexpected data comes in, it may confuse your server, but while this is possible, it is not likely" (see :[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)] , Here may be we can allow sockets to set SO_REUSEADDR when start JobManager,TaskManager, TaskManagerRunner. anything wrong please correct me. sample code for JobManager.scala like: {code:java} val socket = NetUtils.createSocketFromPorts( listeningPortRange, new NetUtils.SocketFactory { override def createSocket(port: Int): ServerSocket = { // Use the correct listening address, bound ports will only be // detected later by Akka. val serverSocket = new ServerSocket() serverSocket.setReuseAddress(true) serverSocket.bind(new InetSocketAddress(InetAddress.getByName(NetUtils.getWildcardIPAddress), port), 0) serverSocket } }) {code} > Enable SO_REUSEADDR on listen sockets > - > > Key: FLINK-9231 > URL: https://issues.apache.org/jira/browse/FLINK-9231 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: Triones Deng >Priority: Major > > This allows sockets to be bound even if there are sockets > from a previous application that are still pending closure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9231) Enable SO_REUSEADDR on listen sockets
[ https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447086#comment-16447086 ] Triones Deng commented on FLINK-9231: - [~yuzhih...@gmail.com] i am interested in this issue. > Enable SO_REUSEADDR on listen sockets > - > > Key: FLINK-9231 > URL: https://issues.apache.org/jira/browse/FLINK-9231 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Priority: Major > > This allows sockets to be bound even if there are sockets > from a previous application that are still pending closure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16432385#comment-16432385 ] Triones Deng commented on FLINK-9087: - This PR is ready for having another review. [~NicoK], [~yuzhih...@gmail.com], Thank you. [~yuzhih...@gmail.com], do we need change the description of this jira? > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Ted Yu >Priority: Minor > > {code} > for (StreamRecordWriter> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429743#comment-16429743 ] Triones Deng commented on FLINK-9087: - thanks for your suggestions, i will follow this. > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Ted Yu >Priority: Minor > > {code} > for (StreamRecordWriter> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16424169#comment-16424169 ] Triones Deng commented on FLINK-9087: - [~yuzhih...@gmail.com] when i run the test. i found that in {code:java} public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException { try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event)) { ... // retain the buffer so that it can be recycled by each channel of targetPartition targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel); } ... return eventBufferConsumer; } } {code} which call targetPartition.addBufferConsumer() , here make use of the copy of the eventBufferConsumer, so, all the BufferConsumer produced by copy share the same buffer.and this will call AbstractReferenceCountedByteBuf.retain() , here AbstractReferenceCountedByteBuf.java is netty class all the targetPartition like AbstractCollectingResultPartitionWriter and ResultPartition will call close method of BufferConsumer, at last the buffer in eventBufferConsumer will be released. ResultPartition will call notifyDataAvailable which is async to consume the data. so here we'd better to let the return value alone, what do you think. or just change the method signature to void ? notice that in FLINK-7315, plan to use flink's buffers in netty, one sub task FLINK-7518 which have a solution. i am a new here, any suggestions? > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > for (StreamRecordWriter> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423367#comment-16423367 ] Triones Deng edited comment on FLINK-9087 at 4/3/18 1:59 AM: - [~yuzhih...@gmail.com] I notice that RecordWriter.broadcastEvent() is called in StreamTask., RecordWriterOutput ,IterationIntermediateTask and IterationHeadTaskbroadcastEvent, also notice that no one make use of the BufferConsumer returned by broadcastEvent(), so i think the better way to close the return value in RecordWriter. change the method signature from BufferConsumer to void, and close the BufferConsumer in the end. does this make sense? what's you idea? was (Author: triones): [~yuzhih...@gmail.com] notice that RecordWriter.broadcastEvent() is called in StreamTask., RecordWriterOutput ,IterationIntermediateTask and IterationHeadTaskbroadcastEvent, also notice that no one make use of the BufferConsumer returned by broadcastEvent(), so i think the better way to close the return value in RecordWriter. change the method signature from BufferConsumer to void, and close the BufferConsumer in the end. does this make sense? what's you idea? > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > for (StreamRecordWriter> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423367#comment-16423367 ] Triones Deng commented on FLINK-9087: - [~yuzhih...@gmail.com] notice that RecordWriter.broadcastEvent() is called in StreamTask., RecordWriterOutput ,IterationIntermediateTask and IterationHeadTaskbroadcastEvent, also notice that no one make use of the BufferConsumer returned by broadcastEvent(), so i think the better way to close the return value in RecordWriter. change the method signature from BufferConsumer to void, and close the BufferConsumer in the end. does this make sense? what's you idea? > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > for (StreamRecordWriter> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9056) Job submission fails with AskTimeoutException if not enough slots are available
[ https://issues.apache.org/jira/browse/FLINK-9056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421687#comment-16421687 ] Triones Deng commented on FLINK-9056: - [~fhueske] when Job parallelism exceeds available slots, Flink would tell the customer the reason , like log "Job parallelism exceeds available slots,please check your available slots or decrease job parallelism", right? > Job submission fails with AskTimeoutException if not enough slots are > available > --- > > Key: FLINK-9056 > URL: https://issues.apache.org/jira/browse/FLINK-9056 > Project: Flink > Issue Type: Improvement > Components: Job-Submission >Affects Versions: 1.5.0 > Environment: * FLIP-6 enabled > * Local Flink instance with fixed number of TMs > * Job parallelism exceeds available slots >Reporter: Fabian Hueske >Assignee: yuqi >Priority: Major > > The error message if a job submission fails due to lack of available slots is > not helpful: > {code:java} > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/8f0fabba-4021-45b6-a1f7-b8afd6627640#-574617182|#-574617182]] > after [30 ms]. Sender[null] sent message of type > "org.apache.flink.runtime.rpc.messages.LocalRpcInvocation". > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421679#comment-16421679 ] Triones Deng commented on FLINK-9087: - Does anyone can give me a permission to contribute to this issue ? Thank you very much. > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > for (StreamRecordWriter> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421677#comment-16421677 ] Triones Deng commented on FLINK-9087: - [~mingleizhang] are you still working on this? if no, i would like to get the ticket, thank you > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > for (StreamRecordWriter> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)