Flink equivalent to Samza's bootstrap stream?

2017-06-21 Thread Jakob Homan
Hey all- I'm using the Managed Key State to store data in a map. I would like, on initial job startup (trigged by a config), for that state to be populated before processing begings. This can either be from another stream or from a file. In Samza, one would do this with bootstrap streams

Re: Related datastream

2017-06-21 Thread nragon
The reason I'm doing it on stream is because i can have many records in memory and I want to execute this in an ordinary laptop. With streaming i can achieve this. So i set my links between a and c with 0..4 meaning each record from a can have between 0 or 4 records, same for b. But for now leta

Re: Related datastream

2017-06-21 Thread Jonas Gröger
Hi nragon, apparently I didn't read the P.S. since I assumed its not important. Silly me. So you are trying to join stream A and B to stream C with stream A and B being keyed. Alright. Are how often do matching elements (matched by primary key) from A and B arrive on your operator

Re: Related datastream

2017-06-21 Thread Jonas Gröger
Hey nragon! Do the two streams A and B have some sort of id or key or how do you plan on joining them? Do you just want to join A and B with elements a and b as they arrive (one in state and join with the next arriving one from the other stream)? >From what you are asking, this should be no

Re: Combining streams with static data and using REST API as a sink

2017-06-21 Thread Nancy Estrada
Hi Josh, I have a use-case similar to yours. I need to join a stream with data from a database to which I have access via a REST API. Since the Side inputs API continues begin and ongoing work. I am wondering how did you approached it, Did you use the rich function updating it periodically?

HA Standalone Cluster configuration

2017-06-21 Thread Edward Buck
Questions about standalone cluster configuration: 1. Is it considered bad practice to have standby JobManagers co-located on the same machines as TaskManagers? 2. Is it considered bad practice to have zookeeper installed on the same machines as the JobManager leader and standby machines?

Re: Add custom configuration files to TMs classpath on YARN

2017-06-21 Thread Mikhail Pryakhin
Hi Nico! Sounds great, will give it a try and return back with results soon. Thank you so much for your help!! Kind Regards, Mike Pryakhin > On 21 Jun 2017, at 16:36, Nico Kruber wrote: > > A workaround may be to use the DistributedCache. It apparently is not >

Re: Add custom configuration files to TMs classpath on YARN

2017-06-21 Thread Nico Kruber
A workaround may be to use the DistributedCache. It apparently is not documented much but the JavaDoc mentions roughly how to use it: https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/ flink/api/java/ExecutionEnvironment.java#L954 /** * Registers a file at the

Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

2017-06-21 Thread Andrea Spina
I Gordon, sadly no news since the last message. At the end I jumped over the issue, I was not able to solve it. I'll try provide a runnable example asap. Thank you. Andrea -- View this message in context:

Re: Custom Gauge Metric is not getting updated on Job Dashboard

2017-06-21 Thread sohimankotia
Cool. Thanks Closing thread . -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Gauge-Metric-is-not-getting-updated-on-Job-Dashboard-tp13842p13895.html Sent from the Apache Flink User Mailing List archive. mailing list archive at

Re: Kafka and Flink integration

2017-06-21 Thread Greg Hogan
If the concrete type cannot be known then a proper TypeInformation cannot be created and Kryo must be used. There may be a few cases where the TypeInformation can be deduced by the developer but not by TypeExtractor and the returns TypeInformation must be explicitly given to prevent the use of

Re: Custom Gauge Metric is not getting updated on Job Dashboard

2017-06-21 Thread Chesnay Schepler
Exactly, you register the gauge once in open(), and modify the code so that this gauge returns different values. On 21.06.2017 12:04, sohimankotia wrote: Basically Every time I am calling add metric method it is just registering the gauge . I can register this gauge in open method and then in

Re: Kafka and Flink integration

2017-06-21 Thread Ted Yu
Greg:Can you clarify he last part?Should it be: the concrete type cannot be known ? Original message From: Greg Hogan Date: 6/21/17 3:10 AM (GMT-08:00) To: nragon Cc: user@flink.apache.org Subject: Re: Kafka and Flink

Re: Custom Gauge Metric is not getting updated on Job Dashboard

2017-06-21 Thread sohimankotia
Basically Every time I am calling add metric method it is just registering the gauge . I can register this gauge in open method and then in flatmap update the value of gauge . Right ? -- View this message in context:

Re: Custom Gauge Metric is not getting updated on Job Dashboard

2017-06-21 Thread Chesnay Schepler
The reason why the gauge value is not updating is because you are not actually updating the gauge, but register a new gauge under the same name. The subsequent registration are ignored, and should've logged a warning. I suggest to make your gauge stateful by adding a field for the opTimeInSec

Re: Kafka and Flink integration

2017-06-21 Thread Greg Hogan
The recommendation has been to avoid Kryo where possible. General data exchange: avro or thrift. Flink internal data exchange: POJO (or Tuple, which are slightly faster though less readable, and there is an outstanding PR to narrow or close the performance gap). Kryo is useful for types which

Re: Custom Gauge Metric is not getting updated on Job Dashboard

2017-06-21 Thread sohimankotia
Here it is : import com.codahale.metrics.SlidingWindowReservoir; import in.dailyhunt.cis.enrichments.datatype.BasicInfoTuple; import in.dailyhunt.cis.enrichments.datatype.SinkTuple; import org.apache.flink.api.common.accumulators.LongCounter; import

Re: Custom Gauge Metric is not getting updated on Job Dashboard

2017-06-21 Thread Chesnay Schepler
Can you provide more of your code (you can also send it to me directly)? I'm interested in where the startTime/endTime arguments are defined. On 21.06.2017 10:47, sohimankotia wrote: I ran job and monitored for approx 20 mins . I tried with meter,accumulators,histogram,gauge . Out of those

Re: Custom Gauge Metric is not getting updated on Job Dashboard

2017-06-21 Thread sohimankotia
I ran job and monitored for approx 20 mins . I tried with meter,accumulators,histogram,gauge . Out of those only meter and accumulators were updating values, other were only only showing constant value all the time . -- View this message in context:

Re: Kafka and Flink integration

2017-06-21 Thread nragon
So, serialization between producer application -> kafka -> flink kafka consumer will use avro, thrift or kryo right? From there, the remaining pipeline can just use standard pojo serialization, which would be better? -- View this message in context: