Re: Create Tuple Array dynamically fro a DS

2019-07-23 Thread Caizhi Weng
Hi Andres,

In that case you should use `flatMap` method instead of `map` method.
`flatMap` method allows you to return multiple elements and collect them
all into one DS. This applies even if you have multiple contents in your
DS.

public static void main(String[] args) throws Exception {
   StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

   DataStream dsString =
env.fromElements("1,a,1.1|2,b,2.2,-2", "3,c|4,d,4.4");
   DataStream dsTuple = dsString.flatMap(new
FlatMapFunction() {
  @Override
  public void flatMap(String value, Collector out) throws
Exception {
 for (String record : value.split("\\|")) {
String[] split = record.split(",");
if (split.length == 2) {
   out.collect(new Tuple2<>(Integer.valueOf(split[0]), split[1]));
} else if (split.length == 3) {
   out.collect(new Tuple3<>(Integer.valueOf(split[0]),
split[1], Double.valueOf(split[2])));
} else {
   out.collect(new Tuple4<>(Integer.valueOf(split[0]),
split[1], Double.valueOf(split[2]), Long.valueOf(split[3])));
}
 }
  }
   });

   dsTuple.print();
   env.execute();
}


Andres Angel  于2019年7月24日周三 上午11:47写道:

> Hello Weng,
>
> This definitely helps a lot,  however I know my initial DS has a single
> row content then I would in theory just create a DS which is what I need.
> That is why I need to know how to create a new environment DS within a map
> function.
>
> thanks so much
>
> On Tue, Jul 23, 2019 at 11:41 PM Caizhi Weng  wrote:
>
>> Hi Andres,
>>
>> Thanks for the detailed explanation.
>>
>> but apparently I can't create a new DS within a map function
>>
>>
>> If you create a new DS within the map function, then you'll create as
>> many DSs as the number of elements in the old DS which... doesn't seem to
>> be your desired situation? I suppose you want to create a DS from
>> DS. If that is the case you can write something like this:
>>
>> public static void main(String[] args) throws Exception {
>>StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>DataStream dsString = env.fromElements("1,a,1.1", "2,b,2.2,-2", 
>> "3,c", "4,d,4.4");
>>DataStream dsTuple = dsString.map(s -> {
>>   String[] split = s.split(",");
>>   if (split.length == 2) {
>>  return new Tuple2<>(Integer.valueOf(split[0]), split[1]);
>>   } else if (split.length == 3) {
>>  return new Tuple3<>(Integer.valueOf(split[0]), split[1], 
>> Double.valueOf(split[2]));
>>   } else {
>>  return new Tuple4<>(Integer.valueOf(split[0]), split[1], 
>> Double.valueOf(split[2]), Long.valueOf(split[3]));
>>   }
>>});
>>
>>dsTuple.print();
>>env.execute();
>> }
>>
>>
>> How dynamically create the DS
>>
>>
>> As you can see in the above code, I did not create a DS but a
>> DS, because Tuple can't be directly used. It seems that you want to
>> turn this new DS into a table, but if different records have different
>> number of columns this is not a good practice as the schema of each record
>> is not the same (but as a workaround, you can fill the columns with null if
>> some record doesn't have this column).
>>
>> Hope this solves your problem. If you have any other problems feel free
>> to write back.
>>
>> Andres Angel  于2019年7月24日周三 上午10:50写道:
>>
>>> Hello,
>>>
>>> Let me list properly the questions I have:
>>>
>>> * How to catch into a string the content of a DataStream? about this
>>> point basically I have a DS , the only way how I can use the
>>> content is within a map function , print , store the content somewhere or
>>> SQL queries. The point is that I need the content because depending on that
>>> content I need to create another DS and later register it as a Table
>>> environment, which means I need the value content but likewise the headers
>>> content and the whole info is within the DS. The first option I had
>>> was play with the map function but apparently I can't create a new DS
>>> within a map function and less register it as a new table environment.
>>>
>>> My second option in this point could be create a sort of public variable
>>> to store the DS content and then create my UDF, but sadly this is
>>> neither allowed. My options in this case would be either somehow store
>>> public the content of the DS into a new variable, turn the
>>> DS as String or store the content in a file and read the file and
>>> start over to parse the content to serve the header and content for the new
>>> DS.
>>>
>>> * How dynamically create the DS: well basically after parse the
>>> point above I might end up with an array of fields sometimes 4,3,2 doesnt
>>> matter then I might need to swing between different tuples or turn my
>>> content into Row to create a DS.
>>>
>>> I'm looking forward to reading your comments.
>>>
>>> thanks so much
>>>
>>> On Tue, Jul 23, 2019 at 10:34 PM Caizhi Weng 
>>> wrote:
>>>
 Hi 

Re: Memory constrains running Flink on Kubernetes

2019-07-23 Thread Xintong Song
Hi,

Flink acquires these 'Status_JVM_Memory' metrics through the MXBean
library. According to MXBean document, non-heap is "the Java virtual
machine manages memory other than the heap (referred as non-heap memory)".
Not sure whether that is equivalent to the metaspace. If the
'-XX:MaxMetaspaceSize', it should trigger metaspcae clean up when the limit
is reached.

As for RocksDB, it mainly uses non-java memory. Heap, non-heap and direct
memory could be considered as java memory (or at least allocated through
the java process). That means, RocksDB is actually using the memory that is
accounted in the total K8s container memory but not accounted in neither of
java heap / non-heap / direct memory, which in your case the 1GB
unaccounted. To leave more memory for RocksDB, you need to either configure
more memory for the K8s containers, or configure less java memory through
the config option 'taskmanager.heap.size'.

The config option 'taskmanager.heap.size', despite the 'heap' in its key,
also accounts for network memory (which uses direct buffers). Currently,
memory configurations in Flink is quite complicated and confusing. The
community is aware of this, and is planing for an overall improvement.

To my understanding, once you set '-XX:MaxMetaspaceSize', there should be
limits on heap, non-heap and direct memory in JVM. You should be able to
find which part that requires memory more than the limit from the java OOM
error message. If there is no java OOM but a K8s container OOM, then it
should be non-java memory used by RocksDB.

[1]
https://docs.oracle.com/javase/8/docs/api/java/lang/management/MemoryMXBean.html

Thank you~

Xintong Song



On Tue, Jul 23, 2019 at 8:42 PM wvl  wrote:

> Hi,
>
> We're running a relatively simply Flink application that uses a bunch of
> state in RocksDB on Kubernetes.
> During the course of development and going to production, we found that we
> were often running into memory issues made apparent by Kubernetes OOMKilled
> and Java OOM log events.
>
> In order to tackle these, we're trying to account for all the memory used
> in the container, to allow proper tuning.
> Metric-wise we have:
> - container_memory_working_set_bytes = 6,5GB
> - flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
> - flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
> - flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB
>
> This is my understanding based on all the documentation and observations:
> container_memory_working_set_bytes will be the total amount of memory in
> use, disregarding OS page & block cache.
> Heap will be heap.
> NonHeap is mostly the metaspace.
> Direct_Memory is mostly network buffers.
>
> Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to
> RocksDB. According to the docs RocksDB has a "Column Family Write Buffer"
> where "You need to budget for 2 x your worst case memory use".
> We have 17 ValueStateDescriptors (ignoring state for windows) which I'm
> assuming corresponds to a "Column Family" in RockDB. Meaning our budget
> should be around 2GB.
> Is this accounted for in one of the flink_taskmanager metrics above? We've
> also enabled various rocksdb metrics, but it's unclear where this Write
> Buffer memory would be represented.
>
> Finally, we've seen that when our job has issues and is restarted rapidly,
> NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are
> killed. We're assuming this is due
> to no form of cleanup in the metaspace as classes get (re)loaded.
>
> These are our taskmanager JVM settings: -XX:+UseG1GC
> -XX:MaxDirectMemorySize=1G -XX:+UnlockExperimentalVMOptions
> -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2
> With flink config:
>   taskmanager.heap.size: 5000m
>   state.backend: rocksdb
>   state.backend.incremental: true
>   state.backend.rocksdb.timer-service.factory: ROCKSDB
>
> Based on what we've observed we're thinking about setting
> -XX:MaxMetaspaceSize to a reasonable value, so that we at least get an
> error message which can easily be traced back to the behavior we're seeing.
>
> Okay, all that said let's sum up what we're asking here:
> - Is there any more insight into how memory is accounted for than our
> current metrics?
> - Which metric, if any accounts for RocksDB memory usage?
> - What's going on with the Metaspace growth we're seeing during job
> restarts, is there something we can do about this such as setting
> -XX:MaxMetaspaceSize?
> - Any other tips to improve reliability running in resource constrained
> environments such as Kubernetes?
>
> Thanks,
>
> William
>
>


Re: Create Tuple Array dynamically fro a DS

2019-07-23 Thread Andres Angel
Hello Weng,

This definitely helps a lot,  however I know my initial DS has a single row
content then I would in theory just create a DS which is what I need. That
is why I need to know how to create a new environment DS within a map
function.

thanks so much

On Tue, Jul 23, 2019 at 11:41 PM Caizhi Weng  wrote:

> Hi Andres,
>
> Thanks for the detailed explanation.
>
> but apparently I can't create a new DS within a map function
>
>
> If you create a new DS within the map function, then you'll create as many
> DSs as the number of elements in the old DS which... doesn't seem to be
> your desired situation? I suppose you want to create a DS from
> DS. If that is the case you can write something like this:
>
> public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>DataStream dsString = env.fromElements("1,a,1.1", "2,b,2.2,-2", 
> "3,c", "4,d,4.4");
>DataStream dsTuple = dsString.map(s -> {
>   String[] split = s.split(",");
>   if (split.length == 2) {
>  return new Tuple2<>(Integer.valueOf(split[0]), split[1]);
>   } else if (split.length == 3) {
>  return new Tuple3<>(Integer.valueOf(split[0]), split[1], 
> Double.valueOf(split[2]));
>   } else {
>  return new Tuple4<>(Integer.valueOf(split[0]), split[1], 
> Double.valueOf(split[2]), Long.valueOf(split[3]));
>   }
>});
>
>dsTuple.print();
>env.execute();
> }
>
>
> How dynamically create the DS
>
>
> As you can see in the above code, I did not create a DS but a
> DS, because Tuple can't be directly used. It seems that you want to
> turn this new DS into a table, but if different records have different
> number of columns this is not a good practice as the schema of each record
> is not the same (but as a workaround, you can fill the columns with null if
> some record doesn't have this column).
>
> Hope this solves your problem. If you have any other problems feel free to
> write back.
>
> Andres Angel  于2019年7月24日周三 上午10:50写道:
>
>> Hello,
>>
>> Let me list properly the questions I have:
>>
>> * How to catch into a string the content of a DataStream? about this
>> point basically I have a DS , the only way how I can use the
>> content is within a map function , print , store the content somewhere or
>> SQL queries. The point is that I need the content because depending on that
>> content I need to create another DS and later register it as a Table
>> environment, which means I need the value content but likewise the headers
>> content and the whole info is within the DS. The first option I had
>> was play with the map function but apparently I can't create a new DS
>> within a map function and less register it as a new table environment.
>>
>> My second option in this point could be create a sort of public variable
>> to store the DS content and then create my UDF, but sadly this is
>> neither allowed. My options in this case would be either somehow store
>> public the content of the DS into a new variable, turn the
>> DS as String or store the content in a file and read the file and
>> start over to parse the content to serve the header and content for the new
>> DS.
>>
>> * How dynamically create the DS: well basically after parse the
>> point above I might end up with an array of fields sometimes 4,3,2 doesnt
>> matter then I might need to swing between different tuples or turn my
>> content into Row to create a DS.
>>
>> I'm looking forward to reading your comments.
>>
>> thanks so much
>>
>> On Tue, Jul 23, 2019 at 10:34 PM Caizhi Weng 
>> wrote:
>>
>>> Hi Andres,
>>>
>>> Sorry I can't quite get your question... Do you mean that how to spilt
>>> the string into fields?
>>>
>>> There is a `split` method in java. You can give it a regexp and it will
>>> return an array containing all the split fields.
>>>
>>> Andres Angel  于2019年7月24日周三 上午10:28写道:
>>>
 Hello Weng,

 thanks for your reply, however I'm struggling to somehow read the
 content of my DS with the payload that defines how many fields the message
 contains into a String. That is the reason why I thought into a map
 function for that DS.

 The Tuple part can change overtime can even pass from 3 or 4 to 2 then
 it can change the whole time. How could I approach this challenge?

 thanks so much

 On Tue, Jul 23, 2019 at 10:23 PM Caizhi Weng 
 wrote:

> Hi Andres,
>
> Are the payloads strings? If yes, one method is that you can store
> them as strings and process it further with user defined functions when 
> you
> need to use them.
>
> Another method is that you can store them into arrays.
>
> Also, if the type of the first 3 fields are the same for the first and
> second payload, you can use a Tuple4<> and set the last element as null 
> for
> the first payload.
>
> Andres Angel  于2019年7月24日周三 上午10:09写道:
>
>> 

Re: Create Tuple Array dynamically fro a DS

2019-07-23 Thread Caizhi Weng
Hi Andres,

Thanks for the detailed explanation.

but apparently I can't create a new DS within a map function


If you create a new DS within the map function, then you'll create as many
DSs as the number of elements in the old DS which... doesn't seem to be
your desired situation? I suppose you want to create a DS from
DS. If that is the case you can write something like this:

public static void main(String[] args) throws Exception {
   StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

   DataStream dsString = env.fromElements("1,a,1.1",
"2,b,2.2,-2", "3,c", "4,d,4.4");
   DataStream dsTuple = dsString.map(s -> {
  String[] split = s.split(",");
  if (split.length == 2) {
 return new Tuple2<>(Integer.valueOf(split[0]), split[1]);
  } else if (split.length == 3) {
 return new Tuple3<>(Integer.valueOf(split[0]), split[1],
Double.valueOf(split[2]));
  } else {
 return new Tuple4<>(Integer.valueOf(split[0]), split[1],
Double.valueOf(split[2]), Long.valueOf(split[3]));
  }
   });

   dsTuple.print();
   env.execute();
}


How dynamically create the DS


As you can see in the above code, I did not create a DS but a
DS, because Tuple can't be directly used. It seems that you want to
turn this new DS into a table, but if different records have different
number of columns this is not a good practice as the schema of each record
is not the same (but as a workaround, you can fill the columns with null if
some record doesn't have this column).

Hope this solves your problem. If you have any other problems feel free to
write back.

Andres Angel  于2019年7月24日周三 上午10:50写道:

> Hello,
>
> Let me list properly the questions I have:
>
> * How to catch into a string the content of a DataStream? about this point
> basically I have a DS , the only way how I can use the content is
> within a map function , print , store the content somewhere or SQL queries.
> The point is that I need the content because depending on that content I
> need to create another DS and later register it as a Table environment,
> which means I need the value content but likewise the headers content and
> the whole info is within the DS. The first option I had was play
> with the map function but apparently I can't create a new DS within a map
> function and less register it as a new table environment.
>
> My second option in this point could be create a sort of public variable
> to store the DS content and then create my UDF, but sadly this is
> neither allowed. My options in this case would be either somehow store
> public the content of the DS into a new variable, turn the
> DS as String or store the content in a file and read the file and
> start over to parse the content to serve the header and content for the new
> DS.
>
> * How dynamically create the DS: well basically after parse the
> point above I might end up with an array of fields sometimes 4,3,2 doesnt
> matter then I might need to swing between different tuples or turn my
> content into Row to create a DS.
>
> I'm looking forward to reading your comments.
>
> thanks so much
>
> On Tue, Jul 23, 2019 at 10:34 PM Caizhi Weng  wrote:
>
>> Hi Andres,
>>
>> Sorry I can't quite get your question... Do you mean that how to spilt
>> the string into fields?
>>
>> There is a `split` method in java. You can give it a regexp and it will
>> return an array containing all the split fields.
>>
>> Andres Angel  于2019年7月24日周三 上午10:28写道:
>>
>>> Hello Weng,
>>>
>>> thanks for your reply, however I'm struggling to somehow read the
>>> content of my DS with the payload that defines how many fields the message
>>> contains into a String. That is the reason why I thought into a map
>>> function for that DS.
>>>
>>> The Tuple part can change overtime can even pass from 3 or 4 to 2 then
>>> it can change the whole time. How could I approach this challenge?
>>>
>>> thanks so much
>>>
>>> On Tue, Jul 23, 2019 at 10:23 PM Caizhi Weng 
>>> wrote:
>>>
 Hi Andres,

 Are the payloads strings? If yes, one method is that you can store them
 as strings and process it further with user defined functions when you need
 to use them.

 Another method is that you can store them into arrays.

 Also, if the type of the first 3 fields are the same for the first and
 second payload, you can use a Tuple4<> and set the last element as null for
 the first payload.

 Andres Angel  于2019年7月24日周三 上午10:09写道:

> Hello everyone,
>
> I need to create dynamically the size of my Tuple that feeds a DS, let
> me explain it better. Let's assume the first payload I read has this 
> format
> "filed1,field2,field3", then this might require a Tuple3<> but my payload
> later can be "field1,field2,field3,field4" then my Tuple might need to be
> refine it on the flight and now be Tuple4<>.
>
> How could I create this dynamically, any idea?
>
> Thanks so much
>



subscribe

2019-07-23 Thread maybe love



Re: Create Tuple Array dynamically fro a DS

2019-07-23 Thread Andres Angel
Hello,

Let me list properly the questions I have:

* How to catch into a string the content of a DataStream? about this point
basically I have a DS , the only way how I can use the content is
within a map function , print , store the content somewhere or SQL queries.
The point is that I need the content because depending on that content I
need to create another DS and later register it as a Table environment,
which means I need the value content but likewise the headers content and
the whole info is within the DS. The first option I had was play
with the map function but apparently I can't create a new DS within a map
function and less register it as a new table environment.

My second option in this point could be create a sort of public variable to
store the DS content and then create my UDF, but sadly this is
neither allowed. My options in this case would be either somehow store
public the content of the DS into a new variable, turn the
DS as String or store the content in a file and read the file and
start over to parse the content to serve the header and content for the new
DS.

* How dynamically create the DS: well basically after parse the
point above I might end up with an array of fields sometimes 4,3,2 doesnt
matter then I might need to swing between different tuples or turn my
content into Row to create a DS.

I'm looking forward to reading your comments.

thanks so much

On Tue, Jul 23, 2019 at 10:34 PM Caizhi Weng  wrote:

> Hi Andres,
>
> Sorry I can't quite get your question... Do you mean that how to spilt the
> string into fields?
>
> There is a `split` method in java. You can give it a regexp and it will
> return an array containing all the split fields.
>
> Andres Angel  于2019年7月24日周三 上午10:28写道:
>
>> Hello Weng,
>>
>> thanks for your reply, however I'm struggling to somehow read the content
>> of my DS with the payload that defines how many fields the message contains
>> into a String. That is the reason why I thought into a map function for
>> that DS.
>>
>> The Tuple part can change overtime can even pass from 3 or 4 to 2 then it
>> can change the whole time. How could I approach this challenge?
>>
>> thanks so much
>>
>> On Tue, Jul 23, 2019 at 10:23 PM Caizhi Weng 
>> wrote:
>>
>>> Hi Andres,
>>>
>>> Are the payloads strings? If yes, one method is that you can store them
>>> as strings and process it further with user defined functions when you need
>>> to use them.
>>>
>>> Another method is that you can store them into arrays.
>>>
>>> Also, if the type of the first 3 fields are the same for the first and
>>> second payload, you can use a Tuple4<> and set the last element as null for
>>> the first payload.
>>>
>>> Andres Angel  于2019年7月24日周三 上午10:09写道:
>>>
 Hello everyone,

 I need to create dynamically the size of my Tuple that feeds a DS, let
 me explain it better. Let's assume the first payload I read has this format
 "filed1,field2,field3", then this might require a Tuple3<> but my payload
 later can be "field1,field2,field3,field4" then my Tuple might need to be
 refine it on the flight and now be Tuple4<>.

 How could I create this dynamically, any idea?

 Thanks so much

>>>


Re: Create Tuple Array dynamically fro a DS

2019-07-23 Thread Caizhi Weng
Hi Andres,

Sorry I can't quite get your question... Do you mean that how to spilt the
string into fields?

There is a `split` method in java. You can give it a regexp and it will
return an array containing all the split fields.

Andres Angel  于2019年7月24日周三 上午10:28写道:

> Hello Weng,
>
> thanks for your reply, however I'm struggling to somehow read the content
> of my DS with the payload that defines how many fields the message contains
> into a String. That is the reason why I thought into a map function for
> that DS.
>
> The Tuple part can change overtime can even pass from 3 or 4 to 2 then it
> can change the whole time. How could I approach this challenge?
>
> thanks so much
>
> On Tue, Jul 23, 2019 at 10:23 PM Caizhi Weng  wrote:
>
>> Hi Andres,
>>
>> Are the payloads strings? If yes, one method is that you can store them
>> as strings and process it further with user defined functions when you need
>> to use them.
>>
>> Another method is that you can store them into arrays.
>>
>> Also, if the type of the first 3 fields are the same for the first and
>> second payload, you can use a Tuple4<> and set the last element as null for
>> the first payload.
>>
>> Andres Angel  于2019年7月24日周三 上午10:09写道:
>>
>>> Hello everyone,
>>>
>>> I need to create dynamically the size of my Tuple that feeds a DS, let
>>> me explain it better. Let's assume the first payload I read has this format
>>> "filed1,field2,field3", then this might require a Tuple3<> but my payload
>>> later can be "field1,field2,field3,field4" then my Tuple might need to be
>>> refine it on the flight and now be Tuple4<>.
>>>
>>> How could I create this dynamically, any idea?
>>>
>>> Thanks so much
>>>
>>


Re: Create Tuple Array dynamically fro a DS

2019-07-23 Thread Andres Angel
Hello Weng,

thanks for your reply, however I'm struggling to somehow read the content
of my DS with the payload that defines how many fields the message contains
into a String. That is the reason why I thought into a map function for
that DS.

The Tuple part can change overtime can even pass from 3 or 4 to 2 then it
can change the whole time. How could I approach this challenge?

thanks so much

On Tue, Jul 23, 2019 at 10:23 PM Caizhi Weng  wrote:

> Hi Andres,
>
> Are the payloads strings? If yes, one method is that you can store them as
> strings and process it further with user defined functions when you need to
> use them.
>
> Another method is that you can store them into arrays.
>
> Also, if the type of the first 3 fields are the same for the first and
> second payload, you can use a Tuple4<> and set the last element as null for
> the first payload.
>
> Andres Angel  于2019年7月24日周三 上午10:09写道:
>
>> Hello everyone,
>>
>> I need to create dynamically the size of my Tuple that feeds a DS, let me
>> explain it better. Let's assume the first payload I read has this format
>> "filed1,field2,field3", then this might require a Tuple3<> but my payload
>> later can be "field1,field2,field3,field4" then my Tuple might need to be
>> refine it on the flight and now be Tuple4<>.
>>
>> How could I create this dynamically, any idea?
>>
>> Thanks so much
>>
>


Re: Create Tuple Array dynamically fro a DS

2019-07-23 Thread Caizhi Weng
Hi Andres,

Are the payloads strings? If yes, one method is that you can store them as
strings and process it further with user defined functions when you need to
use them.

Another method is that you can store them into arrays.

Also, if the type of the first 3 fields are the same for the first and
second payload, you can use a Tuple4<> and set the last element as null for
the first payload.

Andres Angel  于2019年7月24日周三 上午10:09写道:

> Hello everyone,
>
> I need to create dynamically the size of my Tuple that feeds a DS, let me
> explain it better. Let's assume the first payload I read has this format
> "filed1,field2,field3", then this might require a Tuple3<> but my payload
> later can be "field1,field2,field3,field4" then my Tuple might need to be
> refine it on the flight and now be Tuple4<>.
>
> How could I create this dynamically, any idea?
>
> Thanks so much
>


Re: add laplace to k means

2019-07-23 Thread Yun Gao
Hi alaa, 

In the KMeans example, in each iteration the new centers is computed in a 
map-reduce pattern. Each task maintains a part of points and it first choose 
the new center for each point, and then the new center of the sum(point) and 
num(point) is computed in the CentroidAccumulator, and the new point is then 
computed in CentroidAverager by sum(point) / num(point).  Therefore, I think 
you may change the implementation of  CentroidAverager to add the noise.

Best, 
Yun



--
From:alaa 
Send Time:2019 Jul. 23 (Tue.) 21:06
To:user 
Subject:add laplace to k means

Hallo 

I have used this k means code on Flink 

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

and I would to add noise that follows Laplace distribution to the sum of
data item and to the number to data item when calculate a new cluster center
in each iteration .

for j=1 ---> p do 
u' = (sum +Lap(ε))/(num+Laplace(ε))

I have already write Laplace function , but i don't Know how to add it in k
means code and in which line i should write it .

Thank you




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: GroupBy result delay

2019-07-23 Thread Hequn Cheng
Hi Fanbin,

Fabian is right, it should be a watermark problem. Probably, some tasks of
the source don't have enough data to advance the watermark. Furthermore,
you could also monitor event time through Flink web interface.
I have answered a similar question on stackoverflow, see more details
here[1].

Best, Hequn

[1]
https://stackoverflow.com/questions/51691269/event-time-window-in-flink-does-not-trigger

On Wed, Jul 24, 2019 at 4:38 AM Fanbin Bu  wrote:

> If I use proctime, the groupBy happens without any delay.
>
> On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu  wrote:
>
>> not sure whether this is related:
>>
>> public SingleOutputStreamOperator assignTimestampsAndWatermarks(
>>   AssignerWithPeriodicWatermarks timestampAndWatermarkAssigner) {
>>
>>// match parallelism to input, otherwise dop=1 sources could lead to some 
>> strange
>>// behaviour: the watermark will creep along very slowly because the 
>> elements
>>// from the source go to each extraction operator round robin.
>>final int inputParallelism = getTransformation().getParallelism();
>>final AssignerWithPeriodicWatermarks cleanedAssigner = 
>> clean(timestampAndWatermarkAssigner);
>>
>>TimestampsAndPeriodicWatermarksOperator operator =
>>  new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
>>
>>return transform("Timestamps/Watermarks", 
>> getTransformation().getOutputType(), operator)
>>  .setParallelism(inputParallelism);
>> }
>>
>> parallelism is set to 32
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> env.setParallelism(32)
>>
>> and the command for launching the job is
>>
>> flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS
>>
>>
>>
>>
>> On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu  wrote:
>>
>>> Thanks Fabian for the prompt reply. I just started using Flink and this
>>> is a great community.
>>> The watermark setting is only accounting for 10 sec delay. Besides that,
>>> the local job on IntelliJ is running fine without issues.
>>>
>>> Here is the code:
>>>
>>> class EventTimestampExtractor(slack: Long = 0L) extends 
>>> AssignerWithPeriodicWatermarks[T] {
>>>
>>>   var currentMaxTimestamp: Long = _
>>>
>>>   override def extractTimestamp(e: T, prevElementTimestamp: Long) = {
>>> val elemTs = e.created_at
>>> currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp)
>>> elemTs
>>>   }
>>>
>>>   override def getCurrentWatermark(): Watermark = {
>>>   new Watermark(currentMaxTimestamp)
>>>   }
>>> }
>>>
>>> events.assignTimestampsAndWatermarks(new EventTimestampExtractor(1))
>>>
>>> Are there any other things I should be aware of?
>>>
>>> Thanks again for you kind help!
>>>
>>> Fanbin
>>>
>>>
>>> On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske  wrote:
>>>
 Hi Fanbin,

 The delay is most likely caused by the watermark delay.
 A window is computed when the watermark passes the end of the window.
 If you configured the watermark to be 10 minutes before the current max
 timestamp (probably to account for out of order data), then the window will
 be computed with approx. 10 minute delay.

 Best, Fabian

 Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <
 fanbin...@coinbase.com>:

> Hi,
> I have a Flink sql streaming job defined by:
>
> SELECT
>   user_id
>   , hop_end(created_at, interval '30' second, interval '1' minute) as 
> bucket_ts
>   , count(name) as count
> FROM event
> WHERE name = 'signin'
> GROUP BY
>   user_id
>   , hop(created_at, interval '30' second, interval '1' minute)
>
>
> there is a noticeably delay of the groupBy operator. For example, I
> only see the record sent out 10 min later after the record received in. 
> see
> the attached pic.
>
> [image: image.png]
>
> I m expecting to see the group by result after one minute since the
> sliding window size is 1 min and the slide is 30 sec.
>
> There is no such issue if I run the job locally in IntelliJ. However,
> I ran into the above issue if I run the job on EMR (flink version = 1.7)
>
> Can anybody give a clue of what could be wrong?
> Thanks,
>
> Fanbin
>



Create within a map function of a DS a new register DS

2019-07-23 Thread Andres Angel
Hello everyone,

I need to read an element from my DS and according to the content create on
the flight a new DS and register it as new EnvironmentTable.

I'm using the map function for my input DS, however when I try to use the
variable env(environment, in my case StreamExecutionEnvironment ) I can't
access apparently to the global job environment (I declare my env as final
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);).

