Re: Checkpointing and save pointing

2019-05-07 Thread Congxian Qiu
Hi, Boris

TM will also need to write to the external volume.

Best, Congxian
On May 8, 2019, 03:56 +0800, Boris Lublinsky , 
wrote:
> I am planning to use external volume for this. My understanding is that it 
> needs to be mounted only to the job manager, not the task managers. Is this 
> correct, or it needs to be mounted to both?
>
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com
> https://www.lightbend.com/
>


Testing with ProcessingTime and FromElementsFunction with TestEnvironment

2019-05-07 Thread Steven Nelson
Hello!

I am trying to write a test that runs in the TestEnviroment. I create a
process that uses ProcessingTime, has a source constructed from a
FromElementsFunction and runs data through a Keyed Stream into
a ProcessingTimeSessionWindows.withGap().

The problem is that it appears that the env.execute method returns
immediately after the session closes, not allowing the events to be
released from the window before shutdown occurs. This used to work when I
used EventTime.

Thoughts?
-Steve


Checkpointing and save pointing

2019-05-07 Thread Boris Lublinsky
I am planning to use external volume for this. My understanding is that it 
needs to be mounted only to the job manager, not the task managers. Is this 
correct, or it needs to be mounted to both?

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/



Getting async function call terminated with an exception

2019-05-07 Thread Avi Levi
Hi,
We are using flink 1.8.0 (but the flowing also happens in 1.7.2) I tried
very simple unordered async call
override def asyncInvoke(input: Foo, resultFuture: ResultFuture[ScoredFoo])
: Unit  = {
   val r = ScoredFoo(Foo("a"), 80)
   Future.successful(r)
   }

Running this stream seem to be stuck in some infinite loop until it crashes
on timeout exception.:

*java.lang.Exception: An async function call terminated with an exception.
Failing the AsyncWaitOperator.*
*at
org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)*
*at
org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)*
*at java.base/java.lang.Thread.run(Thread.java:844)*
*Caused by: java.util.concurrent.ExecutionException:
java.util.concurrent.TimeoutException: Async function call has timed out.*
*at
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)*
*at
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)*
*at
org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68)*
*at
org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129)*
*... 2 common frames omitted*
*Caused by: java.util.concurrent.TimeoutException: Async function call has
timed out.*
*at
org.apache.flink.streaming.api.scala.async.AsyncFunction.timeout(AsyncFunction.scala:60)*
*at
org.apache.flink.streaming.api.scala.async.AsyncFunction.timeout$(AsyncFunction.scala:59)*
*at
com.lookalike.analytic.utils.LookalikeScoreEnrich.timeout(LookalikeScoreEnrich.scala:18)*
*at
org.apache.flink.streaming.api.scala.AsyncDataStream$$anon$3.timeout(AsyncDataStream.scala:301)*
*at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$1.onProcessingTime(AsyncWaitOperator.java:211)*
*at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)*
*at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)*
*at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)*
*at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299)*
*at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)*
*at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)*
*... 1 common frames omitted*

Please advise , Thanks
Avi


Re: I want to use MapState on an unkeyed stream

2019-05-07 Thread an0
But I only have one stream, nothing to connect it to.

On 2019/05/07 00:15:59, Averell  wrote: 
> From my understanding, having a fake keyBy (stream.keyBy(r => "dummyString"))
> means there would be only one slot handling the data.
> Would a broadcast function [1] work for your case?
> 
> Regards,
> Averell
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 


Migration from flink 1.7.2 to 1.8.0

2019-05-07 Thread Farouk
Hi

We are migrating our app to Flink 1.8.0.

We built a docker image like this as Hadoop is not anymore bundled :

FROM myrepo:5/flink:1.8.0-scala_2.11-alpine

ADD --chown=flink:flink
https://my-artifactory-repo/artifactory/my-repo/org/apache/flink/flink-shaded-hadoop2-uber/2.8.3-1.8.0/flink-shaded-hadoop2-uber-2.8.3-1.8.0.jar
/opt/flink/lib

When running Flink, we are facing the stack trace below  :

