why BlobServer use ServerSocket instead of Netty's ServerBootstrap?

2018-06-10 Thread makeyang
after checking code, I found that BlobServer use ServerSocket instead of
Netty's ServerBootstrap.
I wonder why and is it ok to migtate to ServerBootstrap



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: is there a config to ask taskmanager to keep retrying connect to jobmanager after Disassociated?

2018-06-05 Thread makeyang
can anybody share anythoughts, insights about this issue?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: some default config questions

2018-06-04 Thread makeyang
thanks 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: why does flink release package preferred uber jar than small jar?

2018-06-04 Thread makeyang
thanks rongrong, but it seems unrelevant.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


why does flink release package preferred uber jar than small jar?

2018-06-04 Thread makeyang
what's the benefit that uber jar have while small jar doesn't?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: is there a config to ask taskmanager to keep retrying connect to jobmanager after Disassociated?

2018-06-04 Thread makeyang
so is there a way or config to ask taskmanager to keep continue connectting
to jobmanager?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: is there a config to ask taskmanager to keep retrying connect to jobmanager after Disassociated?

2018-06-04 Thread makeyang
when I debug the jobmanager and below is the error log in task manager:
2018-06-04 17:16:33,295 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor- The
heartbeat of ResourceManager with id 35df0455efc2fb6fa3f2467f7f5d2ba1 timed
out.
2018-06-04 17:16:33,297 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor- Close
ResourceManager connection 35df0455efc2fb6fa3f2467f7f5d2ba1.
java.util.concurrent.TimeoutException: The heartbeat of ResourceManager with
id 35df0455efc2fb6fa3f2467f7f5d2ba1 timed out.
at
org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1553)
at
org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerHeartbeatListener$$Lambda$26/1975100911.run(Unknown
Source)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:295)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:150)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$$Lambda$12/1732386307.apply(Unknown
Source)
at
akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


is there a config to ask taskmanager to keep retrying connect to jobmanager after Disassociated?

2018-06-04 Thread makeyang
I depoloy a standard alone cluster with flink1.5 and when I try to restart
the only jobmanger, below is the log print from task manager:
2018-06-04 12:06:35,882 WARN  akka.remote.ReliableDeliverySupervisor
   
- Association with remote system [akka.tcp://flink@ipaddress:6123] has
failed, address is now gated for [50] ms. Reason: [Disassociated]
2018-06-04 12:07:17,580 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor- The
heartbeat of ResourceManager with id 6af9bbb514a6ddaeca95d6e52db6dbd5 timed
out.
2018-06-04 12:07:17,580 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Close
ResourceManager connection 6af9bbb514a6ddaeca95d6e52db6dbd5.
2018-06-04 12:07:17,611 WARN  akka.remote.transport.netty.NettyTransport
   
- Remote connection to [null] failed with java.net.ConnectException:
Connection refused: /ipaddress:6123
2018-06-04 12:07:17,611 WARN  akka.remote.ReliableDeliverySupervisor
   
- Association with remote system [akka.tcp://flink@ipaddress:6123] has
failed, address is now gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink@ipaddress:6123]] Caused by: [Connection refused:
/ipaddress:6123]

so I'd like to know if there is a config to ask task manager to keep
retrying to connect to job manager(since I am restating jobmanager so it
will come back later)?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: is it OK to PR just for code comment errors?

2018-06-01 Thread makeyang
got it. 
thanks guys



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


some default config questions

2018-05-31 Thread makeyang
why jobmanager.execution.failover-strategy default value is full not
Individual?
why taskmanager.jvm-exit-on-oom default value is false not true?
code is flink 1.5



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


is it OK to PR just for code comment errors?

2018-05-31 Thread makeyang
I am not sure if it is OK to PR just for code comment errors since I found
there is one when I am reading code



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Task did not exit gracefully and lost TaskManager

2018-05-30 Thread makeyang
met the same problem in 1.4
when I cancel job, one of taskmanager keep logging the exception 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: latency critical job

2018-05-28 Thread makeyang
Rong Rong:
my flink version is 1.4.2
since we are using the docker env which is sharing disk-io, based on our
observation, disk-io spike cased by other process in the same physical
machine can lead to long time operator processing.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: latency critical job

