Re: Flink web UI authentication

2018-03-19 Thread Sampath Bhat
Hi Nico

Flink does not support username/password authentication as of now.
we can do something like this:
Here you got a guide from the guys of DigitalOcean about protecting an url
with basic authentication:
https://www.digitalocean.com/community/tutorials/how-to-set-up-password-authentication-with-nginx-on-ubuntu-14-04

More importantly, how can this "web.access-control-allow-origin" parameter
help in authentication?
It is used for inter origin resource allocation. So can I use this
parameter to specify few hostnames so that only those hostname can access
the flink web ui? I dont think so. It is mainly used when another domain
page tries to access flink web ui.

Let me know if I'm missing something.



On Mon, Mar 19, 2018 at 11:20 PM, Nico Kruber 
wrote:

> Hi Sampath,
> aside from allowing only certain origins via the configuration parameter
> "web.access-control-allow-origin", I am not aware of anything like
> username/password authentication. Chesnay (cc'd) may know more about
> future plans.
> You can, however, wrap a proxy like squid around the web UI if you need
> this.
>
>
> Regards
> Nico
>
> On 13/03/18 11:16, Sampath Bhat wrote:
> > Hello
> >
> > I would like to know if flink supports any user level authentication
> > like username/password for flink web ui.
> >
> > Regards
> > Sampath S
> >
>
>


Re: Restart hook and checkpoint

2018-03-19 Thread makeyang
currently there is only time based way to trigger a checkpoint. based on this
discussion, I think flink need to introduce event based way to trigger
checkpoint such as restart a task manager should be count as a event.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Strange Kafka consumer behaviour

2018-03-19 Thread Tzu-Li (Gordon) Tai
Hi Gyula,

Are you using Flink 1.4.x, and have partition discovery enabled?
If yes, then both the state of previously existing topics, as well as
partitions of the newly specified topics will be consumed.

Cheers,
Gordon

On Tue, Mar 20, 2018 at 6:01 AM, Ankit Chaudhary  wrote:

> Did you changed the kafka group id?
>
> On Mon, Mar 19, 2018 at 9:54 PM, Aljoscha Krettek 
> wrote:
>
>> I think it's because it's just taking the state for the existing topics
>> and starts reading from that.
>>
>> A workaround could be to change the uid of the operator to avoid reading
>> the old state.
>>
>> Aljoscha
>>
>> > On 21. Feb 2018, at 02:21, Gyula Fóra  wrote:
>> >
>> > Hi,
>> > I have observed a weird behaviour when changing kafka topics when
>> restoring from a checkpoint.
>> >
>> > It seems that the job started consuming both the topics from the state,
>> and the new topic that I assigned. This happened while changing from kafka
>> 08 to kafka 10.
>> >
>> > Is this expected?
>> >
>> > Thanks,
>> > Gyula
>>
>>
>


Re: Timezone offset on daily time window grouping using SQL

2018-03-19 Thread LiYue
Hi,
Thanks for the tip.
My team will try to read flink table source code and maybe we clould contribute 
later.


> 在 2018年3月19日,下午6:06,Fabian Hueske  写道:
> 
> Hi,
> 
> Calcite's parser supports this syntax TUMBLE(z, INTERVAL '0.004' SECOND, 
> TIME '08:00:00') but unfortunately, Flink SQL doesn't support it yet.
> 
> Best, Fabian
> 
> 2018-03-19 4:45 GMT+01:00 LiYue  >:
> Hello team,
> In DataStream API , a time offset can be specified like this:
> 
> input
> .keyBy()
> .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
> 
> Is their any way to specify a time offset on SQL?
> My sql looks like:
> 
> SELECT * from table
> GROUP BY TUMBLE(`timestamp`,INTERVAL '1’ DAY)
> 
> Thanks !
> LiYue
> tig.jd.com 



Let BucketingSink roll file on each checkpoint

2018-03-19 Thread XilangYan
The behavior of BucketingSink is not exactly we want. 
If I understood correctly, when checkpoint requested, BucketingSink will
flush writer to make sure data not loss, but will not close file, nor roll
new file after checkpoint.
In the case of HDFS, if file length is not updated to name node(through
close file or update file length specifically), MR or other data analysis
tool will not read new data. This is not we desired.
I also want to open new file for each checkpoint period to make sure HDFS
file is persistent, because we met some bugs in flush/append hdfs file user
case.

Is there anyway to let BucketingSink roll file on each checkpoint? Thanks in
advance.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Let BucketingSink roll file on each checkpoint

2018-03-19 Thread XilangYan
The behavior of BucketingSink is not exactly we want. 
If I understood correctly, when checkpoint requested, BucketingSink will
flush writer to make sure data not loss, but will not close file, nor roll
new file after checkpoint.
In the case of HDFS, if file length is not updated to name node(through
close file or update file length specifically), MR or other data analysis
tool will not read new data. This is not we desired.
I also want to open new file for each checkpoint period to make sure HDFS
file is persistent, because we met some bugs in flush/append hdfs file user
case.

Is there anyway to let BucketingSink roll file on each checkpoint? Thanks in
advance.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink SSL Setup on a standalone cluster

2018-03-19 Thread Vinay Patil
Hi,

When I set ssl.verify.hostname to true , the job fails with SSL handshake
exception where it tries to match the IP address  instead of the hostname
in the certificates. Everything works when I set this to false. The
keystore is created with FQDN.
The solution of adding all the hostnames and IP addresses in SAN list is
discarded by the company.

And a security concern is raised when I set this parameter to false. I see
this https://issues.apache.org/jira/browse/FLINK-5030 in Unresolved state.
How do Flink support hostname verification ?

@Chesnay : It would be helpful to know the answer to my previous mail

Regards,
Vinay Patil

On Fri, Mar 16, 2018 at 10:15 AM, Vinay Patil 
wrote:

> Hi Chesnay,
>
> After setting the configurations for Remote Execution Environment the job
> gets submitted ,I had to set ssl-verify-hostname to false.
> However, I don't understand why there is a need to do it. I am running the
> job from master node itself and providing all the configurations in
> flink-conf.yaml while creating the cluster. So why do I have to copy the
> same stuff in code ?
>
> Regards,
> Vinay Patil
>
> On Fri, Mar 16, 2018 at 8:23 AM, Vinay Patil 
> wrote:
>
>> Hi,
>>
>> No I am not passing any config to the remote execution environment. I am
>> running the job from master node itself. I have provided SSL configs in
>> flink-xonf.yaml
>>
>> Do I need to specify any SSL.config as part of Remote Execution env ?
>>
>> If yes can you please provide me an example.
>>
>>
>>
>> On Mar 16, 2018 1:56 AM, "Chesnay Schepler [via Apache Flink User Mailing
>> List archive.]"  wrote:
>>
>> How are you creating the remote environment? In particular, are passing a
>> configuration to the RemoteEnvironment?
>> Have you set the SSL options in the config?
>>
>>
>> On 15.03.2018 22:46, Vinay Patil wrote:
>>
>> Hi,
>>
>> Even tried with ip-address for JobManager.host.name property, but did
>> not work. When I tried netstat -anp | grep 6123 , I see 3 TM connection
>> state as established, however when I submit the job , I see two more
>> entries with state as TIME_WAIT and after some time these entries are gone
>> and I get a Lost to Job Manager Exception.
>>
>> This only happens when SSL is enabled.
>>
>> Regards,
>> Vinay Patil
>>
>> On Thu, Mar 15, 2018 at 10:28 AM, Vinay Patil <[hidden email]
>> > wrote:
>>
>>> Just an update,  I am submitting the job from the master node, not using
>>> the normal flink run command to submit the job , but using Remote Execution
>>> Environment in code to do this.
>>>
>>> And in that I am passing the hostname which is same as provided in
>>> flink-conf.yaml
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Thu, Mar 15, 2018 at 7:57 AM, Vinay Patil <[hidden email]
>>> > wrote:
>>>
 Hi Guys,

 Any suggestions here

 Regards,
 Vinay Patil

 On Wed, Mar 14, 2018 at 8:08 PM, Vinay Patil <[hidden email]
 > wrote:

> Hi,
>
> After waiting for some time I got the exception as Lost Connection to
> Job Manager. Message: Could not retrieve the JobExecutionResult from Job
> Manager
>
> I am submitting the job as remote execution environment. I have
> specified the exact hostname of JobManager and port as 6123.
>
> Please let me know if any other configurations are needed.
>
> Regards,
> Vinay Patil
>
> On Wed, Mar 14, 2018 at 11:48 AM, Vinay Patil <[hidden email]
> > wrote:
>
>> Hi Timo,
>>
>> Not getting any exception , it just says waiting for job completion
>> with a Job ID printed.
>>
>>
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Mar 14, 2018 at 11:34 AM, Timo Walther [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> > wrote:
>>
>>> Hi Vinay,
>>>
>>> do you have any exception or log entry that describes the failure?
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> Am 14.03.18 um 15:51 schrieb Vinay Patil:
>>>
>>> Hi,
>>>
>>> I have keystore for each of the 4 nodes in cluster and respective
>>> trustore. The cluster is configured correctly with SSL , verified this 
>>> by
>>> accessing job manager using https and also see the TM path as 
>>> akka.ssl.tcp,
>>> however the job is not getting submitted to the cluster.
>>>
>>> I am not allowed to import the certificate to the java default
>>> trustore, so I have provided the trustore and keystore as jvm args to 
>>> the
>>> job.
>>>
>>> Is there any other configuration I should do so that the job is
>>> submitted
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>>

ListCheckpointed function - what happens prior to restoreState() being called?

2018-03-19 Thread Ken Krugler
Hi all,

If I implement ListCheckpointed in a function, is there a guarantee that open() 
is called before restoreState()?

Asking because it doesn’t seem to be the case, and I didn’t notice this being 
described here:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/task_lifecycle.html
 


If so, then is it normally best practice to also implement the 
CheckpointedFunction interface, so that initializeState() method is called 
before the restore?

In Flink test code, I don’t see this typically being done.

Thanks,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: CsvSink

2018-03-19 Thread karim amer
Hi Nico,

I tried to reproduce your code but registerDataStream keeps failing to
register the fields even though i am following your code and the Docs.
here is the error
[error]  found   : Symbol
[error]  required: org.apache.flink.table.expressions.Expression
[error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
[error]
I think my code snippet was misleading. Here is the full snippet Changing
the name from table didn't fix it for

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.types.Row



object datastreamtotableapi {

  case class Calls(a: String,
   b: String,
   c: String,
   d: String,
   e: String,
   f: String,
   g: String,
   h: String,
   i: String,
   j: String,
   k: String,
   l: String,
   m: String,
   n: String,
   p: String,
   q: String,
   r: String,
   s: String,
   t: String,
   v: String,
   w: String)


  def main(args: Array[String]) {

val params = ParameterTool.fromArgs(args)
val input = params.getRequired("input")


val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tableEnv = TableEnvironment.getTableEnvironment(env)

val dataStream = env.readTextFile(input)

val namedStream = dataStream.map((value:String) => {

  val columns = value.split(",")
  Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
columns(12), columns(13),columns(14),columns(15), columns(16),
columns(17),
columns(18),columns(19), columns(20)
  )
})


   val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")

   val watermarkedStream =
cleanedStream.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
 override def extractTimestamp(element: Calls): Long =
(element.j.concat(element.k)).toLong
   })



tableEnv.registerDataStream("CDRS", watermarkedStream)
val results = tableEnv.sqlQuery( """
  |SELECT
  | a
  | FROM CDRS
   """.stripMargin)


val result: Table = results

val path = "file:///Users/test/1.txt"
val sink :TableSink[Row]=   new CsvTableSink(
  path, // output path
  fieldDelim = "|", // optional: delimit files by '|'
  numFiles = 1, // optional: write to a single file
  writeMode = WriteMode.OVERWRITE)

result.writeToSink(sink)


env.execute("this job")

  }
}




On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber  wrote:

> Hi Karim,
> when I was trying to reproduce your code, I got an exception with the
> name 'table' being used - by replacing it and completing the job with
> some input, I did see the csv file popping up. Also, the job was
> crashing when the file 1.txt already existed.
>
> The code I used (running Flink 1.5-SNAPSHOT):
>
>   def main(args: Array[String]) {
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = TableEnvironment.getTableEnvironment(env)
>
> val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._2)
> tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>
> val results = tableEnv.sqlQuery( """
>|SELECT
>| A,C
>| FROM mytable
>  """.stripMargin)
>
> val result: Table = results
>
> val path = "file:///tmp/test/1.txt"
> val sink :TableSink[Row]=   new CsvTableSink(
>   path, // output path
>   fieldDelim = "|", // optional: delimit files by '|'
>   numFiles = 1, // optional: write to a single file
>   

Re: Strange Kafka consumer behaviour

2018-03-19 Thread Ankit Chaudhary
Did you changed the kafka group id?

On Mon, Mar 19, 2018 at 9:54 PM, Aljoscha Krettek 
wrote:

> I think it's because it's just taking the state for the existing topics
> and starts reading from that.
>
> A workaround could be to change the uid of the operator to avoid reading
> the old state.
>
> Aljoscha
>
> > On 21. Feb 2018, at 02:21, Gyula Fóra  wrote:
> >
> > Hi,
> > I have observed a weird behaviour when changing kafka topics when
> restoring from a checkpoint.
> >
> > It seems that the job started consuming both the topics from the state,
> and the new topic that I assigned. This happened while changing from kafka
> 08 to kafka 10.
> >
> > Is this expected?
> >
> > Thanks,
> > Gyula
>
>


Running FlinkML ALS with more than two features

2018-03-19 Thread Banias H
Hello Flink experts,

I am new to FlinkML and currently playing around with using ALS in a
recommender system. In our dataset, we have more than 2 features. When I
tried running the example towards the bottom of this page:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/libs/ml/als.html,
I got a *method not implemented error* in fit(). Here is how I set up
inputDS:

val inputDS: DataSet[(Int, Int, Int, Int, Double)] = env.readCsvFile[(Int,
Int, Int, Int, Double)](
  pathToTrainingFile)
...
als.fit(inputDS, parameters)

However when I used only 2 features (i.e passing DataSet[(Int, Int, Double)] to
fit()), it went successfully.  Is it a limitation in ALS in general or it
is an configuration issue?

I would appreciate any info on this. Thanks.

Regards,
BH


Re: Custom Processing per window

2018-03-19 Thread Dhruv Kumar
Is there a way I can leverage OperatorState (instead of KeyState) to solve my 
issue?


> On Mar 19, 2018, at 09:00, Fabian Hueske  wrote:
> 
> Hi,
> 
> Data is partitioned by key across machines and state is kept per key. It is 
> not possible to interact with two keys at the same time.
> 
> Best, Fabian
> 
> 2018-03-19 14:47 GMT+01:00 Dhruv Kumar  >:
> In other words, while using the Flink streaming APIs, is it possible to take 
> a decision on emitting a particular key based on the state of some other key 
> present in the same window?
> 
> Thanks!
> --
> Dhruv Kumar
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me 
> 
>> On Mar 19, 2018, at 05:11, Dhruv Kumar > > wrote:
>> 
>> Task 1: I implemented it using a custom Trigger (see attached file). Looks 
>> like it is doing what I want it to. I copied the code from 
>> EventTimeTrigger.java and overwrote the onElement method. 
>> 
>> Task 2: I will need to maintain the state (this will be the LRU cache) for 
>> multiple keys in the same data structure. But it looks like that the Keyed 
>> states are on a per key basis. Should I use OperatorState in some way? Can I 
>> use a data structure not directly managed by Flink? What will happen in the 
>> case of keys across multiple machines?
>> 
>> 
>> 
>> 
>> Dhruv Kumar
>> PhD Candidate
>> Department of Computer Science and Engineering
>> University of Minnesota
>> www.dhruvkumar.me 
>> 
>>> On Mar 19, 2018, at 02:04, Jörn Franke >> > wrote:
>>> 
>>> How would you start implementing it? Where are you stuck?
>>> 
>>> Did you already try to implement this?
>>> 
>>> On 18. Mar 2018, at 04:10, Dhruv Kumar >> > wrote:
>>> 
 Hi
 
 I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for 
 implementing some very specific use-cases: (They may not seem relevant but 
 I need to implement them or I at least need to know if it is possible to 
 implement them in Flink)
 
 Assumptions:
 1. Data stream is of the form (key, value). We achieve this by the .key 
 operation provided by Flink API.
 2. By emitting a key, I mean sending/outputting its aggregated value to 
 any data sink. 
 
 1. For each Tumbling window in the Event Time space, for each key, I would 
 like to aggregate its value until it crosses a particular threshold (same 
 threshold for all the keys). As soon as the key’s aggregated value crosses 
 this threshold, I would like to emit this key. At the end of every 
 tumbling window, all the (key, value) aggregated pairs  would be emitted 
 irrespective of whether they have crossed the threshold or not.
 
 2. For each Tumbling window in the event time space, I would like to 
 maintain a LRU cache which stores the keys along with their aggregated 
 values and their latest arrival time. The least recently used (LRU) key 
 would be the key whose latest arrival time is earlier than the latest 
 arrival times of all the other keys present in the LRU cache. The LRU 
 cache is of a limited size. So, it is possible that the number of unique 
 keys in a particular window is greater than the size of LRU cache. 
 Whenever any (key, value) pair arrives, if the key already exists, its 
 aggregated value is updated with the value of the newly arrived value and 
 its latest arrival time is updated with the current event time. If the key 
 does not exist and there is some free slot in the LRU cache, it is added 
 into the LRU. As soon as the LRU cache gets occupied fully and a new key 
 comes in which does not exist in the LRU cache, we would like to emit the 
 least recently used key to accommodate the newly arrived key. As in the 
 case of 1, at the end of every tumbling window, all the (key, value) 
 aggregated pairs in the LRU cache would be emitted.  
 
 Would like to know how can we implement these algorithms using Flink. Any 
 help would be greatly appreciated.
 
 Dhruv Kumar
 PhD Candidate
 Department of Computer Science and Engineering
 University of Minnesota
 www.dhruvkumar.me 
>> 
> 
> 



Re: Out of the blue: "Cannot use split/select with side outputs"

2018-03-19 Thread Julio Biason
Update:

Even weirder, I stopped Flink (jobmanager and taskmanager) to increase the
number of slots and, upon restart, it crashed again and then processed
everything just fine.

On Mon, Mar 19, 2018 at 3:01 PM, Julio Biason 
wrote:

> Hey guys,
>
> I got a weird problem with my pipeline.
>
> The pipeline process lines from our logs and generate different metrics
> based on it (I mean, quite the standard procedure). It uses side outputs
> for dead letter queues, in case it finds something wrong with the logs and
> a metric can't be generated. At the end, because some metrics differ in
> content, we send them to different sinks (based on their class, which is
> quite simple); for this, we split the pipeline (based, again, on the
> metrics class) and each split goes to a different metric.
>
> Everything worked fine so far, but I spent some time writing tests for
> some weird results in generating the metrics but right now, when I tried to
> run the whole process, I get a "Cannot use split/select with side outputs".
>
> This is really weird to me, cause a couple of weeks, the combination of
> splits and side outputs worked like a charm.
>
> We didn't update Flink (still running 1.4.0) so I'm really confused on
> what's going on here.
>
> Any ideas?
>
> --
> *Julio Biason*, Sofware Engineer
> *AZION*  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51
> *99907 0554*
>



-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101   |  Mobile: +55 51
*99907 0554*


Re: Incremental checkpointing performance

2018-03-19 Thread Miyuru Dayarathna
Hi Nico,
Thanks for the detailed explanation. The only change I have made in my 
flink-conf.yaml file is the following.
state.backend.fs.checkpointdir: file:///home/ubuntu/tmp-flink-rocksdb
The default "state.backend" value is set to filesystem. Removing the 
env.setStateBackend() method code or changing the "state.backend" property to 
rocksdb does not change the state backend to RocksDB. I got this verified by 
looking at the Flink log files. I have mentioned a sample of the log file for 
your reference.
---carbon-5th:38631/user/taskmanager)
 as 1ac63dfb481eab3d3165a965084115f3. Current number of registered hosts is 1. 
Current number of alive task slots is 1.
2018-03-19 23:10:11,606 INFO  org.apache.flink.runtime.client.JobClient 
    - Checking and uploading JAR files
2018-03-19 23:10:11,618 INFO  org.apache.flink.runtime.jobmanager.JobManager
    - Submitting job 7c19a14f4e75149ffaa064fac7e2bf29 (Flink 
application.).
2018-03-19 23:10:11,623 INFO  org.apache.flink.runtime.jobmanager.JobManager
    - Using restart strategy 
FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, 
delayBetweenRestartAttempts=1) for 7c19a14f4e75149ffaa064fac7e2bf29.
2018-03-19 23:10:11,636 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph    - Job recovers 
via failover strategy: full graph restart
2018-03-19 23:10:11,648 INFO  org.apache.flink.runtime.jobmanager.JobManager
    - Running initialization on master for job Flink application. 
(7c19a14f4e75149ffaa064fac7e2bf29).
2018-03-19 23:10:11,648 INFO  org.apache.flink.runtime.jobmanager.JobManager
    - Successfully ran initialization on master in 0 ms.
2018-03-19 23:10:11,664 INFO  org.apache.flink.runtime.jobmanager.JobManager
    - Using application-defined state backend for checkpoint/savepoint 
metadata: RocksDB State Backend {isInitialized=false, 
configuredDbBasePaths=null, initializedDbBasePaths=null, 
checkpointStreamBackend=File State Backend @ 
file:/home/ubuntu/tmp-flink-rocksdb}.
2018-03-19 23:10:11,685 INFO  org.apache.flink.runtime.jobmanager.JobManager
    - Scheduling job 7c19a14f4e75149ffaa064fac7e2bf29 (Flink 
application.).
2018-03-19 23:10:11,685 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph    - Job Flink 
application. (7c19a14f4e75149ffaa064fac7e2bf29) switched from state CREATED to 
RUNNING.
2018-03-19 23:10:11,692 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph    - Source: 
inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched from 
CREATED to SCHEDULED.
2018-03-19 23:10:11,698 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph    - 
TriggerWindow(GlobalWindows(), 
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75},
 CountTrigger(1), 
org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a, 
WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1) 
(796fcd9c38c87b6efb6f512e78e626e9) switched from CREATED to SCHEDULED.
2018-03-19 23:10:11,706 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph    - Source: 
inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched from 
SCHEDULED to DEPLOYING.
2018-03-19 23:10:11,707 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph    - Deploying 
Source: inputStream -> Filter (1/1) (attempt #0) to computer1
2018-03-19 23:10:11,712 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph    - 
TriggerWindow(GlobalWindows(), 
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75},
 CountTrigger(1), 
org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a, 
WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1) 
(796fcd9c38c87b6efb6f512e78e626e9) switched from SCHEDULED to DEPLOYING.
2018-03-19 23:10:11,712 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph    - Deploying 
TriggerWindow(GlobalWindows(), 
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75},
 CountTrigger(1), 
org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a, 
WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1) (attempt 
#0) to computer1
2018-03-19 23:10:12,004 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph    - Source: 
inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched from 
DEPLOYING to RUNNING.
2018-03-19 23:10:12,011 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph    - 
TriggerWindow(GlobalWindows(), 
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75},
 CountTrigger(1), 
org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a, 
WindowedStream.reduce(WindowedStream.java:241)) -> 

Out of the blue: "Cannot use split/select with side outputs"

2018-03-19 Thread Julio Biason
Hey guys,

I got a weird problem with my pipeline.

The pipeline process lines from our logs and generate different metrics
based on it (I mean, quite the standard procedure). It uses side outputs
for dead letter queues, in case it finds something wrong with the logs and
a metric can't be generated. At the end, because some metrics differ in
content, we send them to different sinks (based on their class, which is
quite simple); for this, we split the pipeline (based, again, on the
metrics class) and each split goes to a different metric.

Everything worked fine so far, but I spent some time writing tests for some
weird results in generating the metrics but right now, when I tried to run
the whole process, I get a "Cannot use split/select with side outputs".

This is really weird to me, cause a couple of weeks, the combination of
splits and side outputs worked like a charm.

We didn't update Flink (still running 1.4.0) so I'm really confused on
what's going on here.

Any ideas?

-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101   |  Mobile: +55 51
*99907 0554*


Re: Flink web UI authentication

