Re: Silly keyBy() error

2016-03-12 Thread Ron Crocker
Thanks Stefano -

That helped, but just led to different pain. I think I need to reconsider how I 
treat these things. Alas, the subject of a different thread.

Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
rcroc...@newrelic.com
M: +1 630 363 8835

> On Mar 12, 2016, at 12:11 PM, Stefano Baghino  
> wrote:
> 
> Hi Ron,
> 
> not all classes can be used to `keyBy` a stream with. For your case in 
> particular, it looks like you have to implement Comparable so that Flink can 
> correctly key your stream based on AggregatableTimesliceImpl.
> 
> Take a look at the first slides here for more information on keying: 
> http://dataartisans.github.io/flink-training/dataStreamAdvanced/slides.html 
> 
> 
> Hope I helped.
> 
> On Sat, Mar 12, 2016 at 9:01 PM, Ron Crocker  > wrote:
> I’m sure this should work, but I’m missing something… I searched the archive 
> first, but didn’t have much luck finding any insights there.
> 
> TL;DR: org.apache.flink.api.common.InvalidProgramException: This type 
> (GenericType) cannot be 
> used as key.
> 
> I’m just getting started with a 1.0 implementation of a new task. It’s a 
> pretty straightforward reduce job, but I’m running into a snag with creating 
> a KeyedStream.
> 
> Here’s the graph:
> StreamExecutionEnvironment see = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 
> DataStream dataStream = see.addSource(new 
> FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME,
>  new TimesliceDeserializer(), kafkaConsumerProperties));
> 
> SingleOutputStreamOperator flattenedDataStream 
> = dataStream
> .assignTimestampsAndWatermarks(new 
> TimesliceTimestampExtractor())
> .flatMap(new TimesliceMapper());
> 
> flattenedDataStream
> .keyBy("accountId", "agentId", "wideMetricId")
> .timeWindow(Time.seconds(60))
> .reduce(AggregatableTimeslice::aggregateWith)
> .print();
> 
> This fails on keyBy() with the message: 
> Exception in thread "main" 
> org.apache.flink.api.common.InvalidProgramException: This type 
> (GenericType) cannot be 
> used as key.
> TimesliceMapper is a concrete implementation of 
> FlatMapFunction, namely
> public class TimesliceMapper implements FlatMapFunction AggregatableTimeslice> {
> @Override
> public void flatMap(TimesliceData value, Collector 
> out) throws Exception {
> for (Timeslice timeslice : value.getTimeslices()) {
> out.collect(new AggregatableTimesliceImpl(timeslice, value, 
> value.getAgentId()));
> }
> }
> }
> AggregatableTimesliceImpl is a simple concrete implementation of the 
> AggregatableTimeslice interface:
> public interface AggregatableTimeslice {
> int getAccountId();
> int getAgentId();
> long getWideMetricId();
> AggregatableTimesliceStats getTimesliceStats();
> }
> Ron
> —
> Ron Crocker
> Principal Engineer & Architect
> ( ( •)) New Relic
> rcroc...@newrelic.com 
> M: +1 630 363 8835 
> 
> 
> 
> -- 
> BR,
> Stefano Baghino
> 
> Software Engineer @ Radicalbit



Re: Silly keyBy() error

2016-03-12 Thread Stefano Baghino
Hi Ron,

not all classes can be used to `keyBy` a stream with. For your case in
particular, it looks like you have to implement Comparable so that Flink
can correctly key your stream based on AggregatableTimesliceImpl.

Take a look at the first slides here for more information on keying:
http://dataartisans.github.io/flink-training/dataStreamAdvanced/slides.html

Hope I helped.

On Sat, Mar 12, 2016 at 9:01 PM, Ron Crocker  wrote:

