ElasticsearchSink Exception

2017-02-24 Thread Govindarajan Srinivasaraghavan
Hi All,

I'm getting the below exception when I start my flink job. I have verified
the elastic search host and it seems to be working well. I have also tried
including the below dependecies to my project but nothing works. Need some
help. Thanks.

compile group: 'org.apache.lucene', name: 'lucene-core', version: '5.5.0'
compile group: 'org.elasticsearch', name: 'elasticsearch', version: '2.3.5'


*Sink Code:*

List transportAddresses = new ArrayList<>();
transportAddresses.add(new
InetSocketAddress(InetAddress.getByName(*hostName*), 9300));

output.addSink(new ElasticsearchSink<>(config, transportAddresses, new
ElasticsearchSinkFunction() {

}


*Exception:*

java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)


Re: ElasticsearchSink Exception

2017-02-25 Thread Govindarajan Srinivasaraghavan
Hi Flavio,

I tried with both http port 9200 and tcp port 9300 and I see incoming
connections in the elasticserach node. Also I see the below errors in
taskmanager out logs. Below are the dependencies I have on my gradle
project. Am I missing something?

Exception in thread "elasticsearch[Madame Menace][generic][T#2]"
java.lang.NoClassDefFoundError:
org/elasticsearch/index/mapper/MapperParsingException
at
org.elasticsearch.ElasticsearchException.(ElasticsearchException.java:597)
at
org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
org.elasticsearch.index.mapper.MapperParsingException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 5 more


Exception in thread "elasticsearch[Saint Elmo][generic][T#2]"
java.lang.NoClassDefFoundError: Could not initialize class
org.elasticsearch.transport.NodeDisconnectedException
at
org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


compile group: 'org.apache.flink', name: 'flink-core', version: '1.2.0'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10',
version: '1.2.0'
compile group: 'org.apache.flink', name: 'flink-java', version: '1.2.0'
compile group: 'org.apache.flink', name:
'flink-connector-kafka-0.10_2.10', version: '1.2.0'
compile group: 'org.apache.flink', name: 'flink-clients_2.10', version: '1.2.0'

compile group: 'org.apache.flink', name:
'flink-connector-elasticsearch2_2.10', version: '1.2.0'


On Sat, Feb 25, 2017 at 1:26 AM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Are you sure that in elasticsearch.yml you've enabled ES to listen to the
> http port 9300?
>
> On 25 Feb 2017 08:58, "Govindarajan Srinivasaraghavan" <
> govindragh...@gmail.com> wrote:
>
> Hi All,
>
> I'm getting the below exception when I start my flink job. I have verified
> the elastic search host and it seems to be working well. I have also tried
> including the below dependecies to my project but nothing works. Need some
> help. Thanks.
>
> compile group: 'org.apache.lucene', name: 'lucene-core', version: '5.5.0'
> compile group: 'org.elasticsearch', name: 'elasticsearch', version: '2.3.5'
>
>
> *Sink Code:*
>
> List transportAddresses = new ArrayList<>();
> transportAddresses.add(new
> InetSocketAddress(InetAddress.getByName(*hostName*), 9300));
>
> output.addSink(new ElasticsearchSink<>(config, transportAddresses, new
> ElasticsearchSinkFunction() {
>
> }
>
>
> *Exception:*
>
> java.lang.RuntimeException: Client is not connected to any Elasticsearch
> nodes!
> at org.apache.flink.streaming.connectors.elasticsearch2.
> ElasticsearchSink.open(ElasticsearchSink.java:172)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.
> AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:386)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:262)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>


Flink Job Exception

2017-02-15 Thread Govindarajan Srinivasaraghavan
Hi All,

I'm trying to run a streaming job with flink 1.2 version and there are 3
task managers with 12 task slots. Irrespective of the parallelism that I
give it always fails with the below error and I found a JIRA link
corresponding to this issue. Can I know by when this will be resolved since
I'm not able to run any job in my current environment. Thanks.

https://issues.apache.org/jira/browse/FLINK-5773

java.lang.ClassCastException: Cannot cast scala.util.Failure to
org.apache.flink.runtime.messages.Acknowledge
at java.lang.Class.cast(Class.java:3369)
at scala.concurrent.Future$$anonfun$mapTo$1.apply(Future.scala:405)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
at scala.util.Try$.apply(Try.scala:192)
at scala.util.Success.map(Try.scala:237)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at 
scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:89)
at 
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:967)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Flink: How to handle external app configuration changes in flink