Apparently something exactly is going on with the TENV (TableEnvironment)
when I register within the map function of the DS the new table environment.

Please correct me if I'm wrong, but is really possible what I'm trying to
do? If not how could I then Parse a DS content as String to create a new
separate method?

thanks so much


Re: Transform from Table to DS

2019-07-23 Thread Andres Angel
This has been fixed now, something weird is that according to the
documentation , I might include around 4 maven packages to properly work
along with the TABLE/SQL API
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/ .

However , I solved my issue working without :


















I just leave:


org.apache.flink
flink-table_2.11
1.6.1


Thanks so much

On Tue, Jul 23, 2019 at 9:54 PM Caizhi Weng  wrote:

> Hi Andres,
>
> Can you print your entire code (including the import section) in this
> post? It might be that this Exception has something to do with your import.
> If you are coding in a Java environment then you should import
> StreamTableEnvironment.java not StreamTableEnvironment.scala.
>
> Andres Angel  于2019年7月24日周三 上午12:01写道:
>
>> Hello guys I'm working on Java environment and I have a sample code as:
>>
>> Table schemafit = tenv.sqlQuery("Here is my query");
>>
>> I need to turn this into a DS to print and any other transformation then
>> I doing a sort of:
>>
>> DataStream resultSet = tenv.toAppendStream(schemafit, Row.class);
>>
>> resultSet.print();
>>
>> However, (please any help) I'm getting the error:
>>
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/flink/api/scala/typeutils/CaseClassTypeInfo
>> at
>> org.apache.flink.table.codegen.CodeGenUtils$.fieldAccessorFor(CodeGenUtils.scala:246)
>> at
>> org.apache.flink.table.codegen.CodeGenerator.generateFieldAccess(CodeGenerator.scala:1217)
>> at
>> org.apache.flink.table.codegen.CodeGenerator.generateInputAccess(CodeGenerator.scala:1165)
>> at
>> org.apache.flink.table.codegen.CodeGenerator$$anonfun$3.apply(CodeGenerator.scala:286)
>> at
>> org.apache.flink.table.codegen.CodeGenerator$$anonfun$3.apply(CodeGenerator.scala:269)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234)
>> at
>> org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:269)
>> at
>> org.apache.flink.table.plan.nodes.datastream.StreamScan$class.generateConversionProcessFunction(StreamScan.scala:111)
>> at
>> org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:74)
>> at
>> org.apache.flink.table.plan.nodes.datastream.DataStreamScan.convertToInternalRow(DataStreamScan.scala:39)
>> at
>> org.apache.flink.table.plan.nodes.datastream.DataStreamScan.translateToPlan(DataStreamScan.scala:82)
>> at
>> org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:103)
>> at
>> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:969)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:896)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:866)
>> at
>> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:202)
>> at
>> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:156)
>> at consumer.trconsumer.main(trconsumer.java:116)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> ... 24 more
>>
>


Create Tuple Array dynamically fro a DS

2019-07-23 Thread Andres Angel
Hello everyone,

I need to create dynamically the size of my Tuple that feeds a DS, let me
explain it better. Let's assume the first payload I read has this format
"filed1,field2,field3", then this might require a Tuple3<> but my payload
later can be "field1,field2,field3,field4" then my Tuple might need to be
refine it on the flight and now be Tuple4<>.

How could I create this dynamically, any idea?

Thanks so much


Re: Transform from Table to DS

2019-07-23 Thread Caizhi Weng
Hi Andres,

Can you print your entire code (including the import section) in this post?
It might be that this Exception has something to do with your import. If
you are coding in a Java environment then you should import
StreamTableEnvironment.java not StreamTableEnvironment.scala.

Andres Angel  于2019年7月24日周三 上午12:01写道:

> Hello guys I'm working on Java environment and I have a sample code as:
>
> Table schemafit = tenv.sqlQuery("Here is my query");
>
> I need to turn this into a DS to print and any other transformation then I
> doing a sort of:
>
> DataStream resultSet = tenv.toAppendStream(schemafit, Row.class);
>
> resultSet.print();
>
> However, (please any help) I'm getting the error:
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/flink/api/scala/typeutils/CaseClassTypeInfo
> at
> org.apache.flink.table.codegen.CodeGenUtils$.fieldAccessorFor(CodeGenUtils.scala:246)
> at
> org.apache.flink.table.codegen.CodeGenerator.generateFieldAccess(CodeGenerator.scala:1217)
> at
> org.apache.flink.table.codegen.CodeGenerator.generateInputAccess(CodeGenerator.scala:1165)
> at
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$3.apply(CodeGenerator.scala:286)
> at
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$3.apply(CodeGenerator.scala:269)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234)
> at
> org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:269)
> at
> org.apache.flink.table.plan.nodes.datastream.StreamScan$class.generateConversionProcessFunction(StreamScan.scala:111)
> at
> org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:74)
> at
> org.apache.flink.table.plan.nodes.datastream.DataStreamScan.convertToInternalRow(DataStreamScan.scala:39)
> at
> org.apache.flink.table.plan.nodes.datastream.DataStreamScan.translateToPlan(DataStreamScan.scala:82)
> at
> org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:103)
> at
> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
> at
> org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:969)
> at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:896)
> at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:866)
> at
> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:202)
> at
> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:156)
> at consumer.trconsumer.main(trconsumer.java:116)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 24 more
>


Re: Flink 1.8 run参数不一样

2019-07-23 Thread Zili Chen
你好,可以查看下 log/ 目录下的相关日志有没有这样一段

2019-07-24 09:34:36,507 WARN  org.apache.flink.client.cli.CliFrontend
- Could not load CLI class
org.apache.flink.yarn.cli.FlinkYarnSessionCli.

java.lang.NoClassDefFoundError:
org/apache/hadoop/yarn/exceptions/YarnException

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:264)

at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1187)

at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1147)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1072)

Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.exceptions.YarnException

at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 5 more


如果有的话,现在 Flink 把核心包和 hadoop 的 pre-bundled 包分开 release,需要你单独下载 pre-bundled 的
hadoop 然后放到 lib/ 文件夹里。


具体地,请仔细阅读下载页面[1] "Apache Flink 1.8.1" 字样上面的文字内容。

Best,
tison.

[1] https://flink.apache.org/downloads.html


王佩  于2019年7月24日周三 上午9:30写道:

> 之前下载的Flink 1.8,运行bin/flink run --help,会有 yarn-cluster 的一些参数,如下:
> Options for yarn-cluster mode:
>  -d,--detachedIf present, runs the job in
> detached
>   mode
>  -m,--jobmanager Address of the JobManager
> (master) to
>   which to connect. Use this flag
> to
>   connect to a different JobManager
> than
>   the one specified in the
>   configuration.
>  -sae,--shutdownOnAttachedExitIf the job is submitted in
> attached
>   mode, perform a best-effort
> cluster
>   shutdown when the CLI is
> terminated
>   abruptly, e.g., in response to a
> user
>   interrupt, such as typing Ctrl +
> C.
>  -yD  use value for given property
>  -yd,--yarndetached   If present, runs the job in
> detached
>   mode (deprecated; use non-YARN
>   specific option instead)
>  -yh,--yarnhelp   Help for the Yarn session CLI.
>  -yid,--yarnapplicationIdAttach to running YARN session
>  -yj,--yarnjar   Path to Flink jar file
>  -yjm,--yarnjobManagerMemory Memory for JobManager Container
> with
>   optional unit (default: MB)
>  -yn,--yarncontainer Number of YARN container to
> allocate
>   (=Number of Task Managers)
>  -ynl,--yarnnodeLabelSpecify YARN node label for the
> YARN
>   application
>  -ynm,--yarnname Set a custom name for the
> application
>   on YARN
>  -yq,--yarnquery  Display available YARN resources
>   (memory, cores)
>  -yqu,--yarnqueueSpecify YARN queue.
>  -ys,--yarnslots Number of slots per TaskManager
>  -yst,--yarnstreaming Start Flink in streaming mode
>  -yt,--yarnship  Ship files in the specified
> directory
>   (t for transfer)
>  -ytm,--yarntaskManagerMemoryMemory per TaskManager Container
> with
>   optional unit (default: MB)
>  -yz,--yarnzookeeperNamespaceNamespace to create the Zookeeper
>   sub-paths for high availability
> mode
>  -z,--zookeeperNamespace Namespace to create the Zookeeper
>   sub-paths for high availability
> mode
>
>
> 现在下载的Flink 1.8,运行bin/flink run --help,总共只有如下参数,少了yarn-cluster选项:
> Action "run" compiles and runs a program.
>
>   Syntax: run [OPTIONS]  
>   "run" action options:
>  -c,--classClass with the program entry
> point
>   ("main" method or "getPlan()"
> method.
>   Only needed if the JAR file does
> not
>   specify the class in its
> manifest.
>  -C,--classpath  Adds a URL to each user code
>   classloader  on all nodes in the
>   cluster. The paths must specify a
>   protocol (e.g. file://) 

Flink 1.8 run参数不一样

2019-07-23 Thread 王佩
之前下载的Flink 1.8,运行bin/flink run --help,会有 yarn-cluster 的一些参数,如下:
Options for yarn-cluster mode:
 -d,--detachedIf present, runs the job in
detached
  mode
 -m,--jobmanager Address of the JobManager
(master) to
  which to connect. Use this flag to
  connect to a different JobManager
than
  the one specified in the
  configuration.
 -sae,--shutdownOnAttachedExitIf the job is submitted in
attached
  mode, perform a best-effort
cluster
  shutdown when the CLI is
terminated
  abruptly, e.g., in response to a
user
  interrupt, such as typing Ctrl +
C.
 -yD  use value for given property
 -yd,--yarndetached   If present, runs the job in
detached
  mode (deprecated; use non-YARN
  specific option instead)
 -yh,--yarnhelp   Help for the Yarn session CLI.
 -yid,--yarnapplicationIdAttach to running YARN session
 -yj,--yarnjar   Path to Flink jar file
 -yjm,--yarnjobManagerMemory Memory for JobManager Container
with
  optional unit (default: MB)
 -yn,--yarncontainer Number of YARN container to
allocate
  (=Number of Task Managers)
 -ynl,--yarnnodeLabelSpecify YARN node label for the
YARN
  application
 -ynm,--yarnname Set a custom name for the
application
  on YARN
 -yq,--yarnquery  Display available YARN resources
  (memory, cores)
 -yqu,--yarnqueueSpecify YARN queue.
 -ys,--yarnslots Number of slots per TaskManager
 -yst,--yarnstreaming Start Flink in streaming mode
 -yt,--yarnship  Ship files in the specified
directory
  (t for transfer)
 -ytm,--yarntaskManagerMemoryMemory per TaskManager Container
with
  optional unit (default: MB)
 -yz,--yarnzookeeperNamespaceNamespace to create the Zookeeper
  sub-paths for high availability
mode
 -z,--zookeeperNamespace Namespace to create the Zookeeper
  sub-paths for high availability
mode


现在下载的Flink 1.8,运行bin/flink run --help,总共只有如下参数,少了yarn-cluster选项:
Action "run" compiles and runs a program.

  Syntax: run [OPTIONS]  
  "run" action options:
 -c,--classClass with the program entry point
  ("main" method or "getPlan()"
method.
  Only needed if the JAR file does
not
  specify the class in its manifest.
 -C,--classpath  Adds a URL to each user code
  classloader  on all nodes in the
  cluster. The paths must specify a
  protocol (e.g. file://) and be
  accessible on all nodes (e.g. by
means
  of a NFS share). You can use this
  option multiple times for
specifying
  more than one URL. The protocol
must
  be supported by the {@link
  java.net.URLClassLoader}.
 -d,--detachedIf present, runs the job in
detached
  mode
 -n,--allowNonRestoredState   Allow to skip savepoint state that
  cannot be restored. You need to
allow
  this if you removed an operator
from
  your program that was part of the
  program when the savepoint was
  triggered.
 -p,--parallelismThe parallelism with which to run
the
  program. Optional flag to
override the
  default value specified in the
  configuration.
 -q,--sysoutLogging   If present, 

Re: GroupBy result delay

2019-07-23 Thread Fanbin Bu
If I use proctime, the groupBy happens without any delay.

On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu  wrote:

> not sure whether this is related:
>
> public SingleOutputStreamOperator assignTimestampsAndWatermarks(
>   AssignerWithPeriodicWatermarks timestampAndWatermarkAssigner) {
>
>// match parallelism to input, otherwise dop=1 sources could lead to some 
> strange
>// behaviour: the watermark will creep along very slowly because the 
> elements
>// from the source go to each extraction operator round robin.
>final int inputParallelism = getTransformation().getParallelism();
>final AssignerWithPeriodicWatermarks cleanedAssigner = 
> clean(timestampAndWatermarkAssigner);
>
>TimestampsAndPeriodicWatermarksOperator operator =
>  new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
>
>return transform("Timestamps/Watermarks", 
> getTransformation().getOutputType(), operator)
>  .setParallelism(inputParallelism);
> }
>
> parallelism is set to 32
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.setParallelism(32)
>
> and the command for launching the job is
>
> flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS
>
>
>
>
> On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu  wrote:
>
>> Thanks Fabian for the prompt reply. I just started using Flink and this
>> is a great community.
>> The watermark setting is only accounting for 10 sec delay. Besides that,
>> the local job on IntelliJ is running fine without issues.
>>
>> Here is the code:
>>
>> class EventTimestampExtractor(slack: Long = 0L) extends 
>> AssignerWithPeriodicWatermarks[T] {
>>
>>   var currentMaxTimestamp: Long = _
>>
>>   override def extractTimestamp(e: T, prevElementTimestamp: Long) = {
>> val elemTs = e.created_at
>> currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp)
>> elemTs
>>   }
>>
>>   override def getCurrentWatermark(): Watermark = {
>>   new Watermark(currentMaxTimestamp)
>>   }
>> }
>>
>> events.assignTimestampsAndWatermarks(new EventTimestampExtractor(1))
>>
>> Are there any other things I should be aware of?
>>
>> Thanks again for you kind help!
>>
>> Fanbin
>>
>>
>> On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske  wrote:
>>
>>> Hi Fanbin,
>>>
>>> The delay is most likely caused by the watermark delay.
>>> A window is computed when the watermark passes the end of the window. If
>>> you configured the watermark to be 10 minutes before the current max
>>> timestamp (probably to account for out of order data), then the window will
>>> be computed with approx. 10 minute delay.
>>>
>>> Best, Fabian
>>>
>>> Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <
>>> fanbin...@coinbase.com>:
>>>
 Hi,
 I have a Flink sql streaming job defined by:

 SELECT
   user_id
   , hop_end(created_at, interval '30' second, interval '1' minute) as 
 bucket_ts
   , count(name) as count
 FROM event
 WHERE name = 'signin'
 GROUP BY
   user_id
   , hop(created_at, interval '30' second, interval '1' minute)


 there is a noticeably delay of the groupBy operator. For example, I
 only see the record sent out 10 min later after the record received in. see
 the attached pic.

 [image: image.png]

 I m expecting to see the group by result after one minute since the
 sliding window size is 1 min and the slide is 30 sec.

 There is no such issue if I run the job locally in IntelliJ. However, I
 ran into the above issue if I run the job on EMR (flink version = 1.7)

 Can anybody give a clue of what could be wrong?
 Thanks,

 Fanbin

>>>


Re: GroupBy result delay

2019-07-23 Thread Fanbin Bu
not sure whether this is related:

public SingleOutputStreamOperator assignTimestampsAndWatermarks(
  AssignerWithPeriodicWatermarks timestampAndWatermarkAssigner) {

   // match parallelism to input, otherwise dop=1 sources could lead
to some strange
   // behaviour: the watermark will creep along very slowly because the elements
   // from the source go to each extraction operator round robin.
   final int inputParallelism = getTransformation().getParallelism();
   final AssignerWithPeriodicWatermarks cleanedAssigner =
clean(timestampAndWatermarkAssigner);

   TimestampsAndPeriodicWatermarksOperator operator =
 new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

   return transform("Timestamps/Watermarks",
getTransformation().getOutputType(), operator)
 .setParallelism(inputParallelism);
}

parallelism is set to 32

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(32)

and the command for launching the job is

flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS




On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu  wrote:

> Thanks Fabian for the prompt reply. I just started using Flink and this is
> a great community.
> The watermark setting is only accounting for 10 sec delay. Besides that,
> the local job on IntelliJ is running fine without issues.
>
> Here is the code:
>
> class EventTimestampExtractor(slack: Long = 0L) extends 
> AssignerWithPeriodicWatermarks[T] {
>
>   var currentMaxTimestamp: Long = _
>
>   override def extractTimestamp(e: T, prevElementTimestamp: Long) = {
> val elemTs = e.created_at
> currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp)
> elemTs
>   }
>
>   override def getCurrentWatermark(): Watermark = {
>   new Watermark(currentMaxTimestamp)
>   }
> }
>
> events.assignTimestampsAndWatermarks(new EventTimestampExtractor(1))
>
> Are there any other things I should be aware of?
>
> Thanks again for you kind help!
>
> Fanbin
>
>
> On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske  wrote:
>
>> Hi Fanbin,
>>
>> The delay is most likely caused by the watermark delay.
>> A window is computed when the watermark passes the end of the window. If
>> you configured the watermark to be 10 minutes before the current max
>> timestamp (probably to account for out of order data), then the window will
>> be computed with approx. 10 minute delay.
>>
>> Best, Fabian
>>
>> Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <
>> fanbin...@coinbase.com>:
>>
>>> Hi,
>>> I have a Flink sql streaming job defined by:
>>>
>>> SELECT
>>>   user_id
>>>   , hop_end(created_at, interval '30' second, interval '1' minute) as 
>>> bucket_ts
>>>   , count(name) as count
>>> FROM event
>>> WHERE name = 'signin'
>>> GROUP BY
>>>   user_id
>>>   , hop(created_at, interval '30' second, interval '1' minute)
>>>
>>>
>>> there is a noticeably delay of the groupBy operator. For example, I only
>>> see the record sent out 10 min later after the record received in. see the
>>> attached pic.
>>>
>>> [image: image.png]
>>>
>>> I m expecting to see the group by result after one minute since the
>>> sliding window size is 1 min and the slide is 30 sec.
>>>
>>> There is no such issue if I run the job locally in IntelliJ. However, I
>>> ran into the above issue if I run the job on EMR (flink version = 1.7)
>>>
>>> Can anybody give a clue of what could be wrong?
>>> Thanks,
>>>
>>> Fanbin
>>>
>>


Re: GroupBy result delay

2019-07-23 Thread Fanbin Bu
Thanks Fabian for the prompt reply. I just started using Flink and this is
a great community.
The watermark setting is only accounting for 10 sec delay. Besides that,
the local job on IntelliJ is running fine without issues.

Here is the code:

class EventTimestampExtractor(slack: Long = 0L) extends
AssignerWithPeriodicWatermarks[T] {

  var currentMaxTimestamp: Long = _

  override def extractTimestamp(e: T, prevElementTimestamp: Long) = {
val elemTs = e.created_at
currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp)
elemTs
  }

  override def getCurrentWatermark(): Watermark = {
  new Watermark(currentMaxTimestamp)
  }
}

events.assignTimestampsAndWatermarks(new EventTimestampExtractor(1))

Are there any other things I should be aware of?

Thanks again for you kind help!

Fanbin


On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske  wrote:

> Hi Fanbin,
>
> The delay is most likely caused by the watermark delay.
> A window is computed when the watermark passes the end of the window. If
> you configured the watermark to be 10 minutes before the current max
> timestamp (probably to account for out of order data), then the window will
> be computed with approx. 10 minute delay.
>
> Best, Fabian
>
> Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <
> fanbin...@coinbase.com>:
>
>> Hi,
>> I have a Flink sql streaming job defined by:
>>
>> SELECT
>>   user_id
>>   , hop_end(created_at, interval '30' second, interval '1' minute) as 
>> bucket_ts
>>   , count(name) as count
>> FROM event
>> WHERE name = 'signin'
>> GROUP BY
>>   user_id
>>   , hop(created_at, interval '30' second, interval '1' minute)
>>
>>
>> there is a noticeably delay of the groupBy operator. For example, I only
>> see the record sent out 10 min later after the record received in. see the
>> attached pic.
>>
>> [image: image.png]
>>
>> I m expecting to see the group by result after one minute since the
>> sliding window size is 1 min and the slide is 30 sec.
>>
>> There is no such issue if I run the job locally in IntelliJ. However, I
>> ran into the above issue if I run the job on EMR (flink version = 1.7)
>>
>> Can anybody give a clue of what could be wrong?
>> Thanks,
>>
>> Fanbin
>>
>


Transform from Table to DS

2019-07-23 Thread Andres Angel
Hello guys I'm working on Java environment and I have a sample code as:

Table schemafit = tenv.sqlQuery("Here is my query");

I need to turn this into a DS to print and any other transformation then I
doing a sort of:

DataStream resultSet = tenv.toAppendStream(schemafit, Row.class);

resultSet.print();

However, (please any help) I'm getting the error:

Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/flink/api/scala/typeutils/CaseClassTypeInfo
at
org.apache.flink.table.codegen.CodeGenUtils$.fieldAccessorFor(CodeGenUtils.scala:246)
at
org.apache.flink.table.codegen.CodeGenerator.generateFieldAccess(CodeGenerator.scala:1217)
at
org.apache.flink.table.codegen.CodeGenerator.generateInputAccess(CodeGenerator.scala:1165)
at
org.apache.flink.table.codegen.CodeGenerator$$anonfun$3.apply(CodeGenerator.scala:286)
at
org.apache.flink.table.codegen.CodeGenerator$$anonfun$3.apply(CodeGenerator.scala:269)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234)
at
org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:269)
at
org.apache.flink.table.plan.nodes.datastream.StreamScan$class.generateConversionProcessFunction(StreamScan.scala:111)
at
org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:74)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamScan.convertToInternalRow(DataStreamScan.scala:39)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamScan.translateToPlan(DataStreamScan.scala:82)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:103)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
at
org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:969)
at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:896)
at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:866)
at
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:202)
at
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:156)
at consumer.trconsumer.main(trconsumer.java:116)
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 24 more


Re: Checkpoints timing out for no apparent reason

2019-07-23 Thread spoganshev
Looks like this is the issue:
https://issues.apache.org/jira/browse/FLINK-11164

We'll try switching to 1.8 and see if it helps.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: AsyncDataStream on key of KeyedStream

2019-07-23 Thread Fabian Hueske
Sure:

   /--> AsyncIO --\
STREAM --> ProcessFunc  --  -- Union -- WindowFunc
  \--/

ProcessFunc keeps track of the unique keys per window duration and emits
each distinct key just once to the AsyncIO function via a side output.
Through the main output it sends all values it receives.
AsyncIO queries the external store for each key it receives.
Union just unions both streams (possibly using an Either type).
WindowFunction compute the window and includes the information that was
fetched by the AsyncIO function.

Cheers,
Fabian

Am Di., 23. Juli 2019 um 17:25 Uhr schrieb Flavio Pompermaier <
pomperma...@okkam.it>:

> For each key I need to call an external REST service to get the current
> status and this is why I'd like to use Async IO. At the moment I do this in
> a process function but I'd like a cleaner solution (if possible).
> Do you think your proposal of forking could be a better option?
> Could you provide a simple snippet/peudo-code of it? I'm not sure I've
> fully undestand your suggestion..
>


Re: Timestamp(timezone) conversion bug in non blink Table/SQL runtime

2019-07-23 Thread Rong Rong
Hi Shuyi,

I think there were some discussions in the mailing list [1,2] and JIRA
tickets [3,4] that might be related.
Since the table-blink planner doesn't produce such error, I think this
problem is valid and should be fixed.

Thanks,
Rong

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/event-time-timezone-is-not-correct-tt26457.html
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TimeZone-shift-problem-in-Flink-SQL-td25666.html#a25739
[3] https://issues.apache.org/jira/browse/FLINK-8353
[4] https://issues.apache.org/jira/browse/FLINK-8169

On Mon, Jul 22, 2019 at 1:49 PM Shuyi Chen  wrote:

> Hi Lasse,
>
> Thanks for the reply. If your input is in epoch time, you are not getting
> local time, instead, you are getting a wrong time that does not make sense.
> For example,  if the user input value is 0 (which means 00:00:00 UTC on 1
> January 1970), and your local timezone is UTC-8, converting 00:00:00 UTC on
> 1 January 1970 to your local timezone should yield 16:00:00 Dec 31, 1969.
> But actually, you will be getting 08:00:00 UTC on 1 January 1970  from
> Table/SQL runtime, which 00:00:00 on 1 January 1970 in your local timezone
> (UTC-8). Your input time just get shifted by 8 hours in output.
>
> Shuyi
>
> On Mon, Jul 22, 2019 at 12:49 PM Lasse Nedergaard <
> lassenederga...@gmail.com> wrote:
>
>> Hi.
>>
>> I have encountered the same problem when you input epoch time to window
>> table function and then use window.start and window.end the out doesn’t
>> output in epoch but local time and I located the problem to the same
>> internal function as you.
>>
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>>
>>
>> Den 22. jul. 2019 kl. 20.46 skrev Shuyi Chen :
>>
>> Hi all,
>>
>> Currently, in the non-blink table/SQL runtime, Flink used
>> SqlFunctions.internalToTimestamp(long v) from Calcite to convert event time
>> (in long) to java.sql.Timestamp. However, as discussed in the recent
>> Calcite mailing list (Jul. 19, 2019), SqlFunctions.internalToTimestamp()
>> assumes the input timestamp value is in the current JVM’s default timezone
>> (which is unusual), NOT milliseconds since epoch. And
>> SqlFunctions.internalToTimestamp() is used to convert timestamp value in
>> the current JVM’s default timezone to milliseconds since epoch, which
>> java.sql.Timestamp constructor takes. Therefore, the results will not only
>> be wrong, but change if the job runs in machines on different timezones as
>> well. (The only exception is that all your production machines uses UTC
>> timezone.)
>>
>> Here is an example, if the user input value is 0 (00:00:00 UTC on 1
>> January 1970), and the table/SQL runtime runs in a machine in PST (UTC-8),
>> the output sql.Timestamp after SqlFunctions.internalToTimestamp() will
>> become 2880 millisec since epoch (08:00:00 UTC on 1 January 1970); And
>> with the same input, if the table/SQL runtime runs again in a different
>> machine in EST (UTC-5), the output sql.Timestamp after
>> SqlFunctions.internalToTimestamp() will become 1800 millisec since
>> epoch (05:00:00 UTC on 1 January 1970).
>>
>> More details are captured in
>> https://issues.apache.org/jira/browse/FLINK-13372. Please let me know
>> your thoughts and correct me if I am wrong. Thanks a lot.
>>
>> Shuyi
>>
>>


Re: Re: MiniClusterResource class not found using AbstractTestBase

2019-07-23 Thread Juan Rodríguez Hortalá
Using that classifier worked, the code builds fine now, thanks a lot. I'm
using 1.8.0 by the way

Greetings,

Juan

On Tue, Jul 23, 2019 at 5:06 AM Haibo Sun  wrote:

> Hi,  Juan
>
> It is dependent on "flink-runtime-*-tests.jar", so build.sbt should be
> modified as follows:
>
> *scalaVersion := "2.11.0"*
>
> *val flinkVersion = "1.8.1"*
>
> *libraryDependencies ++= Seq(*
> *  "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test,*
> *  "org.apache.flink" %% "flink-runtime" % flinkVersion % Test **classifier
> "tests"*
> *)*
>
> Best,
> Haibo
>
> At 2019-07-23 17:51:23, "Fabian Hueske"  wrote:
>
> Hi Juan,
>
> Which Flink version do you use?
>
> Best, Fabian
>
> Am Di., 23. Juli 2019 um 06:49 Uhr schrieb Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com>:
>
>> Hi,
>>
>> I'm trying to use AbstractTestBase in a test in order to use the mini
>> cluster. I'm using specs2 with Scala, so I cannot extend AbstractTestBase
>> because I also have to extend org.specs2.Specification, so I'm trying to
>> access the mini cluster directly using Specs2 BeforeAll to initialize it as
>> follows
>>
>> private val miniClusterResource = AbstractTestBase.miniClusterResource
>> miniClusterResource.before()
>>
>>
>> The problem is that the code doesn't even compile, because it fails to
>> locate `org.apache.flink.runtime.testutils.MiniClusterResource`
>>
>> ```
>> [warn] Class org.apache.flink.runtime.testutils.MiniClusterResource not
>> found - continuing with a stub.
>> [warn] Class
>> org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration not
>> found - continuing with a stub.
>> [error] Class org.apache.flink.runtime.testutils.MiniClusterResource not
>> found - continuing with a stub.
>> [warn] two warnings found
>> [error] one error found
>> [error] (Compile / compileIncremental) Compilation failed
>> [error] Total time: 1 s, completed Jul 22, 2019 9:38:49 PM
>> ```
>>
>> I'm importing the following libraries in build.sbt
>>
>> "org.apache.flink" %% "flink-test-utils"  % flinkVersion,
>> "org.apache.flink" %% "flink-runtime"  % flinkVersion
>>
>>
>> Am I missing some additional library?
>>
>> Thanks,
>>
>> Juan
>>
>


Re: [Table API] ClassCastException when converting a table to DataStream

2019-07-23 Thread Rong Rong
Hi Dongwon,

Sorry for the late reply. I did try some experiment and seems like you are
right:
Setting the `.return()` type actually alter the underlying type of the
DataStream from a GenericType into a specific RowTypeInfo. Please see the
JIRA ticket [1] for more info.

Regarding the approach, yes I think you cannot access the timer service
from the table/SQL API at this moment so that might be the best approach.
And as Fabian suggested, I don't think there's too much problem if you are
not changing the type info underlying in your DataStream. I will follow up
with this in the JIRA ticket.

--
Rong

[1] https://issues.apache.org/jira/browse/FLINK-13389

On Tue, Jul 23, 2019 at 6:30 AM Dongwon Kim  wrote:

> Hi Fabian,
>
> Thanks for clarification :-)
> I could convert back and forth without worrying about it as I keep using
> Row type during the conversion (even though fields are added).
>
> Best,
>
> Dongwon
>
>
>
> On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske  wrote:
>
>> Hi Dongwon,
>>
>> regarding the question about the conversion: If you keep using the Row
>> type and not adding/removing fields, the conversion is pretty much for free
>> right now.
>> It will be a MapFunction (sometimes even not function at all) that should
>> be chained with the other operators. Hence, it should boil down to a
>> function call.
>>
>> Best, Fabian
>>
>> Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <
>> eastcirc...@gmail.com>:
>>
>>> Hi Rong,
>>>
>>> I have to dig deeper into the code to reproduce this error. This seems
 to be a bug to me and will update once I find anything.
>>>
>>> Thanks a lot for spending your time on this.
>>>
>>> However from what you explained, if I understand correctly you can do
 all of your processing within the TableAPI scope without converting it back
 and forth to DataStream.
 E.g. if your "map(a -> a)" placeholder represents some sort of map
 function that's simple enough, you can implement and connect with the table
 API via UserDefinedFunction[1].
 As TableAPI becoming the first class citizen [2,3,4], this would be
 much cleaner implementation from my perspective.
>>>
>>> I also agree with you in that the first class citizen Table API will
>>> make everything not only easier but also a lot cleaner.
>>> We however contain some corner cases that force us to covert Table from
>>> and to DataStream.
>>> One such case is to append to Table a column showing the current
>>> watermark of each record; there's no other way but to do that as
>>> ScalarFunction doesn't allow us to get the runtime context information as
>>> ProcessFunction does.
>>>
>>> I have a question regarding the conversion.
>>> Do I have to worry about runtime performance penalty in case that I
>>> cannot help but convert back and fourth to DataStream?
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong  wrote:
>>>
 Hi Dongwon,

 I have to dig deeper into the code to reproduce this error. This seems
 to be a bug to me and will update once I find anything.

 However from what you explained, if I understand correctly you can do
 all of your processing within the TableAPI scope without converting it back
 and forth to DataStream.
 E.g. if your "map(a -> a)" placeholder represents some sort of map
 function that's simple enough, you can implement and connect with the table
 API via UserDefinedFunction[1].
 As TableAPI becoming the first class citizen [2,3,4], this would be
 much cleaner implementation from my perspective.

 --
 Rong

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
 [2]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
 [3]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
 [4]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html


 On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim 
 wrote:

> Hi Rong,
>
> Thank you for reply :-)
>
> which Flink version are you using?
>
> I'm using Flink-1.8.0.
>
> what is the "sourceTable.getSchema().toRowType()" return?
>
> Row(time1: TimeIndicatorTypeInfo(rowtime))
>
> what is the line *".map(a -> a)" *do and can you remove it?
>
> *".map(a->a)"* is just to illustrate a problem.
> My actual code contains a process function (instead of .map() in the
> snippet) which appends a new field containing watermark to a row.
> If there were ways to get watermark inside a scalar UDF, I wouldn't
> convert table to datastream and vice versa.
>
> if I am understanding correctly, you are also using "time1" as the
>> rowtime, is that want your intension is 

Re: Execution environments for testing: local vs collection vs mini cluster

2019-07-23 Thread Juan Rodríguez Hortalá
Hi Bao,

Thanks for your answer.

1. Integration tests for my project.
2. Both data stream and data sets



On Mon, Jul 22, 2019 at 11:44 PM Biao Liu  wrote:

> Hi Juan,
>
> I'm not sure what you really want. Before giving some suggestions, could
> you answer the questions below first?
>
> 1. Do you want to write a unit test (or integration test) case for your
> project or for Flink? Or just want to run your job locally?
> 2. Which mode do you want to test? DataStream or DataSet?
>
>
>
> Juan Rodríguez Hortalá  于2019年7月23日周二
> 下午1:12写道:
>
>> Hi,
>>
>> In
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/local_execution.html
>> and
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/minicluster/MiniCluster.html
>> I see there are 3 ways to create an execution environment for testing:
>>
>>- StreamExecutionEnvironment.createLocalEnvironment and
>>ExecutionEnvironment.createLocalEnvironment create an execution 
>> environment
>>running on a single JVM using different threads.
>>- CollectionEnvironment runs on a single JVM on a single thread.
>>- I haven't found not much documentation on the Mini Cluster, but it
>>sounds similar to the Hadoop MiniCluster
>>
>> .
>>If that is then case, then it would run on many local JVMs, each of them
>>running multiple threads.
>>
>> Am I correct about the Mini Cluster? Is there any additional
>> documentation about it? I discovered it looking at the source code of
>> AbstractTestBase, that is mentioned on
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/testing.html#integration-testing.
>> Also, it looks like launching the mini cluster registers it somewhere, so
>> subsequent calls to `StreamExecutionEnvironment.getExecutionEnvironment`
>> return an environment that uses the mini cluster. Is that performed by
>> `executionEnvironment.setAsContext()` in
>> https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java#L56
>> ? Is that execution environment registration process documented anywhere?
>>
>> Which test execution environment is recommended for each test use case?
>> For example I don't see why would I use CollectionEnvironment when I have
>> the local environment available and running on several threads, what is a
>> good use case for CollectionEnvironment?
>>
>> Are all these 3 environments supported equality, or maybe some of them is
>> expected to be deprecated?
>>
>> Are there any additional execution environments that could be useful for
>> testing on a single host?
>>
>> Thanks,
>>
>> Juan
>>
>>
>>


Re: AsyncDataStream on key of KeyedStream

2019-07-23 Thread Flavio Pompermaier
For each key I need to call an external REST service to get the current
status and this is why I'd like to use Async IO. At the moment I do this in
a process function but I'd like a cleaner solution (if possible).
Do you think your proposal of forking could be a better option?
Could you provide a simple snippet/peudo-code of it? I'm not sure I've
fully undestand your suggestion..


Re: CEP Pattern limit

2019-07-23 Thread Fabian Hueske
Hi Pedro,

each pattern gets translated into one or more Flink operators. Hence, your
Flink program becomes *very* large and requires much more time to be
deployed.
Hence, the timeout.

I'd try to limit the size your job by grouping your patterns and creating
an own job for each group.
You can also increase the timeout threshold in the Flink configuration [1]
but I'd still recommend to have smaller jobs.

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#distributed-coordination-via-akka

Am Do., 11. Juli 2019 um 15:21 Uhr schrieb Pedro Saraiva <
pedro.a34...@gmail.com>:

> Hello,
>
> I'm using CEP to match a stream against around 1000 different patterns.
>
> To do this I create de patterns and then iterate and call CEP.pattern()
> for each. Later on, I merge the PatternStreams into one using
> datastream.union().
>
> The problem is that i'm getting this exception:
> AstTimeoutException: Ask timed out on Actor... after 1ms. Sender null
> sent message of type LocalRpcInvocation.
>
> I noticed that this exception is thrown when I reach around 500 patterns.
>
> Is there a way to overcome this limit?
>
> Kind regards,
>
> Pedro Saraiva
>


Re: Flink and CDC

2019-07-23 Thread Flavio Pompermaier
Indeed Kafka connect is perfect but I think Flink could easily do the same
without much work..this is what I'm asking for..if anybody has never
thought about it


Re: 1.9 Release Timeline

2019-07-23 Thread Oytun Tez
Thank you for responding! I'll subscribe to dev@

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Tue, Jul 23, 2019 at 10:25 AM Timo Walther  wrote:

> Hi Oytun,
>
> the community is working hard to release 1.9. You can see the progress
> here [1] and on the dev@ mailing list.
>
> [1]
> https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=328=detail
>
> Regards,
> Timo
>
> Am 23.07.19 um 15:52 schrieb Oytun Tez:
>
> Ping, any estimates?
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Thu, Jul 18, 2019 at 11:07 AM Oytun Tez  wrote:
>
>> Hi team,
>>
>> 1.9 is bringing very exciting updates, State Processor API and MapState
>> migrations being two of them. Thank you for all the hard work!
>>
>> I checked the burndown board [1], do you have an estimated timeline for
>> the GA release of 1.9?
>>
>>
>>
>> [1]
>> https://issues.apache.org/jira/secure/RapidBoard.jspa?projectKey=FLINK=328
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>
>


Re: 1.9 Release Timeline

2019-07-23 Thread Timo Walther

Hi Oytun,

the community is working hard to release 1.9. You can see the progress 
here [1] and on the dev@ mailing list.


[1] 
https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=328=detail


Regards,
Timo

Am 23.07.19 um 15:52 schrieb Oytun Tez:

Ping, any estimates?

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com  — www.motaword.com 




On Thu, Jul 18, 2019 at 11:07 AM Oytun Tez > wrote:


Hi team,

1.9 is bringing very exciting updates, State Processor API and
MapState migrations being two of them. Thank you for all the hard
work!

I checked the burndown board [1], do you have an estimated
timeline for the GA release of 1.9?



[1]

https://issues.apache.org/jira/secure/RapidBoard.jspa?projectKey=FLINK=328

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com  — www.motaword.com






Re: Checkpoints timing out for no apparent reason

2019-07-23 Thread spoganshev
I've looked into this problem a little bit more. And it looks like the
problem is caused by some problem with Kinesis sink. There is an exception
in the logs at the moment in time when the job gets restored after being
stalled for about 15 minutes:

Encountered an unexpected expired iterator
AAGzsd7J/muyVo6McROAzdW+UByN+g4ttJjFS/LkswyZHprdlBxsH6B7UI/8DIJu6hj/Vph9OQ6Oz7Rhxg9Dj64w58osOSwf05lX/N+c8EUVRIQY/yZnwjtlmZw1HAKWSBIblfkGIMmmWFPu/UpQqzX7RliA2XWeDvkLAdOcogGmRgceI95rOMEUIWYP7z2PmiQ7TlL4MOG+q/NYEiLgyuoVw7bkm+igE+34caD7peXuZA==
for shard StreamShardHandle{streamName='staging-datalake-struct',
shard='{ShardId: shardId-0005,ParentShardId:
shardId-0001,HashKeyRange: {StartingHashKey:
255211775190703847597530955573826158592,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49591208977124932291714633368622679061889586376843722834,}}'}; refreshing
the iterator ...

It's logged by
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: 1.9 Release Timeline

2019-07-23 Thread Oytun Tez
Ping, any estimates?

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Thu, Jul 18, 2019 at 11:07 AM Oytun Tez  wrote:

> Hi team,
>
> 1.9 is bringing very exciting updates, State Processor API and MapState
> migrations being two of them. Thank you for all the hard work!
>
> I checked the burndown board [1], do you have an estimated timeline for
> the GA release of 1.9?
>
>
>
> [1]
> https://issues.apache.org/jira/secure/RapidBoard.jspa?projectKey=FLINK=328
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>


Re: Extending REST API with new endpoints

2019-07-23 Thread Oytun Tez
Ping, any ideas?

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Mon, Jul 22, 2019 at 9:39 AM Oytun Tez  wrote:

> I did take a look at it, but things got out of hand very quickly from
> there on :D
>
> I see that WebSubmissionExtension implements WebMonitorExtension, but
> then WebSubmissionExtension was used in DispatcherRestEndpoint, which I
> couldn't know how to manipulate/extend...
>
> How can I plug my Extension into the dispatcher?
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Mon, Jul 22, 2019 at 9:37 AM Seth Wiesman  wrote:
>
>> Would the `WebMonitorExtension` work?
>>
>> [1]
>> https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorExtension.java
>>
>> On Mon, Jul 22, 2019 at 8:35 AM Oytun Tez  wrote:
>>
>>> I simply want to open up endpoints to query QueryableStates. What I had
>>> in mind was to give operators an interface to implement their own
>>> QueryableState controllers, e.g. serializers etc.
>>>
>>> We are trying to use Flink in more of an "application framework"
>>> fashion, so extensibility helps a lot. As there already is a http server in
>>> this codebase, we'd like to attach to that instead. Especially queryable
>>> state is tightly coupled with Flink code, so it doesn't make much sense to
>>> host another http application to bridge into Flink.
>>>
>>> ---
>>> Oytun Tez
>>>
>>> *M O T A W O R D*
>>> The World's Fastest Human Translation Platform.
>>> oy...@motaword.com — www.motaword.com
>>>
>>>
>>> On Mon, Jul 22, 2019 at 4:38 AM Biao Liu  wrote:
>>>
 Hi,

 As far as I know, the RESTful handler is not pluggable. And I don't see
 a strong reason from your description to do so.
 Could you explain more about your requirement?


 Oytun Tez  于2019年7月20日周六 上午4:36写道:

> Yep, I scanned all of the issues in Jira and the codebase, I couldn't
> find a way to plug my new endpoint in.
>
> I am basically trying to open up an endpoint for queryable state
> client. I also read somewhere that this may cause some issues due to SSL
> communication within the cluster.
>
> Any pointers?
>
>
>
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Fri, Jul 19, 2019 at 3:53 PM Oytun Tez  wrote:
>
>> Hi there,
>>
>> I am trying to add a new endpoint to the REST API, by
>> extending AbstractRestHandler. But this new handler needs to be added
>> in WebMonitorEndpoint, which has no interface for outside.
>>
>> Can I do this with 1.8? Any other way or plans to make this possible?
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>
>>
>> --
>>
>> Seth Wiesman | Solutions Architect
>>
>> +1 314 387 1463
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>
>


Re: [DISCUSS] Create a Flink ecosystem website

2019-07-23 Thread Oytun Tez
I agree with Robert – localization (this is what we do at MotaWord) is a
maintenance work. If not maintained as well as mainstream, it will only
damage and distant devs that use those local websites.

Re: comments, I don't think people will really discuss furiously. But we at
least need a system where we understand the popularity of a package, it
helps to pick among similar packages if any. Something like the popularity
figure here that we can fetch from somewhere:
https://mvnrepository.com/popular

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Tue, Jul 23, 2019 at 5:07 AM Robert Metzger  wrote:

> Thanks a lot Marta for offering to write a blog post about the community
> site!
>
> I'm not sure if multi-language support for the site is a good idea. I see
> the packages site as something similar to GitHub or Jira. The page itself
> contains very view things we could actually translate. The package owners
> usually are only able to provide one or two languages for their package
> description.
> For the comments, we don't want disjoint discussions to happen.
>
> I've also kicked off a discussion with Apache legal on the initiative [1].
> We might not be able to host this at Apache, but let's see where the
> discussion goes.
>
>
> [1]
> https://lists.apache.org/thread.html/ee76a02257b51292ab61f6ac8d3d69307e83cc569cdeebde80596207@%3Clegal-discuss.apache.org%3E
>
>
> On Sat, Jul 20, 2019 at 5:25 AM Becket Qin  wrote:
>
>> [Sorry for the incomplete message. Clicked send by mistake...]
>>
>> I agree with Marta that it might be good to have multi-language support
>> as a mid-term goal.
>>
>> Jiangjie (Becket) Qin
>>
>> On Sat, Jul 20, 2019 at 11:22 AM Becket Qin  wrote:
>>
>>> The website is awesome! I really like its conciseness and yet fairly
>>> useful information and functionalities. I cannot think of much to improve
>>> at the moment. Just one thought, do we need an "others" category, just in
>>> case a package does not fit into any of the current given categories?
>>>
>>> Thanks Robert and Daryl for the great effort. Looking forward to seeing
>>> this get published soon!!
>>>
>>> I agree with Marta that
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Sat, Jul 20, 2019 at 1:34 AM Marta Paes Moreira 
>>> wrote:
>>>
 Hey, Robert.

 I will keep an eye on the overall progress and get started on the blog
 post
 to make the community announcement. Are there (mid-term) plans to
 translate/localize this website as well? It might be a point worth
 mentioning in the blogpost.

 Hats off to you and Daryl — this turned out amazing!

 Marta

 On Thu, Jul 18, 2019 at 10:57 AM Congxian Qiu 
 wrote:

 > Robert and Daryl, thanks for the great work, I tried the website and
 filed
 > some issues on Github.
 > Best,
 > Congxian
 >
 >
 > Robert Metzger  于2019年7月17日周三 下午11:28写道:
 >
 >> Hey all,
 >>
 >> Daryl and I have great news to share. We are about to finish adding
 the
 >> basic features to the ecosystem page.
 >> We are at a stage where it is ready to be reviewed and made public.
 >>
 >> You can either check out a development instance of the ecosystem page
 >> here: https://flink-ecosystem-demo.flink-resources.org/
 >> Or you run it locally, with the instructions from the README.md:
 >> https://github.com/sorahn/flink-ecosystem
 >>
 >> Please report all issues you find here:
 >> https://github.com/sorahn/flink-ecosystem/issues or in this thread.
 >>
 >> The next steps in this project are the following:
 >> - We fix all issues reported through this testing
 >> - We set up the site on the INFRA resources Becket has secured [1],
 do
 >> some further testing (including email notifications) and pre-fill
 the page
 >> with some packages.
 >> - We set up a packages.flink.apache.org or flink.apache.org/packages
 >> domain
 >> - We announce the packages through a short blog post
 >>
 >> Happy testing!
 >>
 >> Best,
 >> Robert
 >>
 >> [1] https://issues.apache.org/jira/browse/INFRA-18010
 >>
 >>
 >> On Thu, Apr 25, 2019 at 6:23 AM Becket Qin 
 wrote:
 >>
 >>> Thanks for the update, Robert. Looking forward to the website. If
 there
 >>> is already a list of software we need to run the website, we can
 ask Apache
 >>> infra team to prepare the VM for us, as that may also take some
 time.
 >>>
 >>> On Wed, Apr 24, 2019 at 11:57 PM Robert Metzger <
 rmetz...@apache.org>
 >>> wrote:
 >>>
  Hey all,
 
  quick update on this project: The frontend and backend code have
 been
  put together into this repository:
  https://github.com/sorahn/flink-ecosystem
  We also just agreed on an API specification, and will now work on
  

Re: Flink SinkFunction for WebSockets

2019-07-23 Thread Oytun Tez
Hi Tim,

I think this might be a useful sink for small interactions with outside.
Are you planning to open source this? If yes, can you try to make it
agnostic so that people can plug in their own WebSocket protocol – Stomp
etc? :) We can publish this in the upcoming community website as an
extension/plugin/library.




---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Tue, Jul 23, 2019 at 7:07 AM Fabian Hueske  wrote:

> Hi Tim,
>
> One thing that might be interesting is that Flink might emit results more
> than once when a job recovers from a failure.
> It is up to the receiver to deal with that.
> Depending on the type of results this might be easy (idempotent updates)
> or impossible.
>
> Best, Fabian
>
>
>
> Am Fr., 19. Juli 2019 um 00:23 Uhr schrieb Timothy Victor <
> vict...@gmail.com>:
>
>> Hi
>>
>> I'm looking to write a sink function for writing to websockets, in
>> particular ones that speak the WAMP protocol (
>> https://wamp-proto.org/index.html).
>>
>> Before going down that path, I wanted to ask if
>>
>> a) anyone has done something like that already so I dont reinvent stuff
>>
>> b) any caveats or warnings before I try this.
>>
>> Any advise would be appreciated.
>>
>> Thanks
>>
>> Tim
>>
>


Re: AsyncDataStream on key of KeyedStream

2019-07-23 Thread Fabian Hueske
OK, I see. What information will be send out via the async request?
Maybe you can fork of a separate stream with the info that needs to be send
to the external service and later union the result with the main stream
before the window operator?



Am Di., 23. Juli 2019 um 14:12 Uhr schrieb Flavio Pompermaier <
pomperma...@okkam.it>:

> The problem of bundling all records together within a window is that this
> solution doesn't scale (in the case of large time windows and number of
> events)..my requirement could be fulfilled by a keyed ProcessFunction but I
> think AsyncDataStream should provide a first-class support to keyed streams
> (and thus perform a single call per key and window..). What do you think?
>
> On Tue, Jul 23, 2019 at 12:56 PM Fabian Hueske  wrote:
>
>> Hi Flavio,
>>
>> Not sure I understood the requirements correctly.
>> Couldn't you just collect and bundle all records with a regular window
>> operator and forward one record for each key-window to an AsyncIO operator?
>>
>> Best, Fabian
>>
>> Am Do., 18. Juli 2019 um 12:20 Uhr schrieb Flavio Pompermaier <
>> pomperma...@okkam.it>:
>>
>>> Hi to all,
>>> I'm trying to exploit async IO in my Flink job.
>>> In my use case I use keyed tumbling windows and I'd like to execute the
>>> async action only once per key and window (while
>>> the AsyncDataStream.unorderedWait execute the async call for every element
>>> of my stream) ..is there an easy way to do that apart from using a process
>>> function (that basically will lose the asynchronicity)?
>>>
>>> Best,
>>> Flavio
>>>
>>
>
>


Re: [Table API] ClassCastException when converting a table to DataStream

2019-07-23 Thread Dongwon Kim
Hi Fabian,

Thanks for clarification :-)
I could convert back and forth without worrying about it as I keep using
Row type during the conversion (even though fields are added).

Best,

Dongwon



On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske  wrote:

> Hi Dongwon,
>
> regarding the question about the conversion: If you keep using the Row
> type and not adding/removing fields, the conversion is pretty much for free
> right now.
> It will be a MapFunction (sometimes even not function at all) that should
> be chained with the other operators. Hence, it should boil down to a
> function call.
>
> Best, Fabian
>
> Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <
> eastcirc...@gmail.com>:
>
>> Hi Rong,
>>
>> I have to dig deeper into the code to reproduce this error. This seems to
>>> be a bug to me and will update once I find anything.
>>
>> Thanks a lot for spending your time on this.
>>
>> However from what you explained, if I understand correctly you can do all
>>> of your processing within the TableAPI scope without converting it back and
>>> forth to DataStream.
>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>> function that's simple enough, you can implement and connect with the table
>>> API via UserDefinedFunction[1].
>>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>>> cleaner implementation from my perspective.
>>
>> I also agree with you in that the first class citizen Table API will make
>> everything not only easier but also a lot cleaner.
>> We however contain some corner cases that force us to covert Table from
>> and to DataStream.
>> One such case is to append to Table a column showing the current
>> watermark of each record; there's no other way but to do that as
>> ScalarFunction doesn't allow us to get the runtime context information as
>> ProcessFunction does.
>>
>> I have a question regarding the conversion.
>> Do I have to worry about runtime performance penalty in case that I
>> cannot help but convert back and fourth to DataStream?
>>
>> Best,
>>
>> Dongwon
>>
>> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong  wrote:
>>
>>> Hi Dongwon,
>>>
>>> I have to dig deeper into the code to reproduce this error. This seems
>>> to be a bug to me and will update once I find anything.
>>>
>>> However from what you explained, if I understand correctly you can do
>>> all of your processing within the TableAPI scope without converting it back
>>> and forth to DataStream.
>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>> function that's simple enough, you can implement and connect with the table
>>> API via UserDefinedFunction[1].
>>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>>> cleaner implementation from my perspective.
>>>
>>> --
>>> Rong
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
>>> [2]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
>>> [3]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
>>> [4]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>>>
>>>
>>> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim 
>>> wrote:
>>>
 Hi Rong,

 Thank you for reply :-)

 which Flink version are you using?

 I'm using Flink-1.8.0.

 what is the "sourceTable.getSchema().toRowType()" return?

 Row(time1: TimeIndicatorTypeInfo(rowtime))

 what is the line *".map(a -> a)" *do and can you remove it?

 *".map(a->a)"* is just to illustrate a problem.
 My actual code contains a process function (instead of .map() in the
 snippet) which appends a new field containing watermark to a row.
 If there were ways to get watermark inside a scalar UDF, I wouldn't
 convert table to datastream and vice versa.

 if I am understanding correctly, you are also using "time1" as the