2018-03-19 Thread Nico Kruber
Hi Sampath,
aside from allowing only certain origins via the configuration parameter
"web.access-control-allow-origin", I am not aware of anything like
username/password authentication. Chesnay (cc'd) may know more about
future plans.
You can, however, wrap a proxy like squid around the web UI if you need
this.


Regards
Nico

On 13/03/18 11:16, Sampath Bhat wrote:
> Hello
> 
> I would like to know if flink supports any user level authentication
> like username/password for flink web ui.
> 
> Regards
> Sampath S
> 



signature.asc
Description: OpenPGP digital signature


Re: Flink kafka connector with JAAS configurations crashed

2018-03-19 Thread Nico Kruber
Hi,
I'm no expert on Kafka here, but as the tasks are run on the worker
nodes (where the TaskManagers are run), please double-check whether the
file under /data/apps/spark/kafka_client_jaas.conf on these nodes also
contains the same configuration as on the node running the JobManager,
i.e. an appropriate entry for 'KafkaClient'.


Regards
Nico

On 13/03/18 08:42, sundy wrote:
> 
> Hi ,all 
> 
> I use the code below to set kafka JASS config,   the
> serverConfig.jasspath is  /data/apps/spark/kafka_client_jaas.conf,   but
> on flink standalone deployment, it crashs. I am sure the
> kafka_client_jass.conf is valid, cause other applications(Spark
> streaming) are still working fine with it. So I think it may be not the
> problem caused by kafka 0.10 client.
> 
> System.setProperty("java.security.auth.login.config", serverConfig.jasspath);
> properties.setProperty("security.protocol", "SASL_PLAINTEXT");
> properties.setProperty("sasl.mechanism", "PLAIN");
> 
> 
> Exceptions msgs are:
> 
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:597)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:579)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:56)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:91)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:422)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: 
> java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in 
> the JAAS configuration. System property 'java.security.auth.login.config' is 
> /data/apps/spark/kafka_client_jaas.conf
>   at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:94)
>   at 
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
>   at 
> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
>   at 
> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657)
>   ... 11 more
> Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' 
> entry in the JAAS configuration. System property 
> 'java.security.auth.login.config' is /data/apps/spark/kafka_client_jaas.conf
>   at 
> org.apache.kafka.common.security.JaasUtils.defaultJaasConfig(JaasUtils.java:85)
>   at 
> org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:67)
>   at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:85)
>   ... 15 more
> 
> 
> 
> File content looks like below:
> 
> KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule
> required username="admin" password=“xxx"; };
> 
> It seems like the kafka_client_jaas.conf file has been read, but the
> KafkaClient entry could not be resolved. That’s very strange, other
> applications with the same config are working fine. And I wrote a simple
> Java code to test the the file, it works fine too.
> 
> 
> public static void main(String[] args) {
>   Map maps = new HashMap<>();
>   System.setProperty("java.security.auth.login.config",
> "/data/apps/spark/kafka_client_jaas.conf");
>   Configuration jassConfig = JaasUtils.jaasConfig(LoginType.CLIENT, maps);
>   AppConfigurationEntry object[] =
> jassConfig.getAppConfigurationEntry("KafkaClient");
>   for(AppConfigurationEntry entry : object){
>     System.out.println(entry.getOptions());
>   }
> }
> 
> 
> 
> 
>  
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: Submiting jobs via UI/Rest API

2018-03-19 Thread Nico Kruber
Thanks for reporting these issues,

1. This behaviour is actually intended since we do not spawn any thread
that is waiting for the job completion (which may or may not occur
eventually). Therefore, the web UI always submits jobs in detached mode
and you could not wait for job completion anyway. Any call after
env.execute() may thus not give you any more data than you already had
before. As a safety precaution, we stop the execution of the main()
method after env.execute().
If there was a possibility to wait for job completion, you would be able
to block the whole web UI with it.

2. This seems to be solved: I tried to submit this skeleton to Flink
1.5-SNAPSHOT and only got a failure message like this:
{"errors":["The main method caused an error."]}
The code I tried was
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.execute("Socket Window WordCount");

I also tried with a StreamTransformation only but got the same message.



Regards
Nico

On 09/03/18 14:33, eSKa wrote:
> Hi guys,
> 
> We were trying to use UI's "Submit new job" functionality (and later REST
> endpoints for that).
> There were few problems we found:
> 1. When we ran job that had additional code done after env execution (or any
> sink) the code was not executed. E.g. our job was calculating some data,
> writing it to temp location in sink and when everything was successfully,
> move files to proper location on HDFS. Running job using Java's
> YARNClusterClient API worked fine.
> 2. We wanted to test job using "Show Plan" option but it seems that running
> this option for job that did not have anything to run (e.g. calculated input
> paths list was empty) results in killing the container on YARN. I didnt find
> any suspicious logs in jobManager:
> 
> 
> /2018-03-09 14:13:53,979 INFO  com.my_job.CustomFlinkJob  
> - All job done :)
> 2018-03-09 14:13:53,996 INFO 
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Removing web
> dashboard root cache directory
> /tmp/flink-web-1fe30b99-9ad1-4531-b14b-143ea6c3d9ed
> 2018-03-09 14:13:54,004 INFO  org.apache.flink.runtime.blob.BlobServer
>  
> - Stopped BLOB server at 0.0.0.0:60727
> 2018-03-09 14:13:54,007 INFO 
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Removing web
> dashboard jar upload directory
> /tmp/flink-web-8d7b68fc-1ef7-4869-91c1-5bebb370b529
> /
> 
> We are using Flink 1.3.1 version, next week will play with 1.4.1.
> Any chances for fixing that bugs in next versions?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 



signature.asc
Description: OpenPGP digital signature


Re: Queryable State

2018-03-19 Thread Vishal Santoshi
Thank you. These do look like show stoppers for us.  But again thank you.

On Mon, Mar 19, 2018 at 10:31 AM, Fabian Hueske  wrote:

> AFAIK, there have been discussions to replicate state among TMs to speed
> up recovery (and improve availability).
> However, I'm not aware of plans to implement that.
>
> I don't think serving state while a job is down has been considered yet.
>
> 2018-03-19 15:17 GMT+01:00 Vishal Santoshi :
>
>> Are there plans to address all or few of the above apart from the  "JM LB
>> not possible" which seems understandable ?
>>
>> On Mon, Mar 19, 2018 at 9:58 AM, Fabian Hueske  wrote:
>>
>>> Queryable state is "just" an additional feature on regular keyed state.
>>> i.e., the only difference is that you can read the state from an outside
>>> application.
>>> Besides that it behaves exactly like regular application state
>>>
>>> Queryable state is (at the moment) designed to be accessible if a job
>>> runs.
>>> If the job fails (and recovers) or is manually taken down for
>>> maintenance, the state cannot be queried.
>>> It's not possible to put a load balancer in front of a JobManager. Only
>>> one JM is the active master that maintains a running job.
>>> State is also not replicated.
>>>
>>> Best, Fabian
>>>
>>>
>>> 2018-03-19 14:24 GMT+01:00 Vishal Santoshi :
>>>
 Those are understandable. I am more interested in a few things ( and
 may be more that could be added )

 * As far as I can understand JM is the SPOF. Does HA become a necessity
 ?
 * If there are 2 or more JM could we theoretically have a LB fronting
 them ? Thus it is a peer to peer access ( Cassandra ) or a master slave
 setup for JM HA specifically for Queryable access ( For  flink jobs it is
 master slave )
 * Do we replicate state to other TMs for read optimization (
 specifically to avoid Hot Node issues ) ?
 * If the job goes down it seems the state is not accessible. What plans
 to we have to "separate concerns" for Queryable state.

 We consider Queryable State significant a feature Flink provides and
 would do the necessary leg work if there are certain gaps in it being
 trully considered a Highly Available Key Value store.

 Regards.





 On Mon, Mar 19, 2018 at 5:53 AM, Fabian Hueske 
 wrote:

> Hi Vishal,
>
> In general, Queryable State should be ready to use.
> There are a few things to consider though:
>
> - State queries are not synchronized with the application code, i.e.,
> they can happen at the same time. Therefore, the Flink application should
> not modify objects that have been put into or read from the state if you
> are not using the RocksDBStatebackend (which creates copies by
> deserialization).
> - State will be rolled back after a failure. Hence, you can read
> writes that are not "committed by a checkpoint".
>
> @Kostas, did I forget something?
>
> Best, Fabian
>
>
>
> 2018-03-18 16:50 GMT+01:00 Vishal Santoshi 
> :
>
>> To be more precise, is anything thing similar to
>> https://engineering.linkedin.com/blog/2018/03/air-traffic
>> -controller--member-first-notifications-at-linkedin . done in Samza,
>> can be replicated with production level guarantees with Flink Queryable
>> state ( as it stands currently version 1.5 )  ?
>>
>> On Fri, Mar 16, 2018 at 5:10 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> We are making few decisions on use cases where  Queryable state is a
>>> natural fit https://ci.apache.org/projects
>>> /flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html
>>>
>>> Is Queryable state production ready ? We will go to 1.5 flnk if that
>>> helps to make the case for the usage.
>>>
>>
>>
>

>>>
>>
>


Re: Calling close() on Failure

2018-03-19 Thread Nico Kruber
Hi Gregory,
I tried to reproduce the behaviour you described but in my case (Flink
1.5-SNAPSHOT, using the SocketWindowWordCount adapted to let the first
flatmap be a RichFlatMapFunction with a close() method), the close()
method was actually called on the task manager I did not kill. Since the
close() actually comes from the RichFunction, the handling compared to a
ProcessFunction should not be different.

Can you give more details on your program and why you think it was not
called?


Regards
Nico

On 15/03/18 21:16, Gregory Fee wrote:
> Hello! I had a program lose a task manager the other day. The fail over
> back to a checkpoint and recovery worked like a charm. However, on one
> of my ProcessFunctions I defined a close() method and I noticed that it
> did not get called. To be clear, the task manager that failed was
> running that ProcessFunction. It makes sense to me that close() might
> not be callable in that case. But I had parallelism at 24 and I know
> that other instances of that ProcessFunction were running on machines
> that were gracefully shutdown yet zero close() functions were invoked.
> It seems like close() should get called on operators that are shutdown
> gracefully even in a failure condition. Is that how Flink is supposed to
> work? Am I missing something?
> 
> -- 
> *Gregory Fee*
> 
> Engineer
> 425.830.4734 
> Lyft 



signature.asc
Description: OpenPGP digital signature


Re: CsvSink

2018-03-19 Thread Nico Kruber
Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.

The code I used (running Flink 1.5-SNAPSHOT):

  def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
  .assignAscendingTimestamps(_._2)
tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)

val results = tableEnv.sqlQuery( """
   |SELECT
   | A,C
   | FROM mytable
 """.stripMargin)

val result: Table = results

val path = "file:///tmp/test/1.txt"
val sink :TableSink[Row]=   new CsvTableSink(
  path, // output path
  fieldDelim = "|", // optional: delimit files by '|'
  numFiles = 1, // optional: write to a single file
  writeMode = WriteMode.NO_OVERWRITE)

result.writeToSink(sink)

env.execute("this job")
  }

  def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
val data = new mutable.MutableList[(Int, Long, String)]
data.+=((1, 1L, "Hi"))
data.+=((2, 2L, "Hello"))
data.+=((3, 2L, "Hello world"))
data.+=((4, 3L, "Hello world, how are you?"))
data.+=((5, 3L, "I am fine."))
data.+=((6, 3L, "Luke Skywalker"))
env.fromCollection(data)
  }


Nico