java.lang.NullPointerException
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:284)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:360)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:296)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:133)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

Any idea on what's happening ?

I think it's a problem with classloading.

With Flink 1.7.2, every thing works fine

Thanks
Farouk


Re: Class loader premature closure - NoClassDefFoundError: org/apache/kafka/clients/NetworkClient

2019-05-07 Thread Rohan Thimmappa
It is a blocker for exactly once support from flink kafka producer.

This issue reported and closed. but still reproducible
https://issues.apache.org/jira/browse/FLINK-10455

On Mon, May 6, 2019 at 10:20 AM Slotterback, Chris <
chris_slotterb...@comcast.com> wrote:

> Hey Flink users,
>
>
>
> Currently using Flink 1.7.2 with a job using FlinkKafkaProducer with its
> write semantic set to Semantic.EXACTLY_ONCE. When there is a job failure
> and restart (in our case from checkpoint timeout), it begins a failure loop
> that requires a cancellation and resubmission to fix. The expected and
> desired outcome should be a recovery from failure and the job restarts
> successfully. Some digging revealed an issue where the class loader closes
> before the connection to kafka is fully terminated resulting in a
> NoClassDefFoundError. A description of what is happening has already been
> described here:
> https://heap.io/blog/engineering/missing-scala-class-noclassdeffounderror,
> though we are experiencing this with kafka, not Redis:
>
>
>
> 5/3/19
>
> 3:14:18.780 PM
>
> 2019-05-03 15:14:18,780 ERROR
> org.apache.kafka.common.utils.KafkaThread - Uncaught
> exception in thread 'kafka-producer-network-thread | producer-80':
>
> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
>
> at
> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:658)
>
> at
> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:805)
>
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:520)
>
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:226)
>
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Collapse
>
> date_hour =   15
>
>
>
> Interestingly, this only happens when we extend the FlinkKafkaProducer for
> the purposes of setting the write semantic to EXACTLY_ONCE. When running
> with the default FlinkKafkaProducer (using Semantic.AT_LEAST_ONCE), the
> class loader has no issues disconnecting the kafka client on job failure,
> and the job recovers just fine. We are not doing anything particularly
> strange in our extended producer as far as I can tell:
>
>
>
> public class *CustomFlinkKafkaProducer* *extends* *FlinkKafkaProducer*
> {
>
>
>
>   public *CustomFlinkKafkaProducer*(Properties properties, String topicId,
>
>   AvroKeyedSerializer serializationSchema) {
>
> super(
>
> topicId,
>
> serializationSchema,
>
> properties,
>
> Optional.of(new FlinkFixedPartitioner<>()),
>
> *Semantic.EXACTLY_ONCE*,
>
> DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
>
>   }
>
>   public static Properties getPropertiesFromBrokerList(String brokerList) {
>
> […]
>
>   }
>
> }
>
>
>
>
>


-- 
Thanks
Rohan


Re: Apache Flink - How to destroy global window and release it's resources

2019-05-07 Thread Aljoscha Krettek

Hi,

There should be no window or trigger object kept per window.

Aljoscha

> On 15. Apr 2019, at 10:22, Fabian Hueske  wrote:
> 
> Hi,
> 
> Aljoscha know the implementation best (since he implemented it). 
> 
> From my understanding (Aljoscha please correct me if I'm wrong), all Flink 
> managed state is removed (given that user-defined state is correctly cleaned 
> up).
> However, for each key, a window and a trigger object might be kept (this is 
> the part I'm not sure about). 
> This might cause memory issues if the keyspace is very large and "moving" 
> (keys which are only used for a short period of time, e.g., session keys).
> Eventually, a TM would fail causing job recovery. During recovery, only the 
> Flink managed state is restored and the TM would have free memory again.
> 
> Best, Fabian
> 
> 
> Am Fr., 12. Apr. 2019 um 19:58 Uhr schrieb M Singh  >:
> Hi Fabian/Guowei:  
> 
> Thanks for your pointers.   
> 
> Fabian, as you pointed out, global window is never completely removed since 
> it's end time is Long.MAX_VALUE, and that is my concern.  So, is there any 
> other way of clean up the now purged global windows ?
> 
> Thanks again.
> 
> 
> 
> On Thursday, April 11, 2019, 4:16:24 AM EDT, Fabian Hueske  > wrote:
> 
> 
> Hi,
> 
> As far as I know, a window is only completely removed when time (event or 
> processing time, depending on the window type) passes the window's end 
> timestamp.
> Since, GlobalWindow's end timestamp is Long.MAX_VALUE, it is never completely 
> removed.
> I'm not 100% sure what state is kept around. It might not be keyed state but 
> just objects on the heap but not absolutely sure.
> 
> Aljoscha (in CC) should know the details here.
> 
> Best, Fabian
> 
> Am Do., 11. Apr. 2019 um 08:07 Uhr schrieb Guowei Ma  >:
> Hi,
> I think you could return a proper TriggerResult, which defines how to deal 
> with the window elements after computing a window in your trigger 
> implementation. You could find the detail information from the doc[1].
> 
> 1. 
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#fire-and-purge
>  
> 
> Best,
> Guowei
> 
> 
> M Singh mailto:mans2si...@yahoo.com>> 于2019年4月11日周四 
> 上午1:42写道:
> Hi:
> 
> I have a use case where I need to create a global window where I need to wait 
> for unknown time for certain events for a particular key.  I understand that 
> I can create a global window and use a custom trigger to initiate the 
> function computation.  But I am not sure how to destroy the window after the 
> triggering conditions is satisfied and the the events are purged.
> 
> If there is any better way of dealing with this situation, please let me know.
> 
> Thanks
> 
> Mans



Flink on YARN: TaskManager heap auto-sizing?

2019-05-07 Thread Dylan Adams
In the Configuration section of the docs
,
the description for "taskmanager.heap.size" contains: "On YARN setups, this
value is automatically configured to the size of the TaskManager's YARN
container, minus a certain tolerance value."

Does that functionality exist?

I don't see any documented method to specify the YARN container size for
the TaskManagers, nor could I find any logic in the Flink YARN integration
code that seemed to implement that behavior.

My understanding is that you need to manually calculate and specify
taskmanager.heap.size
(and jobmanager.heap.size) based on your YARN setup.

Thanks,
Dylan


Re: flink 1.7 HA production setup going down completely

2019-05-07 Thread Manjusha Vuyyuru
im using 1.7.2.


On Tue, May 7, 2019 at 5:50 PM miki haiat  wrote:

> Which flink version are you using?
> I had similar  issues with 1.5.x
>
> On Tue, May 7, 2019 at 2:49 PM Manjusha Vuyyuru 
> wrote:
>
>> Hello,
>>
>> I have a flink setup with two job managers coordinated by zookeeper.
>>
>> I see the below exception and both jobmanagers are going down:
>>
>> 2019-05-07 08:29:13,346 INFO
>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>> Released locks of job graph f8eb1b482d8ec8c1d3e94c4d0f79df77 from ZooKeeper.
>> 2019-05-07 08:29:13,346 ERROR
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -* Fatal
>> error occurred in the cluster entrypoint.*
>> java.lang.RuntimeException: org.apache.flink.util.FlinkException: Could
>> not retrieve submitted JobGraph from state handle under
>> /147dd022ec91f7381ad4ca3d290387e9. This indicates that the retrieved state
>> handle is broken. Try cleaning the state handle store.
>> at
>> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:74)
>> at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>> at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: org.apache.flink.util.FlinkException: Could not retrieve
>> submitted JobGraph from state handle under
>> /147dd022ec91f7381ad4ca3d290387e9. This indicates that the retrieved state
>> handle is broken. Try cleaning the state handle store.
>> at
>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:208)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJob(Dispatcher.java:696)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobGraphs(Dispatcher.java:681)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobs(Dispatcher.java:662)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$26(Dispatcher.java:821)
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:72)
>> ... 9 more
>>
>>
>> Can someone please help me understand in detail on what is causing this
>> exception. I can see zookeeper not able to retrieve job graph. What could
>> be the reason for this?
>>
>> This is second time that my setup is going down with this excepton, first
>> time i cleared jobgraph folder in zookeeper and restarted, now again faced
>> with same issue.
>>
>> Since this is production setup this way of outage is not at all expected
>> :(. Can someone help me how to give a permanent fix to this issue?
>>
>>
>> Thanks,
>> Manju
>>
>>


Re: flink 1.7 HA production setup going down completely

2019-05-07 Thread miki haiat
Which flink version are you using?
I had similar  issues with 1.5.x

On Tue, May 7, 2019 at 2:49 PM Manjusha Vuyyuru 
wrote:

> Hello,
>
> I have a flink setup with two job managers coordinated by zookeeper.
>
> I see the below exception and both jobmanagers are going down:
>
> 2019-05-07 08:29:13,346 INFO
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
> Released locks of job graph f8eb1b482d8ec8c1d3e94c4d0f79df77 from ZooKeeper.
> 2019-05-07 08:29:13,346 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -* Fatal
> error occurred in the cluster entrypoint.*
> java.lang.RuntimeException: org.apache.flink.util.FlinkException: Could
> not retrieve submitted JobGraph from state handle under
> /147dd022ec91f7381ad4ca3d290387e9. This indicates that the retrieved state
> handle is broken. Try cleaning the state handle store.
> at
> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:74)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.util.FlinkException: Could not retrieve
> submitted JobGraph from state handle under
> /147dd022ec91f7381ad4ca3d290387e9. This indicates that the retrieved state
> handle is broken. Try cleaning the state handle store.
> at
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:208)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJob(Dispatcher.java:696)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobGraphs(Dispatcher.java:681)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobs(Dispatcher.java:662)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$26(Dispatcher.java:821)
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:72)
> ... 9 more
>
>
> Can someone please help me understand in detail on what is causing this
> exception. I can see zookeeper not able to retrieve job graph. What could
> be the reason for this?
>
> This is second time that my setup is going down with this excepton, first
> time i cleared jobgraph folder in zookeeper and restarted, now again faced
> with same issue.
>
> Since this is production setup this way of outage is not at all expected
> :(. Can someone help me how to give a permanent fix to this issue?
>
>
> Thanks,
> Manju
>
>


flink 1.7 HA production setup going down completely

2019-05-07 Thread Manjusha Vuyyuru
Hello,

I have a flink setup with two job managers coordinated by zookeeper.

I see the below exception and both jobmanagers are going down:

2019-05-07 08:29:13,346 INFO
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
Released locks of job graph f8eb1b482d8ec8c1d3e94c4d0f79df77 from ZooKeeper.
2019-05-07 08:29:13,346 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -* Fatal
error occurred in the cluster entrypoint.*
java.lang.RuntimeException: org.apache.flink.util.FlinkException: Could not
retrieve submitted JobGraph from state handle under
/147dd022ec91f7381ad4ca3d290387e9. This indicates that the retrieved state
handle is broken. Try cleaning the state handle store.
at
org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:74)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.util.FlinkException: Could not retrieve
submitted JobGraph from state handle under
/147dd022ec91f7381ad4ca3d290387e9. This indicates that the retrieved state
handle is broken. Try cleaning the state handle store.
at
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:208)
at
org.apache.flink.runtime.dispatcher.Dispatcher.recoverJob(Dispatcher.java:696)
at
org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobGraphs(Dispatcher.java:681)
at
org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobs(Dispatcher.java:662)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$26(Dispatcher.java:821)
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:72)
... 9 more


