Re: env.setStateBackend deprecated in 1.5 for FsStateBackend

2018-06-26 Thread Rong Rong
Hmm.

If you have a wrapper function like this, it will not report deprecated
warning.
*def getFsStateBackend(path: String): StateBackend = return new
FsStateBackend(path) *

Since AbstractStateBackend implements StateBackend and
*def setStateBackend(backend: StateBackend): StreamExecutionEnvironment* is
not deprecated.

This seems like a bug to me though.

--
Rong

On Tue, Jun 26, 2018 at 7:40 PM zhangminglei <18717838...@163.com> wrote:

> At the moment, it seems you can not. Because FsStateBackend extends
> AbstructFileStateBackend then extend AbstructStateBackend which is
> deprecated in *setStateBackend* parameter.. I think you can do what you
> want like below now but it is very bad.
>
> env.setStateBackend(new StateBackend() {
>   @Override
>   public CompletedCheckpointStorageLocation resolveCheckpoint(String 
> externalPointer) throws IOException {
> return null;
>   }
>
>   @Override
>   public CheckpointStorage createCheckpointStorage(JobID jobId) throws 
> IOException {
> return null;
>   }
>
>   @Override
>   public  AbstractKeyedStateBackend createKeyedStateBackend(Environment 
> env, JobID jobID, String operatorIdentifier, TypeSerializer keySerializer, 
> int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry 
> kvStateRegistry) throws Exception {
> return null;
>   }
>
>   @Override
>   public OperatorStateBackend createOperatorStateBackend(Environment env, 
> String operatorIdentifier) throws Exception {
> return null;
>   }
> });
>
> Cheers
> Minglei
>
>
> 在 2018年6月27日,上午9:38,chrisr123  写道:
>
>
> I upgraded from Flink 1.4 to 1.5 and now this call is being flagged as
> deprecated.
> What should I change this code to for 1.5 to get rid of the deprecation
> warning?
> Thanks
>
> // deprecated
> env.setStateBackend(new
> FsStateBackend("hdfs://myhdfsmachine:9000/flink/checkpoints"));
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>


Re: Over Window Not Processing Messages

2018-06-26 Thread Rong Rong
Hi Greg.

Based on a quick test I cannot reproduce the issue, it is emitting messages
correctly in the ITCase environment.
can you share more information? Does the same problem happen if you use
proctime?
I am guessing this could be highly correlated with how you set your
watermark strategy of your input streams of "user_things" and "user_stuff".

--
Rong

On Tue, Jun 26, 2018 at 6:37 PM Gregory Fee  wrote:

> Hello User Community!
>
> I am running some streaming SQL that involves a union all into an over
> window similar to the below:
>
> SELECT user_id, count(action) OVER (PARTITION BY user_id ORDER BY rowtime
> RANGE INTERVAL '14' DAY PRECEDING) action_count, rowtime
> FROM
> (SELECT rowtime, user_id, thing as action FROM user_things
>  UNION ALL SELECT rowtime, user_id, stuff as action FROM user_stuff)
>
> The SQL generates three operators. There are two operators that process
> the 'from' part of the clause that feed into an 'over' operator. I notice
> that messages flow into the 'over' operator and just buffer there for a
> long time (hours in some cases). Eventually something happens and the data
> starts to flush through to the downstream operators. Can anyone help me
> understand what is causing that behavior? I want the data to flow through
> more consistently.
>
> Thanks!
>
>
>
> --
> *Gregory Fee*
> Engineer
> 425.830.4734 <+14258304734>
> [image: Lyft] 
>


Re: env.setStateBackend deprecated in 1.5 for FsStateBackend

2018-06-26 Thread zhangminglei
At the moment, it seems you can not. Because FsStateBackend extends 
AbstructFileStateBackend then extend AbstructStateBackend which is deprecated 
in setStateBackend parameter.. I think you can do what you want like below now 
but it is very bad.
env.setStateBackend(new StateBackend() {
  @Override
  public CompletedCheckpointStorageLocation resolveCheckpoint(String 
externalPointer) throws IOException {
return null;
  }

  @Override
  public CheckpointStorage createCheckpointStorage(JobID jobId) throws 
IOException {
return null;
  }

  @Override
  public  AbstractKeyedStateBackend createKeyedStateBackend(Environment 
env, JobID jobID, String operatorIdentifier, TypeSerializer keySerializer, 
int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry 
kvStateRegistry) throws Exception {
return null;
  }

  @Override
  public OperatorStateBackend createOperatorStateBackend(Environment env, 
String operatorIdentifier) throws Exception {
return null;
  }
});
Cheers
Minglei


> 在 2018年6月27日,上午9:38,chrisr123  写道:
> 
> 
> I upgraded from Flink 1.4 to 1.5 and now this call is being flagged as
> deprecated.
> What should I change this code to for 1.5 to get rid of the deprecation
> warning?
> Thanks
> 
> // deprecated
> env.setStateBackend(new
> FsStateBackend("hdfs://myhdfsmachine:9000/flink/checkpoints"));
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



env.setStateBackend deprecated in 1.5 for FsStateBackend

2018-06-26 Thread chrisr123


I upgraded from Flink 1.4 to 1.5 and now this call is being flagged as
deprecated.
What should I change this code to for 1.5 to get rid of the deprecation
warning?
Thanks

// deprecated
env.setStateBackend(new
FsStateBackend("hdfs://myhdfsmachine:9000/flink/checkpoints"));



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Over Window Not Processing Messages

2018-06-26 Thread Gregory Fee
Hello User Community!

I am running some streaming SQL that involves a union all into an over
window similar to the below:

SELECT user_id, count(action) OVER (PARTITION BY user_id ORDER BY rowtime
RANGE INTERVAL '14' DAY PRECEDING) action_count, rowtime
FROM
(SELECT rowtime, user_id, thing as action FROM user_things
 UNION ALL SELECT rowtime, user_id, stuff as action FROM user_stuff)

The SQL generates three operators. There are two operators that process the
'from' part of the clause that feed into an 'over' operator. I notice that
messages flow into the 'over' operator and just buffer there for a long
time (hours in some cases). Eventually something happens and the data
starts to flush through to the downstream operators. Can anyone help me
understand what is causing that behavior? I want the data to flow through
more consistently.

Thanks!



-- 
*Gregory Fee*
Engineer
425.830.4734 <+14258304734>
[image: Lyft] 


Re: Measure Latency from source to sink

2018-06-26 Thread Hequn Cheng
Hi antonio, latency is exposed via a metric. You can find each operator's
latency through flink UI(Overview->Task Metrics -> select the task, for
example select the sink -> Add metric -> find latency metric)

On Tue, Jun 26, 2018 at 11:18 PM, antonio saldivar 
wrote:

> Hello thank you
>
> I also was trying using Flink UI Metrics on version 1.4.2  
> *env.getConfig().setLatencyTrackingInterval(1000L),
> *But looks like is not displaying anything
>
> El mar., 26 jun. 2018 a las 10:45, zhangminglei (<18717838...@163.com>)
> escribió:
>
>> Hi, You can do that but it does not makes sense in general. But you can
>> do that by flink, storm, spark streaming or structured streaming.  And make
>> a compare the latency under different framework.
>>
>> Cheers
>> Minglei
>>
>> 在 2018年6月26日,下午9:36,antonio saldivar  写道:
>>
>> Hello Thank you for the feedback,
>>
>> Well for now I just Want to measure the time that takes form Source to
>> Sink each transaction add the start and end time in mills
>>
>>
>>
>> El mar., 26 jun. 2018 a las 5:19, zhangminglei (<18717838...@163.com>)
>> escribió:
>>
>>> Hi,Antonio
>>>
>>> Usually, the measurement of delay is for specific business I think it is
>>> more reasonable. What I understand of latency from my experience is data
>>> preparation time plus query calculation time. It is like an end to end
>>> latency test. Hopes this can help you. Not point to the latency of flink
>>>
>>> Cheers
>>> Minglei
>>>
>>>
>>> > 在 2018年6月26日,上午5:23,antonio saldivar  写道:
>>> >
>>> > Hello
>>> >
>>> > I am trying to measure the latency of each transaction traveling
>>> across the system as a DataSource I have a Kafka consumer and I would like
>>> to measure the time that takes from the Source to Sink. Does any one has an
>>> example?.
>>> >
>>> > Thank you
>>> > Best Regards
>>>
>>>
>>>
>>


[ANNOUNCE] Weekly community update #26

2018-06-26 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #26. Please post any news and
updates you want to share with the community to this thread.

# New Flink community website

The new community website [1] has been launched. Big kudos to Fabian for
driving this effort. The new structure should make it easier to find
relevant content for newcomers as well as experienced Flink users alike.

# New Flink committers

The Flink community welcomes two new committers: Piotr Nowojski [2] and
Sihua Zhou [3]. They both contributed new features and many fixes to Flink.
Moreover, they helped the community with reviewing PRs, answering user
questions and driving dev discussions.

[1] flink.apache.org
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-New-committer-Piotr-Nowojski-td22918.html#a22944
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-New-committer-Sihua-Zhou-td22902.html#a22909

Cheers,
Till


Re: [Flink-9407] Question about proposed ORC Sink !

2018-06-26 Thread sagar loke
@zhangminglei,

Question about the schema for ORC format:

1. Does it always need to be of complex type "" ?

2. Or can it be created with individual data types directly ?
eg. "name:string, age:int" ?


Thanks,
Sagar

On Fri, Jun 22, 2018 at 11:56 PM, zhangminglei <18717838...@163.com> wrote:

> Yes, it should be exit. Thanks to Ted Yu. Very exactly!
>
> Cheers
> Zhangminglei
>
> 在 2018年6月23日,下午12:40,Ted Yu  写道:
>
> For #1, the word exist should be exit, right ?
> Thanks
>
>  Original message 
> From: zhangminglei <18717838...@163.com>
> Date: 6/23/18 10:12 AM (GMT+08:00)
> To: sagar loke 
> Cc: dev , user 
> Subject: Re: [Flink-9407] Question about proposed ORC Sink !
>
> Hi, Sagar.
>
> 1. It solves the issue partially meaning files which have finished
> checkpointing don't show .pending status but the files which were in
> progress
> when the program exists are still in .pending state.
>
>
> Ans:
>
> Yea, Make the program exists and in that time if a checkpoint does not
> finished will lead the status keeps in .pending state then. Under the
> normal circumstances, the programs that running in the production env will
> never be stoped or existed if everything is fine.
>
> 2. Ideally, writer should work with default settings correct ? Meaning we
> don't have to explicitly set these parameters to make it work.
> Is this assumption correct ?
>
>
> Ans:
>
> Yes. Writer should work with default settings correct.
> Yes. We do not have to explicitly set these parameters to make it work.
> Yes. Assumption correct indeed.
>
> However, you know, flink is a real time streaming framework, so under
> normal circumstances,you don't really go to use the default settings when
> it comes to a specific business. Especially together work with *offline
> end*(Like hadoop mapreduce). In this case, you need to tell the offline
> end when time a bucket is close and when time the data for the specify
> bucket is ready. So, you can take a look on https://issues.apache.org/
> jira/browse/FLINK-9609.
>
> Cheers
> Zhangminglei
>
>
> 在 2018年6月23日,上午8:23,sagar loke  写道:
>
> Hi Zhangminglei,
>
> Thanks for the reply.
>
> 1. It solves the issue partially meaning files which have finished
> checkpointing don't show .pending status but the files which were in
> progress
> when the program exists are still in .pending state.
>
> 2. Ideally, writer should work with default settings correct ? Meaning we
> don't have to explicitly set these parameters to make it work.
> Is this assumption correct ?
>
> Thanks,
> Sagar
>
> On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <18717838...@163.com> wrote:
>
>> Hi, Sagar. Please use the below code and you will find the part files
>> status from _part-0-107.in-progress   to _part-0-107.pending and finally
>> to part-0-107. [For example], you need to run the program for a while.
>> However, we need set some parameters, like the following. Moreover,
>> *enableCheckpointing* IS also needed. I know why you always see the
>> *.pending* file since the below parameters default value is 60 seconds
>> even though you set the enableCheckpoint. So, that is why you can not see
>> the finished file status until 60 seconds passed.
>>
>> Attached is the ending on my end, and you will see what you want!
>>
>> Please let me know if you still have the problem.
>>
>> Cheers
>> Zhangminglei
>>
>> setInactiveBucketCheckInterval(2000)
>> .setInactiveBucketThreshold(2000);
>>
>>
>> public class TestOrc {
>>public static void main(String[] args) throws Exception {
>>   StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>   env.setParallelism(1);
>>   env.enableCheckpointing(1000);
>>   env.setStateBackend(new MemoryStateBackend());
>>
>>   String orcSchemaString = "struct";
>>   String path = "hdfs://10.199.196.0:9000/data/hive/man";
>>
>>   BucketingSink bucketingSink = new BucketingSink<>(path);
>>
>>   bucketingSink
>>  .setWriter(new OrcFileWriter<>(orcSchemaString))
>>  .setInactiveBucketCheckInterval(2000)
>>  .setInactiveBucketThreshold(2000);
>>
>>   DataStream dataStream = env.addSource(new ManGenerator());
>>
>>   dataStream.addSink(bucketingSink);
>>
>>   env.execute();
>>}
>>
>>public static class ManGenerator implements SourceFunction {
>>
>>   @Override
>>   public void run(SourceContext ctx) throws Exception {
>>  for (int i = 0; i < 2147483000; i++) {
>> Row row = new Row(3);
>> row.setField(0, "Sagar");
>> row.setField(1, 26 + i);
>> row.setField(2, false);
>> ctx.collect(row);
>>  }
>>   }
>>
>>   @Override
>>   public void cancel() {
>>
>>   }
>>}
>> }
>>
>> 
>>
>>
>>
>> 在 2018年6月22日,上午11:14,sagar loke  写道:
>>
>> Sure, we can solve it together :)
>>
>> Are you able to reproduce it ?
>>
>> Thanks,
>> Sagar
>>
>> On Thu, Jun 21, 2018 at 7:28 PM 

How to deploy Flink in a geo-distributed environment

2018-06-26 Thread Stephen
Hi,
Can Flink be deployed in a geo-distributed environment instead of being in
local clusters?
As far as I know, raw data should be moved to local cloud environment or
local clusters before Flink handle it. Consider this situation where data
sources are on different areas which might be cross different countries
that moving data with wlan is slow and expensive. How to solve this
problem? Is there solution for this now?

Thanks.


Re: How to partition within same physical node in Flink

2018-06-26 Thread Vijay Balakrishnan
Hi Fabian,
Thanks once again for your reply. I need to get the data from each
cam/camera into 1 partition/slot and not move the gigantic video data
around as much as I perform other operations on it. For eg, I can get seq#1
and seq#2 for cam1 in cam1 partition/slot and then combine, split,parse,
stitch etc. operations on it in multiple threads within the same cam1
partition.

I have the CameraKey defined as cam,seq# and then keyBy(cam) to get it in 1
partition(eg: cam1). The idea is to then work within the cam1 partition
with various seq#'s 1,2 etc on various threads within the same
slot/partition of TaskManager.

The data is stored in EFS keyed based on seq#/cam# folder structure.

Our actual problem is managing network bandwidth as a resource in each
partition. We want to make sure that the processing of 1 camera(split into
multiple seq# tasks) is not running on the same node as the processing of
another camera as in that case, the required network bandwidth for storing
the output of the process running in the partition would exceed the network
bandwidth of the hardware. Camera processing is expected to run on the same
hardware as the video decode step which is an earlier sequential process in
the same Dataflow pipeline.

I guess I might have to use a ThreadPool within each Slot(cam partition) to
work on each seq# ??

TIA

On Tue, Jun 26, 2018 at 1:06 AM Fabian Hueske  wrote:

> Hi,
>
> keyBy() does not work hierarchically. Each keyBy() overrides the previous
> partitioning.
> You can keyBy(cam, seq#) which guarantees that all records with the same
> (cam, seq#) are processed by the same parallel instance.
> However, Flink does not give any guarantees about how the (cam, seq#)
> partitions are distributed across slots (or even physical nodes).
>
> Btw. why is it important that all records of the same cam are processed by
> the same physical node?
>
> Fabian
>
> 2018-06-25 21:36 GMT+02:00 Vijay Balakrishnan :
>
>> I see a .slotSharingGroup for SingleOutputStreamOperator
>> 
>>  which
>> can put parallel instances of operations in same TM slot.
>> I also see a CoLocationGroup but do not see a .coLocationGroup
>> for SingleOutputStreamOperator to put a task on the same slot.Seems
>> CoLocationGroup
>> 
>> is defined at JobVertex level and has nothing to do with
>> for SingleOutputStreamOperator.
>> TaskManager has many slots. Slots have many threads within it.
>> I want to be able to put the cam1 partition(keyBy(cam) in 1 slot and then
>> use a keyBy(seq#) to run on many threads within that cam1 slot.
>>
>> Vijay
>>
>> On Mon, Jun 25, 2018 at 10:10 AM Vijay Balakrishnan 
>> wrote:
>>
>>> Thanks, Fabian.
>>> Been reading your excellent book on Flink Streaming.Can't wait for more
>>> chapters.
>>> Attached a pic.
>>>
>>> [image: partition-by-cam-ts.jpg]
>>>
>>> I have records with seq# 1 and cam1 and cam2. I also have records with
>>> varying seq#'s.
>>> By partitioning on cam field first(keyBy(cam)), I can get cam1 partition
>>> on the same task manager instance/slot/vCore(???)
>>> Can I then have seq# 1 and seq# 2 for cam1 partition run in different
>>> slots/threads on the same Task Manager instance(aka cam1 partition) using
>>> keyBy(seq#) & setParallelism() ? Can *forward* Strategy be used to
>>> achieve this ?
>>>
>>> TIA
>>>
>>>
>>> On Mon, Jun 25, 2018 at 1:03 AM Fabian Hueske  wrote:
>>>
 Hi,

 Flink distributes task instances to slots and does not expose physical
 machines.
 Records are partitioned to task instances by hash partitioning. It is
 also not possible to guarantee that the records in two different operators
 are send to the same slot.
 Sharing information by side-passing it (e.g., via a file on a machine
 or in a static object) is an anti-pattern and should be avoided.

 Best, Fabian

 2018-06-24 20:52 GMT+02:00 Vijay Balakrishnan :

> Hi,
>
> Need to partition by cameraWithCube.getCam() 1st using
> parallelCamTasks(passed in as args).
>
> Then within each partition, need to partition again by
> cameraWithCube.getTs() but need to make sure each of the 2nd partition by
> getTS() runs on the same physical node ?
>
> How do I achieve that ?
>
> DataStream cameraWithCubeDataStream = env
> .addSource(new Source())
> .keyBy((cameraWithCube) -> cameraWithCube.getCam() )
> .process(new ProcessFunction CameraWithCube>() {
> public void processElement(CameraWithCube cameraWithCube, 
> Context context, Collector collector) throws Exception {
> //do nothing
> }

Re: Few question about upgrade from 1.4 to 1.5 flink ( some very basic )

2018-06-26 Thread Vishal Santoshi
Ok, I will check.

On Tue, Jun 26, 2018, 12:39 PM Gary Yao  wrote:

> Hi Vishal,
>
> You should check the contents of znode /flink_test/[...]/rest_server_lock
> to see
> if the URL is correct.
>
> The host and port should be logged by the RestClient [1]. If you do not
> see the
> message "Sending request of class [...]]" on DEBUG level, probably the
> client is
> not able to get the address from ZK in the first place.
>
> Lastly, the stacktrace you posted seems to be cut off. Can you attach the
> complete client logs?
>
> Best,
> Gary
>
> [1]
> https://github.com/apache/flink/blob/release-1.5.0-rc6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java#L156
>
> On Tue, Jun 26, 2018 at 4:09 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> The leader znode is the right one  ( it is a binary )
>>
>> get
>> /flink_test/da_15/leader//job_manager_lock
>>
>> wFDakka.tcp://
>> fl...@flink-9edd15d7.bf2.tumblr.net:22161/user/jobmanagersrjava.util.UUIDm/J
>>
>>
>> leastSigBitsJ
>>
>>
>> mostSigBitsxpHv
>>
>>
>> So it does ( I think ) resolve the right leader of the HA, but from there
>> ( the logs do not help as DEBUG logs do not expose what server it hits
>> sadly ) .
>>
>>
>> On Tue, Jun 26, 2018 at 9:57 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> OK few things
>>>
>>> 2018-06-26 13:31:29 INFO  CliFrontend:282 -  Starting Command Line
>>> Client (Version: 1.5.0, Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
>>>
>>> ...
>>>
>>> 2018-06-26 13:31:31 INFO  ClientCnxn:876 - Socket connection
>>> established to zk-f1fb95b9.bf2.tumblr.net/10.246.218.17:2181,
>>> initiating session
>>>
>>> 2018-06-26 13:31:31 DEBUG ClientCnxn:949 - Session establishment request
>>> sent on zk-f1fb95b9.bf2.tumblr.net/10.246.218.17:2181
>>>
>>> 2018-06-26 13:31:31 INFO  ClientCnxn:1299 - Session establishment
>>> complete on server zk-f1fb95b9.bf2.tumblr.net/10.246.218.17:2181,
>>> sessionid = 0x35add547801ea07, negotiated timeout = 4
>>>
>>> 2018-06-26 13:31:31 INFO  RestClient:119 - Rest client endpoint started.
>>>
>>> 2018-06-26 13:31:31 INFO  ZooKeeperLeaderRetrievalService:100 -
>>> Starting ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
>>>
>>> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
>>> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
>>> finished:false header:: 1,3  replyHeader:: 1,60416530560,0  request::
>>> '/flink_test,F  response::
>>> s{47265479496,47265479496,1489163688703,1489163688703,0,2,0,0,0,2,60416492885}
>>>
>>>
>>> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
>>> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
>>> finished:false header:: 2,3  replyHeader:: 2,60416530560,0  request::
>>> '/flink_test/da_15,F  response::
>>> s{60416492885,60416492885,1529755199131,1529755199131,0,5,0,0,0,5,60416521584}
>>>
>>>
>>> 2018-06-26 13:31:31 INFO  ZooKeeperLeaderRetrievalService:100 -
>>> Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
>>>
>>> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
>>> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
>>> finished:false header:: 3,3  replyHeader:: 3,60416530560,0  request::
>>> '/flink_test,F  response::
>>> s{47265479496,47265479496,1489163688703,1489163688703,0,2,0,0,0,2,60416492885}
>>>
>>>
>>> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
>>> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
>>> finished:false header:: 4,3  replyHeader:: 4,60416530560,0  request::
>>> '/flink_test/da_15,F  response::
>>> s{60416492885,60416492885,1529755199131,1529755199131,0,5,0,0,0,5,60416521584}
>>>
>>>
>>> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
>>> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
>>> finished:false header:: 5,3  replyHeader:: 5,60416530560,0  request::
>>> '/flink_test/da_15/leader,F  response::
>>> s{60416492887,60416492887,1529755199191,1529755199191,0,1,0,0,0,1,60416492888}
>>>
>>>
>>> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
>>> sessionid:0x35add547801ea07, packet::
>>> clientPath:/flink_test/da_15/leader/rest_server_lock
>>> serverPath:/flink_test/da_15/leader/rest_server_lock finished:false
>>> header:: 6,3  replyHeader:: 6,60416530560,-101  request::
>>> '/flink_test/da_15/leader/rest_server_lock,T  response::
>>>
>>> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
>>> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
>>> finished:false header:: 7,3  replyHeader:: 7,60416530560,0  request::
>>> '/flink_test,F  response::
>>> s{47265479496,47265479496,1489163688703,1489163688703,0,2,0,0,0,2,60416492885}
>>>
>>>
>>> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
>>> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
>>> finished:false header:: 8,3  

Re: TaskIOMetricGroup metrics not unregistered in prometheus on job failure ?

2018-06-26 Thread Chesnay Schepler

Great work on debugging this, you're exactly right.

The children we add to the collector have to be removed individually 
when a metric is unregistered.


If the collector is a io.prometheus.client.Gauge we can use the 
#remove() method. For histograms we will have to modify our 
HistogramSummaryProxy class to allow removing individual histograms.


I've filed FLINK-9665 .

On 26.06.2018 17:28, jelmer wrote:
Hi Chesnay, sorry for the late reply. I did not have time to look into 
this sooner


I did what you suggested. Added some logging to the PrometheusReporter 
like this :


https://github.com/jelmerk/flink/commit/58779ee60a8c3961f3eb2c487c603c33822bba8a

And deployed a custom build of the reporter to our test environment.

I managed to reproduce the issue like this

1. Deploy job A : it lands on worker 1
2. Deploy job B : it lands on worker 1, take note of the job id
3. Redeploy job b by canceling it from a savepoint and deploying it 
again from the savepoint : it lands on worker 3
4. Execute curl -s http://localhost:9249/metrics | grep "job id from 
step 2" on worker 1. The metrics are still exposed even though the job 
is canceled


I attached a piece of the log to the email. What I notice is that the 
two jobs register metrics with the same scoped metric name. In this 
case flink_taskmanager_job_task_buffers_inputQueueLength.


The prometheus exporter seems to use reference counting for the 
metrics and the metrics will only be removed when the count is 0, 
canceling job B will lower the counter by 5 but because job A still is 
deployed the count is not 1 so the metric never gets unregistered


Canceling job A will remove the lingering metrics from the old job B

It seems to me that this is a bug and that the childs that are being 
added in notifyOfAddedMetric 
 
should be removed in notifyOfRemovedMetric


Can you confirm this ?


--Jelmer



On Fri, 15 Jun 2018 at 18:01, Chesnay Schepler > wrote:


I remember that another user reported something similar, but he
wasn't using the PrometheusReporter. see

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/JVM-metrics-disappearing-after-job-crash-restart-tt20420.html

We couldn't find the cause, but my suspicion was FLINK-8946 which
will be fixed in 1.4.3 .
You could cherry-pick 8b046fafb6ee77a86e360f6b792e7f73399239bd and
see whether this actually caused it.

Alternatively, if you can reproduce this it would be immensely
helpful if you could modify the PrometheusReporter and log all
notifications about added or removed metrics.

On 15.06.2018 15:42, Till Rohrmann wrote:

Hi,

this sounds very strange. I just tried it out locally with with a
standard metric and the Prometheus metrics seem to be
unregistered after the job has reached a terminal state. Thus, it
looks as if the standard metrics are properly removed from
`CollectorRegistry.defaultRegistry`. Could you check the log
files whether they contain anything suspicious about a failed
metric deregistration a la `There was a problem unregistering
metric`?

I've also pulled in Chesnay who knows more about the metric
reporters.

Cheers,
Till

On Thu, Jun 14, 2018 at 11:34 PM jelmer mailto:jkupe...@gmail.com>> wrote:

Hi

We are using flink-metrics-prometheus for reporting on apache
flink 1.4.2

And I am looking into an issue where it seems that somehow in
some cases the metrics registered
by org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup
(flink_taskmanager_job_task_buffers_outPoolUsage etc)  are
not being unregistered in prometheus in case of a job restart

Eventually this seems to cause
a java.lang.NoClassDefFoundError:
org/apache/kafka/common/metrics/stats/Rate$1 error when a new
version of the job is deployed  because the jar file
in /tmp/blobStore-foo/job_bar/blob_p-baz-qux has been removed
upon deployment of the new job but the url classloader still
points to it and it cannot find stats/Rate$1 (some
synthetically generated code generated by the java compiler
because its a switch on an enum)

Has anybody come across this issue ? Has it possibly been
fixed in 1.5 ? Can somebody any pointers as to where to look
to tackle this ?

Attached screenshot shows what classloader that cannot be
garbage collected with the gc root







Re: Few question about upgrade from 1.4 to 1.5 flink ( some very basic )

2018-06-26 Thread Gary Yao
Hi Vishal,

You should check the contents of znode /flink_test/[...]/rest_server_lock
to see
if the URL is correct.

The host and port should be logged by the RestClient [1]. If you do not see
the
message "Sending request of class [...]]" on DEBUG level, probably the
client is
not able to get the address from ZK in the first place.

Lastly, the stacktrace you posted seems to be cut off. Can you attach the
complete client logs?

Best,
Gary

[1]
https://github.com/apache/flink/blob/release-1.5.0-rc6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java#L156

On Tue, Jun 26, 2018 at 4:09 PM, Vishal Santoshi 
wrote:

> The leader znode is the right one  ( it is a binary )
>
> get /flink_test/da_15/leader/00
> 00/job_manager_lock
>
> wFDakka.tcp://fl...@flink-9edd15d7.bf2.tumblr.net:22161/
> user/jobmanagersrjava.util.UUIDm/J
>
>
>   leastSigBitsJ
>
>
> mostSigBitsxpHv
>
>
> So it does ( I think ) resolve the right leader of the HA, but from there
> ( the logs do not help as DEBUG logs do not expose what server it hits
> sadly ) .
>
>
> On Tue, Jun 26, 2018 at 9:57 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> OK few things
>>
>> 2018-06-26 13:31:29 INFO  CliFrontend:282 -  Starting Command Line
>> Client (Version: 1.5.0, Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
>>
>> ...
>>
>> 2018-06-26 13:31:31 INFO  ClientCnxn:876 - Socket connection established
>> to zk-f1fb95b9.bf2.tumblr.net/10.246.218.17:2181, initiating session
>>
>> 2018-06-26 13:31:31 DEBUG ClientCnxn:949 - Session establishment request
>> sent on zk-f1fb95b9.bf2.tumblr.net/10.246.218.17:2181
>>
>> 2018-06-26 13:31:31 INFO  ClientCnxn:1299 - Session establishment
>> complete on server zk-f1fb95b9.bf2.tumblr.net/10.246.218.17:2181,
>> sessionid = 0x35add547801ea07, negotiated timeout = 4
>>
>> 2018-06-26 13:31:31 INFO  RestClient:119 - Rest client endpoint started.
>>
>> 2018-06-26 13:31:31 INFO  ZooKeeperLeaderRetrievalService:100 - Starting
>> ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
>>
>> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
>> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
>> finished:false header:: 1,3  replyHeader:: 1,60416530560,0  request::
>> '/flink_test,F  response:: s{47265479496,47265479496,1489
>> 163688703,1489163688703,0,2,0,0,0,2,60416492885}
>>
>> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
>> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
>> finished:false header:: 2,3  replyHeader:: 2,60416530560,0  request::
>> '/flink_test/da_15,F  response:: s{60416492885,60416492885,1529
>> 755199131,1529755199131,0,5,0,0,0,5,60416521584}
>>
>> 2018-06-26 13:31:31 INFO  ZooKeeperLeaderRetrievalService:100 - Starting
>> ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
>>
>> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
>> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
>> finished:false header:: 3,3  replyHeader:: 3,60416530560,0  request::
>> '/flink_test,F  response:: s{47265479496,47265479496,1489
>> 163688703,1489163688703,0,2,0,0,0,2,60416492885}
>>
>> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
>> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
>> finished:false header:: 4,3  replyHeader:: 4,60416530560,0  request::
>> '/flink_test/da_15,F  response:: s{60416492885,60416492885,1529
>> 755199131,1529755199131,0,5,0,0,0,5,60416521584}
>>
>> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
>> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
>> finished:false header:: 5,3  replyHeader:: 5,60416530560,0  request::
>> '/flink_test/da_15/leader,F  response:: s{60416492887,60416492887,1529
>> 755199191,1529755199191,0,1,0,0,0,1,60416492888}
>>
>> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
>> sessionid:0x35add547801ea07, packet:: 
>> clientPath:/flink_test/da_15/leader/rest_server_lock
>> serverPath:/flink_test/da_15/leader/rest_server_lock finished:false
>> header:: 6,3  replyHeader:: 6,60416530560,-101  request::
>> '/flink_test/da_15/leader/rest_server_lock,T  response::
>>
>> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
>> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
>> finished:false header:: 7,3  replyHeader:: 7,60416530560,0  request::
>> '/flink_test,F  response:: s{47265479496,47265479496,1489
>> 163688703,1489163688703,0,2,0,0,0,2,60416492885}
>>
>> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
>> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
>> finished:false header:: 8,3  replyHeader:: 8,60416530560,0  request::
>> '/flink_test/da_15,F  response:: s{60416492885,60416492885,1529
>> 755199131,1529755199131,0,5,0,0,0,5,60416521584}
>>
>> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
>> sessionid:0x35add547801ea07, 

SANSA 0.4 (Scalable Semantic Analytics Stack using Spark/Flink) Released

2018-06-26 Thread Jens Lehmann


Dear all,

The Smart Data Analytics group [1] is happy to announce SANSA 0.4 - the
fourth release of the Scalable Semantic Analytics Stack. SANSA employs
distributed computing via Apache Spark and Flink in order to allow
scalable machine learning, inference and querying capabilities for large
knowledge graphs.

Website:   http://sansa-stack.net
GitHub:https://github.com/SANSA-Stack
Download:  https://github.com/SANSA-Stack/SANSA-Stack/releases

You can find the FAQ and usage examples at http://sansa-stack.net/faq/.

The following features are currently supported by SANSA:

* Reading and writing RDF files in N-Triples, Turtle, RDF/XML, N-Quad
  format
* Reading OWL files in various standard formats
* Support for multiple data partitioning techniques
* SPARQL querying via Sparqlify
* Graph-parallel querying of RDF using SPARQL (1.0) via GraphX
  traversals (experimental)
* RDFS, RDFS Simple, OWL-Horst, EL (experimental) forward chaining
  inference
* Automatic inference plan creation (experimental)
* RDF graph clustering with different algorithms
* Terminological decision trees (experimental)
* Anomaly detection (beta)
* Knowledge graph embedding approaches: TransE (beta), DistMult (beta)

Noteworthy changes or updates since the previous release are:

* Parser performance has been improved significantly [2] e.g. DBpedia
  2016-10 can be loaded in <100 seconds on a 7 node cluster
* Support for a wider range of data partitioning strategies
* A better unified API across data representations (RDD, DataFrame,
  DataSet, Graph) for triple operations
* Improved unit test coverage
* Improved distributed statistics calculation (see ISWC paper [3])
* Initial scalability tests on 6 billion triple Ethereum blockchain data
  on a 100 node cluster [4]
* New SPARQL-to-GraphX rewriter aiming at providing better performance
  for queries exploiting graph locality
* Numeric outlier detection tested on DBpedia (en)
* Improved clustering tested on 20 GB RDF data sets

Deployment and getting started:

* There are template projects for SBT and Maven for Apache Spark as well
  as for Apache Flink available [5] to get started.
* The SANSA jar files are in Maven Central i.e. in most IDEs you can
  just search for “sansa” to include the dependencies in Maven projects.
* Example code is available for various tasks [6].
* We provide interactive notebooks for running and testing code [7] via
  Docker.

We want to thank everyone who helped to create this release, in
particular the projects supporting us [8]: Big Data Europe, HOBBIT,
SAKE, Big Data Ocean, SLIPO, QROWD, BETTER, BOOST and SPECIAL.

View this announcement on Twitter and the SANSA blog:
  http://sansa-stack.net/sansa-0-4/
  https://twitter.com/SANSA_Stack/status/1011633150257188864

Kind regards,

The SANSA Development Team
(http://sansa-stack.net/community/#Contributors)

 [1] http://sda.tech
 [2] http://sansa-stack.net/sansa-parser-performance-improved/
 [3] http://jens-lehmann.org/files/2018/iswc_distlodstats.pdf
 [4]
https://media.consensys.net/alethio-links-up-with-sansa-semantic-analytics-stack-to-analyze-ethereum-at-new-scales-b26055540167
 [5] http://sansa-stack.net/downloads-usage/
 [6] https://github.com/SANSA-Stack/SANSA-Examples
 [7] https://github.com/SANSA-Stack/SANSA-Notebooks
 [8] http://sansa-stack.net/powered-by/


-- 
Prof. Dr. Jens Lehmann
http://jens-lehmann.org
http://sda.tech
Computer Science Institute   Knowledge Discovery Department
University of Bonn   Fraunhofer IAIS
http://www.cs.uni-bonn.dehttp://www.iais.fraunhofer.de
lehm...@uni-bonn.de  jens.lehm...@iais.fraunhofer.de



Re: TaskIOMetricGroup metrics not unregistered in prometheus on job failure ?

2018-06-26 Thread jelmer
Hi Chesnay, sorry for the late reply. I did not have time to look into this
sooner

I did what you suggested. Added some logging to the PrometheusReporter like
this :

https://github.com/jelmerk/flink/commit/58779ee60a8c3961f3eb2c487c603c33822bba8a

And deployed a custom build of the reporter to our test environment.

I managed to reproduce the issue like this

1. Deploy job A : it lands on worker 1
2. Deploy job B : it lands on worker 1, take note of the job id
3. Redeploy job b by canceling it from a savepoint and deploying it again
from the savepoint : it lands on worker 3
4. Execute curl -s http://localhost:9249/metrics | grep "job id from step
2" on worker 1. The metrics are still exposed even though the job is
canceled

I attached a piece of the log to the email. What I notice is that the two
jobs register metrics with the same scoped metric name. In this case
flink_taskmanager_job_task_buffers_inputQueueLength.

The prometheus exporter seems to use reference counting for the metrics and
the metrics will only be removed when the count is 0, canceling job B will
lower the counter by 5 but because job A still is deployed the count is not
1 so the metric never gets unregistered

Canceling job A will remove the lingering metrics from the old job B

It seems to me that this is a bug and that the childs that are being added
in notifyOfAddedMetric

should be removed in notifyOfRemovedMetric

Can you confirm this ?


--Jelmer



On Fri, 15 Jun 2018 at 18:01, Chesnay Schepler  wrote:

> I remember that another user reported something similar, but he wasn't
> using the PrometheusReporter. see
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/JVM-metrics-disappearing-after-job-crash-restart-tt20420.html
>
> We couldn't find the cause, but my suspicion was FLINK-8946 which will be
> fixed in 1.4.3 .
> You could cherry-pick 8b046fafb6ee77a86e360f6b792e7f73399239bd and see
> whether this actually caused it.
>
> Alternatively, if you can reproduce this it would be immensely helpful if
> you could modify the PrometheusReporter and log all notifications about
> added or removed metrics.
>
> On 15.06.2018 15:42, Till Rohrmann wrote:
>
> Hi,
>
> this sounds very strange. I just tried it out locally with with a standard
> metric and the Prometheus metrics seem to be unregistered after the job has
> reached a terminal state. Thus, it looks as if the standard metrics are
> properly removed from `CollectorRegistry.defaultRegistry`. Could you check
> the log files whether they contain anything suspicious about a failed
> metric deregistration a la `There was a problem unregistering metric`?
>
> I've also pulled in Chesnay who knows more about the metric reporters.
>
> Cheers,
> Till
>
> On Thu, Jun 14, 2018 at 11:34 PM jelmer  wrote:
>
>> Hi
>>
>> We are using flink-metrics-prometheus for reporting on apache flink 1.4.2
>>
>> And I am looking into an issue where it seems that somehow in some cases
>> the metrics registered
>> by org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup
>> (flink_taskmanager_job_task_buffers_outPoolUsage etc)  are not being
>> unregistered in prometheus in case of a job restart
>>
>> Eventually this seems to cause a java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/metrics/stats/Rate$1 error when a new version of
>> the job is deployed  because the jar file
>> in /tmp/blobStore-foo/job_bar/blob_p-baz-qux has been removed upon
>> deployment of the new job but the url classloader still points to it and it
>> cannot find stats/Rate$1 (some synthetically generated code generated by
>> the java compiler because its a switch on an enum)
>>
>> Has anybody come across this issue ? Has it possibly been fixed in 1.5 ?
>> Can somebody any pointers as to where to look to tackle this ?
>>
>> Attached screenshot shows what classloader that cannot be garbage
>> collected with the gc root
>>
>>
>
2018-06-26 15:21:41.602 [marshalled-with-bot-header-for-dev -> Sink: 
kafka-sink-for-dev (1/1)] INFO  
org.apache.flink.metrics.prometheus.PrometheusReporter  - Metric added with 
metricName inputQueueLength and scopedMetricName 
flink_taskmanager_job_task_buffers_inputQueueLength and metricGroup scope 
components 
flinkworker001,taskmanager,odin-router-v2,marshalled-with-bot-header-for-dev -> 
Sink: kafka-sink-for-dev,0,buffers and metric group variables 
{=58f009823d425ec30a95fcab9b91929f, 
=cc2765d0460902a818200cb341f809c2, 
=d00c5dc5565d99849c4d996057cdf99f, =flinkworker001, 
=marshalled-with-bot-header-for-dev -> Sink: kafka-sink-for-dev, 
=0, =odin-router-v2, 
=9fce323ec41b9537e85d6ed52c38cdb2, =0}. Count is 0
2018-06-26 15:21:41.605 [bots-for-dev -> Map -> Sink: bots-sink-for-dev (1/1)] 
INFO  org.apache.flink.metrics.prometheus.PrometheusReporter  - Metric added 
with metricName inputQueueLength and scopedMetricName 

Re: Measure Latency from source to sink

2018-06-26 Thread antonio saldivar
Hello thank you

I also was trying using Flink UI Metrics on version 1.4.2
*env.getConfig().setLatencyTrackingInterval(1000L),
*But looks like is not displaying anything

El mar., 26 jun. 2018 a las 10:45, zhangminglei (<18717838...@163.com>)
escribió:

> Hi, You can do that but it does not makes sense in general. But you can do
> that by flink, storm, spark streaming or structured streaming.  And make a
> compare the latency under different framework.
>
> Cheers
> Minglei
>
> 在 2018年6月26日,下午9:36,antonio saldivar  写道:
>
> Hello Thank you for the feedback,
>
> Well for now I just Want to measure the time that takes form Source to
> Sink each transaction add the start and end time in mills
>
>
>
> El mar., 26 jun. 2018 a las 5:19, zhangminglei (<18717838...@163.com>)
> escribió:
>
>> Hi,Antonio
>>
>> Usually, the measurement of delay is for specific business I think it is
>> more reasonable. What I understand of latency from my experience is data
>> preparation time plus query calculation time. It is like an end to end
>> latency test. Hopes this can help you. Not point to the latency of flink
>>
>> Cheers
>> Minglei
>>
>>
>> > 在 2018年6月26日,上午5:23,antonio saldivar  写道:
>> >
>> > Hello
>> >
>> > I am trying to measure the latency of each transaction traveling across
>> the system as a DataSource I have a Kafka consumer and I would like to
>> measure the time that takes from the Source to Sink. Does any one has an
>> example?.
>> >
>> > Thank you
>> > Best Regards
>>
>>
>>
>


Re: Measure Latency from source to sink

2018-06-26 Thread zhangminglei
Hi, You can do that but it does not makes sense in general. But you can do that 
by flink, storm, spark streaming or structured streaming.  And make a compare 
the latency under different framework.

Cheers
Minglei

> 在 2018年6月26日,下午9:36,antonio saldivar  写道:
> 
> Hello Thank you for the feedback,
> 
> Well for now I just Want to measure the time that takes form Source to Sink 
> each transaction add the start and end time in mills
> 
> 
> 
> El mar., 26 jun. 2018 a las 5:19, zhangminglei (<18717838...@163.com 
> >) escribió:
> Hi,Antonio
> 
> Usually, the measurement of delay is for specific business I think it is more 
> reasonable. What I understand of latency from my experience is data 
> preparation time plus query calculation time. It is like an end to end 
> latency test. Hopes this can help you. Not point to the latency of flink
> 
> Cheers
> Minglei
> 
> 
> > 在 2018年6月26日,上午5:23,antonio saldivar  > > 写道:
> > 
> > Hello
> > 
> > I am trying to measure the latency of each transaction traveling across the 
> > system as a DataSource I have a Kafka consumer and I would like to measure 
> > the time that takes from the Source to Sink. Does any one has an example?.
> > 
> > Thank you
> > Best Regards
> 
> 



Re: Few question about upgrade from 1.4 to 1.5 flink ( some very basic )

2018-06-26 Thread Vishal Santoshi
The leader znode is the right one  ( it is a binary )

get
/flink_test/da_15/leader//job_manager_lock

wFDakka.tcp://
fl...@flink-9edd15d7.bf2.tumblr.net:22161/user/jobmanagersrjava.util.UUIDm/J


  leastSigBitsJ


  mostSigBitsxpHv


So it does ( I think ) resolve the right leader of the HA, but from there (
the logs do not help as DEBUG logs do not expose what server it hits sadly
) .


On Tue, Jun 26, 2018 at 9:57 AM, Vishal Santoshi 
wrote:

> OK few things
>
> 2018-06-26 13:31:29 INFO  CliFrontend:282 -  Starting Command Line Client
> (Version: 1.5.0, Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
>
> ...
>
> 2018-06-26 13:31:31 INFO  ClientCnxn:876 - Socket connection established
> to zk-f1fb95b9.bf2.tumblr.net/10.246.218.17:2181, initiating session
>
> 2018-06-26 13:31:31 DEBUG ClientCnxn:949 - Session establishment request
> sent on zk-f1fb95b9.bf2.tumblr.net/10.246.218.17:2181
>
> 2018-06-26 13:31:31 INFO  ClientCnxn:1299 - Session establishment
> complete on server zk-f1fb95b9.bf2.tumblr.net/10.246.218.17:2181,
> sessionid = 0x35add547801ea07, negotiated timeout = 4
>
> 2018-06-26 13:31:31 INFO  RestClient:119 - Rest client endpoint started.
>
> 2018-06-26 13:31:31 INFO  ZooKeeperLeaderRetrievalService:100 - Starting
> ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
>
> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
> finished:false header:: 1,3  replyHeader:: 1,60416530560,0  request::
> '/flink_test,F  response:: s{47265479496,47265479496,
> 1489163688703,1489163688703,0,2,0,0,0,2,60416492885}
>
> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
> finished:false header:: 2,3  replyHeader:: 2,60416530560,0  request::
> '/flink_test/da_15,F  response:: s{60416492885,60416492885,
> 1529755199131,1529755199131,0,5,0,0,0,5,60416521584}
>
> 2018-06-26 13:31:31 INFO  ZooKeeperLeaderRetrievalService:100 - Starting
> ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
>
> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
> finished:false header:: 3,3  replyHeader:: 3,60416530560,0  request::
> '/flink_test,F  response:: s{47265479496,47265479496,
> 1489163688703,1489163688703,0,2,0,0,0,2,60416492885}
>
> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
> finished:false header:: 4,3  replyHeader:: 4,60416530560,0  request::
> '/flink_test/da_15,F  response:: s{60416492885,60416492885,
> 1529755199131,1529755199131,0,5,0,0,0,5,60416521584}
>
> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
> finished:false header:: 5,3  replyHeader:: 5,60416530560,0  request::
> '/flink_test/da_15/leader,F  response:: s{60416492887,60416492887,
> 1529755199191,1529755199191,0,1,0,0,0,1,60416492888}
>
> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
> sessionid:0x35add547801ea07, packet:: 
> clientPath:/flink_test/da_15/leader/rest_server_lock
> serverPath:/flink_test/da_15/leader/rest_server_lock finished:false
> header:: 6,3  replyHeader:: 6,60416530560,-101  request::
> '/flink_test/da_15/leader/rest_server_lock,T  response::
>
> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
> finished:false header:: 7,3  replyHeader:: 7,60416530560,0  request::
> '/flink_test,F  response:: s{47265479496,47265479496,
> 1489163688703,1489163688703,0,2,0,0,0,2,60416492885}
>
> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
> finished:false header:: 8,3  replyHeader:: 8,60416530560,0  request::
> '/flink_test/da_15,F  response:: s{60416492885,60416492885,
> 1529755199131,1529755199131,0,5,0,0,0,5,60416521584}
>
> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
> sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
> finished:false header:: 9,3  replyHeader:: 9,60416530560,0  request::
> '/flink_test/da_15/leader,F  response:: s{60416492887,60416492887,
> 1529755199191,1529755199191,0,1,0,0,0,1,60416492888}
>
> 2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
> sessionid:0x35add547801ea07, packet:: 
> clientPath:/flink_test/da_15/leader/dispatcher_lock
> serverPath:/flink_test/da_15/leader/dispatcher_lock finished:false
> header:: 10,3  replyHeader:: 10,60416530560,-101  request::
> '/flink_test/da_15/leader/dispatcher_lock,T  response::
>
> 2018-06-26 13:31:31 INFO  CliFrontend:914 - Waiting for response...
>
> Waiting for response...
>
> 2018-06-26 13:31:44 DEBUG ClientCnxn:742 - Got ping response for
> sessionid: 0x35add547801ea07 after 