> rowtime, is that want your intension is to use it later as well?

 yup :-)

 As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
> adds a type information hint about the return type of this operator. It is
> used in cases where Flink cannot determine automatically[1].

 The reason why I specify
 *".returns(sourceTable.getSchema().toRowType());"* is to give a type
 information hint as you said.
 That is needed later when I need to make another table like
"*Table anotherTable = tEnv.fromDataStream(stream);"*,
 Without the type information hint, I've got an error
"*An input of GenericTypeInfo cannot be converted to Table.
 Please specify the type of the input with a RowTypeInfo."*
 That's why I give a type information hint in that way.

 Best,

 Dongwon


Re: Flink on Mesos

2019-07-23 Thread Till Rohrmann
I'll take a look.

Cheers,
Till

On Tue, Jul 23, 2019 at 3:07 PM Oleksandr Nitavskyi 
wrote:

> Hey guys.
>
>
>
> We have also made implementation in Flink on Mesos component in order to
> support network bandwidth configuration.
>
>
>
> Will somebody be able to have a look on our PR: 
> *https://github.com/apache/flink/pull/8652
> *
>
> There are for sure some details to clarify.
>
>
>
> Cheers
>
> Oleksandr
>
>
>
> *From: *Till Rohrmann 
> *Date: *Friday 5 April 2019 at 16:46
> *To: *Juan Gentile 
> *Cc: *"user@flink.apache.org" , Oleksandr
> Nitavskyi 
> *Subject: *Re: Flink on Mesos
>
>
>
> Hi Juan,
>
>
>
> thanks for reporting this issue. If you could open an issue and also
> provide a fix for it, then this would be awesome. Please let me know the
> ticket number so that I can monitor it and give your PR a review.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Fri, Apr 5, 2019 at 5:34 AM Juan Gentile  wrote:
>
> Hello!
>
>
>
> We are having a small problem while trying to deploy Flink on Mesos using
> marathon. In our set up of Mesos we are required to specify the amount of
> disk space we want to have for the applications we deploy there.
>
> The current default value in Flink is 0 and it’s currently is not
> parameterizable. This means that we ask 0 disk space for our instances so
> Flink can’t work.
>
> I’d appreciate suggestions if you have any. Otherwise and since this is
> causing some problems on our side, I’d like to know if I can create a
> ticket on Flink and work on it; looks like the fix should be quite easy to
> implement.
>
>
>
> Thank you,
>
> Juan.
>
>


Re: Flink Zookeeper HA: FileNotFoundException blob - Jobmanager not starting up

2019-07-23 Thread Till Rohrmann
Hi Richard,

it looks as if the zNode of a completed job has not been properly removed.
Without the logs of the respective JobMaster, it is hard to debug any
further. However, I suspect that this is an instance of FLINK-11665. I am
currently working on a fix for it.

[1] https://issues.apache.org/jira/browse/FLINK-11665

Cheers,
Till

On Tue, Jul 23, 2019 at 2:38 PM Richard Deurwaarder  wrote:

> Hi Fabian,
>
> I followed the advice of another flink user who mailed me directly, he has
> the same problem and told me to use something like: rmr zgrep 
> /flink/hunch/jobgraphs/1dccee15d84e1d2cededf89758ac2482
> which allowed us to start the job again.
>
> It might be nice to investigate what went wrong as it didn't feel good to
> have our production clustered crippled like this.
>
> Richard
>
> On Tue, Jul 23, 2019 at 12:47 PM Fabian Hueske  wrote:
>
>> Hi Richard,
>>
>> I hope you could resolve the problem in the meantime.
>>
>> Nonetheless, maybe Till (in CC) has an idea what could have gone wrong.
>>
>> Best, Fabian
>>
>> Am Mi., 17. Juli 2019 um 19:50 Uhr schrieb Richard Deurwaarder <
>> rich...@xeli.eu>:
>>
>>> Hello,
>>>
>>> I've got a problem with our flink cluster where the jobmanager is not
>>> starting up anymore, because it tries to download non existant (blob) file
>>> from the zookeeper storage dir.
>>>
>>> We're running flink 1.8.0 on a kubernetes cluster and use the google
>>> storage connector [1] to store checkpoints, savepoints and zookeeper data.
>>>
>>> When I noticed the jobmanager was having problems, it was in a crashloop
>>> throwing file not found exceptions [2]
>>> Caused by: java.io.FileNotFoundException: Item not found:
>>> some-project-flink-state/recovery/hunch/blob/job_e6ad857af7f09b56594e95fe273e9eff/blob_p-486d68fa98fa05665f341d79302c40566b81034e-306d493f5aa810b5f4f7d8d63f5b18b5.
>>> If you enabled STRICT generation consistency, it is possible that the live
>>> version is still available but the intended generation is deleted.
>>>
>>> I looked in the blob directory and I can only find:
>>> /recovery/hunch/blob/job_1dccee15d84e1d2cededf89758ac2482 I've tried to
>>> fiddle around in zookeeper to see if I could find anything [3], but I do
>>> not really know what to look for.
>>>
>>> How could this have happened and how should I recover the job from this
>>> situation?
>>>
>>> Thanks,
>>>
>>> Richard
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/connectors.html#using-hadoop-file-system-implementations
>>> [2] https://gist.github.com/Xeli/0321031655e47006f00d38fc4bc08e16
>>> [3] https://gist.github.com/Xeli/04f6d861c5478071521ac6d2c582832a
>>>
>>


Re: Flink Zookeeper HA: FileNotFoundException blob - Jobmanager not starting up

2019-07-23 Thread Fabian Hueske
Good to know that you were able to fix the issue!

I definitely agree that it would be good to know why this situation
occurred.

Am Di., 23. Juli 2019 um 14:38 Uhr schrieb Richard Deurwaarder <
rich...@xeli.eu>:

> Hi Fabian,
>
> I followed the advice of another flink user who mailed me directly, he has
> the same problem and told me to use something like: rmr zgrep 
> /flink/hunch/jobgraphs/1dccee15d84e1d2cededf89758ac2482
> which allowed us to start the job again.
>
> It might be nice to investigate what went wrong as it didn't feel good to
> have our production clustered crippled like this.
>
> Richard
>
> On Tue, Jul 23, 2019 at 12:47 PM Fabian Hueske  wrote:
>
>> Hi Richard,
>>
>> I hope you could resolve the problem in the meantime.
>>
>> Nonetheless, maybe Till (in CC) has an idea what could have gone wrong.
>>
>> Best, Fabian
>>
>> Am Mi., 17. Juli 2019 um 19:50 Uhr schrieb Richard Deurwaarder <
>> rich...@xeli.eu>:
>>
>>> Hello,
>>>
>>> I've got a problem with our flink cluster where the jobmanager is not
>>> starting up anymore, because it tries to download non existant (blob) file
>>> from the zookeeper storage dir.
>>>
>>> We're running flink 1.8.0 on a kubernetes cluster and use the google
>>> storage connector [1] to store checkpoints, savepoints and zookeeper data.
>>>
>>> When I noticed the jobmanager was having problems, it was in a crashloop
>>> throwing file not found exceptions [2]
>>> Caused by: java.io.FileNotFoundException: Item not found:
>>> some-project-flink-state/recovery/hunch/blob/job_e6ad857af7f09b56594e95fe273e9eff/blob_p-486d68fa98fa05665f341d79302c40566b81034e-306d493f5aa810b5f4f7d8d63f5b18b5.
>>> If you enabled STRICT generation consistency, it is possible that the live
>>> version is still available but the intended generation is deleted.
>>>
>>> I looked in the blob directory and I can only find:
>>> /recovery/hunch/blob/job_1dccee15d84e1d2cededf89758ac2482 I've tried to
>>> fiddle around in zookeeper to see if I could find anything [3], but I do
>>> not really know what to look for.
>>>
>>> How could this have happened and how should I recover the job from this
>>> situation?
>>>
>>> Thanks,
>>>
>>> Richard
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/connectors.html#using-hadoop-file-system-implementations
>>> [2] https://gist.github.com/Xeli/0321031655e47006f00d38fc4bc08e16
>>> [3] https://gist.github.com/Xeli/04f6d861c5478071521ac6d2c582832a
>>>
>>


Re: Flink on Mesos

2019-07-23 Thread Oleksandr Nitavskyi
Hey guys.

We have also made implementation in Flink on Mesos component in order to 
support network bandwidth configuration.

Will somebody be able to have a look on our PR: 
https://github.com/apache/flink/pull/8652
There are for sure some details to clarify.

Cheers
Oleksandr

From: Till Rohrmann 
Date: Friday 5 April 2019 at 16:46
To: Juan Gentile 
Cc: "user@flink.apache.org" , Oleksandr Nitavskyi 

Subject: Re: Flink on Mesos

Hi Juan,

thanks for reporting this issue. If you could open an issue and also provide a 
fix for it, then this would be awesome. Please let me know the ticket number so 
that I can monitor it and give your PR a review.

Cheers,
Till

On Fri, Apr 5, 2019 at 5:34 AM Juan Gentile 
mailto:j.gent...@criteo.com>> wrote:
Hello!

We are having a small problem while trying to deploy Flink on Mesos using 
marathon. In our set up of Mesos we are required to specify the amount of disk 
space we want to have for the applications we deploy there.
The current default value in Flink is 0 and it’s currently is not 
parameterizable. This means that we ask 0 disk space for our instances so Flink 
can’t work.
I’d appreciate suggestions if you have any. Otherwise and since this is causing 
some problems on our side, I’d like to know if I can create a ticket on Flink 
and work on it; looks like the fix should be quite easy to implement.

Thank you,
Juan.


add laplace to k means

2019-07-23 Thread alaa
Hallo 

I have used this k means code on Flink 

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

and I would to add noise that follows Laplace distribution to the sum of
data item and to the number to data item when calculate a new cluster center
in each iteration .

for j=1 ---> p do 
u' = (sum +Lap(ε))/(num+Laplace(ε))

I have already write Laplace function , but i don't Know how to add it in k
means code and in which line i should write it .

Thank you




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Memory constrains running Flink on Kubernetes

2019-07-23 Thread wvl
Hi,

We're running a relatively simply Flink application that uses a bunch of
state in RocksDB on Kubernetes.
During the course of development and going to production, we found that we
were often running into memory issues made apparent by Kubernetes OOMKilled
and Java OOM log events.

In order to tackle these, we're trying to account for all the memory used
in the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB
- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB

This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in
use, disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.

Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to
RocksDB. According to the docs RocksDB has a "Column Family Write Buffer"
where "You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm
assuming corresponds to a "Column Family" in RockDB. Meaning our budget
should be around 2GB.
Is this accounted for in one of the flink_taskmanager metrics above? We've
also enabled various rocksdb metrics, but it's unclear where this Write
Buffer memory would be represented.

Finally, we've seen that when our job has issues and is restarted rapidly,
NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are
killed. We're assuming this is due
to no form of cleanup in the metaspace as classes get (re)loaded.

These are our taskmanager JVM settings: -XX:+UseG1GC
-XX:MaxDirectMemorySize=1G -XX:+UnlockExperimentalVMOptions
-XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2
With flink config:
  taskmanager.heap.size: 5000m
  state.backend: rocksdb
  state.backend.incremental: true
  state.backend.rocksdb.timer-service.factory: ROCKSDB

Based on what we've observed we're thinking about setting
-XX:MaxMetaspaceSize to a reasonable value, so that we at least get an
error message which can easily be traced back to the behavior we're seeing.

Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our
current metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job
restarts, is there something we can do about this such as setting
-XX:MaxMetaspaceSize?
- Any other tips to improve reliability running in resource constrained
environments such as Kubernetes?

Thanks,

William


Re: Flink Zookeeper HA: FileNotFoundException blob - Jobmanager not starting up

2019-07-23 Thread Richard Deurwaarder
Hi Fabian,

I followed the advice of another flink user who mailed me directly, he has
the same problem and told me to use something like: rmr zgrep
/flink/hunch/jobgraphs/1dccee15d84e1d2cededf89758ac2482
which allowed us to start the job again.

It might be nice to investigate what went wrong as it didn't feel good to
have our production clustered crippled like this.

Richard

On Tue, Jul 23, 2019 at 12:47 PM Fabian Hueske  wrote:

> Hi Richard,
>
> I hope you could resolve the problem in the meantime.
>
> Nonetheless, maybe Till (in CC) has an idea what could have gone wrong.
>
> Best, Fabian
>
> Am Mi., 17. Juli 2019 um 19:50 Uhr schrieb Richard Deurwaarder <
> rich...@xeli.eu>:
>
>> Hello,
>>
>> I've got a problem with our flink cluster where the jobmanager is not
>> starting up anymore, because it tries to download non existant (blob) file
>> from the zookeeper storage dir.
>>
>> We're running flink 1.8.0 on a kubernetes cluster and use the google
>> storage connector [1] to store checkpoints, savepoints and zookeeper data.
>>
>> When I noticed the jobmanager was having problems, it was in a crashloop
>> throwing file not found exceptions [2]
>> Caused by: java.io.FileNotFoundException: Item not found:
>> some-project-flink-state/recovery/hunch/blob/job_e6ad857af7f09b56594e95fe273e9eff/blob_p-486d68fa98fa05665f341d79302c40566b81034e-306d493f5aa810b5f4f7d8d63f5b18b5.
>> If you enabled STRICT generation consistency, it is possible that the live
>> version is still available but the intended generation is deleted.
>>
>> I looked in the blob directory and I can only find:
>> /recovery/hunch/blob/job_1dccee15d84e1d2cededf89758ac2482 I've tried to
>> fiddle around in zookeeper to see if I could find anything [3], but I do
>> not really know what to look for.
>>
>> How could this have happened and how should I recover the job from this
>> situation?
>>
>> Thanks,
>>
>> Richard
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/connectors.html#using-hadoop-file-system-implementations
>> [2] https://gist.github.com/Xeli/0321031655e47006f00d38fc4bc08e16
>> [3] https://gist.github.com/Xeli/04f6d861c5478071521ac6d2c582832a
>>
>


Re: Flink and CDC

2019-07-23 Thread Flavio Pompermaier
Anyone else having experience on this topic that could provide additional
feedback here?

On Thu, Jul 18, 2019 at 1:18 PM Flavio Pompermaier 
wrote:

> I think that using Kafka to get CDC events is fine. The problem, in my
> case, is really about how to proceed:
>  1) do I need to create Flink tables before reading CDC events or is there
> a way to automatically creating Flink tables when they gets created via a
> DDL event (assuming a filter on the name of the tables?
>  2) How to handle changes in the table structure (adding or removing
> columns)...? Is Flink able to react to this?
>  3) CSC is a common use case (IMHO) and it's perfect for migrating or test
> to an event driven architecture. So I expect Flink to be able to easily
> allow to query Dynamic tables coming from a db (via Debezium) without
> implementing the logic to handle insert/delete/update statements
>
> What do you think?
>
> Il Gio 18 Lug 2019, 13:17 miki haiat  ha scritto:
>
>> I actually thinking   about this option as well .
>> Im assuming that the correct way to implement it ,  is to integrate
>> debezium embedded   to source function ?
>>
>>
>>
>> [1] https://github.com/debezium/debezium/tree/master/debezium-embedded
>>
>>
>> On Wed, Jul 17, 2019 at 7:08 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hi to all,
>>> I'd like to know whether it exists or not an example about how to
>>> leverage Debezium as a CDC source and to feed a Flink Table (From MySQL for
>>> example).
>>>
>>> Best,
>>> Flavio
>>>
>>


Re: AsyncDataStream on key of KeyedStream

2019-07-23 Thread Flavio Pompermaier
The problem of bundling all records together within a window is that this
solution doesn't scale (in the case of large time windows and number of
events)..my requirement could be fulfilled by a keyed ProcessFunction but I
think AsyncDataStream should provide a first-class support to keyed streams
(and thus perform a single call per key and window..). What do you think?

On Tue, Jul 23, 2019 at 12:56 PM Fabian Hueske  wrote:

> Hi Flavio,
>
> Not sure I understood the requirements correctly.
> Couldn't you just collect and bundle all records with a regular window
> operator and forward one record for each key-window to an AsyncIO operator?
>
> Best, Fabian
>
> Am Do., 18. Juli 2019 um 12:20 Uhr schrieb Flavio Pompermaier <
> pomperma...@okkam.it>:
>
>> Hi to all,
>> I'm trying to exploit async IO in my Flink job.
>> In my use case I use keyed tumbling windows and I'd like to execute the
>> async action only once per key and window (while
>> the AsyncDataStream.unorderedWait execute the async call for every element
>> of my stream) ..is there an easy way to do that apart from using a process
>> function (that basically will lose the asynchronicity)?
>>
>> Best,
>> Flavio
>>
>


Re:Re: MiniClusterResource class not found using AbstractTestBase

2019-07-23 Thread Haibo Sun
Hi,  Juan  


It is dependent on "flink-runtime-*-tests.jar", so build.sbt should be modified 
as follows: 


scalaVersion := "2.11.0"


val flinkVersion = "1.8.1"


libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test,
  "org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier "tests"
)


Best,
Haibo

At 2019-07-23 17:51:23, "Fabian Hueske"  wrote:

Hi Juan,


Which Flink version do you use?


Best, Fabian



Am Di., 23. Juli 2019 um 06:49 Uhr schrieb Juan Rodríguez Hortalá 
:

Hi,



I'm trying to use AbstractTestBase in a test in order to use the mini cluster. 
I'm using specs2 with Scala, so I cannot extend AbstractTestBase because I also 
have to extend org.specs2.Specification, so I'm trying to access the mini 
cluster directly using Specs2 BeforeAll to initialize it as follows



private val miniClusterResource = AbstractTestBase.miniClusterResource
miniClusterResource.before()


The problem is that the code doesn't even compile, because it fails to locate 
`org.apache.flink.runtime.testutils.MiniClusterResource`


```

[warn] Class org.apache.flink.runtime.testutils.MiniClusterResource not found - 
continuing with a stub.
[warn] Class 
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration not found - 
continuing with a stub.
[error] Class org.apache.flink.runtime.testutils.MiniClusterResource not found 
- continuing with a stub.
[warn] two warnings found
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 1 s, completed Jul 22, 2019 9:38:49 PM
```



I'm importing the following libraries in build.sbt


"org.apache.flink" %% "flink-test-utils"  % flinkVersion,
"org.apache.flink" %% "flink-runtime"  % flinkVersion


Am I missing some additional library?


Thanks,



Juan


Re: Use batch and stream environment in a single pipeline

2019-07-23 Thread Fabian Hueske
Hi,

Right now it is not possible to mix batch and streaming environments in a
job.
You would need to implement the batch logic via the streaming API which is
not always straightforward.

However, the Flink community is spending a lot of effort on unifying batch
and stream processing. So this will be much easier in a future version of
Flink.

Best, Fabian

Am Mo., 22. Juli 2019 um 17:04 Uhr schrieb Andres Angel <
ingenieroandresan...@gmail.com>:

> Hello everyone,
>
> I need to create a table from a stream environment and thinking in  a pure
> SQL approach I was wondering if I can create few of the enrichment tables
> in batch environment and only the streaming payload as streaming table
> environment.
>
> I tried to create a batch table environment with a streaming environment
> but it broke up, then I  dont know if within the same flow can I have two
> different environments one as batch and other as streaming???
>
> thanks so much
> AU
>


Re: [Table API] ClassCastException when converting a table to DataStream

2019-07-23 Thread Fabian Hueske
Hi Dongwon,