2016-09-25 Thread Govindarajan Srinivasaraghavan
Hi,

My requirement is to stream millions of records in a day and it has huge
dependency on external configuration parameters. For example, a user can go
and change the required setting anytime in the web application and after
the change is made, the streaming has to happen with the new application
config parameters. These are app level configurations and we also have some
dynamic exclude parameters which each data has to be passed through and
filtered.

I see that flink doesn’t have global state which is shared across all task
managers and subtasks. Having a centralized cache is an option but for each
parameter I would have to read it from cache which will increase the
latency. Please advise on the better approach to handle these kind of
scenarios and how other applications are handling it. Thanks.


Questions on flink

2016-09-24 Thread Govindarajan Srinivasaraghavan
Hi,

I'm working on apache flink for data streaming and I have few questions.
Any help is greatly appreciated. Thanks.

1) Are there any restrictions on creating tumbling windows. For example, if
I want to create a tumbling window per user id for 2 secs and let’s say if
I have more than 10 million user id's would that be a problem. (I'm using
keyBy user id and then creating a timeWindow for 2 secs)? How are these
windows maintained internally in flink?

2) I looked at rebalance for round robin partitioning. Let’s say I have a
cluster set up and if I have a parallelism of 1 for source and if I do a
rebalance, will my data be shuffled across machines to improve performance?
If so is there a specific port using which the data is transferred to other
nodes in the cluster?

3) Are there any limitations on state maintenance? I'm planning to maintain
some user id related data which could grow very large. I read about flink
using rocks db to maintain the state. Just wanted to check if there are any
limitations on how much data can be maintained?

4) Also where is the state maintained if the amount of data is less? (I
guess in JVM memory) If I have several machines on my cluster can every
node get the current state version?

5) I need a way to send external configuration changes to flink. Lets say
there is a new parameter that has to added or an external change which has
to be updated inside flink's state, how can this be done?

Thanks


Using Flink

2016-10-03 Thread Govindarajan Srinivasaraghavan
Hi,



I have few questions on how I need to model my use case in flink. Please
advise. Thanks for the help.



- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source
and I have checkpoint enabled. When I look at the consumer offsets in kafka
it appears to be stagnant and there is a huge lag. But I can see my flink
program is in pace with kafka source in JMX metrics and outputs. Is there a
way to identify why the offsets are not committed to kafka?



- In my current application we custom loggers for debugging purposes. Let’s
say we want to find what’s happening for a particular user, we fire an api
request to add the custom logger for that particular user and use it for
logging along the data path. Is there a way to achieve this in flink? Are
there any global mutable parameters that I can use to achieve this
functionality?



- Can I pass on state between operators? If I need the state stored on
previous operators, how can I fetch it?



Thanks


Re: Flink: How to handle external app configuration changes in flink

2016-09-26 Thread Govindarajan Srinivasaraghavan
Hi Jamie,

Thanks a lot for the response. Appreciate your help.

Regards,
Govind

On Mon, Sep 26, 2016 at 3:26 AM, Jamie Grier <ja...@data-artisans.com>
wrote:

> Hi Govindarajan,
>
> Typically the way people do this is to create a stream of configuration
> changes and consume this like any other stream.  For the specific case of
> filtering for example you may have a data stream and a stream of filters
> that you want to run the data through.  The typically approach in the Flink
> API would then be
>
> val dataStream = env.addSource(dataSource).keyBy("userId")val
> filterStream = env.addSource(filterSource).keyBy("userId")
> val connectedStream = dataStream
>   .connect(filterStream)
>   .flatMap(yourFilterFunction)
>
> ​
> You would maintain your filters as state in your filter function.  Notice
> that in this example both streams are keyed the same way.
>
> If it is not possible to distribute the configuration by key (it really
> depends on your use case) you can instead "broadcast" that state so that
> each instance of yourFilterFunction sees the same configuration messages
> and will end up building the same state.  For example:
>
> val dataStream = env.addSource(dataSource).keyBy("userId")val
> filterStream = env.addSource(filterSource).broadcast()
> val connectedStream = dataStream
>   .connect(filterStream)
>   .flatMap(yourFilterFunction)
>
> ​
> I hope that helps.
>
> -Jamie
>
>
>
>
> On Mon, Sep 26, 2016 at 4:34 AM, Govindarajan Srinivasaraghavan <
> govindragh...@gmail.com> wrote:
>
> > Hi,
> >
> > My requirement is to stream millions of records in a day and it has huge
> > dependency on external configuration parameters. For example, a user can
> go
> > and change the required setting anytime in the web application and after
> > the change is made, the streaming has to happen with the new application
> > config parameters. These are app level configurations and we also have
> some
> > dynamic exclude parameters which each data has to be passed through and
> > filtered.
> >
> > I see that flink doesn’t have global state which is shared across all
> task
> > managers and subtasks. Having a centralized cache is an option but for
> each
> > parameter I would have to read it from cache which will increase the
> > latency. Please advise on the better approach to handle these kind of
> > scenarios and how other applications are handling it. Thanks.
> >
>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> ja...@data-artisans.com
>


Flink Metrics

2016-10-16 Thread Govindarajan Srinivasaraghavan
Hi,

I am currently using flink 1.2 snapshot and instrumenting my pipeline with
flink metrics. One small suggestion I have is currently the Meter interface
only supports getRate() which is always the one minute rate.

It would great if all the rates (1 min, 5 min & 15 min) are exposed to get
a better picture in terms of performance.

Also is there any reason why timers are not part of flink metrics core?

Regards,
Govind


Stream Iterations

2016-12-18 Thread Govindarajan Srinivasaraghavan
Hi All,

I have a use case for which I need some suggestions. It's a streaming
application with kafka source and then groupBy, keyBy and perform some
calculations. The output of each calculation has to be a side input for the
next calculation and also it needs to be sent to a sink.

Right now I'm achieving this by storing the result state in memory and also
save it in redis cache. I was looking at delta iterations in flink
documentation. It would great if someone can help me understand if I can
achieve this using iterations or any other api. Thanks in advance.

Regards,
Govind


Re: Dynamic Scaling

2016-12-26 Thread Govindarajan Srinivasaraghavan
Hi All,

It would great if someone can help me with my questions. Appreciate all the 
help.

Thanks.

> On Dec 23, 2016, at 12:11 PM, Govindarajan Srinivasaraghavan 
> <govindragh...@gmail.com> wrote:
> 
> Hi,
> 
> We have a computation heavy streaming flink job which will be processing 
> around 100 million message at peak time and around 1 million messages in non 
> peak time. We need the capability to dynamically scale so that the 
> computation operator can scale up and down during high or low work loads 
> respectively without restarting the job in order to lower the machine costs.
> 
> Is there an ETA on when the rescaling a single operator without restart 
> feature will be released?
> 
> Is it possible to auto scale one of the operators with docker swarm or Amazon 
> ECS auto scaling based on kafka consumer lag or cpu consumption? If so can I 
> get some documentation or steps to achieve this behaviour.
> 
> Also is there any document on what are the tasks of a job manager apart from 
> scheduling and reporting status? 
> 
> Since there is just one job manager we just wanted to check if there would be 
> any potential scaling limitations as the processing capacity increases. 
> 
> Thanks
> Govind
> 