2018-05-28 Thread makeyang
Timo:
thanks for u suggestion



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


does Flink call FullGC to reclaim direct memory mainly occupied by RocksDB

2018-05-25 Thread makeyang
each time when cancel Job does Flink call FullGC to reclaim direct memory
mainly occupied by RocksDB?
if so, where does this?
if not, why?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


latency critical job

2018-05-25 Thread makeyang
some job is latency critical job which means it can't accept certain
threadhold of latency
so will flink provide timeout operator in near future which means when one
operator timeout, the jobmanager will schedule a new operator which starts
from previous state of the OP and keep dealing with new events and discard
the events are processing.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


chained operator with different parallelism question

2018-05-18 Thread makeyang
someStream.filter(...).map(...).map(...);
there operators are supposed to chained.
but what if there are set different parallelism like below:
someStream.filter(...).setParallelism(X).map(...).setParallelism(Y).map(...).setParallelism(Z);
X != Y != Z
what will happen?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Fwd: Decrease initial source read speed

2018-05-18 Thread makeyang
Andrei Shumanski:
which source are u using?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


are there any ways to test the performance of rocksdb state backend?

2018-05-17 Thread makeyang
I'd like to integrate newer version of rocksdb with flink. I'd like to know
if there are existing tools/ways to benchmark the performance of rocksdb
state backend to see if there are performence improve or drop?


MaKeyang
TIG.JD.COM




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


managin order to use epoll (tasker.network.netty.transport: epoll), is it required that linux version is 4.0.16 or newer or not

2018-04-20 Thread makeyang
my flink veriso is 1.4.2
my jdk version is 1.8.0.20
my linux version is:3.10.0

I try to use epoll with setting: tasker.network.netty.transport: epoll
but it throws excption which leads me here:
https://github.com/apache/flink-shaded/issues/30
I followed the instruction and still exception throws. 
then I found below link Native transports whic is offical netty doc below:
http://netty.io/wiki/native-transports.html
and it says:
Netty provides the following platform specific JNI transports:
Linux (since 4.0.16)

so my question is:
in order to use this epoll,  is it required that linux version is 4.0.16 or
newer or not?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


gonna need more logs when task manager is shutting down

2018-04-19 Thread makeyang
one of my task manager is out ot the cluster and I checked its log found
something below:
2018-04-19 22:34:47,441 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Attempting to fail task externally Process (115/120)
(19d0b0ce1ef3b8023b37bdfda643ef44).
2018-04-19 22:34:47,441 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Process (115/120) (19d0b0ce1ef3b8023b37bdfda643ef44) switched from RUNNING
to FAILED.
java.lang.Exception: TaskManager is shutting down.
at
org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala:220)
at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
at
org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager.scala:121)
at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:374)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

well, 
Attempting to fail task externally Process due to what?
when task manager is shutting down and due to what?

these import info is not found in log which is actually very useful



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Slow flink checkpoint

2018-04-16 Thread makeyang
since flink forward SF has done.
can you guys give some minutes to take a look at this issue and give some
thoughts on it? help to review/comments on my desgin? or give us a design so
that I can help to implement it.

thanks a lot.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


heartbeat.timeout in 1.4 document

2018-04-13 Thread makeyang
in code of flink 1.4:
HeartbeatManagerOptions
HEARTBEAT_TIMEOUT = key("heartbeat.timeout").defaultValue(5L);

but this config is not finkd in
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Slow flink checkpoint

2018-04-04 Thread makeyang
the test is very promising.
the time sync part takes from couple of seconds to couple of mill-seconds.
1000x time reduce(overall time not save since it is just move from sync to
async)
are u guys interested in this change?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Slow flink checkpoint