regarding the question about the conversion: If you keep using the Row type
and not adding/removing fields, the conversion is pretty much for free
right now.
It will be a MapFunction (sometimes even not function at all) that should
be chained with the other operators. Hence, it should boil down to a
function call.

Best, Fabian

Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <
eastcirc...@gmail.com>:

> Hi Rong,
>
> I have to dig deeper into the code to reproduce this error. This seems to
>> be a bug to me and will update once I find anything.
>
> Thanks a lot for spending your time on this.
>
> However from what you explained, if I understand correctly you can do all
>> of your processing within the TableAPI scope without converting it back and
>> forth to DataStream.
>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>> function that's simple enough, you can implement and connect with the table
>> API via UserDefinedFunction[1].
>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>> cleaner implementation from my perspective.
>
> I also agree with you in that the first class citizen Table API will make
> everything not only easier but also a lot cleaner.
> We however contain some corner cases that force us to covert Table from
> and to DataStream.
> One such case is to append to Table a column showing the current watermark
> of each record; there's no other way but to do that as ScalarFunction
> doesn't allow us to get the runtime context information as ProcessFunction
> does.
>
> I have a question regarding the conversion.
> Do I have to worry about runtime performance penalty in case that I cannot
> help but convert back and fourth to DataStream?
>
> Best,
>
> Dongwon
>
> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong  wrote:
>
>> Hi Dongwon,
>>
>> I have to dig deeper into the code to reproduce this error. This seems to
>> be a bug to me and will update once I find anything.
>>
>> However from what you explained, if I understand correctly you can do all
>> of your processing within the TableAPI scope without converting it back and
>> forth to DataStream.
>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>> function that's simple enough, you can implement and connect with the table
>> API via UserDefinedFunction[1].
>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>> cleaner implementation from my perspective.
>>
>> --
>> Rong
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
>> [3]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
>> [4]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>>
>>
>> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim 
>> wrote:
>>
>>> Hi Rong,
>>>
>>> Thank you for reply :-)
>>>
>>> which Flink version are you using?
>>>
>>> I'm using Flink-1.8.0.
>>>
>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>
>>> Row(time1: TimeIndicatorTypeInfo(rowtime))
>>>
>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>
>>> *".map(a->a)"* is just to illustrate a problem.
>>> My actual code contains a process function (instead of .map() in the
>>> snippet) which appends a new field containing watermark to a row.
>>> If there were ways to get watermark inside a scalar UDF, I wouldn't
>>> convert table to datastream and vice versa.
>>>
>>> if I am understanding correctly, you are also using "time1" as the
 rowtime, is that want your intension is to use it later as well?
>>>
>>> yup :-)
>>>
>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
 adds a type information hint about the return type of this operator. It is
 used in cases where Flink cannot determine automatically[1].
>>>
>>> The reason why I specify
>>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
>>> information hint as you said.
>>> That is needed later when I need to make another table like
>>>"*Table anotherTable = tEnv.fromDataStream(stream);"*,
>>> Without the type information hint, I've got an error
>>>"*An input of GenericTypeInfo cannot be converted to Table.
>>> Please specify the type of the input with a RowTypeInfo."*
>>> That's why I give a type information hint in that way.
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong  wrote:
>>>
 Hi Dongwon,

 Can you provide a bit more information:
 which Flink version are you using?
 what is the "sourceTable.getSchema().toRowType()" return?
 what is the line *".map(a -> a)" *do and can you remove it?
 if I am understanding correctly, you are also using "time1" as the
 rowtime, is 

Re: Flink SinkFunction for WebSockets

2019-07-23 Thread Fabian Hueske
Hi Tim,

One thing that might be interesting is that Flink might emit results more
than once when a job recovers from a failure.
It is up to the receiver to deal with that.
Depending on the type of results this might be easy (idempotent updates) or
impossible.

Best, Fabian



Am Fr., 19. Juli 2019 um 00:23 Uhr schrieb Timothy Victor :

> Hi
>
> I'm looking to write a sink function for writing to websockets, in
> particular ones that speak the WAMP protocol (
> https://wamp-proto.org/index.html).
>
> Before going down that path, I wanted to ask if
>
> a) anyone has done something like that already so I dont reinvent stuff
>
> b) any caveats or warnings before I try this.
>
> Any advise would be appreciated.
>
> Thanks
>
> Tim
>


Re: From Kafka Stream to Flink

2019-07-23 Thread Maatary Okouya
I would like to have a KTable, or maybe in Flink term a dynamic Table, that
only contains the latest value for each keyed record. This would allow me
to perform aggregation and join, based on the latest state of every record,
as opposed to every record over time, or a period of time.

On Sun, Jul 21, 2019 at 8:21 AM miki haiat  wrote:

> Can you elaborate more  about your use case .
>
>
> On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya 
> wrote:
>
>> Hi,
>>
>> I am a user of Kafka Stream so far. However, because i have been face
>> with several limitation in particular in performing Join on KTable.
>>
>> I was wondering what is the appraoch in Flink to achieve  (1) the concept
>> of KTable, i.e. a Table that represent a changeLog, i.e. only the latest
>> version of all keyed records,  and (2) joining those.
>>
>> There are currently a lot of limitation around that on Kafka Stream, and
>> i need that for performing some ETL process, where i need to mirror entire
>> databases in Kafka, and then do some join on the table to emit the logical
>> entity in Kafka Topics. I was hoping that somehow i could acheive that by
>> using FLink as intermediary.
>>
>> I can see that you support any kind of join, but i just don't see the
>> notion of Ktable.
>>
>>
>>


Re: AsyncDataStream on key of KeyedStream

2019-07-23 Thread Fabian Hueske
Hi Flavio,

Not sure I understood the requirements correctly.
Couldn't you just collect and bundle all records with a regular window
operator and forward one record for each key-window to an AsyncIO operator?

Best, Fabian

Am Do., 18. Juli 2019 um 12:20 Uhr schrieb Flavio Pompermaier <
pomperma...@okkam.it>:

> Hi to all,
> I'm trying to exploit async IO in my Flink job.
> In my use case I use keyed tumbling windows and I'd like to execute the
> async action only once per key and window (while
> the AsyncDataStream.unorderedWait execute the async call for every element
> of my stream) ..is there an easy way to do that apart from using a process
> function (that basically will lose the asynchronicity)?
>
> Best,
> Flavio
>


Re: [SURVEY] How many people implement Flink job based on the interface Program?

2019-07-23 Thread Flavio Pompermaier
I agree but you have to know in which jar a job is contained..when you
upload the jar on our application you immediately know the qualified name
of the job class and in which jar it belongs to. I think that when you
upload a jar on Flink, Flink should list all available jobs inside it
(IMHO)..it  could be a single main class (as it is now) or multiple classes
(IMHO)

On Tue, Jul 23, 2019 at 12:13 PM Jeff Zhang  wrote:

> IIUC the list of jobs contained in jar means the jobs you defined in the
> pipeline. Then I don't think it is flink's responsibility to maintain the
> job list info, it is the job scheduler that define the pipeline. So the job
> scheduler should maintain the job list.
>
>
>
> Flavio Pompermaier  于2019年7月23日周二 下午5:23写道:
>
>> The jobs are somehow related to each other in the sense that we have a
>> configurable pipeline where there are optional steps you can enable/disable
>> (and thus we create a single big jar).
>> Because of this, we have our application REST service that actually works
>> also as a job scheduler and use the job server as a proxy towards Flink:
>> when one steps ends (this is what is signalled back after the env.execute()
>> from Flink to the application REST service) our application tells the job
>> server to execute the next job of the pipeline on the cluster.
>> Of course this is a "dirty" solution (because we should user a workflow
>> scheduler like Airflow or Luigi or similar) but we wanted to keep things as
>> simplest as possible for the moment.
>> In the future, if our customers would ever improve this part, we will
>> integrate our application with a dedicated job scheduler like the one
>> listed before (probably)..I don't know if some of them are nowadays already
>> integrated with Flink..when we started coding our frontend application (2
>> ears ago) none of them were using it.
>>
>> Best,
>> Flavio
>>
>> On Tue, Jul 23, 2019 at 10:40 AM Jeff Zhang  wrote:
>>
>>> Thanks Flavio,
>>>
>>> I get most of your points except one
>>>
>>>- Get the list of jobs contained in jar (ideally this is is true for
>>>every engine beyond Spark or Flink)
>>>
>>> Just curious to know how you submit job via rest api, if there're
>>> multiple jobs in one jar, then do you need to submit jar one time and
>>> submit jobs multiple times ?
>>> And is there any relationship between these jobs in the same jar ?
>>>
>>>
>>>
>>> Flavio Pompermaier  于2019年7月23日周二 下午4:01写道:
>>>
 Hi Jeff, the thing about the manifest is really about to have a way to
 list multiple main classes in the jart (without the need to inspect every
 Java class or forcing a 1-to-1 between jar and job like it is now).
 My requirements were driven by the UI we're using in our framework:

- Get the list of jobs contained in jar (ideally this is is true
for every engine beyond Spark or Flink)
- Get the list of required/optional parameters for each job
- Besides the optionality of a parameter, each parameter should
include an help description, a type (to validate the input param), a
default value and a set of choices (when there's a limited number of
options available)
- obviously the job serve should be able to
submit/run/cancel/monitor a job and upload/delete the uploaded jars
- the job server should not depend on any target platform
dependency (Spark or Flink) beyond the rest client: at the moment the 
 rest
client requires a lot of core libs (indeed because it needs to submit 
 the
job graph/plan)
- in our vision, the flink client should be something like Apache
Livy (https://livy.apache.org/)
- One of the biggest  limitations we face when running a Flink job
from the REST API is the fact that the job can't do anything after
env.execute() while we need to call an external service to signal that 
 the
job has ended + some other details

 Best,
 Flavio

 On Tue, Jul 23, 2019 at 3:44 AM Jeff Zhang  wrote:

> Hi Flavio,
>
> Based on the discussion in the tickets you mentioned above, the
> program-class attribute was a mistake and community is intended to use
> main-class to replace it.
>
> Deprecating Program interface is a part of work of flink new client
> api.
> IIUC, your requirements are not so complicated. We can implement that
> in the new flink client api. How about listing your requirement, and let's
> discuss how we can make it in the new flink client api. BTW, I guess most
> of your requirements are based on your flink job server, It would be
> helpful if you could provide more info about your flink job server. Thanks
>
>
>
> Flavio Pompermaier  于2019年7月22日周一 下午8:59写道:
>
>> Hi Tison,
>> we use a modified version of the Program interface to enable a web UI
>> do properly detect and run Flink jobs contained in a jar + 

Re: Flink Zookeeper HA: FileNotFoundException blob - Jobmanager not starting up

2019-07-23 Thread Fabian Hueske
Hi Richard,

I hope you could resolve the problem in the meantime.

Nonetheless, maybe Till (in CC) has an idea what could have gone wrong.

Best, Fabian

Am Mi., 17. Juli 2019 um 19:50 Uhr schrieb Richard Deurwaarder <
rich...@xeli.eu>:

> Hello,
>
> I've got a problem with our flink cluster where the jobmanager is not
> starting up anymore, because it tries to download non existant (blob) file
> from the zookeeper storage dir.
>
> We're running flink 1.8.0 on a kubernetes cluster and use the google
> storage connector [1] to store checkpoints, savepoints and zookeeper data.
>
> When I noticed the jobmanager was having problems, it was in a crashloop
> throwing file not found exceptions [2]
> Caused by: java.io.FileNotFoundException: Item not found:
> some-project-flink-state/recovery/hunch/blob/job_e6ad857af7f09b56594e95fe273e9eff/blob_p-486d68fa98fa05665f341d79302c40566b81034e-306d493f5aa810b5f4f7d8d63f5b18b5.
> If you enabled STRICT generation consistency, it is possible that the live
> version is still available but the intended generation is deleted.
>
> I looked in the blob directory and I can only find:
> /recovery/hunch/blob/job_1dccee15d84e1d2cededf89758ac2482 I've tried to
> fiddle around in zookeeper to see if I could find anything [3], but I do
> not really know what to look for.
>
> How could this have happened and how should I recover the job from this
> situation?
>
> Thanks,
>
> Richard
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/connectors.html#using-hadoop-file-system-implementations
> [2] https://gist.github.com/Xeli/0321031655e47006f00d38fc4bc08e16
> [3] https://gist.github.com/Xeli/04f6d861c5478071521ac6d2c582832a
>


Re: Does Flink support raw generic types in a merged stream?

2019-07-23 Thread Fabian Hueske
Hi John,

You could implement your own n-ary Either type.
It's a bit of work because you'd need also a custom TypeInfo & Serializer
but rather straightforward if you follow the implementation of Either.

Best,
Fabian

Am Mi., 17. Juli 2019 um 16:28 Uhr schrieb John Tipper <
john_tip...@hotmail.com>:

> Hi Chesnay,
>
> Yes, but the actual use case needs to support more than 2 streams, so if I
> go down the Either route then I have arbitrarily sized nested Eithers, i.e.
> Either, C> etc, which gets pretty messy very quickly.
>
> Many thanks,
>
> John
>
> Sent from my iPhone
>
> On 17 Jul 2019, at 13:29, Chesnay Schepler  wrote:
>
> Have you looked at org.apache.flink.types.Either? If you'd wrap all
> elements in both streams before the union you should be able to join them
> properly.
>
> On 17/07/2019 14:18, John Tipper wrote:
>
> Hi All,
>
> Can I union/join 2 streams containing generic classes, where each stream
> has a different parameterised type? I'd like to process the combined stream
> of values as a single raw type, casting to a specific type for detailed
> processing, based on some information in the type that will allow me to
> safely cast to the specific type.
>
> I can't share my exact code, but the below example shows the sort of thing
> I want to do.
>
> So, as an example, given the following generic type:
>
> class MyGenericContainer extends Tuple3 {
> ...
> private final String myString;
> private final IN value;
> private final Class clazz; // created by constructor
> private SomeOtherClass someOtherClass;
> ...
> }
>
> and 2 streams, I'd like to be able to do something like:
>
> DataStream> stream1 = ...
> DataStream> stream2 = ...
> DataStream<...> merged = stream1.union(stream2).process(new 
> MyProcessFunction());
> // within an operator, such as a MyProcessFunction:
> MyGenericContainer container = raw generic container passed to function;
> Object rawValue = container.getValue();
> performProcessing((container.getClazz())rawValue); // safely cast rawValue
>
> However, I get an error when I do this:
>
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type 
> of TypeVariable 'IN' in 'class com.example.MyGenericTuple3' could not be 
> determined. This is most likely a type erasure problem. The type extraction 
> currently supports types with generic variables only in cases where all 
> variables in the return type can be deduced from the input type(s). Otherwise 
> the type has to be specified explicitly using type information.
> at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1133)
> at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:853)
> at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
> at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587)
> at 
> org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:633)
>
> If I try to add a returns() to the code, like this:
>
> DataStream<...> merged = stream1.union(stream2)
> .process(...)
> .returns(new TypeHint() {})
>
> then I get a different exception:
>
> Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: The 
> TypeHint is using a generic variable.This is not supported, generic types 
> must be fully specified for the TypeHint.
>
> Is this sort of thing supported or is there another way of joining
> multiple streams into a single stream, where each stream object will have a
> specific type of a common generic type?
>
>
> Many thanks,
>
> John
>
>
>


Re: Union of streams performance issue (10x)

2019-07-23 Thread Fabian Hueske
Hi Peter,

The performance drops probably be due to de/serialization.
When tasks are chained, records are simply forwarded as Java objects via
method calls.
When a task chain in broken into multiple operators, the records (Java
objects) are serialized by the sending task, possibly shipped over the
network, and deserialized by the receiving task.
Depending on the logic of the tasks, this can cause a performance drop.

Two tasks can only be chained, if both have the same parallelism and sender
tasks sends to a single task and the receiver receives from a single task.
The union receives from two tasks which cuts the chain.

Best, Fabian

Am Sa., 13. Juli 2019 um 15:04 Uhr schrieb Peter Zende <
peter.ze...@gmail.com>:

> Hi all
>
> We have a pipeline (runs on YARN, Flink v1.7.1) which consumes a union of
> Kafka and
> HDFS sources. We remarked that the throughput is 10 times higher if only
> one of these sources is consumed.  While trying to identify the problem I
> implemented a no-op source which was unioned with one of the real sources:
>
>   class NoOpSourceFunction extends ParallelSourceFunction[GenericRecord] {
>
> override def run(ctx: SourceContext[GenericRecord]): Unit = {}
> override def cancel(): Unit = {}
>   }
>
>   mainStream.union(env.addSource(new NoOpSourceFunction()))
>
> I remarked that whenever I use a union with any sources like above or
> union the stream with itself, I get the same performance hit.
> When I compare the job graph on the Flink UI the only difference is that
> in case of a union the two sources aren't chained to the subsequent
> downstream operators (transformation steps), both are connected to them
> with  ship_strategy: FORWARD.
> When only one source is present, that one is chained to the transformation
> steps.
>
> To avoid union (and/or forward partitioning) I tried to connect streams
> with CoFlatMapFunction to get the same result but without any gain in
> performance. I was thinking about to read the HDFS stream parallel and use
> Iterate function to feed it back to a previous operator.
>
> After a couple of trial and error I'd like ask for your advice. What is
> the best practice here?  Which options / tools are there to analyze the
> execution plan apart from the Flink plan visualizer and the provided web UI?
>
> Thanks
> Peter
>
>


Re: [SURVEY] How many people implement Flink job based on the interface Program?

2019-07-23 Thread Jeff Zhang
IIUC the list of jobs contained in jar means the jobs you defined in the
pipeline. Then I don't think it is flink's responsibility to maintain the
job list info, it is the job scheduler that define the pipeline. So the job
scheduler should maintain the job list.



Flavio Pompermaier  于2019年7月23日周二 下午5:23写道:

> The jobs are somehow related to each other in the sense that we have a
> configurable pipeline where there are optional steps you can enable/disable
> (and thus we create a single big jar).
> Because of this, we have our application REST service that actually works
> also as a job scheduler and use the job server as a proxy towards Flink:
> when one steps ends (this is what is signalled back after the env.execute()
> from Flink to the application REST service) our application tells the job
> server to execute the next job of the pipeline on the cluster.
> Of course this is a "dirty" solution (because we should user a workflow
> scheduler like Airflow or Luigi or similar) but we wanted to keep things as
> simplest as possible for the moment.
> In the future, if our customers would ever improve this part, we will
> integrate our application with a dedicated job scheduler like the one
> listed before (probably)..I don't know if some of them are nowadays already
> integrated with Flink..when we started coding our frontend application (2
> ears ago) none of them were using it.
>
> Best,
> Flavio
>
> On Tue, Jul 23, 2019 at 10:40 AM Jeff Zhang  wrote:
>
>> Thanks Flavio,
>>
>> I get most of your points except one
>>
>>- Get the list of jobs contained in jar (ideally this is is true for
>>every engine beyond Spark or Flink)
>>
>> Just curious to know how you submit job via rest api, if there're
>> multiple jobs in one jar, then do you need to submit jar one time and
>> submit jobs multiple times ?
>> And is there any relationship between these jobs in the same jar ?
>>
>>
>>
>> Flavio Pompermaier  于2019年7月23日周二 下午4:01写道:
>>
>>> Hi Jeff, the thing about the manifest is really about to have a way to
>>> list multiple main classes in the jart (without the need to inspect every
>>> Java class or forcing a 1-to-1 between jar and job like it is now).
>>> My requirements were driven by the UI we're using in our framework:
>>>
>>>- Get the list of jobs contained in jar (ideally this is is true for
>>>every engine beyond Spark or Flink)
>>>- Get the list of required/optional parameters for each job
>>>- Besides the optionality of a parameter, each parameter should
>>>include an help description, a type (to validate the input param), a
>>>default value and a set of choices (when there's a limited number of
>>>options available)
>>>- obviously the job serve should be able to
>>>submit/run/cancel/monitor a job and upload/delete the uploaded jars
>>>- the job server should not depend on any target platform dependency
>>>(Spark or Flink) beyond the rest client: at the moment the rest client
>>>requires a lot of core libs (indeed because it needs to submit the job
>>>graph/plan)
>>>- in our vision, the flink client should be something like Apache
>>>Livy (https://livy.apache.org/)
>>>- One of the biggest  limitations we face when running a Flink job
>>>from the REST API is the fact that the job can't do anything after
>>>env.execute() while we need to call an external service to signal that 
>>> the
>>>job has ended + some other details
>>>
>>> Best,
>>> Flavio
>>>
>>> On Tue, Jul 23, 2019 at 3:44 AM Jeff Zhang  wrote:
>>>
 Hi Flavio,

 Based on the discussion in the tickets you mentioned above, the
 program-class attribute was a mistake and community is intended to use
 main-class to replace it.

 Deprecating Program interface is a part of work of flink new client
 api.
 IIUC, your requirements are not so complicated. We can implement that
 in the new flink client api. How about listing your requirement, and let's
 discuss how we can make it in the new flink client api. BTW, I guess most
 of your requirements are based on your flink job server, It would be
 helpful if you could provide more info about your flink job server. Thanks



 Flavio Pompermaier  于2019年7月22日周一 下午8:59写道:

> Hi Tison,
> we use a modified version of the Program interface to enable a web UI
> do properly detect and run Flink jobs contained in a jar + their 
> parameters.
> As stated in [1], we dected multiple Main classes per jar by handling
> an extra comma-separeted Manifest entry (i.e. 'Main-classes').
>
> As mentioned on the discussion on the dev ML, our revised Program
> interface looks like this:
>
> public interface FlinkJob {
>   String getDescription();
>   List getParameters();
>   boolean isStreamingOrBatch();
> }
>
> public class FlinkJobParameter {
>   private String paramName;
>   private String paramType = 

Re: timeout exception when consuming from kafka

2019-07-23 Thread Fabian Hueske
Hi Yitzchak,

Thanks for reaching out.
I'm not an expert on the Kafka consumer, but I think the number of
partitions and the number of source tasks might be interesting to know.

Maybe Gordon (in CC) has an idea of what's going wrong here.

Best, Fabian

Am Di., 23. Juli 2019 um 08:50 Uhr schrieb Yitzchak Lieberman <
yitzch...@sentinelone.com>:

> Hi.
>
> Another question - what will happen during a triggered checkpoint if one
> of the kafka brokers is unavailable?
> Will appreciate your insights.
>
> Thanks.
>
> On Mon, Jul 22, 2019 at 12:42 PM Yitzchak Lieberman <
> yitzch...@sentinelone.com> wrote:
>
>> Hi.
>>
>> I'm running a Flink application (version 1.8.0) that
>> uses FlinkKafkaConsumer to fetch topic data and perform transformation on
>> the data, with state backend as below:
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.enableCheckpointing(5_000, CheckpointingMode.AT_LEAST_ONCE);
>> env.setStateBackend((StateBackend) new
>> FsStateBackend("file:///test"));
>> env.getCheckpointConfig().setCheckpointTimeout(30_000);
>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
>>
>> My problem is with the kafka brokers, where in the cluster there are 3
>> operating brokers and 2 are down - total 5 brokers.
>> I was able to consume the data, but when the checkpoint triggered it
>> throws this exception:
>>
>> [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18]
>> o.a.f.r.c.CheckpointCoordinator - Decline checkpoint 6 by task
>> 457b1f801fee89d6f9544409877e29d8 of job 1c46aa5719bac1f0bea436d460b79db1.
>> [INFO ] 2019-07-22 12:29:14.636 [flink-akka.actor.default-dispatcher-28]
>> o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution
>> state FAILED to JobManager for task Source: Custom Source -> Sink: Print to
>> Std. Out 457b1f801fee89d6f9544409877e29d8.
>> [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18]
>> o.a.f.r.c.CheckpointCoordinator - Discarding checkpoint 6 of job
>> 1c46aa5719bac1f0bea436d460b79db1.
>> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
>> Task Source: Custom Source -> Sink: Print to Std. Out (2/4) was not running
>> at
>> org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1198)
>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:700)
>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) ~[?:?]
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_201]
>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274)
>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189)
>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:502)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.actor.Actor$class.aroundReceive(Actor.scala)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [scala-library-2.11.12.jar:?]
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [scala-library-2.11.12.jar:?]
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [scala-library-2.11.12.jar:?]
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [scala-library-2.11.12.jar:?]
>> [INFO ] 2019-07-22 12:29:14.637 [flink-akka.actor.default-dispatcher-28]
>> o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out
>> (2/4) (457b1f801fee89d6f9544409877e29d8) switched from RUNNING to FAILED.
>> 

Re: MiniClusterResource class not found using AbstractTestBase

2019-07-23 Thread Fabian Hueske
Hi Juan,

Which Flink version do you use?

Best, Fabian

Am Di., 23. Juli 2019 um 06:49 Uhr schrieb Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com>:

> Hi,
>
> I'm trying to use AbstractTestBase in a test in order to use the mini
> cluster. I'm using specs2 with Scala, so I cannot extend AbstractTestBase
> because I also have to extend org.specs2.Specification, so I'm trying to
> access the mini cluster directly using Specs2 BeforeAll to initialize it as
> follows
>
> private val miniClusterResource = AbstractTestBase.miniClusterResource
> miniClusterResource.before()
>
>
> The problem is that the code doesn't even compile, because it fails to
> locate `org.apache.flink.runtime.testutils.MiniClusterResource`
>
> ```
> [warn] Class org.apache.flink.runtime.testutils.MiniClusterResource not
> found - continuing with a stub.
> [warn] Class
> org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration not
> found - continuing with a stub.
> [error] Class org.apache.flink.runtime.testutils.MiniClusterResource not
> found - continuing with a stub.
> [warn] two warnings found
> [error] one error found
> [error] (Compile / compileIncremental) Compilation failed
> [error] Total time: 1 s, completed Jul 22, 2019 9:38:49 PM
> ```
>
> I'm importing the following libraries in build.sbt
>
> "org.apache.flink" %% "flink-test-utils"  % flinkVersion,
> "org.apache.flink" %% "flink-runtime"  % flinkVersion
>
>
> Am I missing some additional library?
>
> Thanks,
>
> Juan
>


Re: GroupBy result delay

2019-07-23 Thread Fabian Hueske
Hi Fanbin,

The delay is most likely caused by the watermark delay.
A window is computed when the watermark passes the end of the window. If
you configured the watermark to be 10 minutes before the current max
timestamp (probably to account for out of order data), then the window will
be computed with approx. 10 minute delay.

Best, Fabian

Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu :

> Hi,
> I have a Flink sql streaming job defined by:
>
> SELECT
>   user_id
>   , hop_end(created_at, interval '30' second, interval '1' minute) as 
> bucket_ts
>   , count(name) as count
> FROM event
> WHERE name = 'signin'
> GROUP BY
>   user_id
>   , hop(created_at, interval '30' second, interval '1' minute)
>
>
> there is a noticeably delay of the groupBy operator. For example, I only
> see the record sent out 10 min later after the record received in. see the
> attached pic.
>
> [image: image.png]
>
> I m expecting to see the group by result after one minute since the
> sliding window size is 1 min and the slide is 30 sec.
>
> There is no such issue if I run the job locally in IntelliJ. However, I
> ran into the above issue if I run the job on EMR (flink version = 1.7)
>
> Can anybody give a clue of what could be wrong?
> Thanks,
>
> Fanbin
>


Re: [SURVEY] How many people implement Flink job based on the interface Program?

2019-07-23 Thread Flavio Pompermaier
The jobs are somehow related to each other in the sense that we have a
configurable pipeline where there are optional steps you can enable/disable
(and thus we create a single big jar).
Because of this, we have our application REST service that actually works
also as a job scheduler and use the job server as a proxy towards Flink:
when one steps ends (this is what is signalled back after the env.execute()
from Flink to the application REST service) our application tells the job
server to execute the next job of the pipeline on the cluster.
Of course this is a "dirty" solution (because we should user a workflow
scheduler like Airflow or Luigi or similar) but we wanted to keep things as
simplest as possible for the moment.
In the future, if our customers would ever improve this part, we will
integrate our application with a dedicated job scheduler like the one
listed before (probably)..I don't know if some of them are nowadays already
integrated with Flink..when we started coding our frontend application (2
ears ago) none of them were using it.

Best,
Flavio

On Tue, Jul 23, 2019 at 10:40 AM Jeff Zhang  wrote:

> Thanks Flavio,
>
> I get most of your points except one
>
>- Get the list of jobs contained in jar (ideally this is is true for
>every engine beyond Spark or Flink)
>
> Just curious to know how you submit job via rest api, if there're multiple
> jobs in one jar, then do you need to submit jar one time and submit jobs
> multiple times ?
> And is there any relationship between these jobs in the same jar ?
>
>
>
> Flavio Pompermaier  于2019年7月23日周二 下午4:01写道:
>
>> Hi Jeff, the thing about the manifest is really about to have a way to
>> list multiple main classes in the jart (without the need to inspect every
>> Java class or forcing a 1-to-1 between jar and job like it is now).
>> My requirements were driven by the UI we're using in our framework:
>>
>>- Get the list of jobs contained in jar (ideally this is is true for
>>every engine beyond Spark or Flink)
>>- Get the list of required/optional parameters for each job
>>- Besides the optionality of a parameter, each parameter should
>>include an help description, a type (to validate the input param), a
>>default value and a set of choices (when there's a limited number of
>>options available)
>>- obviously the job serve should be able to submit/run/cancel/monitor
>>a job and upload/delete the uploaded jars
>>- the job server should not depend on any target platform dependency
>>(Spark or Flink) beyond the rest client: at the moment the rest client
>>requires a lot of core libs (indeed because it needs to submit the job
>>graph/plan)
>>- in our vision, the flink client should be something like Apache
>>Livy (https://livy.apache.org/)
>>- One of the biggest  limitations we face when running a Flink job
>>from the REST API is the fact that the job can't do anything after
>>env.execute() while we need to call an external service to signal that the
>>job has ended + some other details
>>
>> Best,
>> Flavio
>>
>> On Tue, Jul 23, 2019 at 3:44 AM Jeff Zhang  wrote:
>>
>>> Hi Flavio,
>>>
>>> Based on the discussion in the tickets you mentioned above, the
>>> program-class attribute was a mistake and community is intended to use
>>> main-class to replace it.
>>>
>>> Deprecating Program interface is a part of work of flink new client api.
>>> IIUC, your requirements are not so complicated. We can implement that in
>>> the new flink client api. How about listing your requirement, and let's
>>> discuss how we can make it in the new flink client api. BTW, I guess most
>>> of your requirements are based on your flink job server, It would be
>>> helpful if you could provide more info about your flink job server. Thanks
>>>
>>>
>>>
>>> Flavio Pompermaier  于2019年7月22日周一 下午8:59写道:
>>>
 Hi Tison,
 we use a modified version of the Program interface to enable a web UI
 do properly detect and run Flink jobs contained in a jar + their 
 parameters.
 As stated in [1], we dected multiple Main classes per jar by handling
 an extra comma-separeted Manifest entry (i.e. 'Main-classes').

 As mentioned on the discussion on the dev ML, our revised Program
 interface looks like this:

 public interface FlinkJob {
   String getDescription();
   List getParameters();
   boolean isStreamingOrBatch();
 }

 public class FlinkJobParameter {
   private String paramName;
   private String paramType = "string";
   private String paramDesc;
   private String paramDefaultValue;
   private Set choices;
   private boolean mandatory;
 }

 I've also opened some JIRA issues related to this topic:

 [1] https://issues.apache.org/jira/browse/FLINK-10864
 [2] https://issues.apache.org/jira/browse/FLINK-10862
 [3] https://issues.apache.org/jira/browse/FLINK-10879.

 Best,
 Flavio



Re: [DISCUSS] Create a Flink ecosystem website

2019-07-23 Thread Robert Metzger
Thanks a lot Marta for offering to write a blog post about the community
site!

I'm not sure if multi-language support for the site is a good idea. I see
the packages site as something similar to GitHub or Jira. The page itself
contains very view things we could actually translate. The package owners
usually are only able to provide one or two languages for their package
description.
For the comments, we don't want disjoint discussions to happen.

I've also kicked off a discussion with Apache legal on the initiative [1].
We might not be able to host this at Apache, but let's see where the
discussion goes.


[1]
https://lists.apache.org/thread.html/ee76a02257b51292ab61f6ac8d3d69307e83cc569cdeebde80596207@%3Clegal-discuss.apache.org%3E


On Sat, Jul 20, 2019 at 5:25 AM Becket Qin  wrote:

> [Sorry for the incomplete message. Clicked send by mistake...]
>
> I agree with Marta that it might be good to have multi-language support as
> a mid-term goal.
>
> Jiangjie (Becket) Qin
>
> On Sat, Jul 20, 2019 at 11:22 AM Becket Qin  wrote:
>
>> The website is awesome! I really like its conciseness and yet fairly
>> useful information and functionalities. I cannot think of much to improve
>> at the moment. Just one thought, do we need an "others" category, just in
>> case a package does not fit into any of the current given categories?
>>
>> Thanks Robert and Daryl for the great effort. Looking forward to seeing
>> this get published soon!!
>>
>> I agree with Marta that
>>
>> Jiangjie (Becket) Qin
>>
>> On Sat, Jul 20, 2019 at 1:34 AM Marta Paes Moreira 
>> wrote:
>>
>>> Hey, Robert.
>>>
>>> I will keep an eye on the overall progress and get started on the blog
>>> post
>>> to make the community announcement. Are there (mid-term) plans to
>>> translate/localize this website as well? It might be a point worth
>>> mentioning in the blogpost.
>>>
>>> Hats off to you and Daryl — this turned out amazing!
>>>
>>> Marta
>>>
>>> On Thu, Jul 18, 2019 at 10:57 AM Congxian Qiu 
>>> wrote:
>>>
>>> > Robert and Daryl, thanks for the great work, I tried the website and
>>> filed
>>> > some issues on Github.
>>> > Best,
>>> > Congxian
>>> >
>>> >
>>> > Robert Metzger  于2019年7月17日周三 下午11:28写道:
>>> >
>>> >> Hey all,
>>> >>
>>> >> Daryl and I have great news to share. We are about to finish adding
>>> the
>>> >> basic features to the ecosystem page.
>>> >> We are at a stage where it is ready to be reviewed and made public.
>>> >>
>>> >> You can either check out a development instance of the ecosystem page
>>> >> here: https://flink-ecosystem-demo.flink-resources.org/
>>> >> Or you run it locally, with the instructions from the README.md:
>>> >> https://github.com/sorahn/flink-ecosystem
>>> >>
>>> >> Please report all issues you find here:
>>> >> https://github.com/sorahn/flink-ecosystem/issues or in this thread.
>>> >>
>>> >> The next steps in this project are the following:
>>> >> - We fix all issues reported through this testing
>>> >> - We set up the site on the INFRA resources Becket has secured [1], do
>>> >> some further testing (including email notifications) and pre-fill the
>>> page
>>> >> with some packages.
>>> >> - We set up a packages.flink.apache.org or flink.apache.org/packages
>>> >> domain
>>> >> - We announce the packages through a short blog post
>>> >>
>>> >> Happy testing!
>>> >>
>>> >> Best,
>>> >> Robert
>>> >>
>>> >> [1] https://issues.apache.org/jira/browse/INFRA-18010
>>> >>
>>> >>
>>> >> On Thu, Apr 25, 2019 at 6:23 AM Becket Qin 
>>> wrote:
>>> >>
>>> >>> Thanks for the update, Robert. Looking forward to the website. If
>>> there
>>> >>> is already a list of software we need to run the website, we can ask
>>> Apache
>>> >>> infra team to prepare the VM for us, as that may also take some time.
>>> >>>
>>> >>> On Wed, Apr 24, 2019 at 11:57 PM Robert Metzger >> >
>>> >>> wrote:
>>> >>>
>>>  Hey all,
>>> 
>>>  quick update on this project: The frontend and backend code have
>>> been
>>>  put together into this repository:
>>>  https://github.com/sorahn/flink-ecosystem
>>>  We also just agreed on an API specification, and will now work on
>>>  finishing the backend.
>>> 
>>>  It will probably take a few more weeks for this to finish, but we
>>> are
>>>  making progress :)
>>> 
>>>  Best,
>>>  Robert
>>> 
>>> 
>>>  On Mon, Apr 15, 2019 at 11:18 AM Robert Metzger <
>>> rmetz...@apache.org>
>>>  wrote:
>>> 
>>> > Hey Daryl,
>>> >
>>> > thanks a lot for posting a link to this first prototype on the
>>> mailing
>>> > list! I really like it!
>>> >
>>> > Becket: Our plan forward is that Congxian is implementing the
>>> backend
>>> > for the website. He has already started with the work, but needs
>>> at least
>>> > one more week.
>>> >
>>> >
>>> > [Re-sending this email because the first one was blocked on
>>> dev@f.a.o]
>>> >
>>> >
>>> > On Mon, Apr 15, 2019 at 7:59 AM Becket Qin 
>>> > 

Re: apache flink: Why checkpoint coordinator takes long time to get completion

2019-07-23 Thread Xiangyu Su
Hi Zili,

here is the release notes for 1.8.1
https://flink.apache.org/news/2019/07/02/release-1.8.1.html
But I could not find any ticket related to the "unexpected time-consuming",
I have just tested our application with both versions, this issue is be
able to reproduce every time with version 1.8.0, and it does not happen
with version 1.8.1 until now.

Best regards
Xiangyu

On Tue, 23 Jul 2019 at 08:49, Zili Chen  wrote:

> Hi Xiangyu,
>
> Could you share the corresponding JIRA that fixed this issue?
>
> Best,
> tison.
>
>
> Xiangyu Su  于2019年7月19日周五 下午8:47写道:
>
>> btw. it seems like this issue has been fixed in 1.8.1
>>
>> On Fri, 19 Jul 2019 at 12:21, Xiangyu Su  wrote:
>>
>>> Ok, thanks.
>>>
>>> and this time-consuming until now always happens after 3rd
>>> checkpointing, and this unexpected  time-consuming was always consistent (~
>>> 4 min by under 4G/min incoming traffic).
>>>
>>> On Fri, 19 Jul 2019 at 11:06, Biao Liu  wrote:
>>>
 Hi Xiangyu,

 Just took a glance at the relevant codes. There is a gap between
 calculating the duration and logging it out. I guess the checkpoint 4 is
 finished in 1 minute, but there is an unexpected time-consuming operation
 during that time. But I can't tell which part it is.


 Xiangyu Su  于2019年7月19日周五 下午4:14写道:

> Dear flink community,
>
> We are POC flink(1.8) to process data in real time, and using global
> checkpointing(S3) and local checkpointing(EBS), deploy cluster on EKS. Our
> application is consuming data from Kinesis.
>
> For my test e.g I am using checkpointing interval 5min. and minimum
> pause 2min.
>
> The issue what we saw is: It seems like flink checkpointing process
> would be idle for 3-4 min, before job manager get complete notification.
>
> here is some logging from job manager:
>
> 2019-07-10 11:59:03,893 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
> Triggering checkpoint 4 @ 1562759941082 for job 
> e7a97014f5799458f1c656135712813d.
> 2019-07-10 12:05:01,836 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 4 for job e7a97014f5799458f1c656135712813d (22387207650 bytes 
> in 58645 ms).
>
> As my understanding the logging above, the 
> completedCheckpoint(CheckpointCoordinator)
> object has been completed in 58645 ms, but the whole checkpointing process
> took ~ 6min.
>
> This logging is for 4th checkpointing, But the first 3 checkpointing
> were finished on time.
> Could you please tell me, why flink checkpointing in my test was
> starting "idle" for few minutes after 3 checkpointing?
>
> Best Regards
> --
> Xiangyu Su
> Java Developer
> xian...@smaato.com
>
> Smaato Inc.
> San Francisco - New York - Hamburg - Singapore
> www.smaato.com
>
> Germany:
> Valentinskamp 70, Emporio, 19th Floor
> 20355 Hamburg
> M 0049(176)22943076
>
> The information contained in this communication may be CONFIDENTIAL
> and is intended only for the use of the recipient(s) named above. If you
> are not the intended recipient, you are hereby notified that any
> dissemination, distribution, or copying of this communication, or any of
> its contents, is strictly prohibited. If you have received this
> communication in error, please notify the sender and delete/destroy the
> original message and any copy of it from your computer or paper files.
>

