[jira] [Commented] (FLINK-9231) Enable SO_REUSEADDR on listen sockets

2018-06-29 Thread Triones Deng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527633#comment-16527633
 ] 

Triones Deng commented on FLINK-9231:
-

[~azagrebin] WebFrontendBootstrap is WebFrontendBootstrap is to be deprecated 
after FLIP-6, RestServerEndpoint is going to be used instead, so maybe we can 
add this feature to RestServerEndpoint, need create a jira and close this or, 
update this or ...

> Enable SO_REUSEADDR on listen sockets
> -
>
> Key: FLINK-9231
> URL: https://issues.apache.org/jira/browse/FLINK-9231
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Ted Yu
>Assignee: Triones Deng
>Priority: Major
>  Labels: pull-request-available
>
> This allows sockets to be bound even if there are sockets from a previous 
> application that are still pending closure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9411) Support parquet rolling sink writer

2018-06-21 Thread Triones Deng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519405#comment-16519405
 ] 

Triones Deng edited comment on FLINK-9411 at 6/21/18 2:29 PM:
--

[~StephanEwen] sure a design is necessary, here there may be two ways to do it 
i think
in production. Orc and Parquet file is popupar because of columnar storage.
there are two method to support parquet writer.
1. just write a ParquetStreamWriter which is a subclass of StreamWriterBase. 
which looks the same as FLINK-9407
2. with a HdfsWriterWrapper which own one delegate writer, when end user want 
to use one format, just simply to specify the format like orc, parquet and let 
the wrapper create a suitable writer like OrcStreamWriter or 
ParquetStreamWriter and so on.
sample code for the HdfsWriterWrapper

{code:java}
public class HdfsWriterWrapper implements Writer {

private Writer delegate;
private String format;
private Configuration configuration;
private TableSchema tableSchema;

public HdfsWriterWrapper(Configuration configuration, String 
format,Class tableClass,String[] columnFields){

}
{code}
which one is better?


was (Author: triones):
[~StephanEwen] sure a design is necessary, here there may be two ways to do it 
i think
in production. Orc and Parquet file is popupar because of columnar storage.
there are two method to support parquet writer.
1. just write a ParquetStreamWriter which is a subclass of StreamWriterBase. 
which looks the same as FLINK-9407
2. with a HdfsWriterWrapper which own one delegate writer, when end user want 
to use one format, just simply to specify the format like orc, parquet and let 
the wrapper create a suitable writer like OrcStreamWriter or 
ParquetStreamWriter and so on.
sample code for the HdfsWriterWrapper

{code:java}
public class HdfsWriterWrapper implements Writer {

private Writer delegate;
private String format;
private Configuration configuration;
private TableSchema tableSchema;

public HdfsWriterWrapper(Configuration configuration, String 
format,Class tableClass,String[] columnFields){

}
{code}
what do you think will be better?

> Support parquet rolling sink writer
> ---
>
> Key: FLINK-9411
> URL: https://issues.apache.org/jira/browse/FLINK-9411
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: mingleizhang
>Assignee: Triones Deng
>Priority: Major
>
> Like support orc rolling sink writer in FLINK-9407 , we should also support 
> parquet rolling sink writer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9411) Support parquet rolling sink writer

2018-06-21 Thread Triones Deng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519405#comment-16519405
 ] 

Triones Deng commented on FLINK-9411:
-

[~StephanEwen] sure a design is necessary, here there may be two ways to do it 
i think
in production. Orc and Parquet file is popupar because of columnar storage.
there are two method to support parquet writer.
1. just write a ParquetStreamWriter which is a subclass of StreamWriterBase. 
which looks the same as FLINK-9407
2. with a HdfsWriterWrapper which own one delegate writer, when end user want 
to use one format, just simply to specify the format like orc, parquet and let 
the wrapper create a suitable writer like OrcStreamWriter or 
ParquetStreamWriter and so on.
sample code for the HdfsWriterWrapper

{code:java}
public class HdfsWriterWrapper implements Writer {

private Writer delegate;
private String format;
private Configuration configuration;
private TableSchema tableSchema;

public HdfsWriterWrapper(Configuration configuration, String 
format,Class tableClass,String[] columnFields){

}
{code}
what do you think will be better?

> Support parquet rolling sink writer
> ---
>
> Key: FLINK-9411
> URL: https://issues.apache.org/jira/browse/FLINK-9411
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: mingleizhang
>Assignee: Triones Deng
>Priority: Major
>
> Like support orc rolling sink writer in FLINK-9407 , we should also support 
> parquet rolling sink writer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9390) Shutdown of KafkaProducer causes confusing log message