Re: Few question about upgrade from 1.4 to 1.5 flink ( some very basic )

2018-06-26 Thread Vishal Santoshi
OK few things

2018-06-26 13:31:29 INFO  CliFrontend:282 -  Starting Command Line Client
(Version: 1.5.0, Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)

...

2018-06-26 13:31:31 INFO  ClientCnxn:876 - Socket connection established to
zk-f1fb95b9.bf2.tumblr.net/10.246.218.17:2181, initiating session

2018-06-26 13:31:31 DEBUG ClientCnxn:949 - Session establishment request
sent on zk-f1fb95b9.bf2.tumblr.net/10.246.218.17:2181

2018-06-26 13:31:31 INFO  ClientCnxn:1299 - Session establishment complete
on server zk-f1fb95b9.bf2.tumblr.net/10.246.218.17:2181, sessionid =
0x35add547801ea07, negotiated timeout = 4

2018-06-26 13:31:31 INFO  RestClient:119 - Rest client endpoint started.

2018-06-26 13:31:31 INFO  ZooKeeperLeaderRetrievalService:100 - Starting
ZooKeeperLeaderRetrievalService /leader/rest_server_lock.

2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
finished:false header:: 1,3  replyHeader:: 1,60416530560,0  request::
'/flink_test,F  response::
s{47265479496,47265479496,1489163688703,1489163688703,0,2,0,0,0,2,60416492885}


