1. Okay, I understand. My code is similar to what you demonstrated. I have attached a snap of my job plan visualization.
3. Have attached the logs and exception raised (15min - configured akka timeout) after submitting the job. Thanks, Shailesh On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <pi...@data-artisans.com> wrote: > Hi, > > 1. > I’m not sure what is your code. However I have tested it and here is the > example with multiple streams in one job: > https://gist.github.com/pnowojski/63fb1c56f2938091769d8de6f513567f > As expected it created 5 source threads (checked in the debugger) and is > printing 5 values to the output every seconds, so clearly those 5 sources > are executed simultaneously. > > Number of operators is not related to the number of threads. Number of > operator chains is. Simple pipelines like source -> map -> filter -> sink > will be chained and executed in one threads, please refer to the > documentation link in one of my earlier response. > > Can you share your job code? > > 2. Good point, I forgot to mention that. The job in my example will have 5 > operator chains, but because of task slot sharing, they will share one > single task slot. In order to distribute such job with parallelism 1 across > the cluster you have to define different slot sharing groups per each chain: > https://ci.apache.org/projects/flink/flink-docs- > release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups > Just set it on the sources. > > 3. Can you show the logs from job manager and task manager? > > 4. As long as you have enough heap memory to run your application/tasks > there is no upper limit for number of task slots. > > Piotrek > > On 14 Nov 2017, at 07:26, Shailesh Jain <shailesh.j...@stellapps.com> > wrote: > > Hi Piotrek, > > I tried out option 'a' mentioned above, but instead of separate jobs, I'm > creating separate streams per device. Following is the test deployment > configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine): > > akka.client.timeout 15 min > jobmanager.heap.mb 1024 > jobmanager.rpc.address localhost > jobmanager.rpc.port 6123 > jobmanager.web.port 8081 > metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter > metrics.reporter.jmx.port 8789 > metrics.reporters jmx > parallelism.default 1 > taskmanager.heap.mb 1024 > taskmanager.memory.preallocate false > taskmanager.numberOfTaskSlots 4 > > The number of Operators per device stream is 4 (one sink function, 3 CEP > operators). > > Observations (and questions): > > 1. No. of threads (captured through JMX) is almost the same as the total > number of operators being created. This clears my original question in this > thread. > > 2. Even when the number of task slots is 4, on web ui, it shows 3 slots as > free. Is this expected? Why are the subtasks not being distributed across > slots? > > 3. Job deployment hangs (never switches to RUNNING) when the number of > devices is greater than 5. Even on increasing the akka client timeout, it > does not help. Will separate jobs being deployed per device instead of > separate streams help here? > > 4. Is there an upper limit on number task slots which can be configured? I > know that my operator state size at any given point in time would not be > very high, so it looks OK to deploy independent jobs which can be deployed > on the same task manager across slots. > > Thanks, > Shailesh > > > On Mon, Nov 13, 2017 at 7:21 PM, Piotr Nowojski <pi...@data-artisans.com> > wrote: > >> Sure, let us know if you have other questions or encounter some issues. >> >> Thanks, Piotrek >> >> >> On 13 Nov 2017, at 14:49, Shailesh Jain <shailesh.j...@stellapps.com> >> wrote: >> >> Thanks, Piotr. I'll try it out and will get back in case of any further >> questions. >> >> Shailesh >> >> On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski <pi...@data-artisans.com> >> wrote: >> >>> 1. It’s a little bit more complicated then that. Each operator >>> chain/task will be executed in separate thread (parallelism >>> Multiplies that). You can check in web ui how was your job split into >>> tasks. >>> >>> 3. Yes that’s true, this is an issue. To preserve the individual >>> watermarks/latencies (assuming that you have some way to calculate them >>> individually per each device), you could either: >>> >>> a) have separate jobs per each device with parallelism 1. Pros: >>> independent failures/checkpoints, Cons: resource usage (number of threads >>> increases with number of devices, there are also other resources consumed >>> by each job), efficiency, >>> b) have one job with multiple data streams. Cons: resource usage >>> (threads) >>> c) ignore Flink’s watermarks, and implement your own code in place of >>> it. You could read all of your data in single data stream, keyBy >>> partition/device and manually handle watermarks logic. You could either try >>> to wrap CEP/Window operators or copy/paste and modify them to suite your >>> needs. >>> >>> I would start and try out from a). If it work for your cluster/scale >>> then that’s fine. If not try b) (would share most of the code with a), and >>> as a last resort try c). >>> >>> Kostas, would you like to add something? >>> >>> Piotrek >>> >>> On 9 Nov 2017, at 19:16, Shailesh Jain <shailesh.j...@stellapps.com> >>> wrote: >>> >>> On 1. - is it tied specifically to the number of source operators or to >>> the number of Datastream objects created. I mean does the answer change if >>> I read all the data from a single Kafka topic, get a Datastream of all >>> events, and the apply N filters to create N individual streams? >>> >>> On 3. - the problem with partitions is that watermarks cannot be >>> different per partition, and since in this use case, each stream is from a >>> device, the latency could be different (but order will be correct almost >>> always) and there are high chances of loosing out on events on operators >>> like Patterns which work with windows. Any ideas for workarounds here? >>> >>> >>> Thanks, >>> Shailesh >>> >>> On 09-Nov-2017 8:48 PM, "Piotr Nowojski" <pi...@data-artisans.com> >>> wrote: >>> >>> Hi, >>> >>> 1. >>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>> dev/parallel.html >>> >>> Number of threads executing would be roughly speaking equal to of the >>> number of input data streams multiplied by the parallelism. >>> >>> 2. >>> Yes, you could dynamically create more data streams at the job startup. >>> >>> 3. >>> Running 10000 independent data streams on a small cluster (couple of >>> nodes) will definitely be an issue, since even with parallelism set to 1, >>> there would be quite a lot of unnecessary threads. >>> >>> It would be much better to treat your data as a single data input stream >>> with multiple partitions. You could assign partitions between source >>> instances based on parallelism. For example with parallelism 6: >>> - source 0 could get partitions 0, 6, 12, 18 >>> - source 1, could get partitions 1, 7, … >>> … >>> - source 5, could get partitions 5, 11, ... >>> >>> Piotrek >>> >>> On 9 Nov 2017, at 10:18, Shailesh Jain <shailesh.j...@stellapps.com> >>> wrote: >>> >>> Hi, >>> >>> I'm trying to understand the runtime aspect of Flink when dealing with >>> multiple data streams and multiple operators per data stream. >>> >>> Use case: N data streams in a single flink job (each data stream >>> representing 1 device - with different time latencies), and each of these >>> data streams gets split into two streams, of which one goes into a bunch of >>> CEP operators, and one into a process function. >>> >>> Questions: >>> 1. At runtime, will the engine create one thread per data stream? Or one >>> thread per operator? >>> 2. Is it possible to dynamically create a data stream at runtime when >>> the job starts? (i.e. if N is read from a file when the job starts and >>> corresponding N streams need to be created) >>> 3. Are there any specific performance impacts when a large number of >>> streams (N ~ 10000) are created, as opposed to N partitions within a single >>> stream? >>> >>> Are there any internal (design) documents which can help understanding >>> the implementation details? Any references to the source will also be >>> really helpful. >>> >>> Thanks in advance. >>> >>> Shailesh >>> >>> >>> >>> >>> >>> >> >> > >
2017-11-14 15:44:09,867 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2017-11-14 15:44:09,932 INFO org.apache.flink.runtime.jobmanager.JobManager - -------------------------------------------------------------------------------- 2017-11-14 15:44:09,932 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager (Version: 1.3.2, Rev:0399bee, Date:03.08.2017 @ 10:23:11 UTC) 2017-11-14 15:44:09,932 INFO org.apache.flink.runtime.jobmanager.JobManager - Current user: shailesh 2017-11-14 15:44:09,932 INFO org.apache.flink.runtime.jobmanager.JobManager - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.144-b01 2017-11-14 15:44:09,932 INFO org.apache.flink.runtime.jobmanager.JobManager - Maximum heap size: 1963 MiBytes 2017-11-14 15:44:09,932 INFO org.apache.flink.runtime.jobmanager.JobManager - JAVA_HOME: (not set) 2017-11-14 15:44:09,934 INFO org.apache.flink.runtime.jobmanager.JobManager - Hadoop version: 2.7.2 2017-11-14 15:44:09,935 INFO org.apache.flink.runtime.jobmanager.JobManager - JVM Options: 2017-11-14 15:44:09,935 INFO org.apache.flink.runtime.jobmanager.JobManager - -Xms2048m 2017-11-14 15:44:09,935 INFO org.apache.flink.runtime.jobmanager.JobManager - -Xmx2048m 2017-11-14 15:44:09,935 INFO org.apache.flink.runtime.jobmanager.JobManager - -Dlog.file=/home/shailesh/workspace/flink/flink-1.3.2/log/flink-shailesh-jobmanager-0-shailesh.log 2017-11-14 15:44:09,935 INFO org.apache.flink.runtime.jobmanager.JobManager - -Dlog4j.configuration=file:/home/shailesh/workspace/flink/flink-1.3.2/conf/log4j.properties 2017-11-14 15:44:09,935 INFO org.apache.flink.runtime.jobmanager.JobManager - -Dlogback.configurationFile=file:/home/shailesh/workspace/flink/flink-1.3.2/conf/logback.xml 2017-11-14 15:44:09,935 INFO org.apache.flink.runtime.jobmanager.JobManager - Program Arguments: 2017-11-14 15:44:09,935 INFO org.apache.flink.runtime.jobmanager.JobManager - --configDir 2017-11-14 15:44:09,935 INFO org.apache.flink.runtime.jobmanager.JobManager - /home/shailesh/workspace/flink/flink-1.3.2/conf 2017-11-14 15:44:09,935 INFO org.apache.flink.runtime.jobmanager.JobManager - --executionMode 2017-11-14 15:44:09,935 INFO org.apache.flink.runtime.jobmanager.JobManager - local 2017-11-14 15:44:09,935 INFO org.apache.flink.runtime.jobmanager.JobManager - Classpath: /home/shailesh/workspace/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/home/shailesh/workspace/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/home/shailesh/workspace/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/home/shailesh/workspace/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/home/shailesh/workspace/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar::: 2017-11-14 15:44:09,935 INFO org.apache.flink.runtime.jobmanager.JobManager - -------------------------------------------------------------------------------- 2017-11-14 15:44:09,936 INFO org.apache.flink.runtime.jobmanager.JobManager - Registered UNIX signal handlers for [TERM, HUP, INT] 2017-11-14 15:44:10,049 INFO org.apache.flink.runtime.jobmanager.JobManager - Loading configuration from /home/shailesh/workspace/flink/flink-1.3.2/conf 2017-11-14 15:44:10,052 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost 2017-11-14 15:44:10,052 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: akka.client.timeout, 15 min 2017-11-14 15:44:10,052 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2017-11-14 15:44:10,052 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.mb, 1024 2017-11-14 15:44:10,052 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.mb, 1024 2017-11-14 15:44:10,052 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 4 2017-11-14 15:44:10,052 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.memory.preallocate, false 2017-11-14 15:44:10,052 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1 2017-11-14 15:44:10,053 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.web.port, 8081 2017-11-14 15:44:10,053 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: metrics.reporters, jmx 2017-11-14 15:44:10,053 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: metrics.reporter.jmx.class, org.apache.flink.metrics.jmx.JMXReporter 2017-11-14 15:44:10,053 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: metrics.reporter.jmx.port, 8789 2017-11-14 15:44:10,058 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager without high-availability 2017-11-14 15:44:10,061 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager on localhost:6123 with execution mode LOCAL 2017-11-14 15:44:10,067 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost 2017-11-14 15:44:10,067 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: akka.client.timeout, 15 min 2017-11-14 15:44:10,067 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2017-11-14 15:44:10,067 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.mb, 1024 2017-11-14 15:44:10,067 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.mb, 1024 2017-11-14 15:44:10,068 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 4 2017-11-14 15:44:10,068 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.memory.preallocate, false 2017-11-14 15:44:10,068 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1 2017-11-14 15:44:10,068 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.web.port, 8081 2017-11-14 15:44:10,068 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: metrics.reporters, jmx 2017-11-14 15:44:10,068 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: metrics.reporter.jmx.class, org.apache.flink.metrics.jmx.JMXReporter 2017-11-14 15:44:10,068 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: metrics.reporter.jmx.port, 8789 2017-11-14 15:44:10,096 INFO org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set to shailesh (auth:SIMPLE) 2017-11-14 15:44:10,153 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager actor system reachable at localhost:6123 2017-11-14 15:44:10,371 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 2017-11-14 15:44:10,405 INFO Remoting - Starting remoting 2017-11-14 15:44:10,552 INFO Remoting - Remoting started; listening on addresses :[akka.tcp://flink@localhost:6123] 2017-11-14 15:44:10,566 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager web frontend 2017-11-14 15:44:10,573 INFO org.apache.flink.runtime.webmonitor.WebMonitorUtils - Determined location of JobManager log file: /home/shailesh/workspace/flink/flink-1.3.2/log/flink-shailesh-jobmanager-0-shailesh.log 2017-11-14 15:44:10,574 INFO org.apache.flink.runtime.webmonitor.WebMonitorUtils - Determined location of JobManager stdout file: /home/shailesh/workspace/flink/flink-1.3.2/log/flink-shailesh-jobmanager-0-shailesh.out 2017-11-14 15:44:10,574 INFO org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Using directory /tmp/flink-web-3c4c002f-2c02-4349-852a-fc92ef06c64d for the web interface files 2017-11-14 15:44:10,574 INFO org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Using directory /tmp/flink-web-d25e9b7d-d26b-47f8-b457-fbe0421d4f7f for web frontend JAR file uploads 2017-11-14 15:44:10,778 INFO org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend listening at 0:0:0:0:0:0:0:0:8081 2017-11-14 15:44:10,779 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager actor 2017-11-14 15:44:10,791 INFO org.apache.flink.runtime.blob.BlobServer - Created BLOB server storage directory /tmp/blobStore-3d640612-b368-4ad6-a64f-c1bd50f642b0 2017-11-14 15:44:10,792 INFO org.apache.flink.runtime.blob.BlobServer - Started BLOB server at 0.0.0.0:45259 - max concurrent requests: 50 - max backlog: 1000 2017-11-14 15:44:10,873 INFO org.apache.flink.runtime.metrics.MetricRegistry - Configuring JMXReporter with {port=8789, class=org.apache.flink.metrics.jmx.JMXReporter}. 2017-11-14 15:44:10,945 INFO org.apache.flink.metrics.jmx.JMXReporter - Started JMX server on port 8789. 2017-11-14 15:44:10,945 INFO org.apache.flink.metrics.jmx.JMXReporter - Configured JMXReporter with {port:8789} 2017-11-14 15:44:10,946 INFO org.apache.flink.runtime.metrics.MetricRegistry - Reporting metrics for reporter jmx of type org.apache.flink.metrics.jmx.JMXReporter. 2017-11-14 15:44:10,952 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting embedded TaskManager for JobManager's LOCAL execution mode 2017-11-14 15:44:10,953 INFO org.apache.flink.runtime.jobmanager.MemoryArchivist - Started memory archivist akka://flink/user/archive 2017-11-14 15:44:10,960 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager at akka.tcp://flink@localhost:6123/user/jobmanager. 2017-11-14 15:44:10,973 INFO org.apache.flink.runtime.io.network.netty.NettyConfig - NettyConfig [server address: localhost/127.0.0.1, server port: 0, ssl enabled: false, memory segment size (bytes): 32768, transport type: NIO, number of server threads: 4 (manual), number of client threads: 4 (manual), server connect backlog: 0 (use Netty's default), client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)] 2017-11-14 15:44:10,976 INFO org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration - Messages have a max timeout of 10000 ms 2017-11-14 15:44:10,988 INFO org.apache.flink.runtime.jobmanager.JobManager - JobManager akka.tcp://flink@localhost:6123/user/jobmanager was granted leadership with leader session ID Some(00000000-0000-0000-0000-000000000000). 2017-11-14 15:44:10,991 INFO org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary file directory '/tmp': total 908 GB, usable 700 GB (77.09% usable) 2017-11-14 15:44:11,140 INFO org.apache.flink.runtime.io.network.buffer.NetworkBufferPool - Allocated 194 MB for network buffer pool (number of memory segments: 6214, bytes per segment: 32768). 2017-11-14 15:44:11,167 INFO org.apache.flink.runtime.io.network.NetworkEnvironment - Starting the network environment and its components. 2017-11-14 15:44:11,175 INFO org.apache.flink.runtime.io.network.netty.NettyClient - Successful initialization (took 3 ms). 2017-11-14 15:44:11,178 INFO org.apache.flink.runtime.io.network.netty.NettyServer - Successful initialization (took 3 ms). Listening on SocketAddress /127.0.0.1:45799. 2017-11-14 15:44:11,334 INFO org.apache.flink.runtime.taskexecutor.TaskManagerServices - Limiting managed memory to 0.7 of the currently free heap space (1224 MB), memory will be allocated lazily. 2017-11-14 15:44:11,338 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager uses directory /tmp/flink-io-f0e24f73-3307-4b0e-9ea5-593dfa8c3a03 for spill files. 2017-11-14 15:44:11,341 INFO org.apache.flink.runtime.metrics.MetricRegistry - Configuring JMXReporter with {port=8789, class=org.apache.flink.metrics.jmx.JMXReporter}. 2017-11-14 15:44:11,342 ERROR org.apache.flink.runtime.metrics.MetricRegistry - Could not instantiate metrics reporter jmx. Metrics might not be exposed/reported. java.lang.RuntimeException: Could not start JMX server on any configured port. Ports: 8789 at org.apache.flink.metrics.jmx.JMXReporter.open(JMXReporter.java:127) at org.apache.flink.runtime.metrics.MetricRegistry.<init>(MetricRegistry.java:129) at org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:188) at org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1921) at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2322) at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2053) at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2052) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2139) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2117) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2117) at scala.util.Try$.apply(Try.scala:192) at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2172) at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2117) at org.apache.flink.runtime.jobmanager.JobManager$$anon$10.call(JobManager.scala:1992) at org.apache.flink.runtime.jobmanager.JobManager$$anon$10.call(JobManager.scala:1990) at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1990) at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala) 2017-11-14 15:44:11,352 INFO org.apache.flink.runtime.filecache.FileCache - User file cache uses directory /tmp/flink-dist-cache-b55e868e-04f1-46a6-95ed-5b42bfbb4537 2017-11-14 15:44:11,360 INFO org.apache.flink.runtime.filecache.FileCache - User file cache uses directory /tmp/flink-dist-cache-f695cd11-66c0-4d3b-966e-bbafea598731 2017-11-14 15:44:11,361 INFO org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with JobManager akka.tcp://flink@localhost:6123/user/jobmanager on port 8081 2017-11-14 15:44:11,361 INFO org.apache.flink.runtime.webmonitor.JobManagerRetriever - New leader reachable under akka.tcp://flink@localhost:6123/user/jobmanager:00000000-0000-0000-0000-000000000000. 2017-11-14 15:44:11,368 INFO org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager actor at akka://flink/user/taskmanager#654603563. 2017-11-14 15:44:11,368 INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager data connection information: 6bfb4783db44eec5977ff0f2442824d6 @ localhost (dataPort=45799) 2017-11-14 15:44:11,369 INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 4 task slot(s). 2017-11-14 15:44:11,370 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 246/1963/1963 MB, NON HEAP: 37/38/-1 MB (used/committed/max)] 2017-11-14 15:44:11,373 INFO org.apache.flink.runtime.taskmanager.TaskManager - Trying to register at JobManager akka.tcp://flink@localhost:6123/user/jobmanager (attempt 1, timeout: 500 milliseconds) 2017-11-14 15:44:11,380 INFO org.apache.flink.runtime.jobmanager.JobManager - Task Manager Registration but not connected to ResourceManager 2017-11-14 15:44:11,381 INFO org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Trying to associate with JobManager leader akka.tcp://flink@localhost:6123/user/jobmanager 2017-11-14 15:44:11,384 INFO org.apache.flink.runtime.instance.InstanceManager - Registered TaskManager at localhost (akka://flink/user/taskmanager) as 5330f5aefccc9bde6de3f362fde0d0ff. Current number of registered hosts is 1. Current number of alive task slots is 4. 2017-11-14 15:44:11,391 INFO org.apache.flink.runtime.taskmanager.TaskManager - Successful registration at JobManager (akka://flink/user/jobmanager), starting network stack and library cache. 2017-11-14 15:44:11,393 INFO org.apache.flink.runtime.taskmanager.TaskManager - Determined BLOB server address to be localhost/127.0.0.1:45259. Starting BLOB cache. 2017-11-14 15:44:11,394 INFO org.apache.flink.runtime.blob.BlobCache - Created BLOB cache storage directory /tmp/blobStore-3812eb71-5af6-4981-8603-c855362a8868 2017-11-14 15:44:11,398 INFO org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager#1260996229] - leader session 00000000-0000-0000-0000-000000000000 2017-11-14 15:44:11,398 INFO org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Received TaskManagers that were registered at the leader JobManager. Trying to consolidate. 2017-11-14 15:44:11,398 INFO org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Consolidated 1 TaskManagers 2017-11-14 15:54:13,366 INFO org.apache.flink.runtime.blob.BlobCache - Created BLOB cache storage directory /tmp/blobStore-e7f6e1a0-ddf8-492b-9425-1b664312502f 2017-11-14 15:54:13,379 INFO org.apache.flink.runtime.blob.BlobCache - Downloading 0eef8a2d0eed7dd82362416f599a8df0f8d98af2 from localhost/127.0.0.1:45259 2017-11-14 15:54:29,715 INFO org.apache.flink.runtime.blob.BlobCache - Downloading 925195c4e83fa0ad05f34ccf902709bab3af9765 from localhost/127.0.0.1:45259 2017-11-14 15:54:32,670 INFO org.apache.flink.runtime.blob.BlobCache - Downloading 695767a41bd520df1183c747dd9b76663c5728d7 from localhost/127.0.0.1:45259 2017-11-14 15:54:42,891 INFO org.apache.flink.runtime.blob.BlobCache - Downloading 9a62ca72fd9ef29744c78066c57ccc27a3926f3e from localhost/127.0.0.1:45259 2017-11-14 15:54:47,684 INFO org.apache.flink.runtime.blob.BlobCache - Downloading 269d4aeb713d430feb326aff64969100cd961b79 from localhost/127.0.0.1:45259 2017-11-14 15:54:51,518 INFO org.apache.flink.runtime.blob.BlobCache - Downloading 2a24d2bbe7a6d266aa84a1f2d74ef0e0fceb818a from localhost/127.0.0.1:45259 2017-11-14 15:54:53,682 INFO org.apache.flink.runtime.blob.BlobCache - Downloading e46082b19eeed26cb8112df2373145fe6219a1b9 from localhost/127.0.0.1:45259 2017-11-14 15:55:26,680 INFO org.apache.flink.runtime.blob.BlobCache - Downloading d3cc0e8e94fbcaf558461e8bba39cde136f2dce5 from localhost/127.0.0.1:45259 2017-11-14 15:55:30,075 INFO org.apache.flink.runtime.blob.BlobCache - Downloading 46a66c74e33353d93d7b17c1979362b706c05336 from localhost/127.0.0.1:45259 2017-11-14 15:56:16,645 INFO org.apache.flink.runtime.blob.BlobCache - Downloading 66b448a3dece4aa92d348ecab3bb44e2d21883a7 from localhost/127.0.0.1:45259 2017-11-14 15:56:32,517 INFO org.apache.flink.runtime.blob.BlobCache - Downloading 5f3f61a3d4fa5fe8b6b1fe069d3d1b5408c95076 from localhost/127.0.0.1:45259 2017-11-14 15:56:35,734 INFO org.apache.flink.runtime.blob.BlobCache - Downloading 0c08d1200cf9970f5ad96aad3fd9e07cad578278 from localhost/127.0.0.1:45259 2017-11-14 15:56:55,763 INFO org.apache.flink.runtime.blob.BlobCache - Downloading 350856c2756bf4e42b4f84bf64fdd25e66b2778d from localhost/127.0.0.1:45259
Exception
Description: Binary data