2018-05-18 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481438#comment-16481438
 ] 

Triones Deng commented on FLINK-9390:
-

[~StephanEwen] notice that now flink make use of TaskInterrupter to cancel 
running task,it is hard tell the InterruptedException due to cancel or real 
exception.   so when the user try to cancel the application, will call 
{code:java}
Task.cancelOrFailAndCancelInvokable()
{code}
here we can give the user a hint like a log that the InterruptedException due 
to  cancel, so the user can ignore the below InterruptedException log. 


> Shutdown of KafkaProducer causes confusing log message
> --
>
> Key: FLINK-9390
> URL: https://issues.apache.org/jira/browse/FLINK-9390
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Stefan Richter
>Priority: Minor
>
> I found some logged exceptions in a user log that occurred during shutdown in 
> the context of the Kafka Producer. Those exceptions are most certainly not a 
> real problem, but can be confusing to users, so maybe we can get rid of them.
> {code}
> 2018-05-16 08:52:16,526 DEBUG 
> org.apache.flink.streaming.api.operators.async.Emitter - Emitter thread got 
> interrupted, shutting down.
> 2018-05-16 08:52:16,527 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal 
> of stream operator.
> org.apache.kafka.common.KafkaException: Failed to close kafka producer
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:479)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:384)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.InterruptedException
>  at java.lang.Object.wait(Native Method)
>  at java.lang.Thread.join(Thread.java:1260)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
>  ... 9 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9390) Shutdown of KafkaProducer causes confusing log message

2018-05-17 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478931#comment-16478931
 ] 

Triones Deng edited comment on FLINK-9390 at 5/17/18 11:41 AM:
---

[~srichter] Could you please kindly let me konw how to reproduce this issue? 
also please let me know  the env that cause the issue, like kafka version...


was (Author: triones):
[~srichter] Could you please kindly let me konw how to reproduce this issue?

> Shutdown of KafkaProducer causes confusing log message
> --
>
> Key: FLINK-9390
> URL: https://issues.apache.org/jira/browse/FLINK-9390
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Stefan Richter
>Priority: Minor
>
> I found some logged exceptions in a user log that occurred during shutdown in 
> the context of the Kafka Producer. Those exceptions are most certainly not a 
> real problem, but can be confusing to users, so maybe we can get rid of them.
> {code}
> 2018-05-16 08:52:16,526 DEBUG 
> org.apache.flink.streaming.api.operators.async.Emitter - Emitter thread got 
> interrupted, shutting down.
> 2018-05-16 08:52:16,527 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal 
> of stream operator.
> org.apache.kafka.common.KafkaException: Failed to close kafka producer
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:479)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:384)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.InterruptedException
>  at java.lang.Object.wait(Native Method)
>  at java.lang.Thread.join(Thread.java:1260)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
>  ... 9 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9390) Shutdown of KafkaProducer causes confusing log message

2018-05-17 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478931#comment-16478931
 ] 

Triones Deng commented on FLINK-9390:
-

[~srichter] Could you please kindly let me konw how to reproduce this issue?

> Shutdown of KafkaProducer causes confusing log message
> --
>
> Key: FLINK-9390
> URL: https://issues.apache.org/jira/browse/FLINK-9390
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Stefan Richter
>Priority: Minor
>
> I found some logged exceptions in a user log that occurred during shutdown in 
> the context of the Kafka Producer. Those exceptions are most certainly not a 
> real problem, but can be confusing to users, so maybe we can get rid of them.
> {code}
> 2018-05-16 08:52:16,526 DEBUG 
> org.apache.flink.streaming.api.operators.async.Emitter - Emitter thread got 
> interrupted, shutting down.
> 2018-05-16 08:52:16,527 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal 
> of stream operator.
> org.apache.kafka.common.KafkaException: Failed to close kafka producer
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:479)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:384)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.InterruptedException
>  at java.lang.Object.wait(Native Method)
>  at java.lang.Thread.join(Thread.java:1260)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
>  ... 9 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9340) ScheduleOrUpdateConsumersTest may fail with Address already in use

