Re: Flink 1.2 Proper Packaging of flink-table with SBT

2017-03-07 Thread Justin Yan
Of course, 15 minutes after I give up and decide to email the mailing list,
I figure it out - my flink App was using the CollectionsEnvironment and not
the proper RemoteEnvironment.

It is still the case, however, that the `flink-table` JAR built by the
standard commands doesn't include the dependencies it requires, and so I'd
be curious to hear what the proper procedure is for linking against
`flink-table` if you want to avoid the bug I highlighted in the
aforementioned JIRA.

Thank you and sorry for the extra noise!

Justin

On Tue, Mar 7, 2017 at 7:21 PM, Justin Yan  wrote:

> Hello!
>
> We are attempting to use the Flink Table API, but are running into a few
> issues.
>
> We initially started with our dependencies looking something like this:
>
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-scala" % "1.2.0" % "provided",
>   "org.apache.flink" %% "flink-clients" % "1.2.0" % "provided",
>   "org.apache.flink" %% "flink-table" % "1.2.0",
>   Libraries.specs2,
>   ...)
>
> However, this is mildly annoying since flink-table declares dependencies
> on the flink core modules, and thus brings everything in *anyway*.  On
> top of that, we saw this JIRA: https://issues.apache.
> org/jira/browse/FLINK-5227, which we found concerning, so we decided to
> follow the advice - we downloaded and built Flink-1.2 from source (java 7,
> mvn 3.3) using the following (We're also using the Kinesis connector):
>
> tools/change-scala-version.sh 2.11
> mvn clean install -Pinclude-kinesis -DskipTests
> cd flink-dist
> mvn clean install -Pinclude-kinesis -DskipTests
>
> Once this was done, we took the JAR in "/flink-libraries/flink-table/target/"
> and copied it over to the taskManager "/lib" directory.  Finally, we marked
> our `flink-table` dependency as "provided".  Everything compiles, but when
> I try to actually run a simple job, I get the following error at runtime:
>
> java.lang.NoClassDefFoundError: org/codehaus/commons/compiler/
> CompileException
>
> Indeed, when I peek inside of the `flink-table` JAR, I can't find that
> particular package (and similarly, it isn't in the flink-dist JAR either)
>
> $ jar tf flink-table_2.11-1.2.0.jar | grep codehaus
> $
>
> I then attempted to include this library in my user code by adding:
>
> "org.codehaus.janino" % "janino" % "3.0.6",
>
> to my list of dependencies.  When I run a `jar tf myjar.jar | grep
> CompileException` - I see the class. However, when I run my flink
> application in this fat JAR, I *continue to get the same error*, even
> though I am positive this class is included in the fat JAR.  I eventually
> got around this by placing this jar in the `flink/lib` directory, but I am
> very confused as to how this class cannot be found when I have included it
> in the fat JAR that I am submitting with the Flink CLI to a YARN cluster.
> I mostly wanted to mention this in case it is a bug, but mostly to see if
> anyone else has had trouble with the Table API, and if not, if I have
> structured my project incorrectly to cause these troubles.
>
> Thanks!
>
> Justin
>
>
>
>


Flink 1.2 Proper Packaging of flink-table with SBT

2017-03-07 Thread Justin Yan
Hello!

We are attempting to use the Flink Table API, but are running into a few
issues.

We initially started with our dependencies looking something like this:

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % "1.2.0" % "provided",
  "org.apache.flink" %% "flink-clients" % "1.2.0" % "provided",
  "org.apache.flink" %% "flink-table" % "1.2.0",
  Libraries.specs2,
  ...)

However, this is mildly annoying since flink-table declares dependencies on
the flink core modules, and thus brings everything in *anyway*.  On top of
that, we saw this JIRA: https://issues.apache.org/jira/browse/FLINK-5227,
which we found concerning, so we decided to follow the advice - we
downloaded and built Flink-1.2 from source (java 7, mvn 3.3) using the
following (We're also using the Kinesis connector):

tools/change-scala-version.sh 2.11
mvn clean install -Pinclude-kinesis -DskipTests
cd flink-dist
mvn clean install -Pinclude-kinesis -DskipTests

Once this was done, we took the JAR in
"/flink-libraries/flink-table/target/" and copied it over to the
taskManager "/lib" directory.  Finally, we marked our `flink-table`
dependency as "provided".  Everything compiles, but when I try to actually
run a simple job, I get the following error at runtime:

java.lang.NoClassDefFoundError:
org/codehaus/commons/compiler/CompileException

Indeed, when I peek inside of the `flink-table` JAR, I can't find that
particular package (and similarly, it isn't in the flink-dist JAR either)

$ jar tf flink-table_2.11-1.2.0.jar | grep codehaus
$

I then attempted to include this library in my user code by adding:

"org.codehaus.janino" % "janino" % "3.0.6",

to my list of dependencies.  When I run a `jar tf myjar.jar | grep
CompileException` - I see the class. However, when I run my flink
application in this fat JAR, I *continue to get the same error*, even
though I am positive this class is included in the fat JAR.  I eventually
got around this by placing this jar in the `flink/lib` directory, but I am
very confused as to how this class cannot be found when I have included it
in the fat JAR that I am submitting with the Flink CLI to a YARN cluster.
I mostly wanted to mention this in case it is a bug, but mostly to see if
anyone else has had trouble with the Table API, and if not, if I have
structured my project incorrectly to cause these troubles.

Thanks!

Justin


Re: Issues with Event Time and Kafka

2017-03-07 Thread Dawid Wysakowicz
Hi Ethan,

I believe then it is because the Watermark and Timestamps in your
implementation are uncorrelated. What Watermark really is a marker that
says there will be no elements with timestamp smaller than the value of
this watermark. For more info on the concept see [1]

.

In your case as you say that events can "lag" for 30 minutes, you should
try the BoundedOutOfOrdernessTimestampExtractor. It is designed exactly for
a case like yours.

Regards,
Dawid

2017-03-07 22:33 GMT+01:00 ext.eformichella 
:

> Hi Dawid, I'm working with Max on the project
> Our code for the TimestampAndWatermarkAssigner is:
> ```
> class TimestampAndWatermarkAssigner(val maxLateness: Long) extends
> AssignerWithPeriodicWatermarks[Row] {
>
>   override def extractTimestamp(element: Row, previousElementTimestamp:
> Long): Long = {
> element.minTime
>   }
>
>   override def getCurrentWatermark(): Watermark = {
> new Watermark(System.currentTimeMillis() - maxLateness)
>   }
> }
> ```
>
> Where Row is a class representing the incoming JSON object coming from
> Kafka, which includes the timestamp
>
> Thanks,
> -Ethan
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Issues-with-
> Event-Time-and-Kafka-tp12061p12090.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


flink/cancel & shutdown hooks

2017-03-07 Thread Dominik Safaric
Hi all,

I would appreciate for any help or advice in regard to default Java runtime 
shutdown hooks and canceling Flink jobs.

Namely part of my Flink application I am using a Kafka interceptor class that 
defines a shutdown hook thread. When stopping the Flink streaming job on my 
local machine the shutdown hook gets executed, however I do not see the same 
behaviour when stopping the Flink application using bin/flink cancel . 

Considering there are no exceptions thrown from the shutdown thread, what could 
the root cause of this be?   

Thanks,
Dominik

Re: Issues with Event Time and Kafka

2017-03-07 Thread ext.eformichella
Hi Dawid, I'm working with Max on the project
Our code for the TimestampAndWatermarkAssigner is:
```
class TimestampAndWatermarkAssigner(val maxLateness: Long) extends
AssignerWithPeriodicWatermarks[Row] {

  override def extractTimestamp(element: Row, previousElementTimestamp:
Long): Long = {
element.minTime
  }

  override def getCurrentWatermark(): Watermark = {
new Watermark(System.currentTimeMillis() - maxLateness)
  }
}
```

Where Row is a class representing the incoming JSON object coming from
Kafka, which includes the timestamp

Thanks,
-Ethan



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12090.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: How to use 'dynamic' state

2017-03-07 Thread Steve Jerman
Thanks for your reply. It makes things much clearer for me.  I think you are 
right - Side Inputs are probably the right way long term (I  looked at the Team 
definition), but I think I can construct something in the mean time.

Steve

On Mar 7, 2017, at 6:11 AM, Aljoscha Krettek 
> wrote:

Hi Steve,
I think Gordon already gave a pretty good answer, I'll just try and go into the 
specifics a bit.

You mentioned that there would be different kinds of operators required for the 
rules (you hinted at FlatMap and maybe a tumbling window). Do you know each of 
those types before starting your program? If yes, you could have several of 
these "primitive" operations in your pipeline and each of them only listens to 
rule changes (on a second input) that is relevant to their operation.

Side inputs would be very good for this but I think you can also get the same 
level of functionality by using a CoFlatMap (for the window case you would use 
a CoFlatMap chained to a window operation).

Does that help? I'm sure we can figure something out together.

Best,
Aljoscha


On Tue, Mar 7, 2017, at 07:44, Tzu-Li (Gordon) Tai wrote:
Hi Steve,

I’ll try to provide some input for the approaches you’re currently looking into 
(I’ll comment on your email below):

* API based stop and restart of job … ugly.

Yes, indeed ;) I think this is absolutely not necessary.

* Use a co-map function with the rules alone stream and the events as the 
other. This seems better however, I would like to have several ‘trigger’ 
functions changed together .. e.g. a tumbling window for one type of criteria 
and a flat map for a different sort … So I’m not sure how to hook this up for 
more than a simple co-map/flatmap. I did see this suggested in one answer and

Do you mean that operators listen only to certain rules / criteria settings 
changes? You could either have separate stream sources for each kind of 
criteria rule trigger events, or have a single source and split them 
afterwards. Then, you broadcast each of them with the corresponding co-map / 
flat-maps.

* Use broadcast state : this seems reasonable however I couldn’t tell if the 
broadcast state would be available to all of the processing functions. Is it 
generally available?

From the context of your description, I think what you want is that the 
injected rules stream can be seen by all operators (instead of “broadcast 
state”, which in Flink streaming refers to something else).

Aljoscha recently consolidated a FLIP for Side Inputs [1], which I think is 
targeted exactly for what you have in mind here. Perhaps you can take a look at 
that and see if it makes sense for your use case? But of course, this isn’t yet 
available as it is still under discussion. I think Side Inputs may be an ideal 
solution for what you have in mind here, as the rule triggers I assume should 
be slowly changing.

I’ve CCed Aljoscha so that he can probably provide more insights, as he has 
worked a lot on the stuff mentioned here.

Cheers,
Gordon

[1] FLIP-17: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API




On March 7, 2017 at 5:05:04 AM, Steve Jerman 
(st...@kloudspot.com) wrote:


I’ve been reading the code/user goup/SO and haven’t really found a great answer 
to this… so I thought I’d ask.

I have a UI that allows the user to edit rules which include specific criteria 
for example trigger event if X many people present for over a minute.

I would like to have a flink job that processes an event stream and triggers on 
these rules.

The catch is that I don’t want to have to restart the job if the rules change… 
(it would be easy otherwise :))

So I found four ways to proceed:

* API based stop and restart of job … ugly.

* Use a co-map function with the rules alone stream and the events as the 
other. This seems better however, I would like to have several ‘trigger’ 
functions changed together .. e.g. a tumbling window for one type of criteria 
and a flat map for a different sort … So I’m not sure how to hook this up for 
more than a simple co-map/flatmap. I did see this suggested in one answer and

* Use broadcast state : this seems reasonable however I couldn’t tell if the 
broadcast state would be available to all of the processing functions. Is it 
generally available?

* Implement my own operators… seems complicated ;)

Are there other approaches?

Thanks for any advice
Steve





Re: Issues with Event Time and Kafka

2017-03-07 Thread Dawid Wysakowicz
Hi Max,
How do you assign timestamps to your events (in event-time case)? Could you
post whole code for your TimestampAndWatermarkAssigner?

Regards,
Dawid

2017-03-07 20:59 GMT+01:00 ext.mwalker :

> Hi Stephan,
>
> The right number of events seem to leave the source and enter the windows,
> but it shows that 0 exit the windows.
>
> Also I have tried 30 minutes and not setting the watermark interval, I am
> not sure what I am supposed to put there the docs seem vague about that.
>
> Best,
>
> Max
>
> On Tue, Mar 7, 2017 at 1:54 PM, Stephan Ewen [via Apache Flink User
> Mailing List archive.] <[hidden email]
> > wrote:
>
>> Hi!
>>
>> At a first glance, your code looks correct to assign the Watermarks. What
>> is your watermark interval in the config?
>>
>> Can you check with the Flink metrics (if you are using 1.2) to see how
>> many rows leave the source, how many enter/leave the window operators, etc?
>>
>> That should help figuring out why there are so few result rows...
>>
>> Stephan
>>
>>
>> On Mon, Mar 6, 2017 at 8:57 PM, ext.mwalker <[hidden email]
>> > wrote:
>>
>>> Hi Folks,
>>>
>>> We are working on a Flink job to proccess a large amount of data coming
>>> in
>>> from a Kafka stream.
>>>
>>> We selected Flink because the data is sometimes out of order or late,
>>> and we
>>> need to roll up the data into 30-minutes event time windows, after which
>>> we
>>> are writing it back out to an s3 bucket.
>>>
>>> We have hit a couple issues:
>>>
>>> 1) The job works fine using processing time, but when we switch to event
>>> time (almost) nothing seems to be written out.
>>> Our watermark code looks like this:
>>> ```
>>>   override def getCurrentWatermark(): Watermark = {
>>> new Watermark(System.currentTimeMillis() - maxLateness);
>>>   }
>>> ```
>>> And we are doing this:
>>> ```
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>> ```
>>> and this:
>>> ```
>>> .assignTimestampsAndWatermarks(new
>>> TimestampAndWatermarkAssigner(Time.minutes(30).toMilliseconds))
>>> ```
>>>
>>> However even though we get millions of records per hour (the vast
>>> majority
>>> of which are no more that 30 minutes late) we get like 2 - 10 records per
>>> hour written out to the s3 bucket.
>>> We are using a custom BucketingFileSink Bucketer if folks believe that is
>>> the issue I would be happy to provide that code here as well.
>>>
>>> 2) On top of all this, we would really prefer to write the records
>>> directly
>>> to Aurora in RDS rather than to an intermediate s3 bucket, but it seems
>>> that
>>> the JDBC sink connector is unsupported / doesn't exist.
>>> If this is not the case we would love to know.
>>>
>>> Thanks in advance for all the help / insight on this,
>>>
>>> Max Walker
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-flink-user-maili
>>> ng-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time
>>> -and-Kafka-tp12061.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12084.html
>> To unsubscribe from Issues with Event Time and Kafka, click here.
>> NAML
>> 
>>
>
>
> --
> View this message in context: Re: Issues with Event Time and Kafka
> 
>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
>  at
> Nabble.com.
>


Re: Issues with Event Time and Kafka

2017-03-07 Thread ext.mwalker
Hi Stephan,

The right number of events seem to leave the source and enter the windows,
but it shows that 0 exit the windows.

Also I have tried 30 minutes and not setting the watermark interval, I am
not sure what I am supposed to put there the docs seem vague about that.

Best,

Max

On Tue, Mar 7, 2017 at 1:54 PM, Stephan Ewen [via Apache Flink User Mailing
List archive.]  wrote:

> Hi!
>
> At a first glance, your code looks correct to assign the Watermarks. What
> is your watermark interval in the config?
>
> Can you check with the Flink metrics (if you are using 1.2) to see how
> many rows leave the source, how many enter/leave the window operators, etc?
>
> That should help figuring out why there are so few result rows...
>
> Stephan
>
>
> On Mon, Mar 6, 2017 at 8:57 PM, ext.mwalker <[hidden email]
> > wrote:
>
>> Hi Folks,
>>
>> We are working on a Flink job to proccess a large amount of data coming in
>> from a Kafka stream.
>>
>> We selected Flink because the data is sometimes out of order or late, and
>> we
>> need to roll up the data into 30-minutes event time windows, after which
>> we
>> are writing it back out to an s3 bucket.
>>
>> We have hit a couple issues:
>>
>> 1) The job works fine using processing time, but when we switch to event
>> time (almost) nothing seems to be written out.
>> Our watermark code looks like this:
>> ```
>>   override def getCurrentWatermark(): Watermark = {
>> new Watermark(System.currentTimeMillis() - maxLateness);
>>   }
>> ```
>> And we are doing this:
>> ```
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> ```
>> and this:
>> ```
>> .assignTimestampsAndWatermarks(new
>> TimestampAndWatermarkAssigner(Time.minutes(30).toMilliseconds))
>> ```
>>
>> However even though we get millions of records per hour (the vast majority
>> of which are no more that 30 minutes late) we get like 2 - 10 records per
>> hour written out to the s3 bucket.
>> We are using a custom BucketingFileSink Bucketer if folks believe that is
>> the issue I would be happy to provide that code here as well.
>>
>> 2) On top of all this, we would really prefer to write the records
>> directly
>> to Aurora in RDS rather than to an intermediate s3 bucket, but it seems
>> that
>> the JDBC sink connector is unsupported / doesn't exist.
>> If this is not the case we would love to know.
>>
>> Thanks in advance for all the help / insight on this,
>>
>> Max Walker
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Issues-with-Event-
>> Time-and-Kafka-tp12061.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12084.html
> To unsubscribe from Issues with Event Time and Kafka, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12087.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Data stream to write to multiple rds instances

2017-03-07 Thread Till Rohrmann
Hi Sathi,

if you read data from Kinesis than Flink can offer you exactly once
processing guarantees. However, what you see written out to your database
depends a little bit on the implementation of your custom sink. If you have
synchronous JDBC client which does not lose data and you fail your job
whenever you see an error, then you should achieve at least once.

Cheers,
Till

On Thu, Mar 2, 2017 at 4:49 PM, Sathi Chowdhury <
sathi.chowdh...@elliemae.com> wrote:

> Hi Till,
> Thanks for your reply.I guess I will have to write a custom sink function
> that will use JdbcOutputFormat. I have a question about checkpointing
> support though ..if I  am reading a stream from kinesis , streamA and it is
> transformed to streamB, and that is written to db, as streamB is
> checkpointed when program recovers will it start from the streamB's
> Checkpointed offset ? In that case checkpointing the jdbc side is not so
> important maybe ..
> Thanks
> Sathi
>
>
> On Mar 2, 2017, at 5:58 AM, Till Rohrmann  wrote:
>
> Hi Sathi,
>
> you can split select or filter your data stream based on the field's
> value. Then you are able to obtain multiple data streams which you can
> output using a JDBCOutputFormat for each data stream. Be aware, however,
> that the JDBCOutputFormat does not give you any processing guarantees since
> it does not take part in Flink's checkpointing mechanism. Unfortunately,
> Flink does not have a streaming JDBC connector, yet.
>
> Cheers,
> Till
>
> On Thu, Mar 2, 2017 at 7:21 AM, Sathi Chowdhury <
> sathi.chowdh...@elliemae.com> wrote:
>
>> Hi All,
>> Is there any preferred way to manage multiple jdbc connections from
>> flink..? I am new to flink and looking for some guidance around the right
>> pattern and apis to do this. The usecase needs to route a stream to a
>> particular jdbc connection depending on a field value.So the records are
>> written to multiple destination dbs.
>> Thanks
>> Sathi
>>
>> On 02/07/2017 04:12 PM, Robert Metzger wrote:
>>
>> Currently, there is no streaming JDBC connector.
>> Check out this thread from last year: http://apache-flink-mail
>> ing-list-archive.1008284.n3.nabble.com/JDBC-Streaming-Conn
>> ector-td10508.html
>> 
>>
>> Sent from my iPhone
>>
>> On Feb 8, 2017, at 1:49 AM, Punit Tandel 
>> wrote:
>>
>> Hi Chesnay
>>
>> Currently that is what i have done, reading the schema from database in
>> order to create a new table in jdbc database and writing the rows coming
>> from jdbcinputformat.
>>
>> Overall i am trying to implement the solution which reads the streaming
>> data from one source which either could be coming from kafka, Jdbc, Hive,
>> Hdfs and writing those streaming data to output source which is again could
>> be any of those.
>>
>> For a simple use case i have just taken one scenario using jdbc in and
>> jdbc out, Since the jdbc input source returns the datastream of Row and to
>> write them into jdbc database we have to create a table which requires
>> schema.
>>
>> Thanks
>> Punit
>>
>>
>>
>> On 02/08/2017 08:22 AM, Chesnay Schepler wrote:
>>
>> Hello,
>>
>> I don't understand why you explicitly need the schema since the batch
>> JDBCInput-/Outputformats don't require it.
>> That's kind of the nice thing about Rows.
>>
>> Would be cool if you could tell us what you're planning to do with the
>> schema :)
>>
>> In any case, to get the schema within the plan then you will have to
>> query the DB and build it yourself. Note that this
>> is executed on the client.
>>
>> Regards,
>> Chesnay
>>
>> On 08.02.2017 00:39, Punit Tandel wrote:
>>
>> Hi Robert
>>
>> Thanks for the response, So in near future release of the flink version ,
>> is this functionality going to be implemented ?
>>
>> Thanks
>> On 02/07/2017 04:12 PM, Robert Metzger wrote:
>>
>> Currently, there is no streaming JDBC connector.
>> Check out this thread from last year: http://apache-flink-mail
>> ing-list-archive.1008284.n3.nabble.com/JDBC-Streaming-Conn
>> ector-td10508.html
>> 
>>
>>
>>
>> On Mon, Feb 6, 2017 at 5:00 PM, Ufuk Celebi  wrote:
>>
>>> I'm not sure how well this works for the streaming API. Looping in
>>> Chesnay, who worked on this.
>>>
>>> On Mon, Feb 6, 2017 at 11:09 AM, Punit Tandel 
>>> wrote:
>>> > Hi ,
>>> >
>>> > I was looking into flink streaming api and trying to implement the
>>> 

Re: AWS exception serialization problem

2017-03-07 Thread Shannon Carey
> is there some shading logic involved in the dependencies, concerning the AWS 
> libraries?

Not that I am aware of. The AWS code is included in the job's fat jar as-is.



Re: Issues with Event Time and Kafka

2017-03-07 Thread Stephan Ewen
Hi!

At a first glance, your code looks correct to assign the Watermarks. What
is your watermark interval in the config?

Can you check with the Flink metrics (if you are using 1.2) to see how many
rows leave the source, how many enter/leave the window operators, etc?

That should help figuring out why there are so few result rows...

Stephan


On Mon, Mar 6, 2017 at 8:57 PM, ext.mwalker 
wrote:

> Hi Folks,
>
> We are working on a Flink job to proccess a large amount of data coming in
> from a Kafka stream.
>
> We selected Flink because the data is sometimes out of order or late, and
> we
> need to roll up the data into 30-minutes event time windows, after which we
> are writing it back out to an s3 bucket.
>
> We have hit a couple issues:
>
> 1) The job works fine using processing time, but when we switch to event
> time (almost) nothing seems to be written out.
> Our watermark code looks like this:
> ```
>   override def getCurrentWatermark(): Watermark = {
> new Watermark(System.currentTimeMillis() - maxLateness);
>   }
> ```
> And we are doing this:
> ```
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> ```
> and this:
> ```
> .assignTimestampsAndWatermarks(new
> TimestampAndWatermarkAssigner(Time.minutes(30).toMilliseconds))
> ```
>
> However even though we get millions of records per hour (the vast majority
> of which are no more that 30 minutes late) we get like 2 - 10 records per
> hour written out to the s3 bucket.
> We are using a custom BucketingFileSink Bucketer if folks believe that is
> the issue I would be happy to provide that code here as well.
>
> 2) On top of all this, we would really prefer to write the records directly
> to Aurora in RDS rather than to an intermediate s3 bucket, but it seems
> that
> the JDBC sink connector is unsupported / doesn't exist.
> If this is not the case we would love to know.
>
> Thanks in advance for all the help / insight on this,
>
> Max Walker
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Issues-with-
> Event-Time-and-Kafka-tp12061.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Appropriate State to use to buffer events in ProcessFunction

2017-03-07 Thread Yassine MARZOUGUI
Hi all,

I want to label events in a stream based on a condition on some future
events.
For example my stream contains events of type A and B and and I would like
to assign a label 1 to an event E of type A if an event of type B happens
within a duration x of E. I am using event time and my events can be out of
order.
For this I'm using ProcessFunction which looks suitable for my use case. In
order to handle out of order events, I'm keeping events of type A in a
state and once an event of type B is received, I fire an event time timer
in which I loop through events of type A in the state having a timestamps <
timer.timestamp, label them and remove them from the state.
Currently the state is simply a value state containing a TreeMap. I'm keeping events sorted in order to effectively get events older
than the timer timestamp.
I wonder If that's the appropriate data structure to use in the value state
to buffer events and be able to handle out of orderness, or if there is a
more effective implementation, especially that the state may grow to reach
~100 GB sometimes?

Any insight is appreciated.

Thanks,
Yassine


Re: Serialization performance

2017-03-07 Thread Stephan Ewen
I'll try and add more details in a bit.

If you have some suggestions on how to make the serialization stack more
extensible, please let us know!

Some hooks exist, like TypeInfoFactories:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html#defining-type-information-using-a-factory

But I think that hook does not work for Avro...


On Tue, Mar 7, 2017 at 1:25 PM, Newport, Billy  wrote:

> I need more details, flink does not appear to be really designed to add in
> serializers in a ‘nice’ way as far as I can tell, it’s kind of hardcoded
> for Kryo right now.
>
>
>
>
>
> *From:* Stephan Ewen [mailto:se...@apache.org]
> *Sent:* Tuesday, March 07, 2017 6:21 AM
> *To:* user@flink.apache.org
> *Subject:* Re: Serialization performance
>
>
>
> Hi Billy!
>
>
>
> Out of curiosity: Were you able to hack some direct Avro support as I
> described in the brief writeup, or do you need some more details?
>
>
>
> Stephan
>
>
>
> On Fri, Mar 3, 2017 at 2:38 PM, Aljoscha Krettek 
> wrote:
>
> Hi Billy,
>
> on the Beam side, you probably have looked into writing your own Coder
> (the equivalent of a TypeSerializer in Flink). If yes, did that not work
> out for you? And if yes, why?
>
>
>
> Best,
>
> Aljoscha
>
>
>
>
>
> On Thu, Mar 2, 2017, at 22:02, Stephan Ewen wrote:
>
> Hi!
>
>
>
> I can write some more details later, here the short answer:
>
>
>
>   - Your serializer would do into something like the AvroSerializer
>
>
>
>   - When you instantiate the AvroSerializer in GenericTypeInfo.
> createSerializer(ExecutionConfig), you pre-register the type of the
> generic type, plus all types in "ExecutionConfig.getRegisteredKryoTypes()"
>
> (we abuse the Kryo type registration here as an Avro type registration
> initially, would have to polish that later)
>
>
>
>   - The registered types are classes, but since they are Avro types, you
> should be able to get their schema (for Reflect Data or so)
>
>
>
> That way, Flink would internally forward all the registrations for you
> (similar as it forwards Kryo registrations) and you don't have to manually
> do that.
>
>
>
> Stephan
>
>
>
>
>
> On Thu, Mar 2, 2017 at 9:31 PM, Newport, Billy 
> wrote:
>
> This is what we’re using as our serializer:
>
>
>
> Somewhere:
>
>
>
>env.addDefaultKryoSerializer(Record.*class*, GRKryoSerializer.
> *class*);
>
>
>
> then
>
>
>
> *public* *class* GRKryoSerializer *extends* Serializer
>
> {
>
>  *static* *class* AvroStuff
>
>  {
>
>Schema schema;
>
>String comment;
>
>*long* key;
>
>DatumReader reader;
>
>DatumWriter writer;
>
>  }
>
>  *static* Map *schemaMap* = *new*
> ConcurrentHashMap<>();
>
>  *static* Map *schemaToFingerprintMap* = *new*
> ConcurrentHashMap<>();
>
>  *static* Logger *log* = Logger.*getLogger*(GRKryoSerializer.*class*.
> getName());
>
>
>
>
>
>  *static* *public* *void* preregisterSchema(String comment, Schema
> schema)
>
>  {
>
>*if*(!*schemaToFingerprintMap*.containsKey(schema)){
>
> *long* fingerprint = SchemaNormalization.
> *parsingFingerprint64*(schema);
>
> AvroStuff stuff = *new* AvroStuff();
>
> stuff.schema = schema;
>
> stuff.comment = comment;
>
> stuff.key = fingerprint;
>
> stuff.reader = *new* GenericDatumReader<>(schema);
>
> stuff.writer = *new* GenericDatumWriter<>(schema);
>
> *log*.info(String.*format*("Preregistering schema for %s
> with fingerprint %d", comment, fingerprint));
>
> *schemaMap*.put(fingerprint, stuff);
>
> *schemaToFingerprintMap*.put(schema, fingerprint);
>
>}
>
>  }
>
>
>
>  *public* GRKryoSerializer() {
>
>  }
>
>
>
>  *static* *public* *void* clearSchemaCache()
>
>  {
>
>*schemaToFingerprintMap*.clear();
>
>*schemaMap*.clear();
>
>  }
>
>
>
>  *static* *public* AvroStuff getStuffFor(GenericRecord o)
>
>  {
>
>*return* *getStuffFor*(o.getSchema());
>
>  }
>
>
>
>  *static* *public* AvroStuff getStuffFor(Schema schema)
>
>  {
>
>Long fingerprint = *schemaToFingerprintMap*.get(schema);
>
>*if*(fingerprint == *null*)
>
>{
>
>
>
> fingerprint = SchemaNormalization.*parsingFingerprint64*(
> schema);
>
> *log*.info(String.*format*("No fingerprint. Generated %d
> for schema %s", fingerprint, schema.toString(*true*)));
>
> *schemaToFingerprintMap*.put(schema, fingerprint);
>
>
>
> *throw* *new* RuntimeException("Unknown schema " + schema
> .toString(*true*));
>
>
>
>}
>
>*return* *schemaMap*.get(fingerprint);
>
>  }
>
>
>
>  @Override
>
>  *public* *void* write(Kryo 

Re: Flink checkpointing gets stuck

2017-03-07 Thread Stephan Ewen
Great to hear it!

What do you think about adding a section to the Flink docs about deployment
on Azure (there is already AWS and GCE, so Azure would make the
cloud-trinity complete) that explains how to set this up and avoid such
pitfalls.

https://ci.apache.org/projects/flink/flink-docs-release-1.2/

If there is anything you think can be improved in the dependencies to help
there, please let us know!


On Tue, Mar 7, 2017 at 5:31 PM, Avihai Berkovitz <
avihai.berkov...@microsoft.com> wrote:

> For anyone seeing this thread in the future, we managed to solve the
> issue. The problem was in the Azure storage SDK.
>
> Flink is using Hadoop 2.7, so we used version 2.7.3 of the Hadoop-azure
> package. This package uses version 2.0.0 of the azure-storage package,
> dated from 2014. It has several bugs that were since fixed, specifically
> one where the socket timeout was infinite. We updated this package to
> version 5.0.0 and everything is working smoothly now.
>
>
>
> *From:* Stephan Ewen [mailto:se...@apache.org]
> *Sent:* Sunday, February 26, 2017 4:47 PM
> *To:* user@flink.apache.org
> *Subject:* Re: Flink checkpointing gets stuck
>
>
>
> Thanks!
>
>
>
> This looks like a bigger example, involving MongoDB, etc.
>
> Are you able to reproduce this issue with a smaller example?
>
>
>
> It would also help to understand the problem better if we knew the
> topology a bit better.
>
> The stack traces look like "phase 1&2" want to send data (but are back
> pressured) and "phase 3&4&5" wait for input data.
>
>
>
> Stephan
>
>
>
>
>
> On Sun, Feb 26, 2017 at 12:30 PM, Shai Kaplan 
> wrote:
>
> Running jstack on one of the Task Managers:
>
>
>
> 2017-02-26 10:06:27
>
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.111-b14 mixed mode):
>
>
>
> "Attach Listener" #6414 daemon prio=9 os_prio=0 tid=0x7f3c8c089000
> nid=0xe692 waiting on condition [0x]
>
>java.lang.Thread.State: RUNNABLE
>
>
>
> "Async calls on Sink: phase 5 (32/48)" #2337 daemon prio=5 os_prio=0
> tid=0x7f3b942fc000 nid=0xb0d5 waiting on condition [0x7f3adf0af000]
>
>java.lang.Thread.State: WAITING (parking)
>
>   at sun.misc.Unsafe.park(Native Method)
>
>   - parking to wait for  <0x7f3d9d000620> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.
> java:175)
>
>   at java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>
>   at java.util.concurrent.LinkedBlockingQueue.take(
> LinkedBlockingQueue.java:442)
>
>   at java.util.concurrent.ThreadPoolExecutor.getTask(
> ThreadPoolExecutor.java:1067)
>
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1127)
>
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>
>   at java.lang.Thread.run(Thread.java:745)
>
>
>
> "Async calls on Sink: phase 5 (31/48)" #2336 daemon prio=5 os_prio=0
> tid=0x7f3b942fb000 nid=0xb0d4 waiting on condition [0x7f3adf1b]
>
>java.lang.Thread.State: WAITING (parking)
>
>   at sun.misc.Unsafe.park(Native Method)
>
>   - parking to wait for  <0x7f3d9fbd7e70> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.
> java:175)
>
>   at java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>
>   at java.util.concurrent.LinkedBlockingQueue.take(
> LinkedBlockingQueue.java:442)
>
>   at java.util.concurrent.ThreadPoolExecutor.getTask(
> ThreadPoolExecutor.java:1067)
>
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1127)
>
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>
>   at java.lang.Thread.run(Thread.java:745)
>
>
>
> "Async calls on Sink: phase 5 (30/48)" #2335 daemon prio=5 os_prio=0
> tid=0x7f3b942f9800 nid=0xb0d3 waiting on condition [0x7f3adf2b1000]
>
>java.lang.Thread.State: WAITING (parking)
>
>   at sun.misc.Unsafe.park(Native Method)
>
>   - parking to wait for  <0x7f3da07cdde8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.
> java:175)
>
>   at java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>
>   at java.util.concurrent.LinkedBlockingQueue.take(
> LinkedBlockingQueue.java:442)
>
>   at java.util.concurrent.ThreadPoolExecutor.getTask(
> ThreadPoolExecutor.java:1067)
>
>  

Re: AWS exception serialization problem

2017-03-07 Thread Stephan Ewen
@Shannon @Gordon - is there some shading logic involved in the
dependencies, concerning the AWS libraries?


On Tue, Mar 7, 2017 at 5:50 PM, Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> I just had a quick look on this, but the Kafka fetcher thread’s context
> classloader doesn’t seem to be the issue (at least for 1.1.4).
>
> In Flink 1.1.4, a separate thread from the task thread is created to run
> the fetcher, but since the task thread sets the user code classloader as
> its context classloader, shouldn’t any threads created from it (i.e., the
> fetcher thread) use it also?
>
> A quickly checked the context classloader the Kafka09Fetcher thread in
> 1.1.4 was using, and it’s `FlinkUserCodeClassLoader`.
>
>
> On March 7, 2017 at 7:32:35 PM, Stephan Ewen (se...@apache.org) wrote:
>
> Ah, I see...
>
> The issue is that the Kafka fetcher thread apparently do not have the
> user-code class loader set as the context class loader. Kryo relies on that
> for class resolution.
>
> What Flink version are you on? I think that actual processing and
> forwarding does not happen in the Kafka Fetchers any more as of 1.2, so
> only Flink 1.1 should be affected...
>
>
> On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey  wrote:
>
>> I think my previous guess was wrong. From what I can tell, when Kryo
>> tries to copy the exception object, it does that by serializing and
>> deserializing it. For subclasses of RuntimeException, it doesn't know how
>> to do it so it delegates serialization to Java. However, it doesn't use a
>> custom ObjectInputStream to override resolveClass() and provide classes
>> from the user code classloader… such as happens in RocksDBStateBackend's
>> use of InstantiationUtil.deserializeObject(). Instead, it uses
>> ObjectInputStream$latestUserDefinedLoader() which is the
>> Launcher$AppClassLoader which definitely doesn't have the user code in it.
>>
>> Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being
>> configured?
>>
>> Thanks,
>> Shannon
>>
>>
>> From: Shannon Carey 
>> Date: Monday, March 6, 2017 at 7:09 PM
>> To: "user@flink.apache.org" 
>> Subject: Re: AWS exception serialization problem
>>
>> This happened when running Flink with bin/run-local.sh I notice that
>> there only appears to be one Java process. The job manager and the task
>> manager run in the same JVM, right? I notice, however, that there are two
>> blob store folders on disk. Could the problem be caused by two different
>> FlinkUserCodeClassLoader objects pointing to the two different JARs?
>>
>>
>> From: Shannon Carey 
>> Date: Monday, March 6, 2017 at 6:39 PM
>> To: "user@flink.apache.org" 
>> Subject: AWS exception serialization problem
>>
>> Has anyone encountered this or know what might be causing it?
>>
>>
>> java.lang.RuntimeException: Could not forward element to next operator
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
>> at 
>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
>> at 
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
>> at 
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
>> deserialization.
>> at 
>> com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
>> at 
>> org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
>> at 
>> org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
>> ... 7 more
>> Caused by: java.lang.ClassNotFoundException: 
>> com.amazonaws.services.s3.model.AmazonS3Exception
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at 

Re: Integrate Flink with S3 on EMR cluster

2017-03-07 Thread Shannon Carey
Generally, using S3 filesystem in EMR with Flink has worked pretty well for me 
in Flink < 1.2 (unless you run out of connections in your HTTP pool). When you 
say, "using Hadoop File System class", what do you mean? In my experience, it's 
sufficient to just use the "s3://" filesystem protocol and Flink's Hadoop 
integration (plus S3 filesystem classes provided by EMR) will do the right 
thing.

-Shannon


Re: FlinkKafkaConsumer010 - creating a data stream of type DataStream<ConsumerRecord<K,V>>

2017-03-07 Thread Dominik Safaric
Hi Gordon,

Thanks for the advice. Following it I’ve implemented the 
Keyed(De)SerializationSchema and am able to further emit the metadata to 
downstream operators. 

Regards,
Dominik

> On 7 Mar 2017, at 07:08, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi Dominik,
> 
> I would recommend implementing a `KeyedSerializationSchema`, and supply it to 
> the constructor when initializing your FlinkKafkaConsumer.
> 
> The `KeyedDeserializationSchema` exposes the metadata of the record such as 
> offset, partition, and key. In the schema, you can implement your own logic 
> of turning the binary data from Kafka into your own data types that carry the 
> metadata information along with the record value, e.g. POJOs or Tuples.
> 
> Some links for more info on this:
> 1. 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#the-deserializationschema
>  
> 
> 2. 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#flinks-typeinformation-class
>  
> 
> 
> The metadata `KeyedDeserializationSchema` exposes is extracted from 
> `ConsumerRecord`s within the Kafka connector, so it doesn’t make sense to 
> wrap it up again into a `ConsumerRecord`. The schema interface exposes all 
> available metadata of the record, so it should be sufficient.
> 
> Cheers,
> Gordon
> 
> On March 7, 2017 at 3:51:59 AM, Dominik Safaric (dominiksafa...@gmail.com 
> ) wrote:
> 
>> Hi, 
>> 
>> Unfortunately I cannot find the option of using raw ConsumerRecord 
>> instances when creating a Kafka data stream.  
>> 
>> In general, I would like to use an instance of the mentioned type because 
>> our use case requires certain metadata such as record offset and partition. 
>> 
>> So far I’ve examined the source code of the Kafka connector and checked the 
>> docs, but unfortunately I could not find the option of creating a data 
>> stream of the type DataStream>.  
>> 
>> Am I missing something or in order to have this ability I have to implement 
>> it myself and build Flink from source?  
>> 
>> Thanks in advance, 
>> Dominik 



Re: AWS exception serialization problem

2017-03-07 Thread Tzu-Li (Gordon) Tai
Hi,

I just had a quick look on this, but the Kafka fetcher thread’s context 
classloader doesn’t seem to be the issue (at least for 1.1.4).

In Flink 1.1.4, a separate thread from the task thread is created to run the 
fetcher, but since the task thread sets the user code classloader as its 
context classloader, shouldn’t any threads created from it (i.e., the fetcher 
thread) use it also?

A quickly checked the context classloader the Kafka09Fetcher thread in 1.1.4 
was using, and it’s `FlinkUserCodeClassLoader`.


On March 7, 2017 at 7:32:35 PM, Stephan Ewen (se...@apache.org) wrote:

Ah, I see...

The issue is that the Kafka fetcher thread apparently do not have the user-code 
class loader set as the context class loader. Kryo relies on that for class 
resolution.

What Flink version are you on? I think that actual processing and forwarding 
does not happen in the Kafka Fetchers any more as of 1.2, so only Flink 1.1 
should be affected...


On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey  wrote:
I think my previous guess was wrong. From what I can tell, when Kryo tries to 
copy the exception object, it does that by serializing and deserializing it. 
For subclasses of RuntimeException, it doesn't know how to do it so it 
delegates serialization to Java. However, it doesn't use a custom 
ObjectInputStream to override resolveClass() and provide classes from the user 
code classloader… such as happens in RocksDBStateBackend's use of 
InstantiationUtil.deserializeObject(). Instead, it uses 
ObjectInputStream$latestUserDefinedLoader() which is the 
Launcher$AppClassLoader which definitely doesn't have the user code in it.

Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being 
configured?

Thanks,
Shannon


From: Shannon Carey 
Date: Monday, March 6, 2017 at 7:09 PM
To: "user@flink.apache.org" 
Subject: Re: AWS exception serialization problem

This happened when running Flink with bin/run-local.sh I notice that there only 
appears to be one Java process. The job manager and the task manager run in the 
same JVM, right? I notice, however, that there are two blob store folders on 
disk. Could the problem be caused by two different FlinkUserCodeClassLoader 
objects pointing to the two different JARs?


From: Shannon Carey 
Date: Monday, March 6, 2017 at 6:39 PM
To: "user@flink.apache.org" 
Subject: AWS exception serialization problem

Has anyone encountered this or know what might be causing it?


