Re: CoProcess() VS union.Process()

2018-02-09 Thread Xingcan Cui
Hi Max,

if I understood correctly, instead of joining three streams, you actually 
performed two separate joins, say S1 JOIN S3 and S2 JOIN S3, right? Your plan 
"(S1 UNION S2) JOIN S3” seems to be identical with “(S1 JOIN S3)  UNION (S2 
JOIN S3)” and if that’s what you need, your pipeline should be feasible I think.

However, If you want to join three streams, you may first join S1 with S2 to 
produce S12 with a CoProcessFunction, and then set another CoProcessFunction to 
join S12 with S3.

Hope that helps.

Best,
Xingcan

> On 10 Feb 2018, at 1:06 PM, m@xi  wrote:
> 
> Hello Flinkers,
> 
> I would like to discuss with you about something that bothers me. So, I have
> two streams that I want to join along with a third stream which I want to
> consult its data from time to time and triggers decisions.
> 
> Essentially, this boils down to coProcessing 3 streams together instead of
> 2, which to the best of my knowledge is not possible.
> 
> I thought to append an extra field to the 2 streams I want to join, namely
> say S1, S2 are the streams with tuples t1, t2. After the pumping with the
> extra field which is the stream id (1 or 2) the tuple would be (1, t1) and
> (2, t2) resulting to S1' and S2'.
> 
> Then I will do S1'.union(S2') which gives me a single data stream. Then this
> I may join with the 3rd stream and do the processing with a coProcess
> function.
> 
> Although, whenever I process and element from the united streams I should
> have an if-then-else to check to which stream a tuple belongs and process
> and update S1' and S2' state accordingly.
> 
> Do you think this is a good idea? In terms of efficiency compared with
> having two functions to do this, namely processElement1() and
> processElement2() of the coProcess function in case I only had two streams.
> 
> And if the aforementioned scheme is feasible, then I guess up til now, this
> is the only way of joining more than 2 streams. Am I right?
> 
> Thanks in advance for your help.
> 
> Best,
> Max
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



CoProcess() VS union.Process()

2018-02-09 Thread m@xi
Hello Flinkers,

I would like to discuss with you about something that bothers me. So, I have
two streams that I want to join along with a third stream which I want to
consult its data from time to time and triggers decisions.

Essentially, this boils down to coProcessing 3 streams together instead of
2, which to the best of my knowledge is not possible.

I thought to append an extra field to the 2 streams I want to join, namely
say S1, S2 are the streams with tuples t1, t2. After the pumping with the
extra field which is the stream id (1 or 2) the tuple would be (1, t1) and
(2, t2) resulting to S1' and S2'.

Then I will do S1'.union(S2') which gives me a single data stream. Then this
I may join with the 3rd stream and do the processing with a coProcess
function.

Although, whenever I process and element from the united streams I should
have an if-then-else to check to which stream a tuple belongs and process
and update S1' and S2' state accordingly.

Do you think this is a good idea? In terms of efficiency compared with
having two functions to do this, namely processElement1() and
processElement2() of the coProcess function in case I only had two streams.

And if the aforementioned scheme is feasible, then I guess up til now, this
is the only way of joining more than 2 streams. Am I right?

Thanks in advance for your help.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Share Spring Application context among operators

2018-02-09 Thread Swapnil Dharane
 Hello,

Is there any way with which I can pass my spring ApplicationContext object
as parameter to flink operators? I understand I need to serialize this
object.Is there any existing serialization mechanism that I can use?

Thanks in advance.


Optimizing multiple aggregate queries on a CEP using Flink

2018-02-09 Thread Sahil Arora
Hi there,
We have been working on a project with the title "Optimizing Multiple
Aggregate Queries over a Complex Event Processing Engine". The aim is to
optimize a group of queries. Take such as* "how many cars passed the post
in the past 1 minute" *and* "how many cars passed the post in the past 2
minutes"* are 2 queries, and the naive and inefficient method to answer
both the queries is to independently solve both of these queries one by one
and find the answer. However, the optimum way would be to minimize the
computation by using the answer given by query 1 and using it in query 2.
This is basically what our aim is, to minimize computation cost when we
have multiple aggregate queries in a CEP.

We have been searching for some platform which supports CEP, and Flink is
probably one of them. Hence, it would be very helpful if we could get some
answers to the following questions:

1. Does flink already have some method of optimizing multiple aggregate
queries?
2. Is it possible for us to implement / test such an algorithm in flink
which considers multiple queries in a CEP, like having a database of SQL
queries and testing an algorithm of our choice?

Any other inputs which may help us with solving the problem would be highly
welcome.

Thanks a lot.
-- 
Sahil Arora
Final year B.Tech Undergrad | Indian Institute of Technology Mandi
Web: https://sahilarora535.github.io
LinkedIn: sahilarora535 
Ph: +91-8130506047 <+91%2081305%2006047>


Re: Regarding BucketingSink

2018-02-09 Thread Vishal Santoshi
without --allowNonRestoredState, on a suspend/resume we do see the length
file along with the finalized file ( finalized during resume )

-rw-r--r--   3 root hadoop 10 2018-02-09 13:57
/vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length

that does makes much more sense.

I guess we should document --allowNonRestoredState better ? It seems it
actually drops state ?



On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi 
wrote:

> This is 1.4 BTW.  I am not sure that I am reading this correctly but the
> lifecycle of cancel/resume is 2 steps
>
>
>
> 1. Cancel job with SP
>
>
> closeCurrentPartFile
>
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-filesystem/src/main/java/org/
> apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L549
>
> is called from close()
>
>
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-filesystem/src/main/java/org/
> apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L416
>
>
> and that moves files to pending state.  That I would presume is called
> when one does a cancel.
>
>
>
> 2. The restore on resume
>
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-filesystem/src/main/java/org/
> apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L369
>
> calls
>
> handleRestoredBucketState
>
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-filesystem/src/main/java/org/
> apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L704
>
> clears the pending files from state without finalizing them?
>
>
>
> That does not seem to be right. I must be reading the code totally wrong ?
>
> I am not sure also whether --allowNonRestoredState is skipping getting
> the state . At least https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/ops/cli.html#savepoints is not exactly clear what it does if
> we add an operator ( GDF I think will add a new operator in the DAG without
> state even if stateful, in my case the Map operator is not even stateful )
>
>
> Thanks and please bear with me if this is all something pretty simple.
>
> Vishal
>
>
>
>
>
>
>
>
>
>
>
>
> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> What should be the behavior of BucketingSink vis a vis state ( pending ,
>> inprogess and finalization ) when we suspend and resume ?
>>
>> So I did this
>>
>> * I had a pipe writing to hdfs suspend and resume using
>>
>> --allowNonRestoredState as in I had added a harmless MapOperator (
>> stateless ).
>>
>>
>> * I see that a file on hdfs, the file being written to ( before the
>> cancel with save point )  go into a pending state  _part-0-21.pending
>>
>>
>> * I see a new file being written to in the resumed pipe
>> _part-0-22.in-progress.
>>
>>
>> What  I do not see is the file in  _part-0-21.pending being finalized (
>> as in renamed to a just part-0-21. I would have assumed that would be the
>> case in this controlled suspend/resume circumstance. Further it is a rename
>> and hdfs mv is not an expensive operation.
>>
>>
>>
>> Am I understanding the process correct and it yes any pointers ?
>>
>>
>> Regards,
>>
>>
>> Vishal
>>
>
>


Re: Regarding BucketingSink

2018-02-09 Thread Vishal Santoshi
This is 1.4 BTW.  I am not sure that I am reading this correctly but the
lifecycle of cancel/resume is 2 steps



1. Cancel job with SP


closeCurrentPartFile

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L549

is called from close()


https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L416


and that moves files to pending state.  That I would presume is called when
one does a cancel.



2. The restore on resume

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L369

calls

handleRestoredBucketState

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L704

clears the pending files from state without finalizing them?



That does not seem to be right. I must be reading the code totally wrong ?

I am not sure also whether --allowNonRestoredState is skipping getting the
state . At least
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints
is not exactly clear what it does if we add an operator ( GDF I think will
add a new operator in the DAG without state even if stateful, in my case
the Map operator is not even stateful )


Thanks and please bear with me if this is all something pretty simple.

Vishal












On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi 
wrote:

> What should be the behavior of BucketingSink vis a vis state ( pending ,
> inprogess and finalization ) when we suspend and resume ?
>
> So I did this
>
> * I had a pipe writing to hdfs suspend and resume using
>
> --allowNonRestoredState as in I had added a harmless MapOperator (
> stateless ).
>
>
> * I see that a file on hdfs, the file being written to ( before the cancel
> with save point )  go into a pending state  _part-0-21.pending
>
>
> * I see a new file being written to in the resumed pipe
> _part-0-22.in-progress.
>
>
> What  I do not see is the file in  _part-0-21.pending being finalized (
> as in renamed to a just part-0-21. I would have assumed that would be the
> case in this controlled suspend/resume circumstance. Further it is a rename
> and hdfs mv is not an expensive operation.
>
>
>
> Am I understanding the process correct and it yes any pointers ?
>
>
> Regards,
>
>
> Vishal
>


Regarding BucketingSink

2018-02-09 Thread Vishal Santoshi
What should be the behavior of BucketingSink vis a vis state ( pending ,
inprogess and finalization ) when we suspend and resume ?

So I did this

* I had a pipe writing to hdfs suspend and resume using

--allowNonRestoredState as in I had added a harmless MapOperator (
stateless ).


* I see that a file on hdfs, the file being written to ( before the cancel
with save point )  go into a pending state  _part-0-21.pending


* I see a new file being written to in the resumed pipe
_part-0-22.in-progress.


What  I do not see is the file in  _part-0-21.pending being finalized ( as
in renamed to a just part-0-21. I would have assumed that would be the case
in this controlled suspend/resume circumstance. Further it is a rename and
hdfs mv is not an expensive operation.



Am I understanding the process correct and it yes any pointers ?


Regards,


Vishal


Re: dataset sort

2018-02-09 Thread Fabian Hueske
The reason why this isn't working in Flink are that

* a file can only be written by a single process
* Flink does not support merging of sorted network partitions but reads
round-robin from incoming network channels.

I think if you sort the historic data in parallel (without range
partitioning, i.e., randomly partitioned) and write it out in multiple
files, you could implement a source function that reads all files in
parallel and generates ascending watermarks.
It would be important that you have as many parallel source tasks as you
have files to ensure that watermarks are properly generated. Apart from
that, this should result in a nicely sorted stream.
The watermark handling of the DataStream API will take care to "merge" the
sorted files.

Best, Fabian


2018-02-09 16:23 GMT+01:00 david westwood :

> Thanks.
>
> I have to stream in the historical data and its out-of-boundedness >>
> real-time data. I thought there was some elegant way using mapPartition
> that I wasn't seeing.
>
> On Fri, Feb 9, 2018 at 5:10 AM, Fabian Hueske  wrote:
>
>> You can also partition by range and sort and write each partition. Once
>> all partitions have been written to files, you can concatenate the files.
>> As Till said it is not possible to sort in parallel and write in order to
>> a single file.
>>
>> Best, Fabian
>>
>> 2018-02-09 10:35 GMT+01:00 Till Rohrmann :
>>
>>> Hi David,
>>>
>>> Flink only supports sorting within partitions. Thus, if you want to
>>> write out a globally sorted dataset you should set the parallelism to 1
>>> which effectively results in a single partition. Decreasing the
>>> parallelism of an operator will cause the individual partitions to lose its
>>> sort order because the individual partitions are read in a non
>>> deterministic order.
>>>
>>> Cheers,
>>> Till
>>>
>>>
>>> On Thu, Feb 8, 2018 at 8:07 PM, david westwood <
>>> david.d.westw...@gmail.com> wrote:
>>>
 Hi:

 I would like to sort historical data using the dataset api.

 env.setParallelism(10)

 val dataset = [(Long, String)] ..
 .paritionByRange(_._1)
 .sortPartition(_._1, Order.ASCEDING)
 .writeAsCsv("mydata.csv").setParallelism(1)

 the data is out of order (in local order)
 but
 .print()
 prints the data in to correct order. I have run a small toy sample
 multiple times.

 Is there a way to sort the entire dataset with parallelism > 1 and
 write it to a single file in ascending order?

>>>
>>>
>>
>


Re: [EXTERNAL] Re: Flink REST API

2018-02-09 Thread Raja . Aravapalli
I am issuing a GET call to list running jobs on Flink Session.

Another quick question, is there a way to check the port on which my Flink YARN 
Session is exposing REST API ?

Because, I could figure out on UI either in YARN Resource Manager / Flink Web 
UI of YARN Session the port number I received from Yarn Client using the 
command “yarn application -list”

Thanks a lot.

Regards,
Raja.

From: Gary Yao 
Date: Friday, February 9, 2018 at 9:25 AM
To: Raja Aravapalli 
Cc: "user@flink.apache.org" 
Subject: Re: [EXTERNAL] Re: Flink REST API

Hi Raja,

Can you tell me the API call that you are trying to issue? If it is not a GET
request, it could be that you are suffering from this bug:

  https://issues.apache.org/jira/browse/YARN-2031

In my case the tracking url shown on the resource manager ui is indeed one that
targets the YARN proxy, i.e., POST/DELETE/PUT requests are not supported.
However, the tracking url obtained from the YARN cli directly refers to the
jobmanager.

Best,

Gary

On Fri, Feb 9, 2018 at 3:54 PM, Raja.Aravapalli 
> wrote:

Hi Gary,

Thanks a lot. I am able to use REST API now.

As you informed, I am able to query REST API, by capturing the tracking-url, I 
get by using the command “yarn application -list”

But, however as I observe in the YARN Resource manager UI, I am not able to 
query using the tracking url I am observing in YARN Resource Manager. Not sure, 
If the Hadoop environment I am on, is using some proxies or etc.
Do you have any thoughts on why this is happening different in YARN command 
line Vs YARN Resource Manager ?


Thanks a lot again.


Regards,
Raja.

From: Gary Yao >
Date: Friday, February 2, 2018 at 10:20 AM
To: Raja Aravapalli 
>
Cc: "user@flink.apache.org" 
>
Subject: [EXTERNAL] Re: Flink REST API

Hi Raja,

The registered tracking URL of the YARN application can be used to issue HTTP
requests against the REST API. You can retrieve the URL by using the YARN
client:

  yarn application -list

In the output, the rightmost column shows the URL, e.g.,

  Application-Id  ...   Tracking-URL
  application_1517585200464_0001 ...
  http://ip-172-31-40-165.eu-central-1.compute.internal:38045

You can also find the tracking URL in the Web UI of the YARN resource manager.

Best,

Gary



On Thu, Feb 1, 2018 at 9:29 PM, Raja.Aravapalli 
> wrote:
Hi,

I have a triggered a Flink  YARN Session on Hadoop yarn.

While I was able to trigger applications and run them. I wish to find the URL 
of REST API for the Flink YARN Sesssion I launched.

Can someone please help me point out on how to find the REST API Url for the 
Flink on YARN?

Thanks a lot.


Regards,
Raja.




Re: [EXTERNAL] Re: Flink REST API

2018-02-09 Thread Gary Yao
Hi Raja,

Can you tell me the API call that you are trying to issue? If it is not a
GET
request, it could be that you are suffering from this bug:

  https://issues.apache.org/jira/browse/YARN-2031

In my case the tracking url shown on the resource manager ui is indeed one
that
targets the YARN proxy, i.e., POST/DELETE/PUT requests are not supported.
However, the tracking url obtained from the YARN cli directly refers to the
jobmanager.

Best,

Gary

On Fri, Feb 9, 2018 at 3:54 PM, Raja.Aravapalli 
wrote:

>
>
> Hi Gary,
>
>
>
> Thanks a lot. I am able to use REST API now.
>
>
>
> As you informed, I am able to query REST API, by capturing the
> tracking-url, I get by using the command *“yarn application -list”*
>
>
>
> But, however as I observe in the YARN Resource manager UI, I am not able
> to query using the tracking url I am observing in YARN Resource Manager.
> Not sure, If the Hadoop environment I am on, is using some proxies or etc.
>
> Do you have any thoughts on why this is happening different in YARN
> command line Vs YARN Resource Manager ?
>
>
>
>
>
> Thanks a lot again.
>
>
>
>
>
> Regards,
>
> Raja.
>
>
>
> *From: *Gary Yao 
> *Date: *Friday, February 2, 2018 at 10:20 AM
> *To: *Raja Aravapalli 
> *Cc: *"user@flink.apache.org" 
> *Subject: *[EXTERNAL] Re: Flink REST API
>
>
>
> Hi Raja,
>
>
>
> The registered tracking URL of the YARN application can be used to issue
> HTTP
>
> requests against the REST API. You can retrieve the URL by using the YARN
>
> client:
>
>
>
>   yarn application -list
>
>
>
> In the output, the rightmost column shows the URL, e.g.,
>
>
>
>   Application-Id  ...   Tracking-URL
>
>   application_1517585200464_0001 ...
>
>   http://ip-172-31-40-165.eu-central-1.compute.internal:38045
>
>
>
> You can also find the tracking URL in the Web UI of the YARN resource
> manager.
>
>
>
> Best,
>
>
>
> Gary
>
>
>
>
>
>
>
> On Thu, Feb 1, 2018 at 9:29 PM, Raja.Aravapalli <
> raja.aravapa...@target.com> wrote:
>
> Hi,
>
>
>
> I have a triggered a Flink  YARN Session on Hadoop yarn.
>
>
>
> While I was able to trigger applications and run them. I wish to find the
> URL of REST API for the Flink YARN Sesssion I launched.
>
>
>
> Can someone please help me point out on how to find the REST API Url for
> the Flink on YARN?
>
>
>
> Thanks a lot.
>
>
>
>
>
> Regards,
>
> Raja.
>
>
>


Re: dataset sort

2018-02-09 Thread david westwood
Thanks.

I have to stream in the historical data and its out-of-boundedness >>
real-time data. I thought there was some elegant way using mapPartition
that I wasn't seeing.

On Fri, Feb 9, 2018 at 5:10 AM, Fabian Hueske  wrote:

> You can also partition by range and sort and write each partition. Once
> all partitions have been written to files, you can concatenate the files.
> As Till said it is not possible to sort in parallel and write in order to
> a single file.
>
> Best, Fabian
>
> 2018-02-09 10:35 GMT+01:00 Till Rohrmann :
>
>> Hi David,
>>
>> Flink only supports sorting within partitions. Thus, if you want to write
>> out a globally sorted dataset you should set the parallelism to 1 which
>> effectively results in a single partition. Decreasing the parallelism of
>> an operator will cause the individual partitions to lose its sort order
>> because the individual partitions are read in a non deterministic order.
>>
>> Cheers,
>> Till
>>
>>
>> On Thu, Feb 8, 2018 at 8:07 PM, david westwood <
>> david.d.westw...@gmail.com> wrote:
>>
>>> Hi:
>>>
>>> I would like to sort historical data using the dataset api.
>>>
>>> env.setParallelism(10)
>>>
>>> val dataset = [(Long, String)] ..
>>> .paritionByRange(_._1)
>>> .sortPartition(_._1, Order.ASCEDING)
>>> .writeAsCsv("mydata.csv").setParallelism(1)
>>>
>>> the data is out of order (in local order)
>>> but
>>> .print()
>>> prints the data in to correct order. I have run a small toy sample
>>> multiple times.
>>>
>>> Is there a way to sort the entire dataset with parallelism > 1 and write
>>> it to a single file in ascending order?
>>>
>>
>>
>


Re: [EXTERNAL] Re: Flink REST API

2018-02-09 Thread Raja . Aravapalli

Hi Gary,

Thanks a lot. I am able to use REST API now.

As you informed, I am able to query REST API, by capturing the tracking-url, I 
get by using the command “yarn application -list”

But, however as I observe in the YARN Resource manager UI, I am not able to 
query using the tracking url I am observing in YARN Resource Manager. Not sure, 
If the Hadoop environment I am on, is using some proxies or etc.
Do you have any thoughts on why this is happening different in YARN command 
line Vs YARN Resource Manager ?


Thanks a lot again.


Regards,
Raja.

From: Gary Yao 
Date: Friday, February 2, 2018 at 10:20 AM
To: Raja Aravapalli 
Cc: "user@flink.apache.org" 
Subject: [EXTERNAL] Re: Flink REST API

Hi Raja,

The registered tracking URL of the YARN application can be used to issue HTTP
requests against the REST API. You can retrieve the URL by using the YARN
client:

  yarn application -list

In the output, the rightmost column shows the URL, e.g.,

  Application-Id  ...   Tracking-URL
  application_1517585200464_0001 ...
  http://ip-172-31-40-165.eu-central-1.compute.internal:38045

You can also find the tracking URL in the Web UI of the YARN resource manager.

Best,

Gary



On Thu, Feb 1, 2018 at 9:29 PM, Raja.Aravapalli 
> wrote:
Hi,

I have a triggered a Flink  YARN Session on Hadoop yarn.

While I was able to trigger applications and run them. I wish to find the URL 
of REST API for the Flink YARN Sesssion I launched.

Can someone please help me point out on how to find the REST API Url for the 
Flink on YARN?

Thanks a lot.


Regards,
Raja.



RE: CEP for time series in csv-file

2018-02-09 Thread Esa Heikkinen
Hi

Thanks for the hints, but I am still very interested about simple working 
example with combination: sbt-project, scala, csv-file reading and cep 
processing. I have did not exactly find something like that. It would help me a 
lot.

It takes lot of time to learn and test many possible code combinations.. Too 
many “moving” parts..
For example “huge” amount of different “imports” and where I can find how use 
them and so on ?
I did not find strict “reference” guide. For example for readCsvFile(). Or 
should I look it from code ?

By the way what is better to use maven or sbt ? It seems most of examples use 
maven, but I haven’t got maven to work properly (yet) ..

Best Regards
Esa

From: Timo Walther [mailto:twal...@apache.org]
Sent: Thursday, February 8, 2018 7:23 PM
To: user@flink.apache.org
Subject: Re: CEP for time series in csv-file

You can also take a look at the Flink training from data Artisans and the code 
examples there. They also use CEP and basically read also from a file:

http://training.data-artisans.com/exercises/CEP.html

Regards,
Timo


Am 2/8/18 um 6:09 PM schrieb Kostas Kloudas:
Hi Esa,

I think the best place to start is the documentation available at the flink 
website.

Some pointers are the following:

CEP documentation: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/libs/cep.html

Blog post with CEP example: 
https://data-artisans.com/blog/complex-event-processing-flink-cep-update

Cheers,
Kostas


On Feb 8, 2018, at 4:28 PM, Esa Heikkinen 
> wrote:

Hi

I have cvs-file(s) that contain an event in every row and first column is time 
stamp of event. Rest of columns are data and “attributes” of event.

I’d want to write simple Scala code that: 1) reads data of csv-file 2) converts 
data of csv-file compatible for CEP 3) sets pattern for CEP 4) Runs CEP  5) 
writes results

Do you have any hints or examples how to do that ?

By the way, what kind of time stamp should be in csv-file ?





Re: Classloader leak with Kafka Mbeans / JMX Reporter

2018-02-09 Thread Edward
I applied the change in the pull request associated with that Kafka bug, and
unfortunately it didn't resolve anything. It doesn't unregister any
additional MBeans which are created by Kafka's JmxRepository -- it is just a
fix to the remove mbeans from Kafka's cache of mbeans (i.e. it is doing
cleanup within the job's  ChildFirst classloader, not the bootstrap/App
classloader where the strong reference exists).



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Strata San Jose

2018-02-09 Thread ashish pok
Awesome, I will send a note from my work email :) 

-- Ashish 
 
  On Fri, Feb 9, 2018 at 5:12 AM, Fabian Hueske wrote:   Hi 
Ashish,

I'll be at Strata San Jose and give two talks.

Just ping me and we can meet there :-)

Cheers, Fabian

2018-02-09 0:53 GMT+01:00 ashish pok :

Wondering if any of the core Flink team members are planning to be at the 
conference? It would be great to meet in peson.
Thanks,

-- Ashish

  


Re: No Job is getting Submitted through Flink 4.0 UI

2018-02-09 Thread Puneet Kinra
 I am unable to submit the job in flink from UI  any specific port opening
is required.

On Fri, Feb 9, 2018 at 5:10 PM, Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> I am unable to submit the job in flink from UI
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>


-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


No Job is getting Submitted through Flink 4.0 UI

2018-02-09 Thread Puneet Kinra
I am unable to submit the job in flink from UI

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: Strata San Jose

2018-02-09 Thread Fabian Hueske
Hi Ashish,

I'll be at Strata San Jose and give two talks.

Just ping me and we can meet there :-)

Cheers, Fabian

2018-02-09 0:53 GMT+01:00 ashish pok :

> Wondering if any of the core Flink team members are planning to be at the
> conference? It would be great to meet in peson.
>
> Thanks,
>
> -- Ashish
>


Re: dataset sort

2018-02-09 Thread Fabian Hueske
You can also partition by range and sort and write each partition. Once all
partitions have been written to files, you can concatenate the files.
As Till said it is not possible to sort in parallel and write in order to a
single file.

Best, Fabian

2018-02-09 10:35 GMT+01:00 Till Rohrmann :

> Hi David,
>
> Flink only supports sorting within partitions. Thus, if you want to write
> out a globally sorted dataset you should set the parallelism to 1 which
> effectively results in a single partition. Decreasing the parallelism of
> an operator will cause the individual partitions to lose its sort order
> because the individual partitions are read in a non deterministic order.
>
> Cheers,
> Till
>
>
> On Thu, Feb 8, 2018 at 8:07 PM, david westwood  > wrote:
>
>> Hi:
>>
>> I would like to sort historical data using the dataset api.
>>
>> env.setParallelism(10)
>>
>> val dataset = [(Long, String)] ..
>> .paritionByRange(_._1)
>> .sortPartition(_._1, Order.ASCEDING)
>> .writeAsCsv("mydata.csv").setParallelism(1)
>>
>> the data is out of order (in local order)
>> but
>> .print()
>> prints the data in to correct order. I have run a small toy sample
>> multiple times.
>>
>> Is there a way to sort the entire dataset with parallelism > 1 and write
>> it to a single file in ascending order?
>>
>
>


Re: Batch Cascade application

2018-02-09 Thread Puneet Kinra
Hi Till

I have 2 kind of data

a) read the data from database put into the memory and nosql database so
have 1 source & custom sink operator
Job1 -->Source--->NoSQL Sink-->status

b) once the  data is updated into the memory i need to run the second job
so i am checking the status return by the first job so have another source
where some historical data is coming  so have 1 source & 1 flatmap
and output sink

Job2 -->Source2-->Check in cache  built from first job --> if exist
execute rules ---> dump to file

so if(Job1==true)
{
  execute Job2
}


On Fri, Feb 9, 2018 at 3:23 PM, Till Rohrmann  wrote:

> Hi Puneet,
>
> without more information about the job you're running (ideally code), it's
> hard to help you.
>
> Cheers,
> Till
>
> On Fri, Feb 9, 2018 at 10:43 AM, Puneet Kinra <
> puneet.ki...@customercentria.com> wrote:
>
>> Hi
>>
>> I am working on batch application i which once the data is get loaded
>> into the Memory
>> second job should only run once first job is finished.
>>
>>
>>
>> boolean contactHistoryLoading=bonusPointBatch.contactHistoryLoading(
>> jsonFileReader,cache);
>> if(contactHistoryLoading)
>> {
>> bonusPointBatch.transcationLoading(jsonFileReader, cache);
>> }
>>
>>
>> but in the plan it is show only two operators ideally it show 4 operators
>> 2 from each job.
>>
>>
>> [image: Inline image 1]
>>
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 <+91%2088001%2067808> | Skype :
>> puneet.ki...@customercentria.com *
>>
>> *e-mail :puneet.ki...@customercentria.com
>> *
>>
>>
>>
>