> I’m sure this should work, but I’m missing something… I searched the
> archive first, but didn’t have much luck finding any insights there.
>
> TL;DR: org.apache.flink.api.common.InvalidProgramException: This type
> (GenericType) cannot
> be used as key.
>
> I’m just getting started with a 1.0 implementation of a new task. It’s a
> pretty straightforward reduce job, but I’m running into a snag with
> creating a KeyedStream.
>
> Here’s the graph:
> StreamExecutionEnvironment see =
> StreamExecutionEnvironment.getExecutionEnvironment();
> see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> DataStream dataStream = see.addSource(new
> FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME,
> new TimesliceDeserializer(), kafkaConsumerProperties));
>
> SingleOutputStreamOperator
> flattenedDataStream = dataStream
> .assignTimestampsAndWatermarks(new
> TimesliceTimestampExtractor())
> .flatMap(new TimesliceMapper());
>
> flattenedDataStream
> .keyBy("accountId", "agentId", "wideMetricId")
> .timeWindow(Time.seconds(60))
> .reduce(AggregatableTimeslice::aggregateWith)
> .print();
>
> This fails on keyBy() with the message:
>
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: This type
> (GenericType) cannot
> be used as key.
>
> TimesliceMapper is a concrete implementation of 
> FlatMapFunction, namely
>
> public class TimesliceMapper implements FlatMapFunction AggregatableTimeslice> {
> @Override
> public void flatMap(TimesliceData value, Collector 
> out) throws Exception {
> for (Timeslice timeslice : value.getTimeslices()) {
> out.collect(new AggregatableTimesliceImpl(timeslice, value, 
> value.getAgentId()));
> }
> }
> }
>
> AggregatableTimesliceImpl is a simple concrete implementation of the 
> AggregatableTimeslice interface:
>
> public interface AggregatableTimeslice {
> int getAccountId();
> int getAgentId();
> long getWideMetricId();
> AggregatableTimesliceStats getTimesliceStats();
> }
>
> Ron
> —
> Ron Crocker
> Principal Engineer & Architect
> ( ( •)) New Relic
> rcroc...@newrelic.com
> M: +1 630 363 8835
>
>


-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Silly keyBy() error

2016-03-12 Thread Ron Crocker
I’m sure this should work, but I’m missing something… I searched the archive 
first, but didn’t have much luck finding any insights there.

TL;DR: org.apache.flink.api.common.InvalidProgramException: This type 
(GenericType) cannot be 
used as key.

I’m just getting started with a 1.0 implementation of a new task. It’s a pretty 
straightforward reduce job, but I’m running into a snag with creating a 
KeyedStream.

Here’s the graph:
StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream dataStream = see.addSource(new 
FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME,
 new TimesliceDeserializer(), kafkaConsumerProperties));

SingleOutputStreamOperator flattenedDataStream = 
dataStream
.assignTimestampsAndWatermarks(new 
TimesliceTimestampExtractor())
.flatMap(new TimesliceMapper());

flattenedDataStream
.keyBy("accountId", "agentId", "wideMetricId")
.timeWindow(Time.seconds(60))
.reduce(AggregatableTimeslice::aggregateWith)
.print();

This fails on keyBy() with the message: 
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: 
This type (GenericType) 
cannot be used as key.
TimesliceMapper is a concrete implementation of FlatMapFunction, namely
public class TimesliceMapper implements FlatMapFunction {
@Override
public void flatMap(TimesliceData value, Collector 
out) throws Exception {
for (Timeslice timeslice : value.getTimeslices()) {
out.collect(new AggregatableTimesliceImpl(timeslice, value, 
value.getAgentId()));
}
}
}
AggregatableTimesliceImpl is a simple concrete implementation of the 
AggregatableTimeslice interface:
public interface AggregatableTimeslice {
int getAccountId();
int getAgentId();
long getWideMetricId();
AggregatableTimesliceStats getTimesliceStats();
}
Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
rcroc...@newrelic.com
M: +1 630 363 8835



Re: SourceFunction Scala

2016-03-12 Thread Stefano Baghino
Hi Ankur,

I'm catching up with this week mailing list right now; I hope you already
solved the issue, but if you haven't this kind of problem happen when you
use a version of Scala for which your Flink dependencies have not been
compiled for. Make sure you append the correct Scala version to the
dependencies you're using, depending on the one you are using for your
project.

You can find more details here:
https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version

On Mon, Mar 7, 2016 at 1:19 PM, Ankur Sharma 
wrote:

> Hi,
>
>
> I am getting following error while executing the fat jar of project: Any
> help?
>
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/util/serialization/DeserializationSchema
> at org.mpi.debs.Main.main(Main.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.streaming.util.serialization.DeserializationSchema
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 1 more
>
>
> Main.scala:
>
> import org.apache.flink.streaming.api.functions.sink.SinkFunction
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>
>
> object Main {
>   def main(args: Array[String]) {
> val env = StreamExecutionEnvironment.createLocalEnvironment(1)
> val stream = env.addSource(new RMQSource[String]("localhost","query-one", 
> new SimpleStringSchema))
> stream.addSink(new SinkFunction[String] {
>   override def invoke(value: String) = {
> println(value)
>   }
> })
> env.execute("QueryOneExecutor")
>   }
> }
>
> Best,
> *Ankur Sharma*
>
> On 06 Mar 2016, at 20:34, Márton Balassi  wrote:
>
> Hey Ankur,
>
> Add the following line to your imports, and have a look at the referenced
> FAQ. [1]
>
> import org.apache.flink.streaming.api.scala._
>
> [1]
> https://flink.apache.org/faq.html#in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters
>
> Best,
>
> Marton
>
> On Sun, Mar 6, 2016 at 8:17 PM, Ankur Sharma 
> wrote:
>
>> Hello,
>>
>> I am trying to use a custom source function (declaration given below) for
>> DataStream.
>> if I add the source to stream using add source:
>>
>> val stream = env.addSource(new QueryOneSource(args))
>>
>> *I get following error:  Any explanations and help ??*
>>
>>
>> Error:(14, 31) could not find implicit value for evidence parameter of type 
>> org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple]
>>
>> val stream = env.addSource(new QueryOneSource(args))
>>
>>   ^
>>
>> Error:(14, 31) not enough arguments for method addSource: (implicit 
>> evidence$15: scala.reflect.ClassTag[org.mpi.debs.Tuple], implicit 
>> evidence$16: 
>> org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple])org.apache.flink.streaming.api.scala.DataStream[org.mpi.debs.Tuple].
>>
>> Unspecified value parameter evidence$16.
>>
>> val stream = env.addSource(new QueryOneSource(args))
>>
>>   ^
>>
>>
>> class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] 
>> {
>>
>> val nextTuple: Tuple // case class Tuple(id: Long, value: Int)
>>
>> override def run(ctx: SourceContext[Tuple]) = {
>>   while (true) {
>> nextRecord()
>> ctx.collect(this.nextTuple)
>>   }
>> }
>>
>> override def cancel() = { }
>>
>> }
>>
>> override def nextRecord() = {
>>
>> }
>>
>> }
>>
>> Best,
>> *Ankur Sharma*
>> *Information Systems Group*
>> *3.15 E1.1 Universität des Saarlandes*
>> *66123, Saarbrücken Germany*
>> *Email: ankur.sha...@mpi-inf.mpg.de  *
>> *an...@stud.uni-saarland.de *
>>
>>
>
>


-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Re: Flink packaging makes life hard for SBT fat jar's

2016-03-12 Thread Till Rohrmann
Great to hear Shikhar :-)

Cheers, Till
On Mar 4, 2016 3:51 AM, "shikhar"  wrote:

> Thanks Till. I can confirm that things are looking good with RC5.
> sbt-assembly works well with the flink-kafka connector dependency not
> marked
> as "provided".
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-packaging-makes-life-hard-for-SBT-fat-jar-s-tp4897p5292.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Behavior of SlidingProessingTimeWindow with CountTrigger

2016-03-12 Thread Vishnu Viswanath
Hi All,


I have the below code


val sev = StreamExecutionEnvironment.getExecutionEnvironment
val socTextStream = sev.socketTextStream("localhost",)

val counts = socTextStream.flatMap{_.split("\\s")}
  .map { (_, 1) }
  .keyBy(0)
  .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
  .trigger(CountTrigger.of(5))
  .sum(1)

counts.print()
sev.execute()

I am sending messages to the port  using nc -lk 
This is my sample input

a
a
a
a
a
b
b
b
b
b
c
c
c
c
c
d
d
d
d
d
e
e
e
e
e

I am sending 5 of each letter since I have a Count Trigger of 5. I was
expecting that for each 5 character, the code will print 5, i.e., (a,5)
(b,5) etc. But the output I am getting is little confusing.
Output:

1> (a,5)
1> (a,5)
1> (b,5)
2> (c,5)
2> (c,5)
1> (d,5)
1> (e,5)
1> (e,5)

As you can see, for some character the count is printed twice(a,c,e) and
for some characters it is printed only once (b,d). I am not able to figure
out what is going on. I think it may have something to do with the
SlidingProcessingTimeWindow but I am not sure.
Can someone explain me what is going on?


Thanks and Regards,
Vishnu Viswanath
www.vishnuviswanath.com
​


Re: Flink and YARN ship folder

2016-03-12 Thread Andrea Sella
Hi Ufuk,

I'm trying to execute the WordCount batch example with input and output on
Alluxio, i followed Running Flink on Alluxio
 and
added the library to lib folder. Have I to replicate this operation on the
slaves or YARN manage that and I must have the library just where I launch
the job?

Thanks,
Andrea

2016-03-11 19:23 GMT+01:00 Ufuk Celebi :

> Everything in the lib folder should be added to the classpath. Can you
> check the YARN client logs that the files are uploaded? Furthermore,
> you can check the classpath of the JVM in the YARN logs of the
> JobManager/TaskManager processes.
>
> – Ufuk
>
>
> On Fri, Mar 11, 2016 at 5:33 PM, Andrea Sella
>  wrote:
> > Hi,
> >
> > There is a way to add external dependencies to Flink Job,  running on
> YARN,
> > not using HADOOP_CLASSPATH?
> > I am looking for a similar idea to standalone mode using lib folder.
> >
> > BR,
> > Andrea
>


Re: External DB as sink - with processing guarantees

2016-03-12 Thread Josh
Hi Fabian,

Thanks, that's very helpful. Actually most of my writes will be idempotent so I 
guess that means I'll get the exact once guarantee using the Hadoop output 
format!

Thanks,
Josh

> On 12 Mar 2016, at 09:14, Fabian Hueske  wrote:
> 
> Hi Josh,
> 
> Flink can guarantee exactly-once processing within its data flow given that 
> the data sources allow to replay data from a specific position in the stream. 
> For example, Flink's Kafka Consumer supports exactly-once.
> 
> Flink achieves exactly-once processing by resetting operator state to a 
> consistent state and replaying data. This means that data might actually be 
> processed more than once, but the operator state will reflect exactly-once 
> semantics because it was reset. Ensuring exactly-once end-to-end it 
> difficult, because Flink does not control (and cannot reset) the state of the 
> sinks. By default, data can be sent more than once to a sink resulting in 
> at-least-once semantics at the sink.
> 
> This issue can be addressed, if the sink provides transactional writes 
> (previous writes can be undone) or if the writes are idempotent (applying 
> them several times does not change the result). Transactional support would 
> need to be integrated with Flink's SinkFunction. This is not the case for 
> Hadoop OutputFormats. I am not familiar with the details of DynamoDB, but you 
> would need to implement a SinkFunction with transactional support or use 
> idempotent writes if you want to achieve exactly-once results.
> 
> Best, Fabian
> 
> 2016-03-12 9:57 GMT+01:00 Josh :
>> Thanks Nick, that sounds good. I would still like to have an understanding 
>> of what determines the processing guarantee though. Say I use a DynamoDB 
>> Hadoop OutputFormat with Flink, how do I know what guarantee I have? And if 
>> it's at-least-once, is there a way to adapt it to achieve exactly-once?
>> 
>> Thanks,
>> Josh
>> 
>>> On 12 Mar 2016, at 02:46, Nick Dimiduk  wrote:
>>> 
>>> Pretty much anything you can write to from a Hadoop MapReduce program can 
>>> be a Flink destination. Just plug in the OutputFormat and go.
>>> 
>>> Re: output semantics, your mileage may vary. Flink should do you fine for 
>>> at least once.
>>> 
 On Friday, March 11, 2016, Josh  wrote:
 Hi all,
 
 I want to use an external data store (DynamoDB) as a sink with Flink. It 
 looks like there's no connector for Dynamo at the moment, so I have two 
 questions:
 
 1. Is it easy to write my own sink for Flink and are there any docs around 
 how to do this?
 2. If I do this, will I still be able to have Flink's processing 
 guarantees? I.e. Can I be sure that every tuple has contributed to the 
 DynamoDB state either at-least-once or exactly-once?
 
 Thanks for any advice,
 Josh
> 


Re: External DB as sink - with processing guarantees

2016-03-12 Thread Josh
Thanks Nick, that sounds good. I would still like to have an understanding of 
what determines the processing guarantee though. Say I use a DynamoDB Hadoop 
OutputFormat with Flink, how do I know what guarantee I have? And if it's 
at-least-once, is there a way to adapt it to achieve exactly-once?

Thanks,
Josh

> On 12 Mar 2016, at 02:46, Nick Dimiduk  wrote:
> 
> Pretty much anything you can write to from a Hadoop MapReduce program can be 
> a Flink destination. Just plug in the OutputFormat and go.
> 
> Re: output semantics, your mileage may vary. Flink should do you fine for at 
> least once.
> 
>> On Friday, March 11, 2016, Josh  wrote:
>> Hi all,
>> 
>> I want to use an external data store (DynamoDB) as a sink with Flink. It 
>> looks like there's no connector for Dynamo at the moment, so I have two 
>> questions:
>> 
>> 1. Is it easy to write my own sink for Flink and are there any docs around 
>> how to do this?
>> 2. If I do this, will I still be able to have Flink's processing guarantees? 
>> I.e. Can I be sure that every tuple has contributed to the DynamoDB state 
>> either at-least-once or exactly-once?
>> 
>> Thanks for any advice,
>> Josh