2018-05-14 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474387#comment-16474387
 ] 

Triones Deng commented on FLINK-9340:
-

[~yuzhih...@gmail.com] i am interested in this.

> ScheduleOrUpdateConsumersTest may fail with Address already in use
> --
>
> Key: FLINK-9340
> URL: https://issues.apache.org/jira/browse/FLINK-9340
> Project: Flink
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Minor
>
> When ScheduleOrUpdateConsumersTest is run in the test suite, I saw:
> {code}
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 8.034 sec <<< 
> FAILURE! - in 
> org.apache.flink.runtime.jobmanager.scheduler.ScheduleOrUpdateConsumersTest
> org.apache.flink.runtime.jobmanager.scheduler.ScheduleOrUpdateConsumersTest  
> Time elapsed: 8.034 sec  <<< ERROR!
> java.net.BindException: Address already in use
>   at sun.nio.ch.Net.bind0(Native Method)
>   at sun.nio.ch.Net.bind(Net.java:433)
>   at sun.nio.ch.Net.bind(Net.java:425)
>   at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
>   at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:125)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:485)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1081)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:502)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:487)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:904)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.bind(AbstractChannel.java:198)
>   at 
> org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:348)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
> {code}
> Seems there was address / port conflict.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9284) Update CLI page

2018-05-07 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465962#comment-16465962
 ] 

Triones Deng commented on FLINK-9284:
-

[~Zentol]  see the example link. there are two place where  -m option used. and 
only one place use port. 
{code:java}
./bin/flink run -m myJMHost:6123 \
   ./examples/batch/WordCount.jar \
   --input file:///home/user/hamlet.txt --output 
file:///home/user/wordcount_out
{code}
now the flink code make use of 8081  for http port and rest port.  I think here 
we can change 6123 to 8081 to submit the job. anything wrong please feel free 
to correct me

 

> Update CLI page
> ---
>
> Key: FLINK-9284
> URL: https://issues.apache.org/jira/browse/FLINK-9284
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, Documentation
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Triones Deng
>Priority: Critical
> Fix For: 1.5.0
>
>
> The [CLI|https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html] 
> page must be updated for 1.5.
> The 
> [examples|https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#examples]
>  using the {{-m}} option must be updated to use {{8081}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9303) Unassign partitions from Kafka client if partitions become unavailable

2018-05-07 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465923#comment-16465923
 ] 

Triones Deng commented on FLINK-9303:
-

[~tzulitai] i am interested in this issue. in you jira. there are some 
conditions will cause the partitions  become unavailable.
1. the partition leader change will cause the partition unavailable shortly.
2. the topic is deleted as the ML memtioned
3. the partition become offline. like all replication down. 

here what's your idea?

> Unassign partitions from Kafka client if partitions become unavailable
> --
>
> Key: FLINK-9303
> URL: https://issues.apache.org/jira/browse/FLINK-9303
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.6.0
>
>
> Originally reported in ML:
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamically-deleting-kafka-topics-does-not-remove-partitions-from-kafkaConsumer-td19946.html]
> The problem is that the Kafka consumer has no notion of "closed" partitions 
> at the moment, so statically assigned partitions to the Kafka client is never 
> removed and is always continuously requested for records.
> This causes LOG noises as reported in the reported mail thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9231) Enable SO_REUSEADDR on listen sockets

2018-04-24 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448079#comment-16448079
 ] 

Triones Deng edited comment on FLINK-9231 at 4/24/18 2:05 PM:
--

[~yuzhih...@gmail.com] I notice that there are four kind of server socket,
 # JobManager and TaskManager create socket server by 