2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
finished:false header:: 2,3  replyHeader:: 2,60416530560,0  request::
'/flink_test/da_15,F  response::
s{60416492885,60416492885,1529755199131,1529755199131,0,5,0,0,0,5,60416521584}


2018-06-26 13:31:31 INFO  ZooKeeperLeaderRetrievalService:100 - Starting
ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.

2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
finished:false header:: 3,3  replyHeader:: 3,60416530560,0  request::
'/flink_test,F  response::
s{47265479496,47265479496,1489163688703,1489163688703,0,2,0,0,0,2,60416492885}


2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
finished:false header:: 4,3  replyHeader:: 4,60416530560,0  request::
'/flink_test/da_15,F  response::
s{60416492885,60416492885,1529755199131,1529755199131,0,5,0,0,0,5,60416521584}


2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
finished:false header:: 5,3  replyHeader:: 5,60416530560,0  request::
'/flink_test/da_15/leader,F  response::
s{60416492887,60416492887,1529755199191,1529755199191,0,1,0,0,0,1,60416492888}


2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
sessionid:0x35add547801ea07, packet::
clientPath:/flink_test/da_15/leader/rest_server_lock
serverPath:/flink_test/da_15/leader/rest_server_lock finished:false
header:: 6,3  replyHeader:: 6,60416530560,-101  request::
'/flink_test/da_15/leader/rest_server_lock,T  response::

2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
finished:false header:: 7,3  replyHeader:: 7,60416530560,0  request::
'/flink_test,F  response::
s{47265479496,47265479496,1489163688703,1489163688703,0,2,0,0,0,2,60416492885}


2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
finished:false header:: 8,3  replyHeader:: 8,60416530560,0  request::
'/flink_test/da_15,F  response::
s{60416492885,60416492885,1529755199131,1529755199131,0,5,0,0,0,5,60416521584}


2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
sessionid:0x35add547801ea07, packet:: clientPath:null serverPath:null
finished:false header:: 9,3  replyHeader:: 9,60416530560,0  request::
'/flink_test/da_15/leader,F  response::
s{60416492887,60416492887,1529755199191,1529755199191,0,1,0,0,0,1,60416492888}


2018-06-26 13:31:31 DEBUG ClientCnxn:843 - Reading reply
sessionid:0x35add547801ea07, packet::
clientPath:/flink_test/da_15/leader/dispatcher_lock
serverPath:/flink_test/da_15/leader/dispatcher_lock finished:false header::
10,3  replyHeader:: 10,60416530560,-101  request::
'/flink_test/da_15/leader/dispatcher_lock,T  response::

2018-06-26 13:31:31 INFO  CliFrontend:914 - Waiting for response...

Waiting for response...

2018-06-26 13:31:44 DEBUG ClientCnxn:742 - Got ping response for sessionid:
0x35add547801ea07 after 0ms

2018-06-26 13:31:58 DEBUG ClientCnxn:742 - Got ping response for sessionid:
0x35add547801ea07 after 0ms

2018-06-26 13:32:01 INFO  RestClient:123 - Shutting down rest endpoint.

2018-06-26 13:32:01 INFO  RestClient:140 - Rest endpoint shutdown complete.

2018-06-26 13:32:01 INFO  ZooKeeperLeaderRetrievalService:117 - Stopping
ZooKeeperLeaderRetrievalService /leader/rest_server_lock.

2018-06-26 13:32:01 INFO  ZooKeeperLeaderRetrievalService:117 - Stopping
ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.

2018-06-26 13:32:01 DEBUG CuratorFrameworkImpl:282 - Closing

2018-06-26 13:32:01 INFO  CuratorFrameworkImpl:821 -
backgroundOperationsLoop exiting

2018-06-26 13:32:01 DEBUG 

Re: Measure Latency from source to sink

2018-06-26 Thread antonio saldivar
Hello Thank you for the feedback,

Well for now I just Want to measure the time that takes form Source to Sink
each transaction add the start and end time in mills



El mar., 26 jun. 2018 a las 5:19, zhangminglei (<18717838...@163.com>)
escribió:

> Hi,Antonio
>
> Usually, the measurement of delay is for specific business I think it is
> more reasonable. What I understand of latency from my experience is data
> preparation time plus query calculation time. It is like an end to end
> latency test. Hopes this can help you. Not point to the latency of flink
>
> Cheers
> Minglei
>
>
> > 在 2018年6月26日,上午5:23,antonio saldivar  写道:
> >
> > Hello
> >
> > I am trying to measure the latency of each transaction traveling across
> the system as a DataSource I have a Kafka consumer and I would like to
> measure the time that takes from the Source to Sink. Does any one has an
> example?.
> >
> > Thank you
> > Best Regards
>
>
>


Re: Restore state from save point with add new flink sql

2018-06-26 Thread Hequn Cheng
Hi

I'm not sure about the answer. I have a feeling that if we only add new
code below the old code(i.e., append new code after old code), the uid will
not be changed.

On Tue, Jun 26, 2018 at 3:06 PM, Till Rohrmann  wrote:

> I think so. Maybe Fabian or Timo can correct me if I'm wrong here.
>
> On Mon, Jun 25, 2018 at 9:17 AM James (Jian Wu) [FDS Data Platform] <
> james...@coupang.com> wrote:
>
>> Hi Till:
>>
>>
>>
>> Thanks for your answer, so if I just add new sql and not modified old sql
>> then use `/`--allowNonRestoredState option to restart job can resume old
>> sql state from savepoints?
>>
>>
>>
>> Regards
>>
>>
>>
>> James
>>
>>
>>
>> *From: *Till Rohrmann 
>> *Date: *Friday, June 15, 2018 at 8:13 PM
>> *To: *"James (Jian Wu) [FDS Data Platform]" 
>> *Cc: *user , Fabian Hueske ,
>> Timo Walther 
>> *Subject: *Re: Restore state from save point with add new flink sql
>>
>>
>>
>> Hi James,
>>
>>
>>
>> as long as you do not change anything for `sql1`, it should work to
>> recover from a savepoint if you pass the `-n`/`--allowNonRestoredState`
>> option to the CLI when resuming your program from the savepoint. The reason
>> is that an operators generated uid depends on the operator and on its
>> inputs.
>>
>>
>>
>> I've also pulled in Fabian and Timo who will be able to tell you a little
>> bit more about the job modification support for stream SQL.
>>
>>
>>
>> Cheers,
>> Till
>>
>>
>>
>> On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS Data Platform] <
>> james...@coupang.com> wrote:
>>
>> *Hi:*
>>
>>
>>
>> *   My application use flink sql, I want to add new sql to the
>> application, *
>>
>>
>>
>> *For example first version is*
>>
>>
>>
>> DataStream paymentCompleteStream = 
>> *getKafkaStream*(env,
>> bootStrapServers, kafkaGroup, orderPaymentCompleteTopic)
>> .flatMap(new PaymentComplete2AggregatedOrderItemFlatMap()).
>> assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30)
>> .returns(TypeInformation.*of*(AggregatedOrderItems.class));
>>
>> tableEnv.registerDataStream("AggregatedOrderItems",
>> paymentCompleteStream, *concatFieldsName*(AggregatedOrderItems.class,
>> true, "eventTs"));
>>
>> tableEnv.registerFunction("group_concat", new GroupConcatFunction());
>>
>> Table resultTable = tableEnv.sqlQuery(*sql1*);
>> tableEnv.toAppendStream(resultTable, Row.class, qConfig)
>> .flatMap(new E5FlatmapFunction(resultSampleRate)).
>> setParallelism(30)
>> .filter(new FilterFunction() {
>> @Override
>> public boolean filter(DetectionResult value) throws Exception
>> {
>>return (value.getViolationCount() >= 5);
>> }
>> }).addSink(new DetectionResultMySqlSink());
>>
>>
>>
>> *Then second version, I add new sql*
>>
>>
>>
>> Table resultTable2 = tableEnv.sqlQuery(*sql2*);
>> tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
>> .flatMap(new A2FlatmapFunction(resultSampleRate)).
>> setParallelism(30)
>> .filter(new FilterFunction() {
>> @Override
>> public boolean filter(DetectionResult value) throws Exception
>> {
>> return (value.getViolationCount() >= 5);
>> }
>> }).addSink(new DetectionResultMySqlSink());
>>
>>
>>
>> *After restart job with savepoints, whether the original flink sql can be
>> restore success? Whether the flink will assign a new UID to original sql
>> operator? (I will not change the original sql)*
>>
>>
>>
>> *Regards*
>>
>>
>>
>> *James*
>>
>>
>>
>>