java.lang.RuntimeException: Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
at 
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
deserialization.
at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
... 7 more
Caused by: java.lang.ClassNotFoundException: 
com.amazonaws.services.s3.model.AmazonS3Exception
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at 

Re: Any good ideas for online/offline detection of devices that send events?

2017-03-07 Thread Bruno Aranda
Hi Gordon,

Many thanks for your helpful ideas. We tried yesterday the CEP approach,
but could not figure it out. The ProcessFunction one looks more promising,
and we are investigating it, though we are fighting with some issues
related to the event time, where we cannot see so far the timer triggered
at the right event time. We are using ascending timestamps, but at the
moment we see the timers fired when it is too late. Investigating more.

Thanks,

Bruno

On Tue, 7 Mar 2017 at 07:49 Tzu-Li (Gordon) Tai  wrote:

> Some more input:
>
> Right now, you can also use the `ProcessFunction` [1] available in Flink
> 1.2 to simulate state TTL.
> The `ProcessFunction` should allow you to keep device state and simulate
> the online / offline detection by registering processing timers. In the
> `onTimer` callback, you can emit the “offline” marker event downstream, and
> in the `processElement` method, you can emit the “online” marker event if
> the case is the device has sent an event after it was determined to be
> offline.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
>
>
> On March 6, 2017 at 9:40:28 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org)
> wrote:
>
> Hi Bruno!
>
> The Flink CEP library also seems like an option you can look into to see
> if it can easily realize what you have in mind.
>
> Basically, the pattern you are detecting is a timeout of 5 minutes after
> the last event. Once that pattern is detected, you emit a “device offline”
> event downstream.
> With this, you can also extend the pattern output stream to detect whether
> a device has became online again.
>
> Here are some materials for you to take a look at Flink CEP:
> 1. http://flink.apache.org/news/2016/04/06/cep-monitoring.html
> 2.
> https://www.slideshare.net/FlinkForward/fabian-huesketill-rohrmann-declarative-stream-processing-with-streamsql-and-cep?qid=3c13eb7d-ed39-4eae-9b74-a6c94e8b08a3==_search=4
>
> The CEP parts in the slides in 2. also provides some good examples of
> timeout detection using CEP.
>
> Hope this helps!
>
> Cheers,
> Gordon
>
> On March 4, 2017 at 1:27:51 AM, Bruno Aranda (bara...@apache.org) wrote:
>
> Hi all,
>
> We are trying to write an online/offline detector for devices that keep
> streaming data through Flink. We know how often roughly to expect events
> from those devices and we want to be able to detect when any of them stops
> (goes offline) or starts again (comes back online) sending events through
> the pipeline. For instance, if 5 minutes have passed since the last event
> of a device, we would fire an event to indicate that the device is offline.
>
> The data from the devices comes through Kafka, with their own event time.
> The devices events are in order in the partitions and each devices goes to
> a specific partition, so in theory, we should not have out of order when
> looking at one partition.
>
> We are assuming a good way to do this is by using sliding windows that are
> big enough, so we can see the relevant gap before/after the events for each
> specific device.
>
> We were wondering if there are other ideas on how to solve this.
>
> Many thanks!
>
> Bruno
>
>


Re: Integrate Flink with S3 on EMR cluster

2017-03-07 Thread vinay patil
Hi Guys,

Has anyone got this error before ? If yes, have you found any other
solution apart from copying the jar files to flink lib folder

Regards,
Vinay Patil

On Mon, Mar 6, 2017 at 8:21 PM, vinay patil [via Apache Flink User Mailing
List archive.]  wrote:

> Hi Guys,
>
> I am getting the same exception:
> EMRFileSystem not Found
>
> I am trying to read encrypted S3 file using Hadoop File System class.
>  (using Flink 1.2.0)
> When I copy all the libs from /usr/share/aws/emrfs/lib and /usr/lib/hadoop
> to Flink lib folder , it works.
>
> However I see that all these libs are already included in the Hadoop
> classpath.
>
> Is there any other way I can make this work ?
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Integrate-Flink-with-S3-on-EMR-cluster-tp5894p12053.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Integrate-Flink-with-S3-on-EMR-cluster-tp5894p12072.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: AWS exception serialization problem

2017-03-07 Thread Stephan Ewen
Ah, I see...

The issue is that the Kafka fetcher thread apparently do not have the
user-code class loader set as the context class loader. Kryo relies on that
for class resolution.

What Flink version are you on? I think that actual processing and
forwarding does not happen in the Kafka Fetchers any more as of 1.2, so
only Flink 1.1 should be affected...


On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey  wrote:

> I think my previous guess was wrong. From what I can tell, when Kryo tries
> to copy the exception object, it does that by serializing and deserializing
> it. For subclasses of RuntimeException, it doesn't know how to do it so it
> delegates serialization to Java. However, it doesn't use a
> custom ObjectInputStream to override resolveClass() and provide classes
> from the user code classloader… such as happens in RocksDBStateBackend's
> use of InstantiationUtil.deserializeObject(). Instead, it uses
> ObjectInputStream$latestUserDefinedLoader() which is the
> Launcher$AppClassLoader which definitely doesn't have the user code in it.
>
> Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being
> configured?
>
> Thanks,
> Shannon
>
>
> From: Shannon Carey 
> Date: Monday, March 6, 2017 at 7:09 PM
> To: "user@flink.apache.org" 
> Subject: Re: AWS exception serialization problem
>
> This happened when running Flink with bin/run-local.sh I notice that there
> only appears to be one Java process. The job manager and the task manager
> run in the same JVM, right? I notice, however, that there are two blob
> store folders on disk. Could the problem be caused by two different
> FlinkUserCodeClassLoader objects pointing to the two different JARs?
>
>
> From: Shannon Carey 
> Date: Monday, March 6, 2017 at 6:39 PM
> To: "user@flink.apache.org" 
> Subject: AWS exception serialization problem
>
> Has anyone encountered this or know what might be causing it?
>
>
> java.lang.RuntimeException: Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
> deserialization.
>   at 
> com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
>   at 
> org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
>   at 
> org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
>   ... 7 more
> Caused by: java.lang.ClassNotFoundException: 
> com.amazonaws.services.s3.model.AmazonS3Exception
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:348)
>   at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>   at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
>   at java.lang.Throwable.readObject(Throwable.java:914)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
>