ElasticsearchSink Exception
Hi All, I'm getting the below exception when I start my flink job. I have verified the elastic search host and it seems to be working well. I have also tried including the below dependecies to my project but nothing works. Need some help. Thanks. compile group: 'org.apache.lucene', name: 'lucene-core', version: '5.5.0' compile group: 'org.elasticsearch', name: 'elasticsearch', version: '2.3.5' *Sink Code:* List transportAddresses = new ArrayList<>(); transportAddresses.add(new InetSocketAddress(InetAddress.getByName(*hostName*), 9300)); output.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction() { } *Exception:* java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes! at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745)
Re: ElasticsearchSink Exception
Hi Flavio, I tried with both http port 9200 and tcp port 9300 and I see incoming connections in the elasticserach node. Also I see the below errors in taskmanager out logs. Below are the dependencies I have on my gradle project. Am I missing something? Exception in thread "elasticsearch[Madame Menace][generic][T#2]" java.lang.NoClassDefFoundError: org/elasticsearch/index/mapper/MapperParsingException at org.elasticsearch.ElasticsearchException.(ElasticsearchException.java:597) at org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.elasticsearch.index.mapper.MapperParsingException at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 5 more Exception in thread "elasticsearch[Saint Elmo][generic][T#2]" java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.transport.NodeDisconnectedException at org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) compile group: 'org.apache.flink', name: 'flink-core', version: '1.2.0' compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10', version: '1.2.0' compile group: 'org.apache.flink', name: 'flink-java', version: '1.2.0' compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.10', version: '1.2.0' compile group: 'org.apache.flink', name: 'flink-clients_2.10', version: '1.2.0' compile group: 'org.apache.flink', name: 'flink-connector-elasticsearch2_2.10', version: '1.2.0' On Sat, Feb 25, 2017 at 1:26 AM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Are you sure that in elasticsearch.yml you've enabled ES to listen to the > http port 9300? > > On 25 Feb 2017 08:58, "Govindarajan Srinivasaraghavan" < > govindragh...@gmail.com> wrote: > > Hi All, > > I'm getting the below exception when I start my flink job. I have verified > the elastic search host and it seems to be working well. I have also tried > including the below dependecies to my project but nothing works. Need some > help. Thanks. > > compile group: 'org.apache.lucene', name: 'lucene-core', version: '5.5.0' > compile group: 'org.elasticsearch', name: 'elasticsearch', version: '2.3.5' > > > *Sink Code:* > > List transportAddresses = new ArrayList<>(); > transportAddresses.add(new > InetSocketAddress(InetAddress.getByName(*hostName*), 9300)); > > output.addSink(new ElasticsearchSink<>(config, transportAddresses, new > ElasticsearchSinkFunction() { > > } > > > *Exception:* > > java.lang.RuntimeException: Client is not connected to any Elasticsearch > nodes! > at org.apache.flink.streaming.connectors.elasticsearch2. > ElasticsearchSink.open(ElasticsearchSink.java:172) > at org.apache.flink.api.common.functions.util.FunctionUtils. > openFunction(FunctionUtils.java:36) > at org.apache.flink.streaming.api.operators. > AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > openAllOperators(StreamTask.java:386) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:262) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) >
Flink Job Exception
Hi All, I'm trying to run a streaming job with flink 1.2 version and there are 3 task managers with 12 task slots. Irrespective of the parallelism that I give it always fails with the below error and I found a JIRA link corresponding to this issue. Can I know by when this will be resolved since I'm not able to run any job in my current environment. Thanks. https://issues.apache.org/jira/browse/FLINK-5773 java.lang.ClassCastException: Cannot cast scala.util.Failure to org.apache.flink.runtime.messages.Acknowledge at java.lang.Class.cast(Class.java:3369) at scala.concurrent.Future$$anonfun$mapTo$1.apply(Future.scala:405) at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) at scala.util.Try$.apply(Try.scala:192) at scala.util.Success.map(Try.scala:237) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63) at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78) at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55) at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:89) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:967) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Flink: How to handle external app configuration changes in flink
Hi, My requirement is to stream millions of records in a day and it has huge dependency on external configuration parameters. For example, a user can go and change the required setting anytime in the web application and after the change is made, the streaming has to happen with the new application config parameters. These are app level configurations and we also have some dynamic exclude parameters which each data has to be passed through and filtered. I see that flink doesn’t have global state which is shared across all task managers and subtasks. Having a centralized cache is an option but for each parameter I would have to read it from cache which will increase the latency. Please advise on the better approach to handle these kind of scenarios and how other applications are handling it. Thanks.
Questions on flink
Hi, I'm working on apache flink for data streaming and I have few questions. Any help is greatly appreciated. Thanks. 1) Are there any restrictions on creating tumbling windows. For example, if I want to create a tumbling window per user id for 2 secs and let’s say if I have more than 10 million user id's would that be a problem. (I'm using keyBy user id and then creating a timeWindow for 2 secs)? How are these windows maintained internally in flink? 2) I looked at rebalance for round robin partitioning. Let’s say I have a cluster set up and if I have a parallelism of 1 for source and if I do a rebalance, will my data be shuffled across machines to improve performance? If so is there a specific port using which the data is transferred to other nodes in the cluster? 3) Are there any limitations on state maintenance? I'm planning to maintain some user id related data which could grow very large. I read about flink using rocks db to maintain the state. Just wanted to check if there are any limitations on how much data can be maintained? 4) Also where is the state maintained if the amount of data is less? (I guess in JVM memory) If I have several machines on my cluster can every node get the current state version? 5) I need a way to send external configuration changes to flink. Lets say there is a new parameter that has to added or an external change which has to be updated inside flink's state, how can this be done? Thanks
Using Flink
Hi, I have few questions on how I need to model my use case in flink. Please advise. Thanks for the help. - I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and I have checkpoint enabled. When I look at the consumer offsets in kafka it appears to be stagnant and there is a huge lag. But I can see my flink program is in pace with kafka source in JMX metrics and outputs. Is there a way to identify why the offsets are not committed to kafka? - In my current application we custom loggers for debugging purposes. Let’s say we want to find what’s happening for a particular user, we fire an api request to add the custom logger for that particular user and use it for logging along the data path. Is there a way to achieve this in flink? Are there any global mutable parameters that I can use to achieve this functionality? - Can I pass on state between operators? If I need the state stored on previous operators, how can I fetch it? Thanks
Re: Flink: How to handle external app configuration changes in flink
Hi Jamie, Thanks a lot for the response. Appreciate your help. Regards, Govind On Mon, Sep 26, 2016 at 3:26 AM, Jamie Grier <ja...@data-artisans.com> wrote: > Hi Govindarajan, > > Typically the way people do this is to create a stream of configuration > changes and consume this like any other stream. For the specific case of > filtering for example you may have a data stream and a stream of filters > that you want to run the data through. The typically approach in the Flink > API would then be > > val dataStream = env.addSource(dataSource).keyBy("userId")val > filterStream = env.addSource(filterSource).keyBy("userId") > val connectedStream = dataStream > .connect(filterStream) > .flatMap(yourFilterFunction) > > > You would maintain your filters as state in your filter function. Notice > that in this example both streams are keyed the same way. > > If it is not possible to distribute the configuration by key (it really > depends on your use case) you can instead "broadcast" that state so that > each instance of yourFilterFunction sees the same configuration messages > and will end up building the same state. For example: > > val dataStream = env.addSource(dataSource).keyBy("userId")val > filterStream = env.addSource(filterSource).broadcast() > val connectedStream = dataStream > .connect(filterStream) > .flatMap(yourFilterFunction) > > > I hope that helps. > > -Jamie > > > > > On Mon, Sep 26, 2016 at 4:34 AM, Govindarajan Srinivasaraghavan < > govindragh...@gmail.com> wrote: > > > Hi, > > > > My requirement is to stream millions of records in a day and it has huge > > dependency on external configuration parameters. For example, a user can > go > > and change the required setting anytime in the web application and after > > the change is made, the streaming has to happen with the new application > > config parameters. These are app level configurations and we also have > some > > dynamic exclude parameters which each data has to be passed through and > > filtered. > > > > I see that flink doesn’t have global state which is shared across all > task > > managers and subtasks. Having a centralized cache is an option but for > each > > parameter I would have to read it from cache which will increase the > > latency. Please advise on the better approach to handle these kind of > > scenarios and how other applications are handling it. Thanks. > > > > > > -- > > Jamie Grier > data Artisans, Director of Applications Engineering > @jamiegrier <https://twitter.com/jamiegrier> > ja...@data-artisans.com >
Flink Metrics
Hi, I am currently using flink 1.2 snapshot and instrumenting my pipeline with flink metrics. One small suggestion I have is currently the Meter interface only supports getRate() which is always the one minute rate. It would great if all the rates (1 min, 5 min & 15 min) are exposed to get a better picture in terms of performance. Also is there any reason why timers are not part of flink metrics core? Regards, Govind
Stream Iterations
Hi All, I have a use case for which I need some suggestions. It's a streaming application with kafka source and then groupBy, keyBy and perform some calculations. The output of each calculation has to be a side input for the next calculation and also it needs to be sent to a sink. Right now I'm achieving this by storing the result state in memory and also save it in redis cache. I was looking at delta iterations in flink documentation. It would great if someone can help me understand if I can achieve this using iterations or any other api. Thanks in advance. Regards, Govind
Re: Dynamic Scaling
Hi All, It would great if someone can help me with my questions. Appreciate all the help. Thanks. > On Dec 23, 2016, at 12:11 PM, Govindarajan Srinivasaraghavan > <govindragh...@gmail.com> wrote: > > Hi, > > We have a computation heavy streaming flink job which will be processing > around 100 million message at peak time and around 1 million messages in non > peak time. We need the capability to dynamically scale so that the > computation operator can scale up and down during high or low work loads > respectively without restarting the job in order to lower the machine costs. > > Is there an ETA on when the rescaling a single operator without restart > feature will be released? > > Is it possible to auto scale one of the operators with docker swarm or Amazon > ECS auto scaling based on kafka consumer lag or cpu consumption? If so can I > get some documentation or steps to achieve this behaviour. > > Also is there any document on what are the tasks of a job manager apart from > scheduling and reporting status? > > Since there is just one job manager we just wanted to check if there would be > any potential scaling limitations as the processing capacity increases. > > Thanks > Govind >
Streaming Exception
Hi All, I see the below error after running my streaming job for a while and when the load increases. After a while the task manager becomes completely dead and the job keeps on restarting. Also when I checked if there is an back pressure in the UI, it kept on saying sampling in progress and no results were displayed. Is there an API which can provide the back pressure details? 2017-03-10 01:40:58,793 WARN org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while emitting latency marker. org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:426) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:848) at org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:152) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:256) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:117) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:848) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:690) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:423) ... 10 more Caused by: java.lang.InterruptedException at java.lang.Object.wait(Native Method) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:168) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:138) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:104) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:114) ... 14 more
Connect more than two streams
Hi, I have two streams reading from kafka, one for data and other for control. The data stream is split by type and there are around six types. Each type has its own processing logic and finally everything has to be merged to get the collective state per device. I was thinking I could connect multiple streams, process and maintain state but connect only supports two streams. Is there some way to achieve my desired functionality? By the way after split and some processing all of them are keyed streams.
Increasing Disk Read Throughput and IOPS
Hi All, We are running flink in AWS and we are observing a strange behavior. We are using docker containers, EBS for storage and Rocks DB state backend. We have a few map and value states with checkpointing every 30 seconds and incremental checkpointing turned on. The issue we are noticing is the read IOPS and read throughput gradually increases over time and keeps constantly growing. The write throughput and write bytes are not increasing as much as reads. The checkpoints are written to a durable NFS storage. We are not sure what is causing this constant increase in read throughput but due to which we are running out of EBS burst balance and need to restart the job every once in a while. Attached the EBS read and write metrics. Has anyone encountered this issue and what could be the possible solution. We have also tried setting the below rocksdb options but didn't help. DBOptions: currentOptions.setOptimizeFiltersForHits(true) .setWriteBufferSize(536870912) .setMaxWriteBufferNumber(5) .setMinWriteBufferNumberToMerge(2); ColumnFamilyOptions: currentOptions.setMaxBackgroundCompactions(4) .setMaxManifestFileSize(1048576) .setMaxLogFileSize(1048576); Thanks.
Too many open files
Hi, We have a streaming job that runs on flink in docker and checkpointing happens every 10 seconds. After several starts and cancellations we are facing this issue with file handles. The job reads data from kafka, processes it and writes it back to kafka and we are using RocksDB state backend. For now we have increased the number file handles to resolve the problem but would like to know if this is expected or is it an issue. Thanks. java.io.FileNotFoundException: /tmp/flink-io-b3043cd6-50c8-446a-8c25-fade1b1862c0/cb317fc2578db72b3046468948fa00f2f17039b6104e72fb8c58938e5869cfbc.0.buffer (Too many open files) at java.io.RandomAccessFile.open0(Native Method) at java.io.RandomAccessFile.open(RandomAccessFile.java:316) at java.io.RandomAccessFile.(RandomAccessFile.java:243) at org.apache.flink.streaming.runtime.io.BufferSpiller.createSpillingChannel(BufferSpiller.java:259) at org.apache.flink.streaming.runtime.io.BufferSpiller.(BufferSpiller.java:120) at org.apache.flink.streaming.runtime.io.BarrierBuffer.(BarrierBuffer.java:149) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.(StreamTwoInputProcessor.java:147) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.init(TwoInputStreamTask.java:79) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:235) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Regards, Govind