>>>
>>> --
>>> Xiangyu Su
>>> Java Developer
>>> xian...@smaato.com
>>>
>>> Smaato Inc.
>>> San Francisco - New York - Hamburg - Singapore
>>> www.smaato.com
>>>
>>> Germany:
>>> Valentinskamp 70, Emporio, 19th Floor
>>> 20355 Hamburg
>>> M 0049(176)22943076
>>>
>>> The information contained in this communication may be CONFIDENTIAL and
>>> is intended only for the use of the recipient(s) named above. If you are
>>> not the intended recipient, you are hereby notified that any dissemination,
>>> distribution, or copying of this communication, or any of its contents, is
>>> strictly prohibited. If you have received this communication in error,
>>> please notify the sender and delete/destroy the original message and any
>>> copy of it from your computer or paper files.
>>>
>>
>>
>> --
>> Xiangyu Su
>> Java Developer
>> xian...@smaato.com
>>
>> Smaato Inc.
>> San Francisco - New York - Hamburg - Singapore
>> www.smaato.com
>>
>> Germany:
>> Valentinskamp 70, Emporio, 19th Floor
>> 20355 Hamburg
>> M 0049(176)22943076
>>
>> The information contained in this communication may be CONFIDENTIAL and
>> is intended only for the use of the recipient(s) named above. If you are
>> not the intended recipient, you are hereby notified that any dissemination,
>> distribution, or copying of this communication, or any of its contents, is
>> strictly prohibited. If you have received this communication in 

Re: [SURVEY] How many people implement Flink job based on the interface Program?

2019-07-23 Thread Jeff Zhang
Thanks Flavio,

I get most of your points except one

   - Get the list of jobs contained in jar (ideally this is is true for
   every engine beyond Spark or Flink)

Just curious to know how you submit job via rest api, if there're multiple
jobs in one jar, then do you need to submit jar one time and submit jobs
multiple times ?
And is there any relationship between these jobs in the same jar ?



Flavio Pompermaier  于2019年7月23日周二 下午4:01写道:

> Hi Jeff, the thing about the manifest is really about to have a way to
> list multiple main classes in the jart (without the need to inspect every
> Java class or forcing a 1-to-1 between jar and job like it is now).
> My requirements were driven by the UI we're using in our framework:
>
>- Get the list of jobs contained in jar (ideally this is is true for
>every engine beyond Spark or Flink)
>- Get the list of required/optional parameters for each job
>- Besides the optionality of a parameter, each parameter should
>include an help description, a type (to validate the input param), a
>default value and a set of choices (when there's a limited number of
>options available)
>- obviously the job serve should be able to submit/run/cancel/monitor
>a job and upload/delete the uploaded jars
>- the job server should not depend on any target platform dependency
>(Spark or Flink) beyond the rest client: at the moment the rest client
>requires a lot of core libs (indeed because it needs to submit the job
>graph/plan)
>- in our vision, the flink client should be something like Apache Livy
>(https://livy.apache.org/)
>- One of the biggest  limitations we face when running a Flink job
>from the REST API is the fact that the job can't do anything after
>env.execute() while we need to call an external service to signal that the
>job has ended + some other details
>
> Best,
> Flavio
>
> On Tue, Jul 23, 2019 at 3:44 AM Jeff Zhang  wrote:
>
>> Hi Flavio,
>>
>> Based on the discussion in the tickets you mentioned above, the
>> program-class attribute was a mistake and community is intended to use
>> main-class to replace it.
>>
>> Deprecating Program interface is a part of work of flink new client api.
>> IIUC, your requirements are not so complicated. We can implement that in
>> the new flink client api. How about listing your requirement, and let's
>> discuss how we can make it in the new flink client api. BTW, I guess most
>> of your requirements are based on your flink job server, It would be
>> helpful if you could provide more info about your flink job server. Thanks
>>
>>
>>
>> Flavio Pompermaier  于2019年7月22日周一 下午8:59写道:
>>
>>> Hi Tison,
>>> we use a modified version of the Program interface to enable a web UI do
>>> properly detect and run Flink jobs contained in a jar + their parameters.
>>> As stated in [1], we dected multiple Main classes per jar by handling an
>>> extra comma-separeted Manifest entry (i.e. 'Main-classes').
>>>
>>> As mentioned on the discussion on the dev ML, our revised Program
>>> interface looks like this:
>>>
>>> public interface FlinkJob {
>>>   String getDescription();
>>>   List getParameters();
>>>   boolean isStreamingOrBatch();
>>> }
>>>
>>> public class FlinkJobParameter {
>>>   private String paramName;
>>>   private String paramType = "string";
>>>   private String paramDesc;
>>>   private String paramDefaultValue;
>>>   private Set choices;
>>>   private boolean mandatory;
>>> }
>>>
>>> I've also opened some JIRA issues related to this topic:
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-10864
>>> [2] https://issues.apache.org/jira/browse/FLINK-10862
>>> [3] https://issues.apache.org/jira/browse/FLINK-10879.
>>>
>>> Best,
>>> Flavio
>>>
>>>
>>> On Mon, Jul 22, 2019 at 1:46 PM Zili Chen  wrote:
>>>
 Hi guys,

 We want to have an accurate idea of how many people are implementing
 Flink job based on the interface Program, and how they actually
 implement it.

 The reason I ask for the survey is from this thread[1] where we notice
 this codepath is stale and less useful than it should be. As it is an
 interface marked as @PublicEvolving it is originally aimed at serving
 as user interface. Thus before doing deprecation or dropping, we'd like
 to see if there are users implementing their job based on this
 interface(org.apache.flink.api.common.Program) and if there is any,
 we are curious about how it is used.

 If little or none of Flink user based on this interface, we would
 propose deprecating or dropping it.

 I really appreciate your time and your insight.

 Best,
 tison.

 [1]
 https://lists.apache.org/thread.html/7ffc9936a384b891dbcf0a481d26c6d13b2125607c200577780d1e18@%3Cdev.flink.apache.org%3E

>>>
>>>
>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
>

-- 
Best Regards

Jeff Zhang


Re: FsStateBackend,hdfs rpc api too much,FileCreated and FileDeleted is for what?

2019-07-23 Thread Yun Tang
Hi Andrew

FilesCreated = CreateFileOps + FsDirMkdirOp Please refer to [1] and [2] to know 
the meaning of this metrics.


[1] 
https://github.com/apache/hadoop/blob/377f95bbe8d2d171b5d7b0bfa7559e67ca4aae46/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java#L178
[2] 
https://github.com/apache/hadoop/blob/377f95bbe8d2d171b5d7b0bfa7559e67ca4aae46/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java#L799

Best
Yun Tang



From: 陈Darling 
Sent: Tuesday, July 23, 2019 11:32
To: qcx978132...@gmail.com 
Cc: user@flink.apache.org ; myas...@live.com 

Subject: Fwd: FsStateBackend,hdfs rpc api too much,FileCreated and FileDeleted 
is for what?

Hi Yun Tang

Your suggestion is very very important to us.
 According to your suggestion, We have suggested that users increase the 
interval time (1 to 5 minutes) and set state.backend.fs.memory-threshold=10k.

But we only have one hdfs cluster, we try to reduce Hdfs api call, I don't know 
if there is any possibility of re-optimization,

Thank you very much for your patience and help.


Darling
Andrew D.Lin



下面是被转发的邮件:

发件人: Congxian Qiu mailto:qcx978132...@gmail.com>>
主题: 回复: FsStateBackend,hdfs rpc api too much,FileCreated and FileDeleted is for 
what?
日期: 2019年7月23日 GMT+8 上午9:48:05
收件人: 陈Darling mailto:chendonglin...@gmail.com>>
抄送: user mailto:user@flink.apache.org>>

Hi Andrew

These API calls are for checkpoint file created/deleted, and there is an 
ongoing issue[1] which want to reduce the number.
[1] https://issues.apache.org/jira/browse/FLINK-11696

Best,
Congxian


陈Darling mailto:chendonglin...@gmail.com>> 
于2019年7月22日周一 下午11:22写道:

Hi

We use  ‘FsStateBackend' as  our state beckend !


The following figure shows the frequency of the hdfs API call.

I don’t understand FilesCreated and FileDeleted is for what?   All of these are 
necessary?

 Is it possible to reduce some unnecessary?





[cid:9b42fd64-e726-47f2-a745-092f6a24d62e@namprd14.prod.outlook.com]






Darling
Andrew D.Lin



Re: [SURVEY] How many people implement Flink job based on the interface Program?

2019-07-23 Thread Flavio Pompermaier
Hi Jeff, the thing about the manifest is really about to have a way to list
multiple main classes in the jart (without the need to inspect every Java
class or forcing a 1-to-1 between jar and job like it is now).
My requirements were driven by the UI we're using in our framework:

   - Get the list of jobs contained in jar (ideally this is is true for
   every engine beyond Spark or Flink)
   - Get the list of required/optional parameters for each job
   - Besides the optionality of a parameter, each parameter should include
   an help description, a type (to validate the input param), a default value
   and a set of choices (when there's a limited number of options available)
   - obviously the job serve should be able to submit/run/cancel/monitor a
   job and upload/delete the uploaded jars
   - the job server should not depend on any target platform dependency
   (Spark or Flink) beyond the rest client: at the moment the rest client
   requires a lot of core libs (indeed because it needs to submit the job
   graph/plan)
   - in our vision, the flink client should be something like Apache Livy (
   https://livy.apache.org/)
   - One of the biggest  limitations we face when running a Flink job from
   the REST API is the fact that the job can't do anything after env.execute()
   while we need to call an external service to signal that the job has
   ended + some other details

Best,
Flavio

On Tue, Jul 23, 2019 at 3:44 AM Jeff Zhang  wrote:

> Hi Flavio,
>
> Based on the discussion in the tickets you mentioned above, the
> program-class attribute was a mistake and community is intended to use
> main-class to replace it.
>
> Deprecating Program interface is a part of work of flink new client api.
> IIUC, your requirements are not so complicated. We can implement that in
> the new flink client api. How about listing your requirement, and let's
> discuss how we can make it in the new flink client api. BTW, I guess most
> of your requirements are based on your flink job server, It would be
> helpful if you could provide more info about your flink job server. Thanks
>
>
>
> Flavio Pompermaier  于2019年7月22日周一 下午8:59写道:
>
>> Hi Tison,
>> we use a modified version of the Program interface to enable a web UI do
>> properly detect and run Flink jobs contained in a jar + their parameters.
>> As stated in [1], we dected multiple Main classes per jar by handling an
>> extra comma-separeted Manifest entry (i.e. 'Main-classes').
>>
>> As mentioned on the discussion on the dev ML, our revised Program
>> interface looks like this:
>>
>> public interface FlinkJob {
>>   String getDescription();
>>   List getParameters();
>>   boolean isStreamingOrBatch();
>> }
>>
>> public class FlinkJobParameter {
>>   private String paramName;
>>   private String paramType = "string";
>>   private String paramDesc;
>>   private String paramDefaultValue;
>>   private Set choices;
>>   private boolean mandatory;
>> }
>>
>> I've also opened some JIRA issues related to this topic:
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-10864
>> [2] https://issues.apache.org/jira/browse/FLINK-10862
>> [3] https://issues.apache.org/jira/browse/FLINK-10879.
>>
>> Best,
>> Flavio
>>
>>
>> On Mon, Jul 22, 2019 at 1:46 PM Zili Chen  wrote:
>>
>>> Hi guys,
>>>
>>> We want to have an accurate idea of how many people are implementing
>>> Flink job based on the interface Program, and how they actually
>>> implement it.
>>>
>>> The reason I ask for the survey is from this thread[1] where we notice
>>> this codepath is stale and less useful than it should be. As it is an
>>> interface marked as @PublicEvolving it is originally aimed at serving
>>> as user interface. Thus before doing deprecation or dropping, we'd like
>>> to see if there are users implementing their job based on this
>>> interface(org.apache.flink.api.common.Program) and if there is any,
>>> we are curious about how it is used.
>>>
>>> If little or none of Flink user based on this interface, we would
>>> propose deprecating or dropping it.
>>>
>>> I really appreciate your time and your insight.
>>>
>>> Best,
>>> tison.
>>>
>>> [1]
>>> https://lists.apache.org/thread.html/7ffc9936a384b891dbcf0a481d26c6d13b2125607c200577780d1e18@%3Cdev.flink.apache.org%3E
>>>
>>
>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: timeout exception when consuming from kafka

2019-07-23 Thread Yitzchak Lieberman
Hi.

Another question - what will happen during a triggered checkpoint if one of
the kafka brokers is unavailable?
Will appreciate your insights.

Thanks.

On Mon, Jul 22, 2019 at 12:42 PM Yitzchak Lieberman <
yitzch...@sentinelone.com> wrote:

> Hi.
>
> I'm running a Flink application (version 1.8.0) that
> uses FlinkKafkaConsumer to fetch topic data and perform transformation on
> the data, with state backend as below:
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(5_000, CheckpointingMode.AT_LEAST_ONCE);
> env.setStateBackend((StateBackend) new
> FsStateBackend("file:///test"));
> env.getCheckpointConfig().setCheckpointTimeout(30_000);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
>
> My problem is with the kafka brokers, where in the cluster there are 3
> operating brokers and 2 are down - total 5 brokers.
> I was able to consume the data, but when the checkpoint triggered it
> throws this exception:
>
> [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18]
> o.a.f.r.c.CheckpointCoordinator - Decline checkpoint 6 by task
> 457b1f801fee89d6f9544409877e29d8 of job 1c46aa5719bac1f0bea436d460b79db1.
> [INFO ] 2019-07-22 12:29:14.636 [flink-akka.actor.default-dispatcher-28]
> o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution
> state FAILED to JobManager for task Source: Custom Source -> Sink: Print to
> Std. Out 457b1f801fee89d6f9544409877e29d8.
> [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18]
> o.a.f.r.c.CheckpointCoordinator - Discarding checkpoint 6 of job
> 1c46aa5719bac1f0bea436d460b79db1.
> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
> Task Source: Custom Source -> Sink: Print to Std. Out (2/4) was not running
> at
> org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1198)
> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:700)
> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) ~[?:?]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_201]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274)
> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189)
> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:502)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at akka.actor.Actor$class.aroundReceive(Actor.scala)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [scala-library-2.11.12.jar:?]
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [scala-library-2.11.12.jar:?]
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [scala-library-2.11.12.jar:?]
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [scala-library-2.11.12.jar:?]
> [INFO ] 2019-07-22 12:29:14.637 [flink-akka.actor.default-dispatcher-28]
> o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out
> (2/4) (457b1f801fee89d6f9544409877e29d8) switched from RUNNING to FAILED.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
> fetching topic metadata
>
> My question is (as I think what does the checkpoint tries to do) why is it
> trying to fetch topic metadata from the brokers that are down?
>
> Thanks,
> Yitzchak.
>


Re: apache flink: Why checkpoint coordinator takes long time to get completion

2019-07-23 Thread Zili Chen
Hi Xiangyu,

Could you share the corresponding JIRA that fixed this issue?

Best,
tison.


Xiangyu Su  于2019年7月19日周五 下午8:47写道:

> btw. it seems like this issue has been fixed in 1.8.1
>
> On Fri, 19 Jul 2019 at 12:21, Xiangyu Su  wrote:
>
>> Ok, thanks.
>>
>> and this time-consuming until now always happens after 3rd checkpointing,
>> and this unexpected  time-consuming was always consistent (~ 4 min by under
>> 4G/min incoming traffic).
>>
>> On Fri, 19 Jul 2019 at 11:06, Biao Liu  wrote:
>>
>>> Hi Xiangyu,
>>>
>>> Just took a glance at the relevant codes. There is a gap between
>>> calculating the duration and logging it out. I guess the checkpoint 4 is
>>> finished in 1 minute, but there is an unexpected time-consuming operation
>>> during that time. But I can't tell which part it is.
>>>
>>>
>>> Xiangyu Su  于2019年7月19日周五 下午4:14写道:
>>>
 Dear flink community,

 We are POC flink(1.8) to process data in real time, and using global
 checkpointing(S3) and local checkpointing(EBS), deploy cluster on EKS. Our
 application is consuming data from Kinesis.

 For my test e.g I am using checkpointing interval 5min. and minimum
 pause 2min.

 The issue what we saw is: It seems like flink checkpointing process
 would be idle for 3-4 min, before job manager get complete notification.

 here is some logging from job manager:

 2019-07-10 11:59:03,893 INFO  
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
 checkpoint 4 @ 1562759941082 for job e7a97014f5799458f1c656135712813d.
 2019-07-10 12:05:01,836 INFO  
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
 checkpoint 4 for job e7a97014f5799458f1c656135712813d (22387207650 bytes 
 in 58645 ms).

 As my understanding the logging above, the 
 completedCheckpoint(CheckpointCoordinator)
 object has been completed in 58645 ms, but the whole checkpointing process
 took ~ 6min.

 This logging is for 4th checkpointing, But the first 3 checkpointing
 were finished on time.
 Could you please tell me, why flink checkpointing in my test was
 starting "idle" for few minutes after 3 checkpointing?

 Best Regards
 --
 Xiangyu Su
 Java Developer
 xian...@smaato.com

 Smaato Inc.
 San Francisco - New York - Hamburg - Singapore
 www.smaato.com

 Germany:
 Valentinskamp 70, Emporio, 19th Floor
 20355 Hamburg
 M 0049(176)22943076

 The information contained in this communication may be CONFIDENTIAL and
 is intended only for the use of the recipient(s) named above. If you are
 not the intended recipient, you are hereby notified that any dissemination,
 distribution, or copying of this communication, or any of its contents, is
 strictly prohibited. If you have received this communication in error,
 please notify the sender and delete/destroy the original message and any
 copy of it from your computer or paper files.

>>>
>>
>> --
>> Xiangyu Su
>> Java Developer
>> xian...@smaato.com
>>
>> Smaato Inc.
>> San Francisco - New York - Hamburg - Singapore
>> www.smaato.com
>>
>> Germany:
>> Valentinskamp 70, Emporio, 19th Floor
>> 20355 Hamburg
>> M 0049(176)22943076
>>
>> The information contained in this communication may be CONFIDENTIAL and
>> is intended only for the use of the recipient(s) named above. If you are
>> not the intended recipient, you are hereby notified that any dissemination,
>> distribution, or copying of this communication, or any of its contents, is
>> strictly prohibited. If you have received this communication in error,
>> please notify the sender and delete/destroy the original message and any
>> copy of it from your computer or paper files.
>>
>
>
> --
> Xiangyu Su
> Java Developer
> xian...@smaato.com
>
> Smaato Inc.
> San Francisco - New York - Hamburg - Singapore
> www.smaato.com
>
> Germany:
> Valentinskamp 70, Emporio, 19th Floor
> 20355 Hamburg
> M 0049(176)22943076
>
> The information contained in this communication may be CONFIDENTIAL and is
> intended only for the use of the recipient(s) named above. If you are not
> the intended recipient, you are hereby notified that any dissemination,
> distribution, or copying of this communication, or any of its contents, is
> strictly prohibited. If you have received this communication in error,
> please notify the sender and delete/destroy the original message and any
> copy of it from your computer or paper files.
>


Re: Execution environments for testing: local vs collection vs mini cluster

2019-07-23 Thread Biao Liu
Hi Juan,

I'm not sure what you really want. Before giving some suggestions, could
you answer the questions below first?

1. Do you want to write a unit test (or integration test) case for your
project or for Flink? Or just want to run your job locally?
2. Which mode do you want to test? DataStream or DataSet?



Juan Rodríguez Hortalá  于2019年7月23日周二
下午1:12写道:

> Hi,
>
> In
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/local_execution.html
> and
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/minicluster/MiniCluster.html
> I see there are 3 ways to create an execution environment for testing:
>
>- StreamExecutionEnvironment.createLocalEnvironment and
>ExecutionEnvironment.createLocalEnvironment create an execution environment
>running on a single JVM using different threads.
>- CollectionEnvironment runs on a single JVM on a single thread.
>- I haven't found not much documentation on the Mini Cluster, but it
>sounds similar to the Hadoop MiniCluster
>
> .
>If that is then case, then it would run on many local JVMs, each of them
>running multiple threads.
>
> Am I correct about the Mini Cluster? Is there any additional documentation
> about it? I discovered it looking at the source code of AbstractTestBase,
> that is mentioned on
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/testing.html#integration-testing.
> Also, it looks like launching the mini cluster registers it somewhere, so
> subsequent calls to `StreamExecutionEnvironment.getExecutionEnvironment`
> return an environment that uses the mini cluster. Is that performed by
> `executionEnvironment.setAsContext()` in
> https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java#L56
> ? Is that execution environment registration process documented anywhere?
>
> Which test execution environment is recommended for each test use case?
> For example I don't see why would I use CollectionEnvironment when I have
> the local environment available and running on several threads, what is a
> good use case for CollectionEnvironment?
>
> Are all these 3 environments supported equality, or maybe some of them is
> expected to be deprecated?
>
> Are there any additional execution environments that could be useful for
> testing on a single host?
>
> Thanks,
>
> Juan
>
>
>