Re: Storing Streaming Data into Static source

2018-06-26 Thread Rad Rad
thanks Stefan.

I subscribed the streaming data from Kafka and I did some queries using
Flink.
I need to store some of the results into a static source. 

So, which is better data source can I define by Kafka source within Flink
API  MongoDB or Postgresql.  


Thanks again. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Storing Streaming Data into Static source

2018-06-26 Thread Stefan Richter
Hi,

I think this is not really a Flink related question. In any case, you might 
want to specify a bit more what you mean by „better", because usually there is 
no strict better but trade-offs and what is „better“ to somebody might not be 
„better“ for you.

Best,
Stefan

> Am 26.06.2018 um 12:54 schrieb Rad Rad :
> 
> Hi all, 
> 
> 
> Kindly, I want to save streaming data which subscribed from Kafka into a
> static data source. Which is better /MongoDB or PostgreSQL. 
> 
> 
> Radhya. 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Logback doesn't receive logs from job

2018-06-26 Thread Guilherme Nobre
Hi all,

I have a Flink cluster (1.4.0) built from flink's docker image, with 1 job
manager and 2 task managers. I'm trying to use logback isntead of log4j and
as far as the cluster configurations goes, seems alright.

Following
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/best_practices.html
I have removed log4j-1.2.xx.jar and sfl4j-log4j12-xxx.jar from lib/ and
added logback-classic.jar, logback-cor.jar and log4j-over-slf4j.jar. This
is a snippet from the logs:

*2018-06-26 11:02:47.760 [main] DEBUG
org.apache.hadoop.security.UserGroupInformation  - hadoop login*
*2018-06-26 11:02:47.761 [main] DEBUG
org.apache.hadoop.security.UserGroupInformation  - hadoop login commit*
*2018-06-26 11:02:47.764 [main] DEBUG
org.apache.hadoop.security.UserGroupInformation  - using local
user:UnixPrincipal: flink*
*2018-06-26 11:02:47.764 [main] DEBUG
org.apache.hadoop.security.UserGroupInformation  - Using user:
"UnixPrincipal: flink" with name flink*
*2018-06-26 11:02:47.765 [main] DEBUG
org.apache.hadoop.security.UserGroupInformation  - User entry: "flink"*
*2018-06-26 11:02:47.765 [main] DEBUG
org.apache.hadoop.security.UserGroupInformation  - Assuming keytab is
managed externally since logged in from subject.*
*2018-06-26 11:02:47.766 [main] DEBUG
org.apache.hadoop.security.UserGroupInformation  - UGI loginUser:flink
(auth:SIMPLE)*
*2018-06-26 11:02:47.766 [main] INFO
org.apache.flink.runtime.jobmanager.JobManager  -  Current Hadoop/Kerberos
user: flink*
*2018-06-26 11:02:47.766 [main] INFO
org.apache.flink.runtime.jobmanager.JobManager  -  JVM: OpenJDK 64-Bit
Server VM - Oracle Corporation - 1.8/25.151-b12*
*2018-06-26 11:02:47.766 [main] INFO
org.apache.flink.runtime.jobmanager.JobManager  -  Maximum heap size: 981
MiBytes*
*2018-06-26 11:02:47.766 [main] INFO
org.apache.flink.runtime.jobmanager.JobManager  -  JAVA_HOME:
/docker-java-home/jre*
*2018-06-26 11:02:47.771 [main] INFO
org.apache.flink.runtime.jobmanager.JobManager  -  Hadoop version: 2.8.0*
*2018-06-26 11:02:47.771 [main] INFO
org.apache.flink.runtime.jobmanager.JobManager  -  JVM Options:*
*2018-06-26 11:02:47.771 [main] INFO
org.apache.flink.runtime.jobmanager.JobManager  - -Xms1024m*
*2018-06-26 11:02:47.771 [main] INFO
org.apache.flink.runtime.jobmanager.JobManager  - -Xmx1024m*
*2018-06-26 11:02:47.771 [main] INFO
org.apache.flink.runtime.jobmanager.JobManager  -
 -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties*
*2018-06-26 11:02:47.772 [main] INFO
org.apache.flink.runtime.jobmanager.JobManager  -
 -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml*
*2018-06-26 11:02:47.772 [main] INFO
org.apache.flink.runtime.jobmanager.JobManager  -  Program Arguments:*
*2018-06-26 11:02:47.772 [main] INFO
org.apache.flink.runtime.jobmanager.JobManager  - --configDir*
*2018-06-26 11:02:47.772 [main] INFO
org.apache.flink.runtime.jobmanager.JobManager  - /opt/flink/conf*
*2018-06-26 11:02:47.772 [main] INFO
org.apache.flink.runtime.jobmanager.JobManager  - --executionMode*
*2018-06-26 11:02:47.772 [main] INFO
org.apache.flink.runtime.jobmanager.JobManager  - cluster*
*2018-06-26 11:02:47.772 [main] INFO
org.apache.flink.runtime.jobmanager.JobManager  -  Classpath:
/opt/flink/lib/flink-python_2.11-1.4.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.4.0.jar:/opt/flink/lib/log4j-over-slf4j.jar:/opt/flink/lib/logback-classic.jar:/opt/flink/lib/logback-core.jar:/opt/flink/lib/flink-dist_2.11-1.4.0.jar:::*

Above you can see the logback jars are properly in the classpath and that
is using logback-console.xml as the configuration file. This is the content
logback-console.xml:

**
**
**
*%d{-MM-dd HH:mm:ss.SSS} [%thread] %-5level
%logger{60} %X{sourceThread} - %msg%n*
**
**

**
**
**
**