NetUtils.createSocketFromPorts. here just looking for a available port to 
create ActorSystem. the config in akka-remote.jar the config item 
"tcp-reuse-addr = off-for-windows". and will close the ServerSocket at once 
when find a available port. code as below. here  i think we can direct call new 
ServerSocker(port) without backlog will be ok, what do you think?
{code:java}
val result = AkkaUtils.retryOnBindException({
// Try all ports in the range until successful
val socket = NetUtils.createSocketFromPorts(
actorSystemPortRange,
new NetUtils.SocketFactory {
override def createSocket(port: Int): ServerSocket = new ServerSocket(
// Use the correct listening address, bound ports will only be
// detected later by Akka.
port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress))
})

val port =
if (socket == null) {
throw new BindException(s"Unable to allocate port for TaskManager.")
} else {
try {
socket.getLocalPort()
} finally {
socket.close()
}
}
..
}, { !actorSystemPortRange.hasNext }, 5000)
{code}

 # BlobServer make use of ServerSocket or SSLContext to create ServerSocket
 # make use of Netty for io,like NettyServer.
 # WebFrontendBootstrap make use of netty to create ServerBootstrap.

I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful 
if your server has been shut down, and then restarted right away while sockets 
are still active on its port. You should be aware that if any unexpected data 
comes in, it may confuse your server, but while this is possible, it is not 
likely" (see 
:[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)]
 , Here may be we can allow  sockets to set SO_REUSEADDR when start  
WebFrontendBootstrap. what's your idea? anything wrong please feel free to 
correct me.

sample code for WebFrontendBootstrap.java like:
{code:java}
this.bootstrap = new ServerBootstrap();
this.bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer).childOption(ChannelOption.SO_REUSEADDR,true);{code}


was (Author: triones):
[~yuzhih...@gmail.com] I notice that there are four kind of server socket,
 # JobManager and TaskManager create socket server by 
NetUtils.createSocketFromPorts. here just looking for a available port to 
create ActorSystem. will close the ServerSocket at once when find a available 
port. code as below. here  i think we can direct call new ServerSocker(port) 
without backlog will be ok, what do you think?
{code:java}
val result = AkkaUtils.retryOnBindException({
// Try all ports in the range until successful
val socket = NetUtils.createSocketFromPorts(
actorSystemPortRange,
new NetUtils.SocketFactory {
override def createSocket(port: Int): ServerSocket = new ServerSocket(
// Use the correct listening address, bound ports will only be
// detected later by Akka.
port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress))
})

val port =
if (socket == null) {
throw new BindException(s"Unable to allocate port for TaskManager.")
} else {
try {
socket.getLocalPort()
} finally {
socket.close()
}
}
..
}, { !actorSystemPortRange.hasNext }, 5000)
{code}

 # BlobServer make use of ServerSocket or SSLContext to create ServerSocket
 # make use of Netty for io,like NettyServer.
 # WebFrontendBootstrap make use of netty to create ServerBootstrap.

I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful 
if your server has been shut down, and then restarted right away while sockets 
are still active on its port. You should be aware that if any unexpected data 
comes in, it may confuse your server, but while this is possible, it is not 
likely" (see 
:[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)]
 , Here may be we can allow  sockets to set SO_REUSEADDR when start  
WebFrontendBootstrap. what's your idea? anything wrong please feel free to 
correct me.

sample code for WebFrontendBootstrap.java like:
{code:java}
this.bootstrap = new ServerBootstrap();
this.bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer).childOption(ChannelOption.SO_REUSEADDR,true);{code}

> Enable SO_REUSEADDR on listen sockets
> -
>
> Key: FLINK-9231
> URL: https://issues.apache.org/jira/browse/FLINK-9231
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Triones Deng
>Priority: Major
>
> This allows sockets to be bound even if there are 

[jira] [Comment Edited] (FLINK-9231) Enable SO_REUSEADDR on listen sockets

2018-04-24 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448079#comment-16448079
 ] 

Triones Deng edited comment on FLINK-9231 at 4/24/18 1:56 PM:
--

[~yuzhih...@gmail.com] I notice that there are four kind of server socket,
 # JobManager and TaskManager create socket server by 
NetUtils.createSocketFromPorts. here just looking for a available port to 
create ActorSystem. will close the ServerSocket at once when find a available 
port. code as below. here  i think we can direct call new ServerSocker(port) 
without backlog will be ok, what do you think?
{code:java}
val result = AkkaUtils.retryOnBindException({
// Try all ports in the range until successful
val socket = NetUtils.createSocketFromPorts(
actorSystemPortRange,
new NetUtils.SocketFactory {
override def createSocket(port: Int): ServerSocket = new ServerSocket(
// Use the correct listening address, bound ports will only be
// detected later by Akka.
port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress))
})

val port =
if (socket == null) {
throw new BindException(s"Unable to allocate port for TaskManager.")
} else {
try {
socket.getLocalPort()
} finally {
socket.close()
}
}
..
}, { !actorSystemPortRange.hasNext }, 5000)
{code}

 # BlobServer make use of ServerSocket or SSLContext to create ServerSocket
 # make use of Netty for io,like NettyServer.
 # WebFrontendBootstrap make use of netty to create ServerBootstrap.