2018-04-01 Thread makeyang
I have put a lot of efforts on this issue and try to resolve it:
1. let me describe current timers' snapshot path first:
a) for each keygroup, invoke
InternalTimeServiceManager.snapshotStateForKeyGroup
b) InternalTimeServiceManager create a
InternalTimerServiceSerializationProxy to write snapshot
c) InternalTimerServiceSerializationProxy iterat  tuple and write service name and
snapshotTimersForKeyGroup, then get InternalTimersSnapshotWriter to
writeTimersSnapshot
d) in method writeTimersSnapshot of InternalTimersSnapshotWriter, first
write keyserializer and namespaceserializer, then get eventTimers and
processingTimers of InternalTimersSnapshot, which is Set of InternalTimer
and serializer them.

2. my first try is shallow copy the  tuples
and then shallow copy the eventTimers and processingTimers, then use another
thread to snapshot them without blocking the event processing thread. but it
turns out that shallow copy of the eventTimers and processingTimers are time
consumed and this solution failed

3. then I try to borrow the idea of data structure CopyOnWriteStateTable and
try to manage timers with it. but after digging more, I found out that there
is a more easy way to achieve asynchronous snapshot timers due to one fact:
InternalTimer is immutable. we can achieve asynchronous with a more easy way
based on this fact: 
a)maintain a stateTableVersion, which is exactly the same thing as
CopyOnWriteStateTable and snapshotVersions which is exactly the same thing
as CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a
readwrite lock, which is used to protect snapshotVersions and
stateTableVersion
b)for each InternalTimer, add 2 more properties: create version and
delete version beside 3 existing properties: timestamp, key and namespace.
each time a Timer is registered in timerservice, it is created with
stateTableVersion as its create version while delete version is -1. each
time when timer is deleted in timerservice, it is marked delete for giving
it a delete verison equals to stateTableVersion without physically delete it
from timerservice. 
c)each time when try to snapshot timers, InternalTimeServiceManager
increase its stateTableVersion and add this stateTableVersion in
snapshotVersions. these 2 operators are protected by write lock of
InternalTimeServiceManager. that current stateTableVersion take as snapshot
version of this snapshot
d)shallow copy  tuples 
e)then use a another thread asynchronous snapshot whole things:
keyserialized, namespaceserializer and timers. for timers which is not
deleted(delete version is -1) and create version less than snapshot version,
serialized it. for timers whose delete version is not -1 and is bigger than
or equals snapshot version, serialized it. otherwise, it will not be
serialized by this snapshot. 
f)when everything is serialized, remove snapshot version in
snapshotVersions, which is still in another thread and this action is
guarded by write lock.
g)last thing: timer physical deletion. 2 places to physically delete
timers: each time when timer is deleted in timerservice, it is marked delete
for giving it a delete verison equals to stateTableVersion without
physically delete it from timerservice. after this, check if timer's delete
version is less than min value of snapshotVersions with read lock
guarded(which means there is no active timer snapshot running) and if that
is true, physically delete it. the other place to delete is in snapshot
timer's iterat: when timer's delete version is less than min value of
snapshotVersions, which means the timer is deleted and no running snapshot
should keep it.
h) some more additions: processingTimeTimers and eventTimeTimers for
each group used to be hashset and now it is changed to concurrenthashmap
with key+namesapce+timestamp as its hash key.

the code is done and test is still runnng. I post this comments not only try
to hear u guys voice, but also try to figure out some more questios related
to currently timer snapshot code path. my questions are below:
1. in method onProcessingTime of HeapInternalTimerService, it is invoked by
another thread of ProcessingTimeService, and in this thread, it will remove
timer in HeapInternalTimerService. while in current timer snapshot path, I
haven't found there is any shallow copy of processingTimeTimers and
eventTimeTimers. how could this won't cause concurrent modification
exception?
2. since onProcessingTime is trigged in another thread, when timers are
snapshot in working thread, what if then a timer is fired and triggerTarget
is processed, which could cause state changed, then asynchronous
keyedstatsnapshot is trigged. won't this cause state inconsistent? let's
image this case: all kedyed state is only chaned by timer. so Add timer1,
timer2, timer3, timer4 and timer5 and since no timer is 

Re: Restart hook and checkpoint

2018-03-19 Thread makeyang
currently there is only time based way to trigger a checkpoint. based on this
discussion, I think flink need to introduce event based way to trigger
checkpoint such as restart a task manager should be count as a event.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/