Can someone please help me understand in detail on what is causing this
exception. I can see zookeeper not able to retrieve job graph. What could
be the reason for this?

This is second time that my setup is going down with this excepton, first
time i cleared jobgraph folder in zookeeper and restarted, now again faced
with same issue.

Since this is production setup this way of outage is not at all expected
:(. Can someone help me how to give a permanent fix to this issue?


Thanks,
Manju


Re: TM occasionally hang in deploying state in Flink 1.5

2019-05-07 Thread qi luo
Thanks Dawid, I’ve created an issue for this 
https://jira.apache.org/jira/browse/FLINK-12426 
. Though we’re using 1.5 but 
this may affect later versions.

I’m still investigating the root case but no result yet. This happens 
occasionally and isn't easy to reproduce.

> On Apr 25, 2019, at 6:40 PM, Dawid Wysakowicz  wrote:
> 
> Hi,
> 
> Feel free to open a JIRA for this issue. By the way have you investigated 
> what is the root cause for it hanging?
> 
> Best,
> 
> Dawid
> 
> On 25/04/2019 08:55, qi luo wrote:
>> Hello,
>> 
>> This issue occurred again and we dumped the TM thread. It indeed hung on 
>> socket read to download jar from Blob server:
>> 
>> "DataSource (at createInput(ExecutionEnvironment.java:548) (our.code)) 
>> (1999/2000)" #72 prio=5 os_prio=0 tid=0x7fb9a1521000 nid=0xa0994 
>> runnable [0x7fb97cfbf000]
>>java.lang.Thread.State: RUNNABLE
>> at java.net.SocketInputStream.socketRead0(Native Method)
>> at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
>> at java.net.SocketInputStream.read(SocketInputStream.java:171)
>> at java.net.SocketInputStream.read(SocketInputStream.java:141)
>> at 
>> org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:152)
>> at 
>> org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:140)
>> at 
>> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:170)
>> at 
>> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
>> at 
>> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206)
>> at 
>> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
>> - locked <0x00078ab60ba8> (a java.lang.Object)
>> at 
>> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:893)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>> at java.lang.Thread.run(Thread.java:748)
>> 
>> I checked the latest master code. There’s still no socket timeout in Blob 
>> client. Should I create an issue to add this timeout?
>> 
>> Regards,
>> Qi 
>> 
>>> On Apr 19, 2019, at 7:49 PM, qi luo >> > wrote:
>>> 
>>> Hi all,
>>> 
>>> We use Flink 1.5 batch and start thousands of jobs per day. Occasionally we 
>>> observed some stuck jobs, due to some TM hang in “DEPLOYING” state. 
>>> 
>>> On checking TM log, it shows that it stuck in downloading jars in 
>>> BlobClient:
>>> 
>>> 
>>> ...
>>> INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   - Received 
>>> task DataSource (at createInput(ExecutionEnvironment.java:548) (our.code)) 
>>> (184/2000).
>>> INFO  org.apache.flink.runtime.taskmanager.Task - 
>>> DataSource (at createInput(ExecutionEnvironment.java:548) (our.code)) 
>>> (184/2000) switched from CREATED to DEPLOYING.
>>> INFO  org.apache.flink.runtime.taskmanager.Task - 
>>> Creating FileSystem stream leak safety net for task DataSource (at 
>>> createInput(ExecutionEnvironment.java:548) (our.code)) (184/2000) 
>>> [DEPLOYING]
>>> INFO  org.apache.flink.runtime.taskmanager.Task - 
>>> Loading JAR files for task DataSource (at 
>>> createInput(ExecutionEnvironment.java:548) (our.code)) (184/2000) 
>>> [DEPLOYING].
>>> INFO  org.apache.flink.runtime.blob.BlobClient  - 
>>> Downloading 
>>> 19e65c0caa41f264f9ffe4ca2a48a434/p-3ecd6341bf97d5512b14c93f6c9f51f682b6db26-37d5e69d156ee00a924c1ebff0c0d280
>>>  from some-host-ip-port
>>> 
>>> no more logs...
>>> 
>>> 
>>> It seems that the TM is calling BlobClient to download jars from 
>>> JM/BlobServer. Under hood it’s calling Socket.connect() and then 
>>> Socket.read() to retrieve results. 
>>> 
>>> Should we add timeout in socket operations in BlobClient to resolve this 
>>> issue?
>>> 
>>> Thanks,
>>> Qi
>> 



How to export all not-null keyed ValueState

2019-05-07 Thread Averell
Hi,

I have a keyed value state which is available for only about 1% the total
number of keyed values that I have. Is there any way to get the values of
all those state values? 
I looked at the queryable state option, but it looks like supporting
querying by keyed value only. 

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/