Which justify the DEBUG logs seem above as well, setting the level to DEBUG
is working fine. Now comes the weirdest part. This is part of the pipeline:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Pipeline {

private static final Logger LOGGER =
LoggerFactory.getLogger(Pipeline.class);

public static void main(String[] args) throws Exception {

LOGGER.info("
INFO");
LOGGER.warn("
WARN");
LOGGER.error("
ERROR");
LOGGER.debug("
DEBUG");
LOGGER.trace("
TRACE");

As you can see, I'm important slf4j logger and logger factory and
instantiating the logger. Then, I just log all 5 levels.

I can't see ANY of these logs in the job manager / task manager logs. They
work fine locally though:

*13:16:11.030 [main] INFO Pipeline -
 INFO*
*13:16:11.035 [main] WARN Pipeline -

Storing Streaming Data into Static source

2018-06-26 Thread Rad Rad
Hi all, 


Kindly, I want to save streaming data which subscribed from Kafka into a
static data source. Which is better /MongoDB or PostgreSQL. 


Radhya. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Few question about upgrade from 1.4 to 1.5 flink ( some very basic )

2018-06-26 Thread zhangminglei
By the way, in HA set up.

> 在 2018年6月26日,下午5:39,zhangminglei <18717838...@163.com> 写道:
> 
> Hi, Gary Yao
> 
> Once I discovered that there was a change in the ip address[ 
> jobmanager.rpc.address ]. From 10.208.73.129 to localhost. I think that will 
> cause the issue. What do you think ?
> 
> Cheers
> Minglei
> 
>> 在 2018年6月26日,下午4:53,Gary Yao > > 写道:
>> 
>> Hi Vishal,
>> 
>> Could it be that you are not using the 1.5.0 client? The stacktrace you 
>> posted
>> does not reference valid lines of code in the release-1.5.0-rc6 tag. 
>> 
>> If you have a HA setup, the host and port of the leading JM will be looked up
>> from ZooKeeper before job submission. Therefore, the flink-conf.yaml used by 
>> the
>> client must have the same ZooKeeper configuration as used by the Flink 
>> cluster.
>> 
>> Best,
>> Gary
>> 
>> On Mon, Jun 25, 2018 at 5:32 PM, Vishal Santoshi > > wrote:
>> I think all I need to add is 
>> 
>> web.port: 8081
>> rest.port: 8081
>> 
>> to the JM flink conf ? 
>> 
>> On Mon, Jun 25, 2018 at 10:46 AM, Vishal Santoshi > > wrote:
>> Another issue I saw with flink cli...
>> 
>> org.apache.flink.client.program.ProgramInvocationException: The program 
>> execution failed: JobManager did not respond within 12 ms
>>  at 
>> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:524)
>>  at 
>> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103)
>>  at 
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
>>  at 
>> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
>>  at org.apach
>> 
>> This was a simple submission  and it does succeed through the UI. 
>> 
>> Has there been a regression on CLI... I could not find any documentation 
>> around it. 
>> 
>> I have a HA JM setup.
>> 
>> 
>> 
>> 
>> On Mon, Jun 25, 2018 at 10:22 AM, Chesnay Schepler > > wrote:
>> The watermark issue is know and will be fixed in 1.5.1
>> 
>> 
>> On 25.06.2018 15:03, Vishal Santoshi wrote:
>>> Thank you  
>>> 
>>> One addition
>>> 
>>> I do not see WM info on the UI  ( Attached ) 
>>> 
>>> Is this a know issue. The same pipe on our production has the WM ( In fact 
>>> never had an issue with  Watermarks not appearing ) . Am I missing 
>>> something ?
>>> 
>>> On Mon, Jun 25, 2018 at 4:15 AM, Fabian Hueske >> > wrote:
>>> Hi Vishal,
>>> 
>>> 1. I don't think a rolling update is possible. Flink 1.5.0 changed the 
>>> process orchestration and how they communicate. IMO, the way to go is to 
>>> start a Flink 1.5.0 cluster, take a savepoint on the running job, start 
>>> from the savepoint on the new cluster and shut the old job down.
>>> 2. Savepoints should be compatible.
>>> 3. You can keep the slot configuration as before.
>>> 4. As I said before, mixing 1.5 and 1.4 processes does not work (or at 
>>> least, it was not considered a design goal and nobody paid attention that 
>>> it is possible).
>>> 
>>> Best, Fabian
>>> 
>>> 
>>> 2018-06-23 13:38 GMT+02:00 Vishal Santoshi >> >:
>>> 
>>> 1.  
>>> Can or has any one  done  a rolling upgrade from 1.4 to 1.5 ?  I am not 
>>> sure we can. It seems that JM cannot recover jobs with this exception
>>> 
>>> Caused by: java.io.InvalidClassException: 
>>> org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; 
>>> local class incompatible: stream classdesc serialVersionUID = 
>>> -647384516034982626, local class serialVersionUID = 2
>>> 
>>> 
>>> 
>>> 2. 
>>> Does SP on 1.4, resume on 1.5 ( pretty basic but no harm asking ) ?
>>> 
>>> 
>>> 
>>> 3. 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html#update-configuration-for-reworked-job-deployment
>>>  
>>> 
>>>  The taskmanager.numberOfTaskSlots: What would be the desired setting in a 
>>> stand alone ( non mesos/yarn ) cluster ?
>>> 
>>> 
>>> 4. I suspend all jobs and establish 1.5 on the JM ( the TMs are still 
>>> running with 1.4 ) . JM refuse to start  with 
>>> 
>>> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net 
>>>  docker[3395]: 2018-06-23 11:34:23 
>>> ERROR JobManager:116 - Failed to recover job 
>>> 454cd84a519f3b50e88bcb378d8a1330.
>>> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net 
>>>  docker[3395]: 
>>> java.lang.InstantiationError: org.apache.flink.runtime.blob.BlobKey
>>> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net 
>>>  docker[3395]: at 
>>> sun.reflect.GeneratedSerializationConstructorAccessor51.newInstance(Unknown 
>>> Source)
>>> Jun 23 07:34:23 

Re: Few question about upgrade from 1.4 to 1.5 flink ( some very basic )

2018-06-26 Thread zhangminglei
Hi, Gary Yao

Once I discovered that there was a change in the ip address[ 
jobmanager.rpc.address ]. From 10.208.73.129 to localhost. I think that will 
cause the issue. What do you think ?

Cheers
Minglei

> 在 2018年6月26日,下午4:53,Gary Yao  写道:
> 
> Hi Vishal,
> 
> Could it be that you are not using the 1.5.0 client? The stacktrace you posted
> does not reference valid lines of code in the release-1.5.0-rc6 tag. 
> 
> If you have a HA setup, the host and port of the leading JM will be looked up
> from ZooKeeper before job submission. Therefore, the flink-conf.yaml used by 
> the
> client must have the same ZooKeeper configuration as used by the Flink 
> cluster.
> 
> Best,
> Gary
> 
> On Mon, Jun 25, 2018 at 5:32 PM, Vishal Santoshi  > wrote:
> I think all I need to add is 
> 
> web.port: 8081
> rest.port: 8081
> 
> to the JM flink conf ? 
> 
> On Mon, Jun 25, 2018 at 10:46 AM, Vishal Santoshi  > wrote:
> Another issue I saw with flink cli...
> 
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: JobManager did not respond within 12 ms
>   at 
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:524)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
>   at 
> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
>   at org.apach
> 
> This was a simple submission  and it does succeed through the UI. 
> 
> Has there been a regression on CLI... I could not find any documentation 
> around it. 
> 
> I have a HA JM setup.
> 
> 
> 
> 
> On Mon, Jun 25, 2018 at 10:22 AM, Chesnay Schepler  > wrote:
> The watermark issue is know and will be fixed in 1.5.1
> 
> 
> On 25.06.2018 15:03, Vishal Santoshi wrote:
>> Thank you  
>> 
>> One addition
>> 
>> I do not see WM info on the UI  ( Attached ) 
>> 
>> Is this a know issue. The same pipe on our production has the WM ( In fact 
>> never had an issue with  Watermarks not appearing ) . Am I missing something 
>> ?
>> 
>> On Mon, Jun 25, 2018 at 4:15 AM, Fabian Hueske > > wrote:
>> Hi Vishal,
>> 
>> 1. I don't think a rolling update is possible. Flink 1.5.0 changed the 
>> process orchestration and how they communicate. IMO, the way to go is to 
>> start a Flink 1.5.0 cluster, take a savepoint on the running job, start from 
>> the savepoint on the new cluster and shut the old job down.
>> 2. Savepoints should be compatible.
>> 3. You can keep the slot configuration as before.
>> 4. As I said before, mixing 1.5 and 1.4 processes does not work (or at 
>> least, it was not considered a design goal and nobody paid attention that it 
>> is possible).
>> 
>> Best, Fabian
>> 
>> 
>> 2018-06-23 13:38 GMT+02:00 Vishal Santoshi > >:
>> 
>> 1.  
>> Can or has any one  done  a rolling upgrade from 1.4 to 1.5 ?  I am not sure 
>> we can. It seems that JM cannot recover jobs with this exception
>> 
>> Caused by: java.io.InvalidClassException: 
>> org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; 
>> local class incompatible: stream classdesc serialVersionUID = 
>> -647384516034982626, local class serialVersionUID = 2
>> 
>> 
>> 
>> 2. 
>> Does SP on 1.4, resume on 1.5 ( pretty basic but no harm asking ) ?
>> 
>> 
>> 
>> 3. 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html#update-configuration-for-reworked-job-deployment
>>  
>> 
>>  The taskmanager.numberOfTaskSlots: What would be the desired setting in a 
>> stand alone ( non mesos/yarn ) cluster ?
>> 
>> 
>> 4. I suspend all jobs and establish 1.5 on the JM ( the TMs are still 
>> running with 1.4 ) . JM refuse to start  with 
>> 
>> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net 
>>  docker[3395]: 2018-06-23 11:34:23 
>> ERROR JobManager:116 - Failed to recover job 
>> 454cd84a519f3b50e88bcb378d8a1330.
>> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net 
>>  docker[3395]: 
>> java.lang.InstantiationError: org.apache.flink.runtime.blob.BlobKey
>> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net 
>>  docker[3395]: at 
>> sun.reflect.GeneratedSerializationConstructorAccessor51.newInstance(Unknown 
>> Source)
>> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net 
>>  docker[3395]: at 
>> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net 
>>  docker[3395]: at 

Re: Measure Latency from source to sink

2018-06-26 Thread zhangminglei
Hi,Antonio

Usually, the measurement of delay is for specific business I think it is more 
reasonable. What I understand of latency from my experience is data preparation 
time plus query calculation time. It is like an end to end latency test. Hopes 
this can help you. Not point to the latency of flink

Cheers
Minglei


> 在 2018年6月26日,上午5:23,antonio saldivar  写道:
> 
> Hello
> 
> I am trying to measure the latency of each transaction traveling across the 
> system as a DataSource I have a Kafka consumer and I would like to measure 
> the time that takes from the Source to Sink. Does any one has an example?.
> 
> Thank you
> Best Regards




Re: Few question about upgrade from 1.4 to 1.5 flink ( some very basic )

2018-06-26 Thread Gary Yao
Hi Vishal,

Could it be that you are not using the 1.5.0 client? The stacktrace you
posted
does not reference valid lines of code in the release-1.5.0-rc6 tag.

If you have a HA setup, the host and port of the leading JM will be looked
up
from ZooKeeper before job submission. Therefore, the flink-conf.yaml used
by the
client must have the same ZooKeeper configuration as used by the Flink
cluster.

Best,
Gary

On Mon, Jun 25, 2018 at 5:32 PM, Vishal Santoshi 
wrote:

> I think all I need to add is
>
> web.port: 8081
>
> rest.port: 8081
>
> to the JM flink conf ?
>
> On Mon, Jun 25, 2018 at 10:46 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Another issue I saw with flink cli...
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: JobManager did not respond within 12 ms
>>
>> at org.apache.flink.client.program.ClusterClient.runDetached(
>> ClusterClient.java:524)
>>
>> at org.apache.flink.client.program.StandaloneClusterClient.subm
>> itJob(StandaloneClusterClient.java:103)
>>
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:456)
>>
>> at org.apache.flink.client.program.DetachedEnvironment.finalize
>> Execute(DetachedEnvironment.java:77)
>>
>> at org.apach
>>
>> This was a simple submission  and it does succeed through the UI.
>>
>> Has there been a regression on CLI... I could not find any documentation
>> around it.
>>
>>
>> I have a HA JM setup.
>>
>>
>>
>>
>>
>> On Mon, Jun 25, 2018 at 10:22 AM, Chesnay Schepler 
>> wrote:
>>
>>> The watermark issue is know and will be fixed in 1.5.1
>>>
>>>
>>> On 25.06.2018 15:03, Vishal Santoshi wrote:
>>>
>>> Thank you
>>>
>>> One addition
>>>
>>> I do not see WM info on the UI  ( Attached )
>>>
>>> Is this a know issue. The same pipe on our production has the WM ( In
>>> fact never had an issue with  Watermarks not appearing ) . Am I missing
>>> something ?
>>>
>>> On Mon, Jun 25, 2018 at 4:15 AM, Fabian Hueske 
>>> wrote:
>>>
 Hi Vishal,

 1. I don't think a rolling update is possible. Flink 1.5.0 changed the
 process orchestration and how they communicate. IMO, the way to go is to
 start a Flink 1.5.0 cluster, take a savepoint on the running job, start
 from the savepoint on the new cluster and shut the old job down.
 2. Savepoints should be compatible.
 3. You can keep the slot configuration as before.
 4. As I said before, mixing 1.5 and 1.4 processes does not work (or at
 least, it was not considered a design goal and nobody paid attention that
 it is possible).

 Best, Fabian


 2018-06-23 13:38 GMT+02:00 Vishal Santoshi :

>
> 1.
> Can or has any one  done  a rolling upgrade from 1.4 to 1.5 ?  I am
> not sure we can. It seems that JM cannot recover jobs with this exception
>
> Caused by: java.io.InvalidClassException:
> org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
> local class incompatible: stream classdesc serialVersionUID =
> -647384516034982626, local class serialVersionUID = 2
>
>
>
>
> 2.
> Does SP on 1.4, resume on 1.5 ( pretty basic but no harm asking ) ?
>
>
>
> 3.
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/
> release-notes/flink-1.5.html#update-configuration-for-rework
> ed-job-deployment The taskmanager.numberOfTaskSlots: What would be
> the desired setting in a stand alone ( non mesos/yarn ) cluster ?
>
>
> 4. I suspend all jobs and establish 1.5 on the JM ( the TMs are still
> running with 1.4 ) . JM refuse to start  with
>
> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net docker[3395]:
> 2018-06-23 11:34:23 ERROR JobManager:116 - Failed to recover job
> 454cd84a519f3b50e88bcb378d8a1330.
>
> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net docker[3395]:
> java.lang.InstantiationError: org.apache.flink.runtime.blob.BlobKey
>
> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net docker[3395]: at
> sun.reflect.GeneratedSerializationConstructorAccessor51.newInstance(Unknown
> Source)
>
> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net docker[3395]: at
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>
> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net docker[3395]: at
> java.io.ObjectStreamClass.newInstance(ObjectStreamClass.java:1079)
>
> Jun
> .
>
>
>
> Any feedback would be highly appreciated...
>
>

>>>
>>>
>>
>


Re: String Interning

2018-06-26 Thread Stefan Richter
Hi,

you can enable object reuse via the execution config [1]: „By default, objects 
are not reused in Flink. Enabling the object reuse mode will instruct the 
runtime to reuse user objects for better performance. Keep in mind that this 
can lead to bugs when the user-code function of an operation is not aware of 
this behavior.“.

Best,
Stefan

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/execution_configuration.html
 


> Am 22.06.2018 um 20:09 schrieb Martin, Nick :
> 
> I have a job where I read data from Kafka, do some processing on it, and 
> write it to a database. When I read data out of Kafka, I put it into an 
> object that has a String field based on the Kafka message key. The possible 
> values for the message key are tightly constrained so there are fewer than 
> 100 possible unique key values. Profiling of the Flink job shows millions of 
> in flight stream elements, with an equal number of Strings, but I know all 
> the strings are duplicates of a small number of unique values.  So it’s an 
> ideal usecase for String interning. I’ve tried to use interning in the 
> constructors for the message elements, but I suspect that I need to do 
> something to preserve the interning when Flink serializes/deserializes 
> objects when passing them between operators. What’s the best way to 
> accomplish that?
>  
>  
>  
> 
> Notice: This e-mail is intended solely for use of the individual or entity to 
> which it is addressed and may contain information that is proprietary, 
> privileged and/or exempt from disclosure under applicable law. If the reader 
> is not the intended recipient or agent responsible for delivering the message 
> to the intended recipient, you are hereby notified that any dissemination, 
> distribution or copying of this communication is strictly prohibited. This 
> communication may also contain data subject to U.S. export laws. If so, data 
> subject to the International Traffic in Arms Regulation cannot be 
> disseminated, distributed, transferred, or copied, whether incorporated or in 
> its original form, to foreign nationals residing in the U.S. or abroad, 
> absent the express prior approval of the U.S. Department of State. Data 
> subject to the Export Administration Act may not be disseminated, 
> distributed, transferred or copied contrary to U. S. Department of Commerce 
> regulations. If you have received this communication in error, please notify 
> the sender by reply e-mail and destroy the e-mail message and any physical 
> copies made of the communication.
>  Thank you. 
> *



Re: Measure Latency from source to sink

2018-06-26 Thread Fabian Hueske
Hi,

Measuring latency is tricky and you have to be careful about what you
measure.
Aggregations like window operators make things even more difficult because
you need to decide which timestamp(s) to forward (smallest?, largest?, all?)
Depending on the operation, the measurement code might even add to the
overall latency.
Also, the clock of the nodes in your cluster might not be totally in sync.

Best, Fabian

2018-06-26 4:00 GMT+02:00 antonio saldivar :

> Thank you very much
>
> I already did #2 but ate the moment i print te output as i am using a
> trigger alert and evaluete the window it replace me the toString values to
> null or 0 and only prints the ones saved in my accumulator and the keyBy
> value
>
> On Mon, Jun 25, 2018, 9:22 PM Hequn Cheng  wrote:
>
>> Hi antonio,
>>
>> I see two options to solve your problem.
>> 1. Enable the latency tracking[1]. But you have to pay attention to it's
>> mechanism, for example, a) the sources only *periodically* emit a
>> special record and b) the latency markers are not accounting for the time
>> user records spend in operators as they are bypassing them.
>> 2. Add a time field to each of your record. Each time a record comes in
>> from the source, write down the time(t1), so that we can get the latency at
>> sink(t2) by  t2 - t1.
>>
>> Hope this helps.
>> Hequn
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-
>> master/monitoring/metrics.html#latency-tracking
>>
>> On Tue, Jun 26, 2018 at 5:23 AM, antonio saldivar 
>> wrote:
>>
>>> Hello
>>>
>>> I am trying to measure the latency of each transaction traveling across
>>> the system as a DataSource I have a Kafka consumer and I would like to
>>> measure the time that takes from the Source to Sink. Does any one has an
>>> example?.
>>>
>>> Thank you
>>> Best Regards
>>>
>>
>>


Re: How to partition within same physical node in Flink

2018-06-26 Thread Fabian Hueske
Hi,

keyBy() does not work hierarchically. Each keyBy() overrides the previous
partitioning.
You can keyBy(cam, seq#) which guarantees that all records with the same
(cam, seq#) are processed by the same parallel instance.
However, Flink does not give any guarantees about how the (cam, seq#)
partitions are distributed across slots (or even physical nodes).

Btw. why is it important that all records of the same cam are processed by
the same physical node?

Fabian

2018-06-25 21:36 GMT+02:00 Vijay Balakrishnan :

> I see a .slotSharingGroup for SingleOutputStreamOperator
> 
>  which can put parallel instances of operations in same TM slot.
> I also see a CoLocationGroup but do not see a .coLocationGroup for 
> SingleOutputStreamOperator to
> put a task on the same slot.Seems CoLocationGroup
> 
> is defined at JobVertex level and has nothing to do with for
> SingleOutputStreamOperator.
> TaskManager has many slots. Slots have many threads within it.
> I want to be able to put the cam1 partition(keyBy(cam) in 1 slot and then
> use a keyBy(seq#) to run on many threads within that cam1 slot.
>
> Vijay
>
> On Mon, Jun 25, 2018 at 10:10 AM Vijay Balakrishnan 
> wrote:
>
>> Thanks, Fabian.
>> Been reading your excellent book on Flink Streaming.Can't wait for more
>> chapters.
>> Attached a pic.
>>
>> [image: partition-by-cam-ts.jpg]
>>
>> I have records with seq# 1 and cam1 and cam2. I also have records with
>> varying seq#'s.
>> By partitioning on cam field first(keyBy(cam)), I can get cam1 partition
>> on the same task manager instance/slot/vCore(???)
>> Can I then have seq# 1 and seq# 2 for cam1 partition run in different
>> slots/threads on the same Task Manager instance(aka cam1 partition) using
>> keyBy(seq#) & setParallelism() ? Can *forward* Strategy be used to
>> achieve this ?
>>
>> TIA
>>
>>
>> On Mon, Jun 25, 2018 at 1:03 AM Fabian Hueske  wrote:
>>
>>> Hi,
>>>
>>> Flink distributes task instances to slots and does not expose physical
>>> machines.
>>> Records are partitioned to task instances by hash partitioning. It is
>>> also not possible to guarantee that the records in two different operators
>>> are send to the same slot.
>>> Sharing information by side-passing it (e.g., via a file on a machine or
>>> in a static object) is an anti-pattern and should be avoided.
>>>
>>> Best, Fabian
>>>
>>> 2018-06-24 20:52 GMT+02:00 Vijay Balakrishnan :
>>>
 Hi,

 Need to partition by cameraWithCube.getCam() 1st using
 parallelCamTasks(passed in as args).

 Then within each partition, need to partition again by
 cameraWithCube.getTs() but need to make sure each of the 2nd partition by
 getTS() runs on the same physical node ?

 How do I achieve that ?

 DataStream cameraWithCubeDataStream = env
 .addSource(new Source())
 .keyBy((cameraWithCube) -> cameraWithCube.getCam() )
 .process(new ProcessFunction() 
 {
 public void processElement(CameraWithCube cameraWithCube, 
 Context context, Collector collector) throws Exception {
 //do nothing
 }
 })
 .slotSharingGroup("camSharingGroup")//TODO: how to add camera# 
 of the partition
 .setParallelism(parallelCamTasks)
 .keyBy((cameraWithCube) -> cameraWithCube.getTs())
 .process(new ProcessFunction() 
 {
 public void processElement(CameraWithCube cameraWithCube, 
 Context context, Collector collector) throws Exception {
 //TODO: process code
 }
 })
 .setParallelism(noOfSlotsInEachPhysicalNode)//TODO: how many 
 parallel tasks within physical node
 .slotSharingGroup("??");//TODO: in same physical node

 TIA

>>>
>>>


Re: Restore state from save point with add new flink sql

2018-06-26 Thread Till Rohrmann
I think so. Maybe Fabian or Timo can correct me if I'm wrong here.

On Mon, Jun 25, 2018 at 9:17 AM James (Jian Wu) [FDS Data Platform] <
james...@coupang.com> wrote:

> Hi Till:
>
>
>
> Thanks for your answer, so if I just add new sql and not modified old sql
> then use `/`--allowNonRestoredState option to restart job can resume old
> sql state from savepoints?
>
>
>
> Regards
>
>
>
> James
>
>
>
> *From: *Till Rohrmann 
> *Date: *Friday, June 15, 2018 at 8:13 PM
> *To: *"James (Jian Wu) [FDS Data Platform]" 
> *Cc: *user , Fabian Hueske ,
> Timo Walther 
> *Subject: *Re: Restore state from save point with add new flink sql
>
>
>
> Hi James,
>
>
>
> as long as you do not change anything for `sql1`, it should work to
> recover from a savepoint if you pass the `-n`/`--allowNonRestoredState`
> option to the CLI when resuming your program from the savepoint. The reason
> is that an operators generated uid depends on the operator and on its
> inputs.
>
>
>
> I've also pulled in Fabian and Timo who will be able to tell you a little
> bit more about the job modification support for stream SQL.
>
>
>
> Cheers,
> Till
>
>
>
> On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS Data Platform] <
> james...@coupang.com> wrote:
>
> *Hi:*
>
>
>
> *   My application use flink sql, I want to add new sql to the
> application, *
>
>
>
> *For example first version is*
>
>
>
> DataStream paymentCompleteStream = *getKafkaStream*(env,
> bootStrapServers, kafkaGroup, orderPaymentCompleteTopic)
> .flatMap(new
> PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30)
> .returns(TypeInformation.*of*(AggregatedOrderItems.class));
>
> tableEnv.registerDataStream("AggregatedOrderItems", paymentCompleteStream,
> *concatFieldsName*(AggregatedOrderItems.class, true, "eventTs"));
>
> tableEnv.registerFunction("group_concat", new GroupConcatFunction());
>
> Table resultTable = tableEnv.sqlQuery(*sql1*);
> tableEnv.toAppendStream(resultTable, Row.class, qConfig)
> .flatMap(new
> E5FlatmapFunction(resultSampleRate)).setParallelism(30)
> .filter(new FilterFunction() {
> @Override
> public boolean filter(DetectionResult value) throws Exception {
>return (value.getViolationCount() >= 5);
> }
> }).addSink(new DetectionResultMySqlSink());
>
>
>
> *Then second version, I add new sql*
>
>
>
> Table resultTable2 = tableEnv.sqlQuery(*sql2*);
> tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
> .flatMap(new
> A2FlatmapFunction(resultSampleRate)).setParallelism(30)
> .filter(new FilterFunction() {
> @Override
> public boolean filter(DetectionResult value) throws Exception {
> return (value.getViolationCount() >= 5);
> }
> }).addSink(new DetectionResultMySqlSink());
>
>
>
> *After restart job with savepoints, whether the original flink sql can be
> restore success? Whether the flink will assign a new UID to original sql
> operator? (I will not change the original sql)*
>
>
>
> *Regards*
>
>
>
> *James*
>
>
>
>


Re: CEP: Different consuming strategies within a pattern

2018-06-26 Thread Shailesh Jain
Thanks, Dawid.

On Mon, Jun 25, 2018 at 12:48 PM, Dawid Wysakowicz 
wrote:

> Hi Shailesh,
>
> It does not emit results because "followedBy" accepts only the first
> occurrence of matching event. Therefore in your case it only tries to
> construct pattern with start(id=2). Try removing this event and you will
> see it matches the other one.
> If you want to try to construct match with any subsequent start you can
> use "followedByAny", but then remember to add the within clause, as
> otherwise partial matches won't be cleared.
>
> Cheers,
> Dawid
>
> On 25/06/18 08:11, Shailesh Jain wrote:
> > Hi,
> >
> > I'm trying to detect a sequence like A followed by B, C, D.
> > i.e. there is no strict contiguity between A and B, but strict
> > contiguity between B, C and D.
> >
> > Sample test case:
> > https://gist.github.com/jainshailesh/57832683fb5137bd306e4844abd9ef86
> >
> > testStrictFollowedByRelaxedContiguity passes, but
> > testRelaxedFollowedByStrictContiguity fails.
> >
> > I'm not able to understand this behaviour. Am I missing something here?
> >
> > Thanks,
> > Shailesh
> >
> >
> >
> >
>
>