I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful 
if your server has been shut down, and then restarted right away while sockets 
are still active on its port. You should be aware that if any unexpected data 
comes in, it may confuse your server, but while this is possible, it is not 
likely" (see 
:[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)]
 , Here may be we can allow  sockets to set SO_REUSEADDR when start  
WebFrontendBootstrap. what's your idea? anything wrong please feel free to 
correct me.

sample code for WebFrontendBootstrap.java like:
{code:java}
this.bootstrap = new ServerBootstrap();
this.bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer).childOption(ChannelOption.SO_REUSEADDR,true);{code}


was (Author: triones):
[~yuzhih...@gmail.com] I notice that there are three kind of server socket,
 # JobManager and TaskManager create socket server by 
NetUtils.createSocketFromPorts. here just looking for a available port to 
create ActorSystem. will close the ServerSocket at once when find a available 
port. code as below. here  i think we can direct call new ServerSocker(port) 
without backlog will be ok, what do you think?
{code:java}
val result = AkkaUtils.retryOnBindException({
// Try all ports in the range until successful
val socket = NetUtils.createSocketFromPorts(
actorSystemPortRange,
new NetUtils.SocketFactory {
override def createSocket(port: Int): ServerSocket = new ServerSocket(
// Use the correct listening address, bound ports will only be
// detected later by Akka.
port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress))
})

val port =
if (socket == null) {
throw new BindException(s"Unable to allocate port for TaskManager.")
} else {
try {
socket.getLocalPort()
} finally {
socket.close()
}
}
..
}, { !actorSystemPortRange.hasNext }, 5000)
{code}

 # make use of Netty for io,like NettyServer.
 # BlobServer make use of ServerSocket or SSLContext to create ServerSocket
 # WebFrontendBootstrap make use of netty to create ServerBootstrap.

I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful 
if your server has been shut down, and then restarted right away while sockets 
are still active on its port. You should be aware that if any unexpected data 
comes in, it may confuse your server, but while this is possible, it is not 
likely" (see 
:[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)]
 , Here may be we can allow  sockets to set SO_REUSEADDR when start  
WebFrontendBootstrap. what's your idea? anything wrong please feel free to 
correct me.

sample code for WebFrontendBootstrap.java like:
{code:java}
this.bootstrap = new ServerBootstrap();
this.bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer).childOption(ChannelOption.SO_REUSEADDR,true);{code}

> Enable SO_REUSEADDR on listen sockets
> -
>
> Key: FLINK-9231
> URL: https://issues.apache.org/jira/browse/FLINK-9231
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Triones Deng
>Priority: Major
>
> This allows sockets to be bound even if there are sockets
> from a previous application that are still pending closure.



--
This 

[jira] [Comment Edited] (FLINK-9231) Enable SO_REUSEADDR on listen sockets

2018-04-24 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448079#comment-16448079
 ] 

Triones Deng edited comment on FLINK-9231 at 4/24/18 1:54 PM:
--

[~yuzhih...@gmail.com] I notice that there are three kind of server socket,
 # JobManager and TaskManager create socket server by 
NetUtils.createSocketFromPorts. here just looking for a available port to 
create ActorSystem. will close the ServerSocket at once when find a available 
port. code as below. here  i think we can direct call new ServerSocker(port) 
without backlog will be ok, what do you think?
{code:java}
val result = AkkaUtils.retryOnBindException({
// Try all ports in the range until successful
val socket = NetUtils.createSocketFromPorts(
actorSystemPortRange,
new NetUtils.SocketFactory {
override def createSocket(port: Int): ServerSocket = new ServerSocket(
// Use the correct listening address, bound ports will only be
// detected later by Akka.
port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress))
})

val port =
if (socket == null) {
throw new BindException(s"Unable to allocate port for TaskManager.")
} else {
try {
socket.getLocalPort()
} finally {
socket.close()
}
}
..
}, { !actorSystemPortRange.hasNext }, 5000)
{code}

 # make use of Netty for io,like NettyServer.
 # BlobServer make use of ServerSocket or SSLContext to create ServerSocket
 # WebFrontendBootstrap make use of netty to create ServerBootstrap.