-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: Batch Cascade application

2018-02-09 Thread Till Rohrmann
Hi Puneet,

without more information about the job you're running (ideally code), it's
hard to help you.

Cheers,
Till

On Fri, Feb 9, 2018 at 10:43 AM, Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Hi
>
> I am working on batch application i which once the data is get loaded into
> the Memory
> second job should only run once first job is finished.
>
>
>
> boolean contactHistoryLoading=bonusPointBatch.contactHistoryLoading(
> jsonFileReader,cache);
> if(contactHistoryLoading)
> {
> bonusPointBatch.transcationLoading(jsonFileReader, cache);
> }
>
>
> but in the plan it is show only two operators ideally it show 4 operators
> 2 from each job.
>
>
> [image: Inline image 1]
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 <+91%2088001%2067808> | Skype :
> puneet.ki...@customercentria.com *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>


Batch Cascade application

2018-02-09 Thread Puneet Kinra
Hi

I am working on batch application i which once the data is get loaded into
the Memory
second job should only run once first job is finished.



boolean
contactHistoryLoading=bonusPointBatch.contactHistoryLoading(jsonFileReader,cache);
if(contactHistoryLoading)
{
bonusPointBatch.transcationLoading(jsonFileReader, cache);
}


but in the plan it is show only two operators ideally it show 4 operators
2 from each job.