Streaming Exception

2017-03-09 Thread Govindarajan Srinivasaraghavan
Hi All,

I see the below error after running my streaming job for a while and when
the load increases. After a while the task manager becomes completely dead
and the job keeps on restarting.

Also when I checked if there is an back pressure in the UI, it kept on
saying sampling in progress and no results were displayed. Is there an API
which can provide the back pressure details?

2017-03-10 01:40:58,793 WARN
 org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Error
while emitting latency marker.
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:426)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:848)
at
org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:152)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:256)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:117)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:848)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:708)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:690)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:423)
... 10 more
Caused by: java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:138)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107)
at
org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:104)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:114)
... 14 more


Connect more than two streams

2017-07-24 Thread Govindarajan Srinivasaraghavan
Hi,


I have two streams reading from kafka, one for data and other for control.
The data stream is split by type and there are around six types. Each type
has its own processing logic and finally everything has to be merged to get
the collective state per device. I was thinking I could connect multiple
streams, process and maintain state but connect only supports two streams.
Is there some way to achieve my desired functionality?


By the way after split and some processing all of them are keyed streams.


Increasing Disk Read Throughput and IOPS

2018-05-22 Thread Govindarajan Srinivasaraghavan
Hi All,

We are running flink in AWS and we are observing a strange behavior. We are
using docker containers, EBS for storage and Rocks DB state backend. We
have a few map and value states with checkpointing every 30 seconds and
incremental checkpointing turned on. The issue we are noticing is the read
IOPS and read throughput gradually increases over time and keeps constantly
growing. The write throughput and write bytes are not increasing as much as
reads. The checkpoints are written to a durable NFS storage. We are not
sure what is causing this constant increase in read throughput but due to
which we are running out of EBS burst balance and need to restart the job
every once in a while. Attached the EBS read and write metrics. Has anyone
encountered this issue and what could be the possible solution.


We have also tried setting the below rocksdb options but didn't help.


DBOptions:

currentOptions.setOptimizeFiltersForHits(true)
.setWriteBufferSize(536870912)
.setMaxWriteBufferNumber(5)
.setMinWriteBufferNumberToMerge(2);

ColumnFamilyOptions:

currentOptions.setMaxBackgroundCompactions(4)
.setMaxManifestFileSize(1048576)
.setMaxLogFileSize(1048576);



Thanks.


Too many open files

2018-03-20 Thread Govindarajan Srinivasaraghavan
Hi,

We have a streaming job that runs on flink in docker and checkpointing
happens every 10 seconds. After several starts and cancellations we are
facing this issue with file handles.

The job reads data from kafka, processes it and writes it back to kafka and
we are using RocksDB state backend. For now we have increased the number
file handles to resolve the problem but would like to know if this is
expected or is it an issue. Thanks.

java.io.FileNotFoundException:
/tmp/flink-io-b3043cd6-50c8-446a-8c25-fade1b1862c0/cb317fc2578db72b3046468948fa00f2f17039b6104e72fb8c58938e5869cfbc.0.buffer
(Too many open files)

at java.io.RandomAccessFile.open0(Native Method)

at java.io.RandomAccessFile.open(RandomAccessFile.java:316)

at java.io.RandomAccessFile.(RandomAccessFile.java:243)

at
org.apache.flink.streaming.runtime.io.BufferSpiller.createSpillingChannel(BufferSpiller.java:259)

at
org.apache.flink.streaming.runtime.io.BufferSpiller.(BufferSpiller.java:120)

at
org.apache.flink.streaming.runtime.io.BarrierBuffer.(BarrierBuffer.java:149)

at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.(StreamTwoInputProcessor.java:147)

at
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.init(TwoInputStreamTask.java:79)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:235)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)

at java.lang.Thread.run(Thread.java:748)

Regards,
Govind