I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful 
if your server has been shut down, and then restarted right away while sockets 
are still active on its port. You should be aware that if any unexpected data 
comes in, it may confuse your server, but while this is possible, it is not 
likely" (see 
:[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)]
 , Here may be we can allow  sockets to set SO_REUSEADDR when start  
WebFrontendBootstrap. what's your idea? anything wrong please feel free to 
correct me.

sample code for WebFrontendBootstrap.java like:
{code:java}
this.bootstrap = new ServerBootstrap();
this.bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer).childOption(ChannelOption.SO_REUSEADDR,true);{code}


was (Author: triones):
[~yuzhih...@gmail.com] I notice that there are two kind of server socket,
 # make use of  SockerServer, like JobManager,TaskManager, TaskManagerRunner 
and BlobServer, all of them create socket server by 
NetUtils.createSocketFromPorts
 # make use of Netty for io,like NettyServer.

I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful 
if your server has been shut down, and then restarted right away while sockets 
are still active on its port. You should be aware that if any unexpected data 
comes in, it may confuse your server, but while this is possible, it is not 
likely" (see 
:[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)]
 , Here may be we can allow  sockets to set SO_REUSEADDR when start  
JobManager,TaskManager, TaskManagerRunner. anything wrong please correct me.

sample code for JobManager.scala like:
{code:java}
val socket = NetUtils.createSocketFromPorts(
  listeningPortRange,
  new NetUtils.SocketFactory {
override def createSocket(port: Int): ServerSocket = {
  // Use the correct listening address, bound ports will only be
  // detected later by Akka.
  val serverSocket = new ServerSocket()
  serverSocket.setReuseAddress(true)
  serverSocket.bind(new 
InetSocketAddress(InetAddress.getByName(NetUtils.getWildcardIPAddress), port), 
0)
  serverSocket
}
})
{code}

> Enable SO_REUSEADDR on listen sockets
> -
>
> Key: FLINK-9231
> URL: https://issues.apache.org/jira/browse/FLINK-9231
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Triones Deng
>Priority: Major
>
> This allows sockets to be bound even if there are sockets
> from a previous application that are still pending closure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9231) Enable SO_REUSEADDR on listen sockets

2018-04-23 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448079#comment-16448079
 ] 

Triones Deng commented on FLINK-9231:
-

[~yuzhih...@gmail.com] I notice that there are two kind of server socket,
 # make use of  SockerServer, like JobManager,TaskManager, TaskManagerRunner 
and BlobServer, all of them create socket server by 
NetUtils.createSocketFromPorts
 # make use of Netty for io,like NettyServer.

I think if we plan to make use of SO_REUSEADDR, "which is suitable It is useful 
if your server has been shut down, and then restarted right away while sockets 
are still active on its port. You should be aware that if any unexpected data 
comes in, it may confuse your server, but while this is possible, it is not 
likely" (see 
:[https://stackoverflow.com/questions/19960475/problems-related-to-so-reuseaddr?rq=1)]
 , Here may be we can allow  sockets to set SO_REUSEADDR when start  
JobManager,TaskManager, TaskManagerRunner. anything wrong please correct me.

sample code for JobManager.scala like:
{code:java}
val socket = NetUtils.createSocketFromPorts(
  listeningPortRange,
  new NetUtils.SocketFactory {
override def createSocket(port: Int): ServerSocket = {
  // Use the correct listening address, bound ports will only be
  // detected later by Akka.
  val serverSocket = new ServerSocket()
  serverSocket.setReuseAddress(true)
  serverSocket.bind(new 
InetSocketAddress(InetAddress.getByName(NetUtils.getWildcardIPAddress), port), 
0)
  serverSocket
}
})
{code}

> Enable SO_REUSEADDR on listen sockets
> -
>
> Key: FLINK-9231
> URL: https://issues.apache.org/jira/browse/FLINK-9231
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Triones Deng
>Priority: Major
>
> This allows sockets to be bound even if there are sockets
> from a previous application that are still pending closure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9231) Enable SO_REUSEADDR on listen sockets