On 16/03/18 22:50, karim amer wrote:
> Hi There,
> 
>  I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
> 
> 
> tableEnv.registerDataStream("table", watermarkedStream)
> 
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
> 
> 
> 
> val result: Table = results
> 
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]=   new CsvTableSink(
>   path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
> 
> result.writeToSink(sink)
> 
> env.execute("this job")
> 
> 
> 
> 
> Thanks



signature.asc
Description: OpenPGP digital signature


Re: Strange behavior on filter, group and reduce DataSets

2018-03-19 Thread simone

Hi Fabian,

This simple code reproduces the behavior -> 
https://github.com/xseris/Flink-test-union


Thanks, Simone.


On 19/03/2018 15:44, Fabian Hueske wrote:

Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans. The data is 
replicated, filtered, reduced, and unioned.
There is nothing in between the filter and reduce, that could cause 
incorrect behavior.


The good thing is, the optimizer seems to be fine. The bad thing is, 
it is either the Flink runtime code or your functions.
Given that one plan produces good results, it might be the Flink 
runtime code.


Coming back to my previous question.
Can you provide a minimal program to reproduce the issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske >:


Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske >:

HI Simone,

Looking at the plan, I don't see why this should be happening.
The pseudo code looks fine as well.
Any chance that you can create a minimal program to reproduce
the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone >:

Hi Fabian,

reuse is not enabled. I attach the plan of the execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:

Hi,

Union is actually a very simple operator (not even an
operator in Flink terms). It just merges to inputs. There
is no additional logic involved.
Therefore, it should also not emit records before either
of both ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the
data is reduced and emitted in a pipelined fashion, i.e.,
once the first record is reduced, it is forwarded into
the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before
the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects,
you have to be careful in how you implement your functions.
If no, can you share the plan
(ExecutionEnvironment.getExecutionPlan()) that was
generated for the program?

Thanks,
Fabian

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions





2018-03-19 9:51 GMT+01:00 Flavio Pompermaier
>:

Any help on this? This thing is very strange..the
"manual" union of the output of the 2 datasets is
different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone
> wrote:

Sorry, I translated the code into pseudocode too
fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:


Hi,

Just a guest, but string compare in Java should
be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:

/subject.getField("field1") == "";//
/














Re: Strange behavior on filter, group and reduce DataSets

2018-03-19 Thread Fabian Hueske
Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans. The data is replicated,
filtered, reduced, and unioned.
There is nothing in between the filter and reduce, that could cause
incorrect behavior.

The good thing is, the optimizer seems to be fine. The bad thing is, it is
either the Flink runtime code or your functions.
Given that one plan produces good results, it might be the Flink runtime
code.

Coming back to my previous question.
Can you provide a minimal program to reproduce the issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske :

> Ah, thanks for the update!
> I'll have a look at that.
>
> 2018-03-19 15:13 GMT+01:00 Fabian Hueske :
>
>> HI Simone,
>>
>> Looking at the plan, I don't see why this should be happening. The pseudo
>> code looks fine as well.
>> Any chance that you can create a minimal program to reproduce the problem?
>>
>> Thanks,
>> Fabian
>>
>> 2018-03-19 12:04 GMT+01:00 simone :
>>
>>> Hi Fabian,
>>>
>>> reuse is not enabled. I attach the plan of the execution.
>>>
>>> Thanks,
>>> Simone
>>>
>>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>>
>>> Hi,
>>>
>>> Union is actually a very simple operator (not even an operator in Flink
>>> terms). It just merges to inputs. There is no additional logic involved.
>>> Therefore, it should also not emit records before either of both
>>> ReduceFunctions sorted its data.
>>> Once the data has been sorted for the ReduceFunction, the data is
>>> reduced and emitted in a pipelined fashion, i.e., once the first record is
>>> reduced, it is forwarded into the MapFunction (passing the unioned inputs).
>>> So it is not unexpected that Map starts processing before the
>>> ReduceFunction terminated.
>>>
>>> Did you enable object reuse [1]?
>>> If yes, try to disable it. If you want to reuse objects, you have to be
>>> careful in how you implement your functions.
>>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>>> that was generated for the program?
>>>
>>> Thanks,
>>> Fabian
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/batch/index.html#operating-on-data-objects-in-functions
>>>
>>>
>>>
>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier :
>>>
 Any help on this? This thing is very strange..the "manual" union of the
 output of the 2 datasets is different than the flink-union of them..
 Could it be a problem of the flink optimizer?

 Best,
 Flavio

 On Fri, Mar 16, 2018 at 4:01 PM, simone 
 wrote:

> Sorry, I translated the code into pseudocode too fast. That is indeed
> an equals.
>
> On 16/03/2018 15:58, Kien Truong wrote:
>
> Hi,
>
> Just a guest, but string compare in Java should be using equals
> method, not == operator.
>
> Regards,
>
> Kien
>
>
> On 3/16/2018 9:47 PM, simone wrote:
>
> *subject.getField("field1") == "";*
>
>
>


>>>
>>>
>>
>


Re: Queryable State

2018-03-19 Thread Fabian Hueske
AFAIK, there have been discussions to replicate state among TMs to speed up
recovery (and improve availability).
However, I'm not aware of plans to implement that.

I don't think serving state while a job is down has been considered yet.

2018-03-19 15:17 GMT+01:00 Vishal Santoshi :

> Are there plans to address all or few of the above apart from the  "JM LB
> not possible" which seems understandable ?
>
> On Mon, Mar 19, 2018 at 9:58 AM, Fabian Hueske  wrote:
>
>> Queryable state is "just" an additional feature on regular keyed state.
>> i.e., the only difference is that you can read the state from an outside
>> application.
>> Besides that it behaves exactly like regular application state
>>
>> Queryable state is (at the moment) designed to be accessible if a job
>> runs.
>> If the job fails (and recovers) or is manually taken down for
>> maintenance, the state cannot be queried.
>> It's not possible to put a load balancer in front of a JobManager. Only
>> one JM is the active master that maintains a running job.
>> State is also not replicated.
>>
>> Best, Fabian
>>
>>
>> 2018-03-19 14:24 GMT+01:00 Vishal Santoshi :
>>
>>> Those are understandable. I am more interested in a few things ( and may
>>> be more that could be added )
>>>
>>> * As far as I can understand JM is the SPOF. Does HA become a necessity ?
>>> * If there are 2 or more JM could we theoretically have a LB fronting
>>> them ? Thus it is a peer to peer access ( Cassandra ) or a master slave
>>> setup for JM HA specifically for Queryable access ( For  flink jobs it is
>>> master slave )
>>> * Do we replicate state to other TMs for read optimization (
>>> specifically to avoid Hot Node issues ) ?
>>> * If the job goes down it seems the state is not accessible. What plans
>>> to we have to "separate concerns" for Queryable state.
>>>
>>> We consider Queryable State significant a feature Flink provides and
>>> would do the necessary leg work if there are certain gaps in it being
>>> trully considered a Highly Available Key Value store.
>>>
>>> Regards.
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Mar 19, 2018 at 5:53 AM, Fabian Hueske 
>>> wrote:
>>>
 Hi Vishal,

 In general, Queryable State should be ready to use.
 There are a few things to consider though:

 - State queries are not synchronized with the application code, i.e.,
 they can happen at the same time. Therefore, the Flink application should
 not modify objects that have been put into or read from the state if you
 are not using the RocksDBStatebackend (which creates copies by
 deserialization).
 - State will be rolled back after a failure. Hence, you can read writes
 that are not "committed by a checkpoint".

 @Kostas, did I forget something?

 Best, Fabian



 2018-03-18 16:50 GMT+01:00 Vishal Santoshi :

> To be more precise, is anything thing similar to
> https://engineering.linkedin.com/blog/2018/03/air-traffic
> -controller--member-first-notifications-at-linkedin . done in Samza,
> can be replicated with production level guarantees with Flink Queryable
> state ( as it stands currently version 1.5 )  ?
>
> On Fri, Mar 16, 2018 at 5:10 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> We are making few decisions on use cases where  Queryable state is a
>> natural fit https://ci.apache.org/projects
>> /flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html
>>
>> Is Queryable state production ready ? We will go to 1.5 flnk if that
>> helps to make the case for the usage.
>>
>
>

>>>
>>
>


Re: flink on mesos

2018-03-19 Thread Nico Kruber
Can you elaborate a bit more on what is not working? (please provide a
log file or the standard output/error).
Also, can you try a newer flink checkount? The start scripts have been
merged into a single one for 'flip6' and 'old' - I guess,
mesos-appmaster.sh should be the right script for you now.


Regards
Nico

On 18/03/18 17:06, miki haiat wrote:
> I think  that you can use the catalog option only if you install dc/os ?
>  
> 
>  iv  installed  mesos and marathon  
> 
> 
> 
> 
> On Sun, Mar 18, 2018 at 5:59 PM, Lasse Nedergaard
> > wrote:
> 
> Hi. 
> Go to Catalog, Search for Flink and click deploy
> 
> Med venlig hilsen / Best regards
> Lasse Nedergaard
> 
> 
> Den 18. mar. 2018 kl. 16.18 skrev miki haiat  >:
> 
>>
>> Hi , 
>>
>> Im trying to run flink on mesos iv  installed  mesos and marathon
>> successfully but im unable to create flink job/task manager 
>>
>> im running this command but mesos wont start any task 
>>
>> ./mesos-appmaster-flip6-session.sh  -n 1
>>
>>
>>
>> i cant figure out the proper way to run flink on  mesos  
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Queryable State

2018-03-19 Thread Vishal Santoshi
Are there plans to address all or few of the above apart from the  "JM LB
not possible" which seems understandable ?

On Mon, Mar 19, 2018 at 9:58 AM, Fabian Hueske  wrote:

> Queryable state is "just" an additional feature on regular keyed state.
> i.e., the only difference is that you can read the state from an outside
> application.
> Besides that it behaves exactly like regular application state
>
> Queryable state is (at the moment) designed to be accessible if a job runs.
> If the job fails (and recovers) or is manually taken down for maintenance,
> the state cannot be queried.
> It's not possible to put a load balancer in front of a JobManager. Only
> one JM is the active master that maintains a running job.
> State is also not replicated.
>
> Best, Fabian
>
>
> 2018-03-19 14:24 GMT+01:00 Vishal Santoshi :
>
>> Those are understandable. I am more interested in a few things ( and may
>> be more that could be added )
>>
>> * As far as I can understand JM is the SPOF. Does HA become a necessity ?
>> * If there are 2 or more JM could we theoretically have a LB fronting
>> them ? Thus it is a peer to peer access ( Cassandra ) or a master slave
>> setup for JM HA specifically for Queryable access ( For  flink jobs it is
>> master slave )
>> * Do we replicate state to other TMs for read optimization ( specifically
>> to avoid Hot Node issues ) ?
>> * If the job goes down it seems the state is not accessible. What plans
>> to we have to "separate concerns" for Queryable state.
>>
>> We consider Queryable State significant a feature Flink provides and
>> would do the necessary leg work if there are certain gaps in it being
>> trully considered a Highly Available Key Value store.
>>
>> Regards.
>>
>>
>>
>>
>>
>> On Mon, Mar 19, 2018 at 5:53 AM, Fabian Hueske  wrote:
>>
>>> Hi Vishal,
>>>
>>> In general, Queryable State should be ready to use.
>>> There are a few things to consider though:
>>>
>>> - State queries are not synchronized with the application code, i.e.,
>>> they can happen at the same time. Therefore, the Flink application should
>>> not modify objects that have been put into or read from the state if you
>>> are not using the RocksDBStatebackend (which creates copies by
>>> deserialization).
>>> - State will be rolled back after a failure. Hence, you can read writes
>>> that are not "committed by a checkpoint".
>>>
>>> @Kostas, did I forget something?
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> 2018-03-18 16:50 GMT+01:00 Vishal Santoshi :
>>>
 To be more precise, is anything thing similar to
 https://engineering.linkedin.com/blog/2018/03/air-traffic
 -controller--member-first-notifications-at-linkedin . done in Samza,
 can be replicated with production level guarantees with Flink Queryable
 state ( as it stands currently version 1.5 )  ?

 On Fri, Mar 16, 2018 at 5:10 PM, Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> We are making few decisions on use cases where  Queryable state is a
> natural fit https://ci.apache.org/projects
> /flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html
>
> Is Queryable state production ready ? We will go to 1.5 flnk if that
> helps to make the case for the usage.
>


>>>
>>
>


Re: Incremental checkpointing performance

2018-03-19 Thread Nico Kruber
Hi Miyuru,
Indeed, the behaviour you observed sounds strange and kind of go against
the results Stefan presented in [1]. To see what is going on, can you
also share your changes to Flink's configuration, i.e. flink-conf.yaml?

Let's first make sure you're really comparing RocksDBStateBackend with
vs without incremental checkpoints:
- if you remove this from the code:
env.setStateBackend(new RocksDBStateBackend(
   new FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
   true));
then you will end up with the state backend configured via the
"state.backend" property. Was this set to "rocksdb"? Alternatively, you
can set the second parameter to the RocksDBStateBackend constructor to
false to get the right back-end.

You can also verify the values you see from the web interface by looking
into the logs (at INFO level). There, you should see reports like this:
"Asynchronous RocksDB snapshot (..., asynchronous part) in thread ...
took ... ms." and "Asynchronous RocksDB snapshot (..., synchronous part)
in thread ... took ... ms."

Other than that, from what I know about it (Stefan (cc'd), correct me if
I'm wrong), incremental checkpoints only do hard links locally to the
changed sst files and then copy the data in there to the checkpoint
store (the path you gave). A full checkpoint must copy all current data.
If, between two checkpoints, you write more data than the contents of
the database, e.g. by updating a key multiple times, you may indeed have
more data to store. Judging from the state sizes you gave, this is
probably not the case.


Let's get started with this and see whether there is anything unusual.


Regards,
Nico


[1]
https://berlin.flink-forward.org/kb_sessions/a-look-at-flinks-internal-data-structures-and-algorithms-for-efficient-checkpointing/

On 19/03/18 05:25, Miyuru Dayarathna wrote:
> Hi,
> 
> We did a performance test of Flink's incremental checkpointing to
> measure the average time it takes to create a checkpoint and the average
> checkpoint file size. We did this test on a single computer in order to
> avoid the latencies introduced by network communication. The computer
> had Intel® Core™ i7-7600U CPU @ 2.80GHz, 4 cores, 16GB RAM, 450GB HDD,
> 101GB free SSD space. The computer was running on Ubuntu 16.04
> LTS, Apache Flink 1.4.1, Java version "1.8.0_152", Java(TM) SE Runtime
> Environment (build 1.8.0_152-b16), Java HotSpot(TM) 64-Bit Server VM
> (build 25.152-b16, mixed mode). We used the JVM flags -Xmx8g -Xms8g. The
> test was run for 40 minutes.
> 
> The Flink application we used is as follows,
> //-
> public class LengthWindowIncrementalCheckpointing {
>     private static DataStream>
> inputStream = null;
>     private static final int PARALLELISM = 1;
>     private static final int timeoutMillis = 10;
>     private static final int WINDOWLENGTH = 1;
>     private static final int SLIDELENGTH = 1;
>     private static Logger logger =
> LoggerFactory.getLogger(LengthWindowIncrementalCheckpointing.class);
> 
>     public static void main(String[] args) {
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
>     // start a checkpoint every 1000 ms
>     env.enableCheckpointing(1000);
>     try {
>     env.setStateBackend(new RocksDBStateBackend(
>     new
> FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>     true));
>     } catch (IOException e) {
>     e.printStackTrace();
>     }
> 
>     env.setBufferTimeout(timeoutMillis);
>     inputStream = env.addSource(new
> MicrobenchSourceFunction()).setParallelism(PARALLELISM).name("inputStream");
> 
>     DataStream>
> incrementStream2 =
>     inputStream.filter(new FilterFunction Float, Integer, String>>() {
>     @Override
>     public boolean filter(Tuple4 String> tuple) throws Exception {
>     if (tuple.f1 > 10) {
>     return true;
>     }
>     return false;
>     }
>     }).keyBy(1).countWindow(WINDOWLENGTH, SLIDELENGTH).sum(2);
>     incrementStream2.writeUsingOutputFormat(new
> DiscardingOutputFormat     String>>());
> 
>     try {
>     env.execute("Flink application.");
>     } catch (Exception e) {
>     logger.error("Error in starting the Flink stream
> application: " + e.getMessage(), e);
>     }
>     }
> }
> 
> //-
> 
> I have attached two charts (Average_latencies.jpg and
> Average_state_sizes.jpg) 

Re: Strange behavior on filter, group and reduce DataSets

2018-03-19 Thread Fabian Hueske
Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske :

> HI Simone,
>
> Looking at the plan, I don't see why this should be happening. The pseudo
> code looks fine as well.
> Any chance that you can create a minimal program to reproduce the problem?
>
> Thanks,
> Fabian
>
> 2018-03-19 12:04 GMT+01:00 simone :
>
>> Hi Fabian,
>>
>> reuse is not enabled. I attach the plan of the execution.
>>
>> Thanks,
>> Simone
>>
>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>
>> Hi,
>>
>> Union is actually a very simple operator (not even an operator in Flink
>> terms). It just merges to inputs. There is no additional logic involved.
>> Therefore, it should also not emit records before either of both
>> ReduceFunctions sorted its data.
>> Once the data has been sorted for the ReduceFunction, the data is reduced
>> and emitted in a pipelined fashion, i.e., once the first record is reduced,
>> it is forwarded into the MapFunction (passing the unioned inputs).
>> So it is not unexpected that Map starts processing before the
>> ReduceFunction terminated.
>>
>> Did you enable object reuse [1]?
>> If yes, try to disable it. If you want to reuse objects, you have to be
>> careful in how you implement your functions.
>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>> that was generated for the program?
>>
>> Thanks,
>> Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/batch/index.html#operating-on-data-objects-in-functions
>>
>>
>>
>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier :
>>
>>> Any help on this? This thing is very strange..the "manual" union of the
>>> output of the 2 datasets is different than the flink-union of them..
>>> Could it be a problem of the flink optimizer?
>>>
>>> Best,
>>> Flavio
>>>
>>> On Fri, Mar 16, 2018 at 4:01 PM, simone 
>>> wrote:
>>>
 Sorry, I translated the code into pseudocode too fast. That is indeed
 an equals.

 On 16/03/2018 15:58, Kien Truong wrote:

 Hi,

 Just a guest, but string compare in Java should be using equals method,
 not == operator.

 Regards,

 Kien


 On 3/16/2018 9:47 PM, simone wrote:

 *subject.getField("field1") == "";*



>>>
>>>
>>
>>
>


Re: Strange behavior on filter, group and reduce DataSets

2018-03-19 Thread Fabian Hueske
HI Simone,

Looking at the plan, I don't see why this should be happening. The pseudo
code looks fine as well.
Any chance that you can create a minimal program to reproduce the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone :

> Hi Fabian,
>
> reuse is not enabled. I attach the plan of the execution.
>
> Thanks,
> Simone
>
> On 19/03/2018 11:36, Fabian Hueske wrote:
>
> Hi,
>
> Union is actually a very simple operator (not even an operator in Flink
> terms). It just merges to inputs. There is no additional logic involved.
> Therefore, it should also not emit records before either of both
> ReduceFunctions sorted its data.
> Once the data has been sorted for the ReduceFunction, the data is reduced
> and emitted in a pipelined fashion, i.e., once the first record is reduced,
> it is forwarded into the MapFunction (passing the unioned inputs).
> So it is not unexpected that Map starts processing before the
> ReduceFunction terminated.
>
> Did you enable object reuse [1]?
> If yes, try to disable it. If you want to reuse objects, you have to be
> careful in how you implement your functions.
> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
> that was generated for the program?
>
> Thanks,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> dev/batch/index.html#operating-on-data-objects-in-functions
>
>
>
> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier :
>
>> Any help on this? This thing is very strange..the "manual" union of the
>> output of the 2 datasets is different than the flink-union of them..
>> Could it be a problem of the flink optimizer?
>>
>> Best,
>> Flavio
>>
>> On Fri, Mar 16, 2018 at 4:01 PM, simone 
>> wrote:
>>
>>> Sorry, I translated the code into pseudocode too fast. That is indeed an
>>> equals.
>>>
>>> On 16/03/2018 15:58, Kien Truong wrote:
>>>
>>> Hi,
>>>
>>> Just a guest, but string compare in Java should be using equals method,
>>> not == operator.
>>>
>>> Regards,
>>>
>>> Kien
>>>
>>>
>>> On 3/16/2018 9:47 PM, simone wrote:
>>>
>>> *subject.getField("field1") == "";*
>>>
>>>
>>>
>>
>>
>
>


Re: HDFS data locality and distribution

2018-03-19 Thread Reinier Kip
Hi Chesnay,


Thanks for responding.


I managed to resolve the problem last Friday; I had a single datasource for 
each file, instead of one big datasource for all the files. The reading of the 
one or two HDFS blocks within each datasource was then distributed to a small 
percentage of slots (let's say ~10%). Some Beam runner-specific knowledge for 
Flink I did not yet have.


> the function after the groupBy() should still make full use of the 
> parallelism of the cluster
> Do note that data skew can affect how much data is distributed to each node


I do not remember seeing this behaviour, instead I remember data was 
redistributed only among slots that did the reading, but I cannot verify this 
at this point. Also, I do not know exactly how Beam operators map to Flink's. 
Key distribution is in the millions and quite uniform.


Reinier


From: Chesnay Schepler 
Sent: 13 March 2018 12:40:02
To: user@flink.apache.org
Subject: Re: HDFS data locality and distribution

Hello,

You said that "data is distributed very badly across slots"; do you mean that 
only a small number of subtasks is reading from HDFS, or that the keyed data is 
only processed by a few subtasks?

Flink does prioritize date locality over date distribution when reading the 
files, but the function after the groupBy() should still make full use of the 
parallelism of the cluster. Do note that data skew can affect how much data is 
distributed to each node, i.e. if 80% of your data has the same key (or rather 
hash), they will all end up on the same node.

On 12.03.2018 13:49, Reinier Kip wrote:

Relevant versions: Beam 2.1, Flink 1.3.


From: Reinier Kip 
Sent: 12 March 2018 13:45:47
To: user@flink.apache.org
Subject: HDFS data locality and distribution


Hey all,


I'm trying to batch-process 30-ish files from HDFS, but I see that data is 
distributed very badly across slots. 4 out of 32 slots get 4/5ths of the data, 
another 3 slots get about 1/5th and a last slot just a few records. This 
probably triggers disk spillover on these slots and slows down the job 
immensely. The data has many many unique keys and processing could be done in a 
highly parallel manner. From what I understand, HDFS data locality governs 
which splits are assigned to which subtask.


  *   I'm running a Beam on Flink on YARN pipeline.
  *   I'm reading 30-ish files, whose records are later grouped by their 
millions of unique keys.
  *   For now, I have 8 task managers by 4 slots. Beam sets all subtasks to 
have 32 parallelism.
  *   Data seems to be localised to 9 out of the 32 slots, 3 out of the 8 task 
managers.


Does the statement of input split assignment ring true? Is the fact that data 
isn't redistributed an effort from Flink to have high data locality, even if 
this means disk spillover for a few slots/tms and idleness for others? Is there 
any use for parallelism if work isn't distributed anyway?


Thanks for your time, Reinier



Re: Custom Processing per window

2018-03-19 Thread Fabian Hueske
Hi,

Data is partitioned by key across machines and state is kept per key. It is
not possible to interact with two keys at the same time.

Best, Fabian

2018-03-19 14:47 GMT+01:00 Dhruv Kumar :

> In other words, while using the Flink streaming APIs, is it possible to
> take a decision on emitting a particular key based on the state of some
> other key present in the same window?
>
> Thanks!
> --
> *Dhruv Kumar*
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me
>
> On Mar 19, 2018, at 05:11, Dhruv Kumar  wrote:
>
> Task 1: I implemented it using a custom Trigger (see attached file). Looks
> like it is doing what I want it to. I copied the code from
> EventTimeTrigger.java and overwrote the *onElement* method.
>
> Task 2: I will need to maintain the state (this will be the LRU cache) for
> multiple keys in the same data structure. But it looks like that the Keyed
> states are on a per key basis. Should I use OperatorState in some way? Can
> I use a data structure not directly managed by Flink? What will happen in
> the case of keys across multiple machines?
>
> 
>
>
> *Dhruv Kumar*
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me
>
> On Mar 19, 2018, at 02:04, Jörn Franke  wrote:
>
> How would you start implementing it? Where are you stuck?
>
> Did you already try to implement this?
>
> On 18. Mar 2018, at 04:10, Dhruv Kumar  wrote:
>
> Hi
>
> I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for
> implementing some very specific use-cases: (They may not seem relevant but
> I need to implement them or I at least need to know if it is possible to
> implement them in Flink)
>
> Assumptions:
> 1. Data stream is of the form (key, value). We achieve this by the *.key*
> operation provided by Flink API.
> 2. By emitting a key, I mean sending/outputting its aggregated value to
> any data sink.
>
> 1. For each Tumbling window in the Event Time space, for each key, I would
> like to aggregate its value until it crosses a particular threshold (same
> threshold for all the keys). As soon as the key’s aggregated value crosses
> this threshold, I would like to emit this key. At the end of every tumbling
> window, all the (key, value) aggregated pairs  would be emitted
> irrespective of whether they have crossed the threshold or not.
>
> 2. For each Tumbling window in the event time space, I would like to
> maintain a LRU cache which stores the keys along with their aggregated
> values and their latest arrival time. The least recently used (LRU) key
> would be the key whose latest arrival time is earlier than the latest
> arrival times of all the other keys present in the LRU cache. The LRU cache
> is of a limited size. So, it is possible that the number of unique keys in
> a particular window is greater than the size of LRU cache. Whenever any
> (key, value) pair arrives, if the key already exists, its aggregated value
> is updated with the value of the newly arrived value and its latest arrival
> time is updated with the current event time. If the key does not exist and
> there is some free slot in the LRU cache, it is added into the LRU. As soon
> as the LRU cache gets occupied fully and a new key comes in which does not
> exist in the LRU cache, we would like to emit the least recently used key
> to accommodate the newly arrived key. As in the case of 1, at the end of
> every tumbling window, all the (key, value) aggregated pairs in the LRU
> cache would be emitted.
>
> Would like to know how can we implement these algorithms using Flink. Any
> help would be greatly appreciated.
>
> *Dhruv Kumar*
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me
>
>
>
>


Re: Queryable State

2018-03-19 Thread Fabian Hueske
Queryable state is "just" an additional feature on regular keyed state.
i.e., the only difference is that you can read the state from an outside
application.
Besides that it behaves exactly like regular application state

Queryable state is (at the moment) designed to be accessible if a job runs.
If the job fails (and recovers) or is manually taken down for maintenance,
the state cannot be queried.
It's not possible to put a load balancer in front of a JobManager. Only one
JM is the active master that maintains a running job.
State is also not replicated.

Best, Fabian


2018-03-19 14:24 GMT+01:00 Vishal Santoshi :

> Those are understandable. I am more interested in a few things ( and may
> be more that could be added )
>
> * As far as I can understand JM is the SPOF. Does HA become a necessity ?
> * If there are 2 or more JM could we theoretically have a LB fronting them
> ? Thus it is a peer to peer access ( Cassandra ) or a master slave setup
> for JM HA specifically for Queryable access ( For  flink jobs it is master
> slave )
> * Do we replicate state to other TMs for read optimization ( specifically
> to avoid Hot Node issues ) ?
> * If the job goes down it seems the state is not accessible. What plans to
> we have to "separate concerns" for Queryable state.
>
> We consider Queryable State significant a feature Flink provides and would
> do the necessary leg work if there are certain gaps in it being trully
> considered a Highly Available Key Value store.
>
> Regards.
>
>
>
>
>
> On Mon, Mar 19, 2018 at 5:53 AM, Fabian Hueske  wrote:
>
>> Hi Vishal,
>>
>> In general, Queryable State should be ready to use.
>> There are a few things to consider though:
>>
>> - State queries are not synchronized with the application code, i.e.,
>> they can happen at the same time. Therefore, the Flink application should
>> not modify objects that have been put into or read from the state if you
>> are not using the RocksDBStatebackend (which creates copies by
>> deserialization).
>> - State will be rolled back after a failure. Hence, you can read writes
>> that are not "committed by a checkpoint".
>>
>> @Kostas, did I forget something?
>>
>> Best, Fabian
>>
>>
>>
>> 2018-03-18 16:50 GMT+01:00 Vishal Santoshi :
>>
>>> To be more precise, is anything thing similar to
>>> https://engineering.linkedin.com/blog/2018/03/air-traffic
>>> -controller--member-first-notifications-at-linkedin . done in Samza,
>>> can be replicated with production level guarantees with Flink Queryable
>>> state ( as it stands currently version 1.5 )  ?
>>>
>>> On Fri, Mar 16, 2018 at 5:10 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 We are making few decisions on use cases where  Queryable state is a
 natural fit https://ci.apache.org/projects
 /flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html

 Is Queryable state production ready ? We will go to 1.5 flnk if that
 helps to make the case for the usage.

>>>
>>>
>>
>


Re: How to correct use timeWindow() with DataStream?

2018-03-19 Thread Fabian Hueske
If you don't want to partition by key, i.e., have a single result for each
time window, you should not use keyBy and an allWindow.
However, this will only be executed with a parallelism of 1.

2018-03-19 13:54 GMT+01:00 Felipe Gutierrez :

> thanks a lot Fabian,
>
> It clarified my way to developing. I am using keyBy, timeWindow, and apply
> monad operator at the EventTimeStreamExampleJava
> 
> now. I am generating dates in order and with a bit out of orderness now at
> LogSourceFunction
> .
> And only using Date as my key at LogLine
> 
> object.
>
> If I understood watermarks well, my program should combine all the lines
> that are inside the same watermark when I set ".timeWindow(Time.seconds(5),
> Time.seconds(1))" and used ".apply(new LogLineCounterFunction())". But it
> is still not happening because I didn't use a good key ".keyBy(lineLog ->
> lineLog.getTime())" and my key at the LogLineCounterFunction class is still
> the Date.
>
> public static class LogLineCounterFunction implements WindowFunction<
> LogLine, // input
> Tuple3, // output
> Date, // key
> TimeWindow> { // window type
>
> What should I use as a key in my case?
>
> My output is combining only the lines with the same key (Date). I want to
> combine the dates between the watermarks ".timeWindow(Time.seconds(5),
> Time.seconds(1))"...
>
> 3> (LogLine{time=2003-12-15 16:31:08.534, line=' | 2003-12-15 16:31:08.534
> | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
> 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 |
> 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
> 16:31:08.534},107151667,9)
> 3> (LogLine{time=2003-12-15 16:31:04.184, line=' | 2003-12-15 16:31:04.184
> | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15
> 16:31:04.184},107151667,4)
> 3> (LogLine{time=2003-12-15 16:31:00.884, line=' | 2003-12-15 16:31:00.884
> | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
> 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 |
> 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884
> | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
> 16:31:00.884},107151667,12)
> 3> (LogLine{time=2003-12-15 16:31:03.784, line=' | 2003-12-15
> 16:31:03.784},107151667,1)
> 3> (LogLine{time=2003-12-15 16:31:06.334, line=' | 2003-12-15 16:31:06.334
> | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15
> 16:31:06.334},107151667,4)
>
>
>
>
> On Mon, Mar 19, 2018 at 6:44 AM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> The timestamps of the stream records should be increasing (strict
>> monotonicity is not required, a bit out of orderness can be handled due to
>> watermarks).
>> So, the events should also be generated with increasing timestamps. It
>> looks like your generator generates random dates. I'd also generate data
>> with millisecond precision, not just days.
>>
>> Also, a timestamp in Flink is the number of milliseconds since
>> 1970-01-01-00:00:00.
>> However, your timestamp extractor only returns the number of seconds
>> since last minute (i.e., from 0 to 60). You should use Date.getTime()
>> instead of Date.getSeconds().
>>
>> Best, Fabian
>>
>> 2018-03-16 18:08 GMT+01:00 Felipe Gutierrez > >:
>>
>>> Hi all,
>>>
>>> I am building an example with DataStream using Flink that has a fake
>>> source generator of LogLine(Date d, String line). I want to work with
>>> Watermarks on it so I created a class that implements 
>>> AssignerWithPeriodicWatermarks.
>>> If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream
>>> I can group by second and concatenate the lines. When I use
>>> ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I
>>> misunderstood something when I was reading about Event Time. Could anyone
>>> help me please? My source code is as follow.
>>>
>>> Thanks for the ideas. Kind Regards,  Felipe
>>>
>>> package flink.example.streaming;
>>>
>>> import flink.util.LogLine;
>>> import flink.util.LogSourceFunction;
>>> import flink.util.UtilDate;
>>> import org.apache.flink.api.common.functions.MapFunction;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>>> vironment;
>>> import org.apache.flink.streaming.api.functions.AssignerWithPeriodi
>>> cWatermarks;
>>> import org.apache.flink.streaming.api.watermark.Watermark;
>>> import 

Re: Custom Processing per window

2018-03-19 Thread Dhruv Kumar
In other words, while using the Flink streaming APIs, is it possible to take a 
decision on emitting a particular key based on the state of some other key 
present in the same window?

Thanks!
--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

> On Mar 19, 2018, at 05:11, Dhruv Kumar  wrote:
> 
> Task 1: I implemented it using a custom Trigger (see attached file). Looks 
> like it is doing what I want it to. I copied the code from 
> EventTimeTrigger.java and overwrote the onElement method. 
> 
> Task 2: I will need to maintain the state (this will be the LRU cache) for 
> multiple keys in the same data structure. But it looks like that the Keyed 
> states are on a per key basis. Should I use OperatorState in some way? Can I 
> use a data structure not directly managed by Flink? What will happen in the 
> case of keys across multiple machines?
> 
> 
> 
> 
> Dhruv Kumar
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me 
> 
>> On Mar 19, 2018, at 02:04, Jörn Franke > > wrote:
>> 
>> How would you start implementing it? Where are you stuck?
>> 
>> Did you already try to implement this?
>> 
>> On 18. Mar 2018, at 04:10, Dhruv Kumar > > wrote:
>> 
>>> Hi
>>> 
>>> I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for 
>>> implementing some very specific use-cases: (They may not seem relevant but 
>>> I need to implement them or I at least need to know if it is possible to 
>>> implement them in Flink)
>>> 
>>> Assumptions:
>>> 1. Data stream is of the form (key, value). We achieve this by the .key 
>>> operation provided by Flink API.
>>> 2. By emitting a key, I mean sending/outputting its aggregated value to any 
>>> data sink. 
>>> 
>>> 1. For each Tumbling window in the Event Time space, for each key, I would 
>>> like to aggregate its value until it crosses a particular threshold (same 
>>> threshold for all the keys). As soon as the key’s aggregated value crosses 
>>> this threshold, I would like to emit this key. At the end of every tumbling 
>>> window, all the (key, value) aggregated pairs  would be emitted 
>>> irrespective of whether they have crossed the threshold or not.
>>> 
>>> 2. For each Tumbling window in the event time space, I would like to 
>>> maintain a LRU cache which stores the keys along with their aggregated 
>>> values and their latest arrival time. The least recently used (LRU) key 
>>> would be the key whose latest arrival time is earlier than the latest 
>>> arrival times of all the other keys present in the LRU cache. The LRU cache 
>>> is of a limited size. So, it is possible that the number of unique keys in 
>>> a particular window is greater than the size of LRU cache. Whenever any 
>>> (key, value) pair arrives, if the key already exists, its aggregated value 
>>> is updated with the value of the newly arrived value and its latest arrival 
>>> time is updated with the current event time. If the key does not exist and 
>>> there is some free slot in the LRU cache, it is added into the LRU. As soon 
>>> as the LRU cache gets occupied fully and a new key comes in which does not 
>>> exist in the LRU cache, we would like to emit the least recently used key 
>>> to accommodate the newly arrived key. As in the case of 1, at the end of 
>>> every tumbling window, all the (key, value) aggregated pairs in the LRU 
>>> cache would be emitted.  
>>> 
>>> Would like to know how can we implement these algorithms using Flink. Any 
>>> help would be greatly appreciated.
>>> 
>>> Dhruv Kumar
>>> PhD Candidate
>>> Department of Computer Science and Engineering
>>> University of Minnesota
>>> www.dhruvkumar.me 
> 



Re: Queryable State

2018-03-19 Thread Vishal Santoshi
Those are understandable. I am more interested in a few things ( and may be
more that could be added )

* As far as I can understand JM is the SPOF. Does HA become a necessity ?
* If there are 2 or more JM could we theoretically have a LB fronting them
? Thus it is a peer to peer access ( Cassandra ) or a master slave setup
for JM HA specifically for Queryable access ( For  flink jobs it is master
slave )
* Do we replicate state to other TMs for read optimization ( specifically
to avoid Hot Node issues ) ?
* If the job goes down it seems the state is not accessible. What plans to
we have to "separate concerns" for Queryable state.

We consider Queryable State significant a feature Flink provides and would
do the necessary leg work if there are certain gaps in it being trully
considered a Highly Available Key Value store.

Regards.





On Mon, Mar 19, 2018 at 5:53 AM, Fabian Hueske  wrote:

> Hi Vishal,
>
> In general, Queryable State should be ready to use.
> There are a few things to consider though:
>
> - State queries are not synchronized with the application code, i.e., they
> can happen at the same time. Therefore, the Flink application should not
> modify objects that have been put into or read from the state if you are
> not using the RocksDBStatebackend (which creates copies by deserialization).
> - State will be rolled back after a failure. Hence, you can read writes
> that are not "committed by a checkpoint".
>
> @Kostas, did I forget something?
>
> Best, Fabian
>
>
>
> 2018-03-18 16:50 GMT+01:00 Vishal Santoshi :
>
>> To be more precise, is anything thing similar to
>> https://engineering.linkedin.com/blog/2018/03/air-traffic
>> -controller--member-first-notifications-at-linkedin . done in Samza, can
>> be replicated with production level guarantees with Flink Queryable state (
>> as it stands currently version 1.5 )  ?
>>
>> On Fri, Mar 16, 2018 at 5:10 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> We are making few decisions on use cases where  Queryable state is a
>>> natural fit https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>> dev/stream/state/queryable_state.html
>>>
>>> Is Queryable state production ready ? We will go to 1.5 flnk if that
>>> helps to make the case for the usage.
>>>
>>
>>
>


Re: How to correct use timeWindow() with DataStream?

2018-03-19 Thread Felipe Gutierrez
thanks a lot Fabian,

It clarified my way to developing. I am using keyBy, timeWindow, and apply
monad operator at the EventTimeStreamExampleJava

now. I am generating dates in order and with a bit out of orderness now at
LogSourceFunction
.
And only using Date as my key at LogLine

object.

If I understood watermarks well, my program should combine all the lines
that are inside the same watermark when I set ".timeWindow(Time.seconds(5),
Time.seconds(1))" and used ".apply(new LogLineCounterFunction())". But it
is still not happening because I didn't use a good key ".keyBy(lineLog ->
lineLog.getTime())" and my key at the LogLineCounterFunction class is still
the Date.

public static class LogLineCounterFunction implements WindowFunction<
LogLine, // input
Tuple3, // output
Date, // key
TimeWindow> { // window type

What should I use as a key in my case?

My output is combining only the lines with the same key (Date). I want to
combine the dates between the watermarks ".timeWindow(Time.seconds(5),
Time.seconds(1))"...

3> (LogLine{time=2003-12-15 16:31:08.534, line=' | 2003-12-15 16:31:08.534
| 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 |
2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
16:31:08.534},107151667,9)
3> (LogLine{time=2003-12-15 16:31:04.184, line=' | 2003-12-15 16:31:04.184
| 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15
16:31:04.184},107151667,4)
3> (LogLine{time=2003-12-15 16:31:00.884, line=' | 2003-12-15 16:31:00.884
| 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 |
2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884
| 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
16:31:00.884},107151667,12)
3> (LogLine{time=2003-12-15 16:31:03.784, line=' | 2003-12-15
16:31:03.784},107151667,1)
3> (LogLine{time=2003-12-15 16:31:06.334, line=' | 2003-12-15 16:31:06.334
| 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15
16:31:06.334},107151667,4)




On Mon, Mar 19, 2018 at 6:44 AM, Fabian Hueske  wrote:

> Hi,
>
> The timestamps of the stream records should be increasing (strict
> monotonicity is not required, a bit out of orderness can be handled due to
> watermarks).
> So, the events should also be generated with increasing timestamps. It
> looks like your generator generates random dates. I'd also generate data
> with millisecond precision, not just days.
>
> Also, a timestamp in Flink is the number of milliseconds since
> 1970-01-01-00:00:00.
> However, your timestamp extractor only returns the number of seconds since
> last minute (i.e., from 0 to 60). You should use Date.getTime() instead of
> Date.getSeconds().
>
> Best, Fabian
>
> 2018-03-16 18:08 GMT+01:00 Felipe Gutierrez 
> :
>
>> Hi all,
>>
>> I am building an example with DataStream using Flink that has a fake
>> source generator of LogLine(Date d, String line). I want to work with
>> Watermarks on it so I created a class that implements 
>> AssignerWithPeriodicWatermarks.
>> If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream
>> I can group by second and concatenate the lines. When I use
>> ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I
>> misunderstood something when I was reading about Event Time. Could anyone
>> help me please? My source code is as follow.
>>
>> Thanks for the ideas. Kind Regards,  Felipe
>>
>> package flink.example.streaming;
>>
>> import flink.util.LogLine;
>> import flink.util.LogSourceFunction;
>> import flink.util.UtilDate;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>> vironment;
>> import org.apache.flink.streaming.api.functions.AssignerWithPeriodi
>> cWatermarks;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>>
>> import javax.annotation.Nullable;
>> import java.util.Date;
>>
>> public class EventTimeStreamExampleJava {
>> public static void main(String[] args) throws Exception {
>>
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>> 

Re: Checkpoint is not triggering as per configuration

2018-03-19 Thread Piotr Nowojski
Hi,

Please analyse what was going on the TaskManager and JobManager before this 
“task is not being executed at the moment”. What is the reason why it is not 
being executed? Was there some exception? Depending on your setup, you might 
need to check your stdout/stderr files (if your code is printing some errors).

Other issue might be if your operators/functions are initialising very slowly 
or being stuck somewhere.

Thanks, Piotrek

> On 19 Mar 2018, at 10:14, ms110400027 Syed Muhammad Abrar Akber 
>  wrote:
> 
> Dear Piotrek;
> The log for task manager shows the following message
> 2018-03-19 17:07:58,000 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> triggering task Source: Custom File Source (1/1) is not being executed at the 
> moment. Aborting checkpoint.
> I don't know how to fix this issue.  I will highly appreciate your support, 
> if you help me in fixing the issue.
> 
> 
> Further, please guide me where I can find the resources which are helpful for 
> beginners like me to fix such issues.
> Thank you for your support.
> 
> Regards;
> Syed Muhamamd Abrar Akber
> MS110400027
> 
> On Mon, Feb 5, 2018 at 5:33 PM, Piotr Nowojski  > wrote:
> Hi,
> 
> Did you check task manager and job manager logs for any problems?
> 
> Piotrek
> 
> > On 5 Feb 2018, at 03:19, syed  > > wrote:
> >
> > Hi
> > I am new to the flink world, and trying to understand. Currently, I am using
> > Flink 1.3.2 on a small cluster of 4 nodes,
> > I have configured checkpoint directory at HDFS, and run streaming word count
> > example with my own custom input file of 63M entries,
> > I enabled checkpoint every one second {/env.enableCheckpointing(1000)/}
> >
> > The problem I am facing is checkpoint is only triggered once after 1 second,
> > but no checkpoint afterwards, I run application for more than 5 minutes, but
> > checkpoint history shows only 1 checkpoint triggered and was successful. I
> > don't know why checkpoint not triggering after every second?
> > Please suggest me what is wrong?
> > Thanks in anticipation.
> >
> >
> >
> >
> > --
> > Sent from: 
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> > 
> 
> 



Re: Strange behavior on filter, group and reduce DataSets

2018-03-19 Thread Fabian Hueske
Hi,

Union is actually a very simple operator (not even an operator in Flink
terms). It just merges to inputs. There is no additional logic involved.
Therefore, it should also not emit records before either of both
ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the data is reduced
and emitted in a pipelined fashion, i.e., once the first record is reduced,
it is forwarded into the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before the
ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects, you have to be
careful in how you implement your functions.
If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
that was generated for the program?

Thanks,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-
release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



2018-03-19 9:51 GMT+01:00 Flavio Pompermaier :

> Any help on this? This thing is very strange..the "manual" union of the
> output of the 2 datasets is different than the flink-union of them..
> Could it be a problem of the flink optimizer?
>
> Best,
> Flavio
>
> On Fri, Mar 16, 2018 at 4:01 PM, simone 
> wrote:
>
>> Sorry, I translated the code into pseudocode too fast. That is indeed an
>> equals.
>>
>> On 16/03/2018 15:58, Kien Truong wrote:
>>
>> Hi,
>>
>> Just a guest, but string compare in Java should be using equals method,
>> not == operator.
>>
>> Regards,
>>
>> Kien
>>
>>
>> On 3/16/2018 9:47 PM, simone wrote:
>>
>> *subject.getField("field1") == "";*
>>
>>
>>
>
>


Re: Custom Processing per window

2018-03-19 Thread Dhruv Kumar
Task 1: I implemented it using a custom Trigger (see attached file). Looks like it is doing what I want it to. I copied the code from EventTimeTrigger.java and overwrote the onElement method. Task 2: I will need to maintain the state (this will be the LRU cache) for multiple keys in the same data structure. But it looks like that the Keyed states are on a per key basis. Should I use OperatorState in some way? Can I use a data structure not directly managed by Flink? What will happen in the case of keys across multiple machines?

LazyAlgoTrigger.java
Description: Binary data

Dhruv KumarPhD CandidateDepartment of Computer Science and EngineeringUniversity of Minnesotawww.dhruvkumar.me


On Mar 19, 2018, at 02:04, Jörn Franke  wrote:How would you start implementing it? Where are you stuck?Did you already try to implement this?On 18. Mar 2018, at 04:10, Dhruv Kumar  wrote:HiI am a CS PhD student at UMN Twin Cities. I am trying to use Flink for implementing some very specific use-cases: (They may not seem relevant but I need to implement them or I at least need to know if it is possible to implement them in Flink)Assumptions:1. Data stream is of the form (key, value). We achieve this by the .key operation provided by Flink API.2. By emitting a key, I mean sending/outputting its aggregated value to any data sink. 1. For each Tumbling window in the Event Time space, for each key, I would like to aggregate its value until it crosses a particular threshold (same threshold for all the keys). As soon as the key’s aggregated value crosses this threshold, I would like to emit this key. At the end of every tumbling window, all the (key, value) aggregated pairs  would be emitted irrespective of whether they have crossed the threshold or not.2. For each Tumbling window in the event time space, I would like to maintain a LRU cache which stores the keys along with their aggregated values and their latest arrival time. The least recently used (LRU) key would be the key whose latest arrival time is earlier than the latest arrival times of all the other keys present in the LRU cache. The LRU cache is of a limited size. So, it is possible that the number of unique keys in a particular window is greater than the size of LRU cache. Whenever any (key, value) pair arrives, if the key already exists, its aggregated value is updated with the value of the newly arrived value and its latest arrival time is updated with the current event time. If the key does not exist and there is some free slot in the LRU cache, it is added into the LRU. As soon as the LRU cache gets occupied fully and a new key comes in which does not exist in the LRU cache, we would like to emit the least recently used key to accommodate the newly arrived key. As in the case of 1, at the end of every tumbling window, all the (key, value) aggregated pairs in the LRU cache would be emitted.  Would like to know how can we implement these algorithms using Flink. Any help would be greatly appreciated.
Dhruv KumarPhD CandidateDepartment of Computer Science and EngineeringUniversity of Minnesotawww.dhruvkumar.me





Re: Partial aggregation result sink

2018-03-19 Thread Fabian Hueske
Hi,

Yes there are plans to support early results and control the result latency
/ result completeness trade off.
However, I cannot say when these features will be available.

Best, Fabian

2018-03-19 8:14 GMT+01:00 LiYue :

> Hi ,
>
> Is there any plan to adding these features to flink SQL ?
>
> Thanks
> LiYue
> tig.jd.com
>
>
>
> 在 2018年3月14日,上午7:48,Fabian Hueske  写道:
>
> Hi,
>
> Chesnay is right.
> SQL and Table API do not support early window results and no allowed
> lateness to update results with late arriving data.
> If you need such features, you should use the DataStream API.
>
> Best, Fabian
>
>
> 2018-03-13 12:10 GMT+01:00 Chesnay Schepler :
>
>> I don't think you can specify custom triggers when using purer SQL, but
>> maybe Fabian or Timo know a SQL way of implementing your goal.
>>
>>
>> On 12.03.2018 13:16, 李玥 wrote:
>>
>> Hi Chirag,
>> Thank for your reply!
>> I found a provided ContinuousEventTimeTrigger should be worked in my
>> situation.
>> Most examples are based on Table API like 
>> ‘ds.keyBy(0).window().trigger(MyTrigger.of())…’,
>> But how to apply the trigger to a pure Flink SQL Application ?
>>
>>
>>
>>
>>
>>
>> 在 2018年3月12日,下午5:20,Chirag Dewan  写道:
>>
>> Hi LiYue,
>>
>> This should help : Apache Flink 1.5-SNAPSHOT Documentation: Windows
>> 
>>
>>
>> Apache Flink 1.5-SNAPSHOT Documentation: Windows
>> 
>>
>>
>>
>> So basically you need to register a processing time trigger at every 10
>> minutes and on callback, you can FIRE the window result like this:
>>
>>   @Override
>> public TriggerResult onProcessingTime(long time, TimeWindow window,
>> TriggerContext ctx) throws Exception {
>>   // schedule next timer
>>   ctx.registerProcessingTimeTimer(System.currentTimeMillis() +
>> 1000L);
>>   return TriggerResult.FIRE;
>> }
>>
>>
>> I hope it helps.
>>
>> Chirag
>>
>> On Monday, 12 March, 2018, 2:10:25 PM IST, 李玥 
>> wrote:
>>
>>
>> Hi,team
>> I’m working on a event-time based aggregation application with flink
>> SQL.  Is there any way to keep sinking partial aggregation result BEFORE
>> time window closed?
>> For example, My SQL:
>> select …
>> from my_table
>> GROUP BY TUMBLE(`timestamp`, INTERVAL '1’ DAY),other_column;
>> Usually, Flink sink agg result after time-window closed, Is there any way
>> to keep sinking TODAY’s partial aggregation result every 10 miniutes so we
>> can see today’s performance on my chart.
>>
>> Thanks!
>> LiYue
>>
>>
>>
>>
>
>


Re: Queryable State

2018-03-19 Thread Fabian Hueske
Hi Vishal,

In general, Queryable State should be ready to use.
There are a few things to consider though:

- State queries are not synchronized with the application code, i.e., they
can happen at the same time. Therefore, the Flink application should not
modify objects that have been put into or read from the state if you are
not using the RocksDBStatebackend (which creates copies by deserialization).
- State will be rolled back after a failure. Hence, you can read writes
that are not "committed by a checkpoint".

@Kostas, did I forget something?

Best, Fabian



2018-03-18 16:50 GMT+01:00 Vishal Santoshi :

> To be more precise, is anything thing similar to https://engineering.
> linkedin.com/blog/2018/03/air-traffic-controller--member-
> first-notifications-at-linkedin . done in Samza, can be replicated with
> production level guarantees with Flink Queryable state ( as it stands
> currently version 1.5 )  ?
>
> On Fri, Mar 16, 2018 at 5:10 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> We are making few decisions on use cases where  Queryable state is a
>> natural fit https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/stream/state/queryable_state.html
>>
>> Is Queryable state production ready ? We will go to 1.5 flnk if that
>> helps to make the case for the usage.
>>
>
>


Re: How to correct use timeWindow() with DataStream?

2018-03-19 Thread Fabian Hueske
Hi,

The timestamps of the stream records should be increasing (strict
monotonicity is not required, a bit out of orderness can be handled due to
watermarks).
So, the events should also be generated with increasing timestamps. It
looks like your generator generates random dates. I'd also generate data
with millisecond precision, not just days.

Also, a timestamp in Flink is the number of milliseconds since
1970-01-01-00:00:00.
However, your timestamp extractor only returns the number of seconds since
last minute (i.e., from 0 to 60). You should use Date.getTime() instead of
Date.getSeconds().

Best, Fabian

2018-03-16 18:08 GMT+01:00 Felipe Gutierrez :

> Hi all,
>
> I am building an example with DataStream using Flink that has a fake
> source generator of LogLine(Date d, String line). I want to work with
> Watermarks on it so I created a class that implements
> AssignerWithPeriodicWatermarks. If I don't use the monad
> ".timeWindow(Time.seconds(2))" on the data stream I can group by second and
> concatenate the lines. When I use ".timeWindow(Time.seconds(2))" nothing is
> shown on the output. I guess I misunderstood something when I was reading
> about Event Time. Could anyone help me please? My source code is as follow.
>
> Thanks for the ideas. Kind Regards,  Felipe
>
> package flink.example.streaming;
>
> import flink.util.LogLine;
> import flink.util.LogSourceFunction;
> import flink.util.UtilDate;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.
> AssignerWithPeriodicWatermarks;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.streaming.api.windowing.time.Time;
>
> import javax.annotation.Nullable;
> import java.util.Date;
>
> public class EventTimeStreamExampleJava {
> public static void main(String[] args) throws Exception {
>
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> DataStream dataStream = env
> .addSource(new LogSourceFunction())
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessGenerator())
> .keyBy(lineLog -> lineLog.getSec())
> // .timeWindow(Time.seconds(2))
> .reduce((log1, log2) -> new LogLine(log1.getTime(),
> log1.getLine() + " | " + log2.getLine()))
> ;
>
> dataStream.print();
>
> env.execute("Window LogRead");
> }
>
> public static class BoundedOutOfOrdernessGenerator implements
> AssignerWithPeriodicWatermarks {
>
> private final long maxOutOfOrderness = 3500; // 3.5 seconds
>
> private long currentMaxTimestamp;
>
> @Override
> public long extractTimestamp(LogLine element, long
> previousElementTimestamp) {
> long timestamp = element.getTime().getSeconds();
> currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
> return timestamp;
> }
>
> @Nullable
> @Override
> public Watermark getCurrentWatermark() {
> // return the watermark as current highest timestamp minus the
> out-of-orderness bound
> return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
> }
> }
> }
>
> package flink.util;
>
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>
> public class LogSourceFunction implements SourceFunction {
>
> private volatile boolean isRunning = true;
>
> @Override
> public void run(SourceContext ctx) throws Exception {
> while (isRunning) {
> ctx.collect(new LogLine(UtilDate.getRandomSec(),
> UtilDate.getRandomString()));
> }
> }
>
> @Override
> public void cancel() {
> isRunning = false;
> }
> }
>
> package flink.util;
>
> import java.util.Date;
> import java.util.Objects;
>
> public class LogLine {
>
> private Date time;
> private int sec;
> private String line;
>
> public LogLine() {
> }
>
> public LogLine(Date time, String line) {
> this.sec = time.getSeconds();
> this.time = time;
> this.line = line;
> }
>
> public LogLine(int sec, String line) {
> this.sec = sec;
> this.time = UtilDate.getRandomDate(sec);
> this.line = line;
> }
>
> public int getSec() {
> return sec;
> }
>
> public void setSec(int sec) {
> this.sec = sec;
> }
>
> public Date getTime() {
> return time;
> }
>
> public String getLine() {
> return line;
> }
>
> public void setTime(Date time) {
> this.time = time;
> }

Re: Slow flink checkpoint

2018-03-19 Thread Fabian Hueske
Hi,

Yes, you cannot start a separate thread to cleanup the state.
State is managed by Flink and can only be accessed at certain points in
time when the user code is called.

If you are using event time, another trick you could play is to only
register all timers on (currentWatermark + 1).
That will cause the trigger to fire whenever the watermark advances. You
could store all relevant timestamps in a ListState and act on all timers
that are less than the currentWatermark.
Also, since there is only a single timer per timestamp (currentWM + 1)
there will be only one watermark per key.

Best, Fabian

2018-03-16 13:56 GMT+01:00 林德强 :

> Hi Fabian ,
>Reduce the number of timers is a good idea.
>But in my application the timer is different from the key  registered
> follow the keyBy . May be it can't work with an upper and lower bound.
>
>I try modify the flink resource and start a thread to clean the
> expired keyed sate, but it doesn't work well because of concurrency issues.
>
>
>
>
>
>
> Best,
> Deqiang
>
> 2018-03-16 16:03 GMT+08:00 Fabian Hueske :
>
>> Hi,
>>
>> AFAIK, that's not possible.
>> The only "solution" is to reduce the number of timers. Whether that's
>> possible or not, depends on the application.
>> For example, if you use timers to clean up state, you can work with an
>> upper and lower bound and only register one timer for each (upper - lower)
>> interval.
>>
>> Best, Fabian
>>
>> 2018-03-16 3:11 GMT+01:00 林德强 :
>>
>>> Hi,
>>>
>>> I'm run a job on Flink streaming. I found with the increase in the
>>> number of  'InternalTimer' object the checkpoint more and more slowly. Is
>>> there any way to solve this problem ? such as make the
>>> "timeServiceManager" snapshot async.
>>>
>>>
>>>
>>>
>>>
>>> Thanks
>>>
>>
>>
>


Re: cannot see monitor metrics

2018-03-19 Thread Chesnay Schepler
This is currently expected behavior. We do not measure incoming records 
for sources, and outgoing records for sinks as we can't do this in a 
generalized fashion.


See FLINK-7286 .

On 19.03.2018 02:37, ?? wrote:
I run flink on yarn , Im sure my program is OK, but I just cannot see 
the monitor metrics,

my ENV: flink-1.4,kafka-0.10,cdh-5.11.1





Re: Strange behavior on filter, group and reduce DataSets

2018-03-19 Thread Flavio Pompermaier
Any help on this? This thing is very strange..the "manual" union of the
output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone  wrote:

> Sorry, I translated the code into pseudocode too fast. That is indeed an
> equals.
>
> On 16/03/2018 15:58, Kien Truong wrote:
>
> Hi,
>
> Just a guest, but string compare in Java should be using equals method,
> not == operator.
>
> Regards,
>
> Kien
>
>
> On 3/16/2018 9:47 PM, simone wrote:
>
> *subject.getField("field1") == "";*
>
>
>


Re: Partial aggregation result sink

2018-03-19 Thread LiYue
Hi ,

Is there any plan to adding these features to flink SQL ?

Thanks 
LiYue
tig.jd.com



> 在 2018年3月14日,上午7:48,Fabian Hueske  写道:
> 
> Hi,
> 
> Chesnay is right. 
> SQL and Table API do not support early window results and no allowed lateness 
> to update results with late arriving data.
> If you need such features, you should use the DataStream API.
> 
> Best, Fabian
> 
> 
> 2018-03-13 12:10 GMT+01:00 Chesnay Schepler  >:
> I don't think you can specify custom triggers when using purer SQL, but maybe 
> Fabian or Timo know a SQL way of implementing your goal.
> 
> 
> On 12.03.2018 13:16, 李玥 wrote:
>> Hi Chirag,
>> Thank for your reply!
>> I found a provided ContinuousEventTimeTrigger should be worked in my 
>> situation. 
>> Most examples are based on Table API like 
>> ‘ds.keyBy(0).window().trigger(MyTrigger.of())…’, But how to apply the 
>> trigger to a pure Flink SQL Application ? 
>> 
>> 
>> 
>> 
>> 
>> 
>>> 在 2018年3月12日,下午5:20,Chirag Dewan >> > 写道:
>>> 
>>> Hi LiYue,
>>> 
>>> This should help : Apache Flink 1.5-SNAPSHOT Documentation: Windows 
>>> 
>>> 
>>> 
>>> Apache Flink 1.5-SNAPSHOT Documentation: Windows
>>> 
>>>  
>>> 
>>> 
>>> 
>>> 
>>> So basically you need to register a processing time trigger at every 10 
>>> minutes and on callback, you can FIRE the window result like this:
>>> 
>>>   @Override
>>> public TriggerResult onProcessingTime(long time, TimeWindow window, 
>>> TriggerContext ctx) throws Exception {
>>>   // schedule next timer
>>>   ctx.registerProcessingTimeTimer(System.currentTimeMillis() + 1000L);
>>>   return TriggerResult.FIRE;
>>> }
>>> 
>>> 
>>> I hope it helps.
>>> 
>>> Chirag
>>> 
>>> On Monday, 12 March, 2018, 2:10:25 PM IST, 李玥 >> > wrote:
>>> 
>>> 
>>> Hi,team
>>> I’m working on a event-time based aggregation application with flink 
>>> SQL.  Is there any way to keep sinking partial aggregation result BEFORE 
>>> time window closed?
>>> For example, My SQL:
>>> select …
>>> from my_table
>>> GROUP BY TUMBLE(`timestamp`, INTERVAL '1’ DAY),other_column;
>>> Usually, Flink sink agg result after time-window closed, Is there any way 
>>> to keep sinking TODAY’s partial aggregation result every 10 miniutes so we 
>>> can see today’s performance on my chart.
>>> 
>>> Thanks!
>>> LiYue
>>> 
>> 
> 
> 



Re: Custom Processing per window

2018-03-19 Thread Jörn Franke
How would you start implementing it? Where are you stuck?

Did you already try to implement this?

> On 18. Mar 2018, at 04:10, Dhruv Kumar  wrote:
> 
> Hi
> 
> I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for 
> implementing some very specific use-cases: (They may not seem relevant but I 
> need to implement them or I at least need to know if it is possible to 
> implement them in Flink)
> 
> Assumptions:
> 1. Data stream is of the form (key, value). We achieve this by the .key 
> operation provided by Flink API.
> 2. By emitting a key, I mean sending/outputting its aggregated value to any 
> data sink. 
> 
> 1. For each Tumbling window in the Event Time space, for each key, I would 
> like to aggregate its value until it crosses a particular threshold (same 
> threshold for all the keys). As soon as the key’s aggregated value crosses 
> this threshold, I would like to emit this key. At the end of every tumbling 
> window, all the (key, value) aggregated pairs  would be emitted irrespective 
> of whether they have crossed the threshold or not.
> 
> 2. For each Tumbling window in the event time space, I would like to maintain 
> a LRU cache which stores the keys along with their aggregated values and 
> their latest arrival time. The least recently used (LRU) key would be the key 
> whose latest arrival time is earlier than the latest arrival times of all the 
> other keys present in the LRU cache. The LRU cache is of a limited size. So, 
> it is possible that the number of unique keys in a particular window is 
> greater than the size of LRU cache. Whenever any (key, value) pair arrives, 
> if the key already exists, its aggregated value is updated with the value of 
> the newly arrived value and its latest arrival time is updated with the 
> current event time. If the key does not exist and there is some free slot in 
> the LRU cache, it is added into the LRU. As soon as the LRU cache gets 
> occupied fully and a new key comes in which does not exist in the LRU cache, 
> we would like to emit the least recently used key to accommodate the newly 
> arrived key. As in the case of 1, at the end of every tumbling window, all 
> the (key, value) aggregated pairs in the LRU cache would be emitted.  
> 
> Would like to know how can we implement these algorithms using Flink. Any 
> help would be greatly appreciated.
> 
> Dhruv Kumar
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me
>