[image: Inline image 1]

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: dataset sort

2018-02-09 Thread Till Rohrmann
Hi David,

Flink only supports sorting within partitions. Thus, if you want to write
out a globally sorted dataset you should set the parallelism to 1 which
effectively results in a single partition. Decreasing the parallelism of an
operator will cause the individual partitions to lose its sort order
because the individual partitions are read in a non deterministic order.

Cheers,
Till


On Thu, Feb 8, 2018 at 8:07 PM, david westwood 
wrote:

> Hi:
>
> I would like to sort historical data using the dataset api.
>
> env.setParallelism(10)
>
> val dataset = [(Long, String)] ..
> .paritionByRange(_._1)
> .sortPartition(_._1, Order.ASCEDING)
> .writeAsCsv("mydata.csv").setParallelism(1)
>
> the data is out of order (in local order)
> but
> .print()
> prints the data in to correct order. I have run a small toy sample
> multiple times.
>
> Is there a way to sort the entire dataset with parallelism > 1 and write
> it to a single file in ascending order?
>


Re: Developing and running Flink applications in Linux through Windows editors or IDE's ?

2018-02-09 Thread Till Rohrmann
Hi Esa,

welcome to the community :-). For the development of Flink it does not
really matter how you code. In general, contributors pick what suits their
needs best and so should you. Here is a link for general remarks for
setting up IntelliJ and Eclipse [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/ide_setup.html

Cheers,
Till

On Thu, Feb 8, 2018 at 1:49 PM, Esa Heikkinen 
wrote:

> Hello
>
>
>
> I am newbie with Flink.
>
>
>
> I’d want to develop my Flink scala-application in Windows IDE (for
> examples IntelliJ IDEA) and run them in Linux (Ubuntu).
>
> Is that good or bad idea ? Or is it some remote use possible ?
>
>
>
> At this moment there are no graphical interface (GUI) in Linux. Or would
> it be better to be ?
>
>
>
> I have ssh-tunneled the port 8081 to Windows and I can run (or monitor) my
> Flink applications in browser (Firefox) of Windows.
>
>
>
> Or is it better to use a simple editor like nano in Linux and forgot all
> “smart” GUI editors and IDE’s in Windows-side ?
>
>
>
> Do you have recommendations how to do that ?
>
>
>
> ---
>
>
>
>
>
>
>
>
>
>
>


Re: Two issues when deploying Flink on DC/OS

2018-02-09 Thread Till Rohrmann
Hi,

"java.io.IOException: Connection reset by peer" is usually thrown if the
remote peer terminates the connection. So the interesting bit would be
who's requesting static files from Flink. So far we serve the web frontend
and the log and stdout files via the StaticFileServerHandler. Maybe it's
DC/OS itself for some reason? This error should, however, not influence
Flink itself.

Cheers,
Till

On Thu, Feb 8, 2018 at 12:36 PM, Lasse Nedergaard  wrote:

> And We see the same too
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 8. feb. 2018 kl. 11.58 skrev Stavros Kontopoulos <
> st.kontopou...@gmail.com>:
>
> We see the same issue here (2):
> 2018-02-08 10:55:11,447 ERROR 
> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler
>  - Caught exception
> java.io.IOException: Connection reset by peer
>
> Stavros
>
> On Sat, Jan 13, 2018 at 9:59 PM, Eron Wright  wrote:
>
>> Hello Dongwon,
>>
>> Flink doesn't support a 'unique host' constraint at this time; it simply
>> accepts adequate offers without any such consideration.   Flink does
>> support a 'host attributes' constraint to filter certain hosts, but that's
>> not applicable here.
>>
>> Under the hood, Flink uses a library called Netflix Fenzo to optimize
>> placement, and a uniqueness constraint could be added by more deeply
>> leveraging Fenzo's constraint system.   You mentioned that you're trying to
>> make good use of your GPU resources, which could also be achieved by
>> treating GPU as a scalar resource (similar to how memory and cores are
>> treated).   Mesos does support that, but Fenzo may require some
>> enhancement.   So, these are two potential ways to enhance Flink to support
>> your scenario.  I'm happy to help; reach out to me.
>>
>> The obvious, ugly workaround is to configure your TMs to be large enough
>> to consume the whole host.
>>
>> Eron
>>
>>
>>
>>
>>
>> On Thu, Jan 11, 2018 at 7:18 AM, Gary Yao  wrote:
>>
>>> Hi Dongwon,
>>>
>>> I am not familiar with the deployment on DC/OS. However, Eron Wright and
>>> Jörg
>>> Schad (cc'd), who have worked on the Mesos integration, might be able to
>>> help
>>> you.
>>>
>>> Best,
>>> Gary
>>>
>>> On Tue, Jan 9, 2018 at 10:29 AM, Dongwon Kim 
>>> wrote:
>>>
 Hi,

 I've launched JobManager and TaskManager on DC/OS successfully.
 Now I have two new issues:

 1) All TaskManagers are scheduled on a single node.
 - Is it intended to maximize data locality and minimize network
 communication cost?
 - Is there an option in Flink to adjust the behavior of JobManager when
 it considers multiple resource offers from different Mesos agents?
 - I want to schedule TaskManager processes on different GPU servers so
 that each TaskManger process can use its own GPU cards exclusively.
 - Below is a part of JobManager log that is occurring while JobManager
 is negotiating resources with the Mesos master:

 2018-01-09 07:34:54,872 INFO  
 org.apache.flink.mesos.runtime.clusterframework.MesosJobManager  - 
 JobManager akka.tcp://flink@dnn-g08-233:18026/user/jobmanager was granted 
 leadership with leader session ID 
 Some(----).
 2018-01-09 07:34:55,889 INFO  
 org.apache.flink.mesos.scheduler.ConnectionMonitor- Connecting 
 to Mesos...
 2018-01-09 07:34:55,962 INFO  
 org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  
 - Trying to associate with JobManager leader 
 akka.tcp://flink@dnn-g08-233:18026/user/jobmanager
 2018-01-09 07:34:55,977 INFO  
 org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  
 - Resource Manager associating with leading JobManager 
 Actor[akka://flink/user/jobmanager#-1481183359] - leader session 
 ----
 2018-01-09 07:34:56,479 INFO  
 org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  
 - Scheduling Mesos task taskmanager-1 with (10240.0 MB, 8.0 cpus).
 2018-01-09 07:34:56,481 INFO  
 org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  
 - Scheduling Mesos task taskmanager-2 with (10240.0 MB, 8.0 cpus).
 2018-01-09 07:34:56,481 INFO  
 org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  
 - Scheduling Mesos task taskmanager-3 with (10240.0 MB, 8.0 cpus).
 2018-01-09 07:34:56,481 INFO  
 org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  
 - Scheduling Mesos task taskmanager-4 with (10240.0 MB, 8.0 cpus).
 2018-01-09 07:34:56,481 INFO  
 org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  
 - Scheduling Mesos task taskmanager-5 with (10240.0 MB, 8.0 cpus).
 2018-01-09 07:34:56,483 INFO