2018-04-21 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447086#comment-16447086
 ] 

Triones Deng commented on FLINK-9231:
-

[~yuzhih...@gmail.com] i am interested in this issue. 

> Enable SO_REUSEADDR on listen sockets
> -
>
> Key: FLINK-9231
> URL: https://issues.apache.org/jira/browse/FLINK-9231
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Major
>
> This allows sockets to be bound even if there are sockets
> from a previous application that are still pending closure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint

2018-04-10 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16432385#comment-16432385
 ] 

Triones Deng commented on FLINK-9087:
-

This PR is ready for having another review. [~NicoK], [~yuzhih...@gmail.com], 
Thank you. 

[~yuzhih...@gmail.com], do we need change the description of this jira?

> Return value of broadcastEvent should be closed in 
> StreamTask#performCheckpoint
> ---
>
> Key: FLINK-9087
> URL: https://issues.apache.org/jira/browse/FLINK-9087
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> for (StreamRecordWriter> 
> streamRecordWriter : streamRecordWriters) {
>   try {
> streamRecordWriter.broadcastEvent(message);
> {code}
> The BufferConsumer returned by broadcastEvent() should be closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint

2018-04-08 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429743#comment-16429743
 ] 

Triones Deng commented on FLINK-9087:
-

thanks for your suggestions, i will follow this.

> Return value of broadcastEvent should be closed in 
> StreamTask#performCheckpoint
> ---
>
> Key: FLINK-9087
> URL: https://issues.apache.org/jira/browse/FLINK-9087
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> for (StreamRecordWriter> 
> streamRecordWriter : streamRecordWriters) {
>   try {
> streamRecordWriter.broadcastEvent(message);
> {code}
> The BufferConsumer returned by broadcastEvent() should be closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint

2018-04-03 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16424169#comment-16424169
 ] 

Triones Deng commented on FLINK-9087:
-

[~yuzhih...@gmail.com]   when i run the test. i found that in 
{code:java}
public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException {
try (BufferConsumer eventBufferConsumer = 
EventSerializer.toBufferConsumer(event)) {
...
// retain the buffer so that it can be recycled 
by each channel of targetPartition

targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel);
}
...
return eventBufferConsumer;
}
}
{code}

which call targetPartition.addBufferConsumer() , here make use of the copy of 
the eventBufferConsumer, so, all the BufferConsumer produced by copy share the 
same buffer.and this will call AbstractReferenceCountedByteBuf.retain() , here 
AbstractReferenceCountedByteBuf.java is netty class

all the targetPartition like AbstractCollectingResultPartitionWriter and 
ResultPartition will call close method of BufferConsumer, at last the buffer in 
eventBufferConsumer  will be released. ResultPartition will call 
notifyDataAvailable which is async to consume the data. so here we'd better to 
let the return value alone,  what do you think. or just change the method 
signature to void ?

notice that in FLINK-7315, plan to use flink's buffers in netty, one sub task 
FLINK-7518 which have a solution.  i am a new here, any suggestions?

> Return value of broadcastEvent should be closed in 
> StreamTask#performCheckpoint
> ---
>
> Key: FLINK-9087
> URL: https://issues.apache.org/jira/browse/FLINK-9087
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> for (StreamRecordWriter> 
> streamRecordWriter : streamRecordWriters) {
>   try {
> streamRecordWriter.broadcastEvent(message);
> {code}
> The BufferConsumer returned by broadcastEvent() should be closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint

2018-04-02 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423367#comment-16423367
 ] 

Triones Deng edited comment on FLINK-9087 at 4/3/18 1:59 AM:
-

[~yuzhih...@gmail.com] I notice that RecordWriter.broadcastEvent()  is called 
in StreamTask., RecordWriterOutput ,IterationIntermediateTask and 
IterationHeadTaskbroadcastEvent, also notice that no one make use of the 
BufferConsumer returned by broadcastEvent(), so i think the better way to close 
the return value in RecordWriter. change  the method signature from   
BufferConsumer to void, and close the BufferConsumer in the end. does this make 
sense? what's you idea?


was (Author: triones):
[~yuzhih...@gmail.com] notice that RecordWriter.broadcastEvent()  is called in 
StreamTask., RecordWriterOutput ,IterationIntermediateTask and 
IterationHeadTaskbroadcastEvent, also notice that no one make use of the 
BufferConsumer returned by broadcastEvent(), so i think the better way to close 
the return value in RecordWriter. change  the method signature from   
BufferConsumer to void, and close the BufferConsumer in the end. does this make 
sense? what's you idea?

> Return value of broadcastEvent should be closed in 
> StreamTask#performCheckpoint
> ---
>
> Key: FLINK-9087
> URL: https://issues.apache.org/jira/browse/FLINK-9087
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
> for (StreamRecordWriter> 
> streamRecordWriter : streamRecordWriters) {
>   try {
> streamRecordWriter.broadcastEvent(message);
> {code}
> The BufferConsumer returned by broadcastEvent() should be closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint

2018-04-02 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423367#comment-16423367
 ] 

Triones Deng commented on FLINK-9087:
-

[~yuzhih...@gmail.com] notice that RecordWriter.broadcastEvent()  is called in 
StreamTask., RecordWriterOutput ,IterationIntermediateTask and 
IterationHeadTaskbroadcastEvent, also notice that no one make use of the 
BufferConsumer returned by broadcastEvent(), so i think the better way to close 
the return value in RecordWriter. change  the method signature from   
BufferConsumer to void, and close the BufferConsumer in the end. does this make 
sense? what's you idea?

> Return value of broadcastEvent should be closed in 
> StreamTask#performCheckpoint
> ---
>
> Key: FLINK-9087
> URL: https://issues.apache.org/jira/browse/FLINK-9087
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
> for (StreamRecordWriter> 
> streamRecordWriter : streamRecordWriters) {
>   try {
> streamRecordWriter.broadcastEvent(message);
> {code}
> The BufferConsumer returned by broadcastEvent() should be closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9056) Job submission fails with AskTimeoutException if not enough slots are available

2018-04-01 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421687#comment-16421687
 ] 

Triones Deng commented on FLINK-9056:
-

[~fhueske] when Job parallelism exceeds available slots, Flink would tell the 
customer the reason , like log "Job parallelism exceeds available slots,please 
check your available slots or decrease job parallelism", right?

> Job submission fails with AskTimeoutException if not enough slots are 
> available
> ---
>
> Key: FLINK-9056
> URL: https://issues.apache.org/jira/browse/FLINK-9056
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.5.0
> Environment: * FLIP-6 enabled
>  * Local Flink instance with fixed number of TMs
>  * Job parallelism exceeds available slots
>Reporter: Fabian Hueske
>Assignee: yuqi
>Priority: Major
>
> The error message if a job submission fails due to lack of available slots is 
> not helpful:
> {code:java}
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/8f0fabba-4021-45b6-a1f7-b8afd6627640#-574617182|#-574617182]]
>  after [30 ms]. Sender[null] sent message of type 
> "org.apache.flink.runtime.rpc.messages.LocalRpcInvocation".
>      at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>      at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>      at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>      at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>      at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>      at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>      at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>      at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>      at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>      at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint

2018-04-01 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421679#comment-16421679
 ] 

Triones Deng commented on FLINK-9087:
-

Does anyone can give me a permission to contribute to this issue ? Thank you 
very much.

> Return value of broadcastEvent should be closed in 
> StreamTask#performCheckpoint
> ---
>
> Key: FLINK-9087
> URL: https://issues.apache.org/jira/browse/FLINK-9087
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
> for (StreamRecordWriter> 
> streamRecordWriter : streamRecordWriters) {
>   try {
> streamRecordWriter.broadcastEvent(message);
> {code}
> The BufferConsumer returned by broadcastEvent() should be closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint

2018-04-01 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421677#comment-16421677
 ] 

Triones Deng commented on FLINK-9087:
-

[~mingleizhang] are you still working on this? if no, i would like to get the 
ticket, thank you

> Return value of broadcastEvent should be closed in 
> StreamTask#performCheckpoint
> ---
>
> Key: FLINK-9087
> URL: https://issues.apache.org/jira/browse/FLINK-9087
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
> for (StreamRecordWriter> 
> streamRecordWriter : streamRecordWriters) {
>   try {
> streamRecordWriter.broadcastEvent(message);
> {code}
> The BufferConsumer returned by broadcastEvent() should be closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)