Why ListState of flink don't support update?

2017-08-16 Thread yunfan123
If I want to update the list.
I have to do two steps:
listState.clear()
for (Element e : myList) {
listState.add(e);
}

Why not I update the state by:
listState.update(myList) ?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-ListState-of-flink-don-t-support-update-tp14957.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re:Re:Re:Re:Re:How to verify the data to Elasticsearch whether correct or not ?

2017-08-16 Thread mingleizhang
Ahhh. Sorry Ted. I didnt see the code was broken. Yep, I will directly put the 
text code here. 


Dependency is 

  com.googlecode.protobuf-java-format
  protobuf-java-format
  1.2

And the adding code is like following. This time, although I sink an object to 
Elasticsearch, I convert it to a JSON by JsonFormat.printToString(element). And 
it can solve my issue as I can get my data from Elasticsearch represent as a 
json string, then I can use it to show my data as a front end. 
// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new 
ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new 
ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, AnyRef]
json.put("activityInfo", element)
json.put("mid", element.getMid)
json.put("activity", element.getActivity)
json.put("json", JsonFormat.printToString(element))

Requests.indexRequest().index("filter_event_tracking").`type`("my-type-2").source(json)
  }
override def process(activityInfo: ActivityInfo, runtimeContext: 
RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
  }
}))
Peace.
zhangminglei / mingleizhang



At 2017-08-17 09:21:47, "Ted Yu"  wrote:
Did you use image for the code ?
Can you send plain code again ?
Cheers


 Original message 
From: mingleizhang <18717838...@163.com>
Date: 8/16/17 6:16 PM (GMT-08:00)
To: mingleizhang <18717838...@163.com>
Cc: "Tzu-Li (Gordon) Tai" , user@flink.apache.org
Subject: Re:Re:Re:Re:How to verify the data to Elasticsearch whether correct or 
not ?


I solved the issue by adding a dependency that convert the protobuf objects 
into a JSON. By adding a line of code like below:  element is a PB object.




Thanks.
zhangminglei




At 2017-08-16 22:52:30, "mingleizhang" <18717838...@163.com> wrote:

I looked into the sinked data which in ElasticSearch. Good news I can found it 
is really right there. But but, I sinked the data is an object. But the 
Elasticsearch represent it as a string. I put the related code below.


element type is an ActivityInfo. then, I wrote a java api to read the data. the 
value is a string instead. I want it represented as an object of ActivityInfo. 
But it didnt do like what i want.


Can anybody give me some advice for it ? Thank you very much!










Thanks
zhangminglei / mingleizhang




At 2017-08-16 20:52:34, "mingleizhang" <18717838...@163.com> wrote:



Hi, Gordon.


  I am not sure about this, as far as I know. ElasticSearch often store 
JSON data inside it as it is convenient to create it's index. As refers to my 
code below, I stored the protobuf objects (ActivityInfo which build from 
activityinfo.proto file) in ElasticSearch. And it is a binary data stored in 
it. It is very strange I feel. Flink document just give an example for it's 
data which type belongs to a string as JSON.


Peace,
Zhangminglei





At 2017-08-16 13:27:10, "Tzu-Li (Gordon) Tai"  wrote:

Hi,


I couldn’t spot anything off in the code snippet you provided. So you should be 
ok with this :)


Cheers,
Gordon





On 15 August 2017 at 9:18:59 PM, mingleizhang (18717838...@163.com) wrote:

BTW, ActivityInfo is an PB object build from xxx.proto. And already has it's 
value setted to itself.





At 2017-08-15 21:17:00, "mingleizhang" <18717838...@163.com> wrote:

Hi, flink experts!


I sinked my data ( PB objects ) to elasticsearch. I dont know whether the 
sinked data is correct or incorrect. The codes like following, Could you help 
me check it please ? Im not familar with ES. Now, I want to install a kibana to 
view my data. But I dont know the below codes is correct or incorrect. I ran 
the flink program. it does not give me an error. I just want to confirm.


// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new 
ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new 
ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, ActivityInfo]
json.put("data", element)

Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json)
  }
override def process(activityInfo: ActivityInfo, runtimeContext: 
RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
  }
}))


Thanks
mingleizhang





 




【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>





 




【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>



【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>

Re: Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state

2017-08-16 Thread Chao Wang

Thank you! Nico. That helps me a lot!

2a) That really clarifies my understanding about Flink. Yes, I think I 
have used static references, since I invoked a native function 
(implemented through JNI) which I believe only has one instance per 
process. And I guess the reason why those Java synchronization 
mechanisms were in vain is because of separate function objects at 
runtime, which results in separate lock objects. Now I use c++ mutex 
within the native function and it resolves my case.


BTW, could you elaborate a bit more about what do you mean by 
"per-record base"? what do you mean by a record?


3) I do not intend to store the CoProcessFunction.Context. I was just 
wondering that since the document said it is only valid during the 
invocation, for maintaining custom states of my program logic I guess I 
cannot use it.



Thank you,
Chao


On 08/16/2017 03:31 AM, Nico Kruber wrote:

Hi Chao,

1) regarding the difference of CoFlatMapFunction/CoProcessFunction, let me
quote the javadoc of the CoProcessFunction:

"Contrary to the {@link CoFlatMapFunction}, this function can also query the
time (both event and processing) and set timers, through the provided {@link
Context}. When reacting to the firing of set timers the function can emit yet
more elements."

So, imho, both deliver a different level of abstraction and control (high- vs.
low-level). Also note the different methods available for you to implement.

2a) In general, Flink calls functions on a per-record base in a serialized
fashion per task. For each task at a TaskManager, in case of it having
multiple slots, separate function objects are used where you should only get
in trouble if you share static references. Otherwise you do not need to worry
about thread-safety.

2b) From what I see in the code (StreamTwoInputProcessor), the same should
apply to CoFlatMapFunction and CoProcessFunction so that calls to flatMap1/2
and processElement1/2 are not called in parallel!

3) why would you want to store the CoProcessFunction.Context?


Nico

On Monday, 14 August 2017 18:36:38 CEST Chao Wang wrote:

Hi,

I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe,
and to what extent? What's the difference between the two Functions? and
in general, how does Flink prevent race conditions? Here's my case:

I tried to condition on two input streams and produce the third stream
if the condition is met. I implemented CoFlatMapFunction and tried to
monitor a state using a field in the implemented class (I want to
isolate my application from the checkpointing feature, and therefore I
do not use the states as documented here
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state
.html). The field served as a flag indicating whether there are some pending
data from either input stream, and if yes, processing it along with the
arriving data from the other input stream (the processing invokes a native
function).

But then I got double free error and segmentation fault, which I believe
was due to unintentional concurrent access to the native function. Then
I tried to wrap the access into a synchronized method, as well as
explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain
and the error remained.

I considered using CoProcessFunction in my case, but seems to me that it
does not handle customary internal states, stating in the javadoc "The
context [CoProcessFunction.Context] is only valid during the invocation
of this method, do not store it."



Thanks,
Chao




Exception for Scala anonymous class when restoring from state

2017-08-16 Thread Kien Truong
Hi,

After some refactoring: moving some operator to separate functions/file, I'm 
encountering a lot of exceptions like these. The logic of the application did 
not change, and all the refactored operators are stateless, e.g simple 
map/flatmap/filter.

Does anyone know how to fix/avoid/work around this?

I'm using FsStateBackend on Flink 1.3.2

Regards,
Kien 

java.lang.ClassNotFoundException:

x.x.X$$anon$113$$anon$55
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:305)
at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:321)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

Re: hadoop

2017-08-16 Thread Ted Yu
Can you check the following config in yarn-site.xml ?

yarn.resourcemanager.proxy-user-privileges.enabled (true)

Cheers

On Wed, Aug 16, 2017 at 4:48 PM, Raja.Aravapalli  wrote:

>
>
> Hi,
>
>
>
> I triggered an flink yarn-session on a running Hadoop cluster… and
> triggering streaming application on that.
>
>
>
> But, I see after few days of running without any issues, the flink
> application which is writing data to hdfs failing with below exception.
>
>
>
> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.
> hadoop.security.token.SecretManager$InvalidToken): token
> (HDFS_DELEGATION_TOKEN token xx for xx) can't be found in cache
>
>
>
>
>
> Can someone please help me how I can fix this. Thanks a lot.
>
>
>
>
>
>
>
> Regards,
>
> Raja.
>


Re: hadoop

2017-08-16 Thread Will Du
Is the kerberos token expired without renewing?

> On Aug 16, 2017, at 7:48 PM, Raja.Aravapalli  
> wrote:
> 
>  
> Hi,
>  
> I triggered an flink yarn-session on a running Hadoop cluster… and triggering 
> streaming application on that.
>  
> But, I see after few days of running without any issues, the flink 
> application which is writing data to hdfs failing with below exception.
>  
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  token (HDFS_DELEGATION_TOKEN token xx for xx) can't be found in cache
>  
>  
> Can someone please help me how I can fix this. Thanks a lot.
>  
>  
>  
> Regards,
> Raja.



Re:Re:Re:Re:How to verify the data to Elasticsearch whether correct or not ?

2017-08-16 Thread Ted Yu
Did you use image for the code ?Can you send plain code again ?Cheers
 Original message From: mingleizhang <18717838...@163.com> 
Date: 8/16/17  6:16 PM  (GMT-08:00) To: mingleizhang <18717838...@163.com> Cc: 
"Tzu-Li (Gordon) Tai" , user@flink.apache.org Subject: 
Re:Re:Re:Re:How to verify the data to Elasticsearch whether correct or not ? 
I solved the issue by adding a dependency that convert the protobuf objects 
into a JSON. By adding a line of code like below:  element is a PB object.

Thanks.zhangminglei


At 2017-08-16 22:52:30, "mingleizhang" <18717838...@163.com> wrote:
 I looked into the sinked data which in ElasticSearch. Good news I can found it 
is really right there. But but, I sinked the data is an object. But the 
Elasticsearch represent it as a string. I put the related code below.
element type is an ActivityInfo. then, I wrote a java api to read the data. the 
value is a string instead. I want it represented as an object of ActivityInfo. 
But it didnt do like what i want.
Can anybody give me some advice for it ? Thank you very much!




Thankszhangminglei / mingleizhang


At 2017-08-16 20:52:34, "mingleizhang" <18717838...@163.com> wrote:
 
Hi, Gordon.
      I am not sure about this, as far as I know. ElasticSearch often store 
JSON data inside it as it is convenient to create it's index. As refers to my 
code below, I stored the protobuf objects (ActivityInfo which build from 
activityinfo.proto file) in ElasticSearch. And it is a binary data stored in 
it. It is very strange I feel. Flink document just give an example for it's 
data which type belongs to a string as JSON.
Peace,Zhangminglei



At 2017-08-16 13:27:10, "Tzu-Li (Gordon) Tai"  wrote:
 body{font-family:Helvetica,Arial;font-size:13px}Hi,
I couldn’t spot anything off in the code snippet you provided. So you should be 
ok with this :)
Cheers,Gordon 
  
On 15 August 2017 at 9:18:59 PM, mingleizhang (18717838...@163.com) wrote: 






BTW, ActivityInfo is an PB object build from xxx.proto. And already
has it's value setted to itself.











At 2017-08-15 21:17:00, "mingleizhang" <18717838...@163.com>
wrote:



Hi, flink experts!


I sinked my data ( PB objects ) to elasticsearch. I dont know
whether the sinked data is correct or incorrect. The codes like
following, Could you help me check it please ? Im not familar with
ES. Now, I want to install a kibana to view my data. But I dont
know the below codes is correct or incorrect. I ran the flink
program. it does not give me an error. I just want to
confirm.



// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new 
ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new 
ElasticsearchSinkFunction[ActivityInfo] {
  def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, ActivityInfo]
json.put("data", element)

Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json)
  }
  override def process(activityInfo: ActivityInfo, runtimeContext: 
RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
  }
}))


Thanks
mingleizhang







 








【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>
       






 

【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>    
    

【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>    
    

Re:Re:Re:Re:How to verify the data to Elasticsearch whether correct or not ?

2017-08-16 Thread mingleizhang
I solved the issue by adding a dependency that convert the protobuf objects 
into a JSON. By adding a line of code like below:  element is a PB object.




Thanks.
zhangminglei




At 2017-08-16 22:52:30, "mingleizhang" <18717838...@163.com> wrote:

I looked into the sinked data which in ElasticSearch. Good news I can found it 
is really right there. But but, I sinked the data is an object. But the 
Elasticsearch represent it as a string. I put the related code below.


element type is an ActivityInfo. then, I wrote a java api to read the data. the 
value is a string instead. I want it represented as an object of ActivityInfo. 
But it didnt do like what i want.


Can anybody give me some advice for it ? Thank you very much!










Thanks
zhangminglei / mingleizhang




At 2017-08-16 20:52:34, "mingleizhang" <18717838...@163.com> wrote:



Hi, Gordon.


  I am not sure about this, as far as I know. ElasticSearch often store 
JSON data inside it as it is convenient to create it's index. As refers to my 
code below, I stored the protobuf objects (ActivityInfo which build from 
activityinfo.proto file) in ElasticSearch. And it is a binary data stored in 
it. It is very strange I feel. Flink document just give an example for it's 
data which type belongs to a string as JSON.


Peace,
Zhangminglei





At 2017-08-16 13:27:10, "Tzu-Li (Gordon) Tai"  wrote:

Hi,


I couldn’t spot anything off in the code snippet you provided. So you should be 
ok with this :)


Cheers,
Gordon





On 15 August 2017 at 9:18:59 PM, mingleizhang (18717838...@163.com) wrote:

BTW, ActivityInfo is an PB object build from xxx.proto. And already has it's 
value setted to itself.





At 2017-08-15 21:17:00, "mingleizhang" <18717838...@163.com> wrote:

Hi, flink experts!


I sinked my data ( PB objects ) to elasticsearch. I dont know whether the 
sinked data is correct or incorrect. The codes like following, Could you help 
me check it please ? Im not familar with ES. Now, I want to install a kibana to 
view my data. But I dont know the below codes is correct or incorrect. I ran 
the flink program. it does not give me an error. I just want to confirm.


// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new 
ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new 
ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, ActivityInfo]
json.put("data", element)

Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json)
  }
override def process(activityInfo: ActivityInfo, runtimeContext: 
RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
  }
}))


Thanks
mingleizhang





 




【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>





 




【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>

Re: [EXTERNAL] difference between checkpoints & savepoints

2017-08-16 Thread Raja . Aravapalli

Thanks very much for the detailed explanation Stefan.


Regards,
Raja.

From: Stefan Richter 
Date: Monday, August 14, 2017 at 7:47 AM
To: Raja Aravapalli 
Cc: "user@flink.apache.org" 
Subject: Re: [EXTERNAL] difference between checkpoints & savepoints

Just noticed that I forgot to include also a reference to the documentation 
about externalized checkpoints: 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/checkpoints.html

Am 14.08.2017 um 14:17 schrieb Stefan Richter 
>:


Hi,



Also, in the same line, can someone detail the difference between State Backend 
& External checkpoint?


Those are two very different things. If we talk about state backends in Flink, 
we mean the entity that is responsible for storing and managing the state 
inside an operator. This could for example be something like the FsStateBackend 
that is based on hash maps and keeps state on the heap, or the 
RocksDBStateBackend which is using RocksDB as a store internally and operates 
on native memory and disk.

An externalized checkpoint, like a normal checkpoint, is the collection of all 
state in a job persisted to stable storage for recovery. A little more 
concrete, this typically means writing out the contents of the state backends 
to a save place so that we can restore them from there.


Also, programmatic API, thru which methods we can configure those.

This explains how to set the backend programatically:

https://ci.apache.org/projects/flink/flink-docs-master/ops/state/state_backends.html

To activate externalized checkpoints, you activate normal checkpoints, plus the 
following line:


env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

where env is your StreamExecutionEnvironment.

If you need an example, please take a look at the 
org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase. This class 
configures everything you asked about programatically.

Best,
Stefan




hadoop

2017-08-16 Thread Raja . Aravapalli

Hi,

I triggered an flink yarn-session on a running Hadoop cluster… and triggering 
streaming application on that.

But, I see after few days of running without any issues, the flink application 
which is writing data to hdfs failing with below exception.

Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (HDFS_DELEGATION_TOKEN token xx for xx) can't be found in cache


Can someone please help me how I can fix this. Thanks a lot.



Regards,
Raja.


Re: Access to datastream from BucketSink- RESOLVED

2017-08-16 Thread ant burton
I have resolved my issue, thank you for your help.

The following code give me access to an element to determine a bucket directory 
name.

import org.apache.hadoop.fs.Path;
import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer;

import org.apache.flink.streaming.connectors.fs.Clock;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.flink.api.java.tuple.Tuple2;

public class S3Bucketer implements Bucketer {
private static final long serialVersionUID = 1L;

@Override
public Path getBucketPath(Clock clock, Path basePath, String element) {
// Now that we have access to element, we can
// generate a s3 filename path from it
String s3_filename_path = "";

return new Path(s3_filename_path);
}
}


> On 16 Aug 2017, at 16:06, Kostas Kloudas  wrote:
> 
> Hi Ant,
> 
> I think you are implementing the wrong Bucketer. 
> This seems to be the one for the RollingSink which is deprecated. 
> Is this correct?
> 
> You should implement the BucketingSink one, which is in the package:
> 
> org.apache.flink.streaming.connectors.fs.bucketing
> 
> That one requires the implementation of 1 method with signature:
> 
> Path getBucketPath(Clock clock, Path basePath, T element);
> 
> which from what I understand from you requirements gives you access 
> to the element that you need.
> 
> Cheers,
> Kostas
> 
>> On Aug 16, 2017, at 3:31 PM, ant burton  wrote:
>> 
>> 
>> Thanks Kostas,
>> 
>> I’m narrowing in on a solution:
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html
>>  says "You can also specify a custom bucketer by using setBucketer() on a 
>> BucketingSink. If desired, the bucketer can use a property of the element or 
>> tuple to determine the bucket directory.”
>> 
>> BucketingSink sink = new BucketingSink("/base/path");
>> sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
>> Therefore I’ve created a skeleton class:
>> 
>> public class S3Bucketer implements Bucketer {
>>  private static final long serialVersionUID = 1L;
>> 
>>  private final String formatString;
>> 
>>  public S3Bucketer() {
>>  }
>> 
>>  private void readObject(ObjectInputStream in) {
>>  in.defaultReadObject();
>>  }
>> 
>>  public boolean shouldStartNewBucket(Path basePath, Path 
>> currentBucketPath) {
>>  return true;
>>  }
>> 
>>  public Path getNextBucketPath(Path basePath) {
>>  return new Path(basePath + 
>> “/some-path-that-I-need-create-from-the-stream");
>>  }
>> }
>> 
>> my question now is how do I access the data stream from within the 
>> S3Bucketer so that I can generate a filename based on the data with the data 
>> stream.
>> 
>> Thanks,
>> 
>>> On 16 Aug 2017, at 12:55, Kostas Kloudas  
>>> wrote:
>>> 
>>> In the second link for the BucketingSink, you can set your 
>>> own Bucketer using the setBucketer method. You do not have to 
>>> implement your own sink from scratch.
>>> 
>>> Kostas
>>> 
 On Aug 16, 2017, at 1:39 PM, ant burton  wrote:
 
 or rather 
 https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
 
 
> On 16 Aug 2017, at 12:24, Kostas Kloudas  
> wrote:
> 
> Hi Ant,
> 
> I think you can do it by implementing your own Bucketer.
> 
> Cheers,
> Kostas
> 
> .
>> On Aug 16, 2017, at 1:09 PM, ant burton  wrote:
>> 
>> Hello,
>> 
>> Given 
>> 
>>  // Set StreamExecutionEnvironment
>>  final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>  // Set checkpoints in ms
>>  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>>  // Add source (input stream)
>>  DataStream dataStream = StreamUtil.getDataStream(env, 
>> params);
>> 
>> How can I construct the s3_filename from the content of the an event, it 
>> seems that whenever I attempt this I either have access to an event or 
>> access to .addSink but not both.
>> 
>>  dataStream.addSink(new BucketingSink("s3a://flink/" + 
>> s3_filename));
>> 
>> 
>> Thanks,
>> 
>> 
>> 
>> 
> 
 
>>> 
>> 
> 



Re: Access to datastream from BucketSink

2017-08-16 Thread ant burton
Thank you for your help it’s greatly appreciated.

My aim is to be able “ use a property of the element to determine the bucket 
directory”

With your suggestions, this is what I have so far, its obviously wrong, I hope 
I’m getting closer.

Is it correct to still implement Bucketer, just change where it is imported 
from? or do I need to import BucketingSink ?

import org.apache.hadoop.fs.Path;
import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer; // I think 
this is wrong
import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.connectors.fs.Clock;

public class S3Bucketer implements Bucketer {
public Path getBucketPath(Clock clock, Path basePath, String element) {
// Now that we have access to element, we can
// generate a s3 filename path from it
String s3_filename_path = "";

return new Path(s3_filename_path);
}
}

Apologies my Java is limited at the present.

Thanks,

> On 16 Aug 2017, at 16:06, Kostas Kloudas  wrote:
> 
> Hi Ant,
> 
> I think you are implementing the wrong Bucketer. 
> This seems to be the one for the RollingSink which is deprecated. 
> Is this correct?
> 
> You should implement the BucketingSink one, which is in the package:
> 
> org.apache.flink.streaming.connectors.fs.bucketing
> 
> That one requires the implementation of 1 method with signature:
> 
> Path getBucketPath(Clock clock, Path basePath, T element);
> 
> which from what I understand from you requirements gives you access 
> to the element that you need.
> 
> Cheers,
> Kostas
> 
>> On Aug 16, 2017, at 3:31 PM, ant burton  wrote:
>> 
>> 
>> Thanks Kostas,
>> 
>> I’m narrowing in on a solution:
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html
>>  says "You can also specify a custom bucketer by using setBucketer() on a 
>> BucketingSink. If desired, the bucketer can use a property of the element or 
>> tuple to determine the bucket directory.”
>> 
>> BucketingSink sink = new BucketingSink("/base/path");
>> sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
>> Therefore I’ve created a skeleton class:
>> 
>> public class S3Bucketer implements Bucketer {
>>  private static final long serialVersionUID = 1L;
>> 
>>  private final String formatString;
>> 
>>  public S3Bucketer() {
>>  }
>> 
>>  private void readObject(ObjectInputStream in) {
>>  in.defaultReadObject();
>>  }
>> 
>>  public boolean shouldStartNewBucket(Path basePath, Path 
>> currentBucketPath) {
>>  return true;
>>  }
>> 
>>  public Path getNextBucketPath(Path basePath) {
>>  return new Path(basePath + 
>> “/some-path-that-I-need-create-from-the-stream");
>>  }
>> }
>> 
>> my question now is how do I access the data stream from within the 
>> S3Bucketer so that I can generate a filename based on the data with the data 
>> stream.
>> 
>> Thanks,
>> 
>>> On 16 Aug 2017, at 12:55, Kostas Kloudas  
>>> wrote:
>>> 
>>> In the second link for the BucketingSink, you can set your 
>>> own Bucketer using the setBucketer method. You do not have to 
>>> implement your own sink from scratch.
>>> 
>>> Kostas
>>> 
 On Aug 16, 2017, at 1:39 PM, ant burton  wrote:
 
 or rather 
 https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
 
 
> On 16 Aug 2017, at 12:24, Kostas Kloudas  
> wrote:
> 
> Hi Ant,
> 
> I think you can do it by implementing your own Bucketer.
> 
> Cheers,
> Kostas
> 
> .
>> On Aug 16, 2017, at 1:09 PM, ant burton  wrote:
>> 
>> Hello,
>> 
>> Given 
>> 
>>  // Set StreamExecutionEnvironment
>>  final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>  // Set checkpoints in ms
>>  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>>  // Add source (input stream)
>>  DataStream dataStream = StreamUtil.getDataStream(env, 
>> params);
>> 
>> How can I construct the s3_filename from the content of the an event, it 
>> seems that whenever I attempt this I either have access to an event or 
>> access to .addSink but not both.
>> 
>>  dataStream.addSink(new BucketingSink("s3a://flink/" + 
>> s3_filename));
>> 
>> 
>> Thanks,
>> 
>> 
>> 
>> 
> 
 
>>> 
>> 
> 



Re: Reload DistributedCache file?

2017-08-16 Thread Ted Yu
For hdfs, there is iNotify mechanism.

https://issues.apache.org/jira/browse/HDFS-6634

https://www.slideshare.net/Hadoop_Summit/keep-me-in-the-loop-inotify-in-hdfs

FYI

On Wed, Aug 16, 2017 at 9:41 AM, Conrad Crampton <
conrad.cramp...@secdata.com> wrote:

> Hi,
>
> I have a simple text file that is stored in HDFS which I use in a
> RichFilterFunction by way of DistributedCache file. The file is externally
> edited periodically to have other lines added to it. My FilterFunction also
> implements Runnable whose run method is run as a scheduleAtFixedRate method
> of ScheduledExectutorService which reloads the file and stores the results
> in a List in the Filter class.
>
>
>
> I have realized the errors of my ways as the file that is reloaded is the
> cached file that is copied to temporary file location on the node which
> this instance of Filter class is loaded and not the file from HDFS directly
> (as this has been copied when the Flink job started.
>
>
>
> Can anyone suggest a solution to this? It is I think a similar problem
> that Add Side Inputs in Flink [1] proposal is trying to address but not
> finalized yet.
>
> Can anyone see a problem if I have a thread that reloads the HDFS file
> being in the main body of my Flink program and registers the cache file
> within that reload process e.g.
>
>
>
> env.registerCachedFile(properties.getProperty(*"whitelist.location"*),
> *WHITELIST*);
>
>
>
> i.e. does this actually copy the file again from HDFS to temporary files
> on each node? I think I’d have to have the same schedule I have currently
> that reload within my Filter function too though as all the previous
> process would do is to push the HDFS file to temp location and not actually
> refresh my List.
>
>
>
> Any suggestions would be welcome.
>
>
>
> Thanks
>
> Conrad
>
>
>
> [1] https://docs.google.com/document/d/1hIgxi2Zchww_
> 5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit#heading=h.pqg5z6g0mjm7
>
>
> SecureData, combating cyber threats
>
> --
>
> The information contained in this message or any of its attachments may be
> privileged and confidential and intended for the exclusive use of the
> intended recipient. If you are not the intended recipient any disclosure,
> reproduction, distribution or other dissemination or use of this
> communications is strictly prohibited. The views expressed in this email
> are those of the individual and not necessarily of SecureData Europe Ltd.
> Any prices quoted are only valid if followed up by a formal written quote.
>
> SecureData Europe Limited. Registered in England & Wales 04365896.
> Registered Address: SecureData House, Hermitage Court, Hermitage Lane,
> Maidstone, Kent, ME16 9NT
>


Reload DistributedCache file?

2017-08-16 Thread Conrad Crampton
Hi,
I have a simple text file that is stored in HDFS which I use in a 
RichFilterFunction by way of DistributedCache file. The file is externally 
edited periodically to have other lines added to it. My FilterFunction also 
implements Runnable whose run method is run as a scheduleAtFixedRate method of 
ScheduledExectutorService which reloads the file and stores the results in a 
List in the Filter class.

I have realized the errors of my ways as the file that is reloaded is the 
cached file that is copied to temporary file location on the node which this 
instance of Filter class is loaded and not the file from HDFS directly (as this 
has been copied when the Flink job started.

Can anyone suggest a solution to this? It is I think a similar problem that Add 
Side Inputs in Flink [1] proposal is trying to address but not finalized yet.
Can anyone see a problem if I have a thread that reloads the HDFS file being in 
the main body of my Flink program and registers the cache file within that 
reload process e.g.

env.registerCachedFile(properties.getProperty("whitelist.location"), WHITELIST);

i.e. does this actually copy the file again from HDFS to temporary files on 
each node? I think I’d have to have the same schedule I have currently that 
reload within my Filter function too though as all the previous process would 
do is to push the HDFS file to temp location and not actually refresh my List.

Any suggestions would be welcome.

Thanks
Conrad

[1] 
https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit#heading=h.pqg5z6g0mjm7


SecureData, combating cyber threats
__ 
The information contained in this message or any of its attachments may be 
privileged and confidential and intended for the exclusive use of the intended 
recipient. If you are not the intended recipient any disclosure, reproduction, 
distribution or other dissemination or use of this communications is strictly 
prohibited. The views expressed in this email are those of the individual and 
not necessarily of SecureData Europe Ltd. Any prices quoted are only valid if 
followed up by a formal written quote.

SecureData Europe Limited. Registered in England & Wales 04365896. Registered 
Address: SecureData House, Hermitage Court, Hermitage Lane, Maidstone, Kent, 
ME16 9NT


Re: stream partitioning to avoid network overhead

2017-08-16 Thread Karthik Deivasigamani
Thanks Urs for your inputs.

Yes we use AsyncIO operator for our webservice calls.

We were considering increasing the kafka partitions and increasing the
parallelism on the source to match the webservice operator. Wasn't quite
sure if this was the only way to achieve operator chaining. Thanks for
clarifying this. We will surely try this.

Initially when we ran the job - the webservice call parallelism was higher
than the downstream operators(parser) parallelism. What we observed was
after about 48hrs the memory usage by flink taskmanager was ~98% and system
load was too high. When we chained the webservice and parser operators
together by setting the same parallelism this problem went away completely.

What we were wondering is if setting the same parallelism on all the
operators is the standard and desired way to achieve operator chaining or
is there an alternate approach to achieve the same.
~
Karthik



On Fri, Aug 11, 2017 at 7:25 PM, Urs Schoenenberger <
urs.schoenenber...@tngtech.com> wrote:

> Hi Karthik,
>
> maybe I'm misunderstanding, but there are a few things in your
> description that seem strange to me:
>
> - Your "slow" operator seems to be slow not because it's compute-heavy,
> but because it's waiting for a response. Is AsyncIO (
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
> asyncio.html
> ) an option for you? It's a more natural approach to this issue compared
> to raising the parallelism.
>
> - A flink task slot can hold an instance of each operator. If you have
> two (non-chained) operators, one with parallelism 1, one with
> parallelism 2, you will use two slots, not three. You are not wasting
> slots by having source parallelism 3 instead of 1.
>
> - Therefore, it would in my opinion make sense to run all your operators
> at the same parallelism. Operator chaining is desirable! Just run at the
> max parallelism (the one you'd give to the webservice operator) and make
> sure your Kafka topic has enough partitions to serve this parallelism.
> If you're going the Async I/O way, you probably don't even need high
> parallelism, but even with the sync implementation I can't think of a
> big problem with running the chained operator at max parallelism.
>
> - Minor point: I guess you have measured this, but I'm a little confused
> why the intra-Flink network traffic would cause significant problems: It
> seems to be the same amount of data that you're querying from the
> external webservice, and that connection should be a bottleneck before
> the intra-cluster network becomes one?
>
>
> Best regards,
> Urs
>
> On 11.08.2017 04:54, Karthik Deivasigamani wrote:
> > Hi,
> >
> >I have a use case where we read messages from a Kafka topic and
> invoke a
> > webservice. The web-service call can take a take couple of seconds and
> then
> > gives us back on avg 800KB of data. This data is set to another operator
> > which does the parsing and then it gets sent to sink which saves the
> > processed data in a NoSQL db. The app looks like this :
> >
> > [image: Inline image 1]
> > Since my payload from the web service is large a lot of data is
> transferred
> > over the network and this is becoming a bottle neck.
> >
> > Lets say *I have 6 slots per node and I would like to have 1 slot for
> > source, 3 slots for web service calls, 2 for parser and 1 for my sink*.
> > This way all the processing can happen locally and there is no network
> > overhead. I have tried *stream.forward() *but it requires that the down
> > stream operator has the same number of parallelism as the one emitting
> > data. Next I tried *stream.rescale()* and that does not schedule the task
> > as I would expect it given the parallelism's on the operators are all
> > multiple of each other (my flink cluster has enough empty slots and
> > capacity).
> >
> >
> > [image: Inline image 2]
> >
> >
> > Is there a way to schedule my task's in a fashion where there is no data
> > transfer over the network. I was able to do this in apache storm by using
> > localOrShuffle grouping. Not sure how to acehive the same in flink. Any
> > pointers would be really helpful.
> >
> > For now I have solved this problem by having the same parallelism on the
> > web-service operator, parser, sink which causes flink to chain these
> > operator together and execute them in the same thread.But ideally I would
> > like to have more instances of the slow operator and less instances of my
> > fast operator.
> >
> > ~
> > Karthik
> >
> >
> >
> > Hi,
> >
> >I have a use case where we read messages from a Kafka topic and
> > invoke a webservice. The web-service call can take a take couple of
> > seconds and then gives us back on avg 800KB of data. This data is set to
> > another operator which does the parsing and then it gets sent to sink
> > which saves the processed data in a NoSQL db. The app looks like this :
> >
> > Inline image 1
> > Since my payload from the web service is large a lot of data is
> > transferred over 

JobManager HA behind load balancer

2017-08-16 Thread Shannon Carey
Is anyone running multiple JobManagers (in High Availability mode) behind a 
load balancer such as an AWS ELB or a software proxy such as HAProxy or Nginx?

Right now, it appears that server-side redirects that come from the JobManager 
Web UI use the internal IP address of the JobManager (from Akka). Therefore, if 
you're accessing your JobManager via a DNS name or a load balancing proxy, the 
redirect doesn't work properly. Has anyone created a workaround for this?

If there's no workaround, should we perhaps add a config setting to the 
JobManager to tell it what DNS name or root URL to use when sending redirect 
responses?

Also, it looks like at least some types of requests are not supported by 
non-master JobManagers, and therefore they respond with a redirect to the 
internal address of the master. Is it necessary to integrate the proxy with 
Zookeeper so that requests will only be proxied to the master JobManager? If 
the non-master nodes only send redirects, then including them as upstream 
servers in the proxy would be problematic.

Thanks for the info,
Shannon


Re: Access to datastream from BucketSink

2017-08-16 Thread Kostas Kloudas
Hi Ant,

I think you are implementing the wrong Bucketer. 
This seems to be the one for the RollingSink which is deprecated. 
Is this correct?

You should implement the BucketingSink one, which is in the package:

org.apache.flink.streaming.connectors.fs.bucketing

That one requires the implementation of 1 method with signature:

Path getBucketPath(Clock clock, Path basePath, T element);

which from what I understand from you requirements gives you access 
to the element that you need.

Cheers,
Kostas

> On Aug 16, 2017, at 3:31 PM, ant burton  wrote:
> 
> 
> Thanks Kostas,
> 
> I’m narrowing in on a solution:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html
>  says "You can also specify a custom bucketer by using setBucketer() on a 
> BucketingSink. If desired, the bucketer can use a property of the element or 
> tuple to determine the bucket directory.”
> 
> BucketingSink sink = new BucketingSink("/base/path");
> sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
> Therefore I’ve created a skeleton class:
> 
> public class S3Bucketer implements Bucketer {
>   private static final long serialVersionUID = 1L;
> 
>   private final String formatString;
> 
>   public S3Bucketer() {
>   }
> 
>   private void readObject(ObjectInputStream in) {
>   in.defaultReadObject();
>   }
> 
>   public boolean shouldStartNewBucket(Path basePath, Path 
> currentBucketPath) {
>   return true;
>   }
> 
>   public Path getNextBucketPath(Path basePath) {
>   return new Path(basePath + 
> “/some-path-that-I-need-create-from-the-stream");
>   }
> }
> 
> my question now is how do I access the data stream from within the S3Bucketer 
> so that I can generate a filename based on the data with the data stream.
> 
> Thanks,
> 
>> On 16 Aug 2017, at 12:55, Kostas Kloudas  wrote:
>> 
>> In the second link for the BucketingSink, you can set your 
>> own Bucketer using the setBucketer method. You do not have to 
>> implement your own sink from scratch.
>> 
>> Kostas
>> 
>>> On Aug 16, 2017, at 1:39 PM, ant burton  wrote:
>>> 
>>> or rather 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>>> 
>>> 
 On 16 Aug 2017, at 12:24, Kostas Kloudas  
 wrote:
 
 Hi Ant,
 
 I think you can do it by implementing your own Bucketer.
 
 Cheers,
 Kostas
 
 .
> On Aug 16, 2017, at 1:09 PM, ant burton  wrote:
> 
> Hello,
> 
> Given 
> 
>   // Set StreamExecutionEnvironment
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
>   // Set checkpoints in ms
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 
>   // Add source (input stream)
>   DataStream dataStream = StreamUtil.getDataStream(env, 
> params);
> 
> How can I construct the s3_filename from the content of the an event, it 
> seems that whenever I attempt this I either have access to an event or 
> access to .addSink but not both.
> 
>   dataStream.addSink(new BucketingSink("s3a://flink/" + 
> s3_filename));
> 
> 
> Thanks,
> 
> 
> 
> 
 
>>> 
>> 
> 



Re:Re:Re:How to verify the data to Elasticsearch whether correct or not ?

2017-08-16 Thread mingleizhang
I looked into the sinked data which in ElasticSearch. Good news I can found it 
is really right there. But but, I sinked the data is an object. But the 
Elasticsearch represent it as a string. I put the related code below.


element type is an ActivityInfo. then, I wrote a java api to read the data. the 
value is a string instead. I want it represented as an object of ActivityInfo. 
But it didnt do like what i want.


Can anybody give me some advice for it ? Thank you very much!










Thanks
zhangminglei / mingleizhang




At 2017-08-16 20:52:34, "mingleizhang" <18717838...@163.com> wrote:



Hi, Gordon.


  I am not sure about this, as far as I know. ElasticSearch often store 
JSON data inside it as it is convenient to create it's index. As refers to my 
code below, I stored the protobuf objects (ActivityInfo which build from 
activityinfo.proto file) in ElasticSearch. And it is a binary data stored in 
it. It is very strange I feel. Flink document just give an example for it's 
data which type belongs to a string as JSON.


Peace,
Zhangminglei





At 2017-08-16 13:27:10, "Tzu-Li (Gordon) Tai"  wrote:

Hi,


I couldn’t spot anything off in the code snippet you provided. So you should be 
ok with this :)


Cheers,
Gordon





On 15 August 2017 at 9:18:59 PM, mingleizhang (18717838...@163.com) wrote:

BTW, ActivityInfo is an PB object build from xxx.proto. And already has it's 
value setted to itself.





At 2017-08-15 21:17:00, "mingleizhang" <18717838...@163.com> wrote:

Hi, flink experts!


I sinked my data ( PB objects ) to elasticsearch. I dont know whether the 
sinked data is correct or incorrect. The codes like following, Could you help 
me check it please ? Im not familar with ES. Now, I want to install a kibana to 
view my data. But I dont know the below codes is correct or incorrect. I ran 
the flink program. it does not give me an error. I just want to confirm.


// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new 
ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new 
ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, ActivityInfo]
json.put("data", element)

Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json)
  }
override def process(activityInfo: ActivityInfo, runtimeContext: 
RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
  }
}))


Thanks
mingleizhang





 




【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>





 

Re: Change state backend.

2017-08-16 Thread Ted Yu
I guess shashank meant switching state backend w.r.t. savepoints.

On Wed, Aug 16, 2017 at 4:00 AM, Biplob Biswas 
wrote:

> Could you clarify a bit more? Do you want an existing state on a running
> job
> to be migrated from FsStateBackend to RocksDbStateBackend?
>
> Or
>
> Do you have the option of restarting your job after changing existing code?
>
>
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Change-state-
> backend-tp14928p14930.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Access to datastream from BucketSink

2017-08-16 Thread ant burton

Thanks Kostas,

I’m narrowing in on a solution:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html
 

 says "You can also specify a custom bucketer by using setBucketer() on a 
BucketingSink. If desired, the bucketer can use a property of the element or 
tuple to determine the bucket directory.”

BucketingSink sink = new BucketingSink("/base/path");
sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
Therefore I’ve created a skeleton class:

public class S3Bucketer implements Bucketer {
private static final long serialVersionUID = 1L;

private final String formatString;

public S3Bucketer() {
}

private void readObject(ObjectInputStream in) {
in.defaultReadObject();
}

public boolean shouldStartNewBucket(Path basePath, Path 
currentBucketPath) {
return true;
}

public Path getNextBucketPath(Path basePath) {
return new Path(basePath + 
“/some-path-that-I-need-create-from-the-stream");
}
}

my question now is how do I access the data stream from within the S3Bucketer 
so that I can generate a filename based on the data with the data stream.

Thanks,

> On 16 Aug 2017, at 12:55, Kostas Kloudas  wrote:
> 
> In the second link for the BucketingSink, you can set your 
> own Bucketer using the setBucketer method. You do not have to 
> implement your own sink from scratch.
> 
> Kostas
> 
>> On Aug 16, 2017, at 1:39 PM, ant burton > > wrote:
>> 
>> or rather 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>>  
>> 
>> 
>> 
>>> On 16 Aug 2017, at 12:24, Kostas Kloudas >> > wrote:
>>> 
>>> Hi Ant,
>>> 
>>> I think you can do it by implementing your own Bucketer.
>>> 
>>> Cheers,
>>> Kostas
>>> 
>>> .
 On Aug 16, 2017, at 1:09 PM, ant burton > wrote:
 
 Hello,
 
 Given 
 
   // Set StreamExecutionEnvironment
   final StreamExecutionEnvironment env = 
 StreamExecutionEnvironment.getExecutionEnvironment();
 
   // Set checkpoints in ms
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
   // Add source (input stream)
   DataStream dataStream = StreamUtil.getDataStream(env, 
 params);
 
 How can I construct the s3_filename from the content of the an event, it 
 seems that whenever I attempt this I either have access to an event or 
 access to .addSink but not both.
 
dataStream.addSink(new BucketingSink("s3a://flink/ 
 " + s3_filename));
 
 
 Thanks,
 
 
 
 
>>> 
>> 
> 



Re:Re:How to verify the data to Elasticsearch whether correct or not ?

2017-08-16 Thread mingleizhang


Hi, Gordon.


  I am not sure about this, as far as I know. ElasticSearch often store 
JSON data inside it as it is convenient to create it's index. As refers to my 
code below, I stored the protobuf objects (ActivityInfo which build from 
activityinfo.proto file) in ElasticSearch. And it is a binary data stored in 
it. It is very strange I feel. Flink document just give an example for it's 
data which type belongs to a string as JSON.


Peace,
Zhangminglei





At 2017-08-16 13:27:10, "Tzu-Li (Gordon) Tai"  wrote:

Hi,


I couldn’t spot anything off in the code snippet you provided. So you should be 
ok with this :)


Cheers,
Gordon





On 15 August 2017 at 9:18:59 PM, mingleizhang (18717838...@163.com) wrote:

BTW, ActivityInfo is an PB object build from xxx.proto. And already has it's 
value setted to itself.





At 2017-08-15 21:17:00, "mingleizhang" <18717838...@163.com> wrote:

Hi, flink experts!


I sinked my data ( PB objects ) to elasticsearch. I dont know whether the 
sinked data is correct or incorrect. The codes like following, Could you help 
me check it please ? Im not familar with ES. Now, I want to install a kibana to 
view my data. But I dont know the below codes is correct or incorrect. I ran 
the flink program. it does not give me an error. I just want to confirm.


// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new 
ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new 
ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, ActivityInfo]
json.put("data", element)

Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json)
  }
override def process(activityInfo: ActivityInfo, runtimeContext: 
RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
  }
}))


Thanks
mingleizhang





 




【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>

Re: Access to datastream from BucketSink

2017-08-16 Thread Kostas Kloudas
In the second link for the BucketingSink, you can set your 
own Bucketer using the setBucketer method. You do not have to 
implement your own sink from scratch.

Kostas

> On Aug 16, 2017, at 1:39 PM, ant burton  wrote:
> 
> or rather 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>  
> 
> 
> 
>> On 16 Aug 2017, at 12:24, Kostas Kloudas > > wrote:
>> 
>> Hi Ant,
>> 
>> I think you can do it by implementing your own Bucketer.
>> 
>> Cheers,
>> Kostas
>> 
>> .
>>> On Aug 16, 2017, at 1:09 PM, ant burton >> > wrote:
>>> 
>>> Hello,
>>> 
>>> Given 
>>> 
>>>   // Set StreamExecutionEnvironment
>>>   final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> 
>>>   // Set checkpoints in ms
>>>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> 
>>>   // Add source (input stream)
>>>   DataStream dataStream = StreamUtil.getDataStream(env, params);
>>> 
>>> How can I construct the s3_filename from the content of the an event, it 
>>> seems that whenever I attempt this I either have access to an event or 
>>> access to .addSink but not both.
>>> 
>>> dataStream.addSink(new BucketingSink("s3a://flink/ 
>>> " + s3_filename));
>>> 
>>> 
>>> Thanks,
>>> 
>>> 
>>> 
>>> 
>> 
> 



Re: Access to datastream from BucketSink

2017-08-16 Thread ant burton
or rather 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
 



> On 16 Aug 2017, at 12:24, Kostas Kloudas  wrote:
> 
> Hi Ant,
> 
> I think you can do it by implementing your own Bucketer.
> 
> Cheers,
> Kostas
> 
> .
>> On Aug 16, 2017, at 1:09 PM, ant burton  wrote:
>> 
>> Hello,
>> 
>> Given 
>> 
>>   // Set StreamExecutionEnvironment
>>   final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>   // Set checkpoints in ms
>>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>>   // Add source (input stream)
>>   DataStream dataStream = StreamUtil.getDataStream(env, params);
>> 
>> How can I construct the s3_filename from the content of the an event, it 
>> seems that whenever I attempt this I either have access to an event or 
>> access to .addSink but not both.
>> 
>>  dataStream.addSink(new BucketingSink("s3a://flink/" + 
>> s3_filename));
>> 
>> 
>> Thanks,
>> 
>> 
>> 
>> 
> 



Re: Access to datastream from BucketSink

2017-08-16 Thread ant burton

I am I on the right path with the following:

class S3SinkFunc implements SinkFunction {
public void invoke(String element) {
System.out.println(element);
// don't have access to dataStream to call .addSink() :-(
}
}

Thanks,

> On 16 Aug 2017, at 12:24, Kostas Kloudas  wrote:
> 
> Hi Ant,
> 
> I think you can do it by implementing your own Bucketer.
> 
> Cheers,
> Kostas
> 
> .
>> On Aug 16, 2017, at 1:09 PM, ant burton  wrote:
>> 
>> Hello,
>> 
>> Given 
>> 
>>   // Set StreamExecutionEnvironment
>>   final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>   // Set checkpoints in ms
>>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>>   // Add source (input stream)
>>   DataStream dataStream = StreamUtil.getDataStream(env, params);
>> 
>> How can I construct the s3_filename from the content of the an event, it 
>> seems that whenever I attempt this I either have access to an event or 
>> access to .addSink but not both.
>> 
>>  dataStream.addSink(new BucketingSink("s3a://flink/" + 
>> s3_filename));
>> 
>> 
>> Thanks,
>> 
>> 
>> 
>> 
> 



Re: Aggregation by key hierarchy

2017-08-16 Thread Basanth Gowda
Thanks Nico.

As there are 2 ways to achieve this which is better ?

1st option -> dataStream.flatMap( ... ) -> this takes in out and provides
me N number of outputs, depending on my key combination . On each of the
output the same windowing logic is applied

or the one you suggested

2nd option -> use keyBy to create N number of streams

With the fist option I would use an external config, and it allows me to
change the number of combinations dynamically at runtime. Would it be
possible with 2nd option as well ? Can I modify or add data stream at
runtime without restarting  ?

On Wed, Aug 16, 2017 at 4:37 AM, Nico Kruber  wrote:

> [back to the ml...]
>
> also including your other mail's additional content...
> > I have been able to do this by the following and repeating this for every
> > key + window combination. So in the above case there would be 8 blocks
> like
> > below. (4 combinations and 2 window period for each combination)
> >
> > modelDataStream.keyBy("campaiginId","addId")
> > .timeWindow(Time.minutes(1))
> > .trigger(CountTrigger.of(2))
> > .reduce(..)
>
> As mentioned in my last email, I only see one way for reducing duplication
> (for the key combinations) but this involves more handling from your side
> and
> I'd probably not recommend this. Regarding the different windows, I do not
> see
> something you may do otherwise here.
>
> Maybe Aljoscha (cc'd) has an idea of how to do this better
>
>
> Nico
>
> On Monday, 14 August 2017 19:08:29 CEST Basanth Gowda wrote:
> > Hi Nico,
> > Thank you . This is pretty much what I am doing , was wondering if there
> is
> > a better way.
> >
> > If there are 10 dimensions on which I want to aggregate with 2 windows -
> > this would become about 20 different combinations
> >
> > Thank you
> > Basanth
> >
> > On Mon, Aug 14, 2017 at 12:50 PM Nico Kruber 
> wrote:
> > > Hi Basanth,
> > > Let's assume you have records of the form
> > > Record = {timestamp, country, state, city, value}
> > > Then you'd like to create aggregates, e.g. the average, for the
> following
> > > combinations?
> > > 1) avg per country
> > > 2) avg per state and country
> > > 3) avg per city and state and country
> > >
> > > * You could create three streams and aggregate each individually:
> > > DataStream ds = //...
> > > DataStream ds1 = ds.keyBy("country");
> > > DataStream ds2 = ds.keyBy("country","state");
> > > DataStream ds3 = ds.keyBy("country","state","city");
> > > // + your aggregation per stream ds1, ds2, ds3
> > >
> > > You probably want to do different things for each of the resulting
> > > aggregations anyway, so having separate streams is probably right for
> you.
> > >
> > > * Alternatively, you could go with ds1 only and create the aggregates
> of
> > > the
> > > per-state (2) and per-city (3) ones in a stateful aggregation function
> > > yourself, e.g. in a MapState [1]. At the end of your aggregation
> window,
> > > you
> > > could then emit those with different keys to be able to distinguish
> > > between
> > > them.
> > >
> > >
> > > Nico
> > >
> > > [1]
> > > https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/
> > > state.html
> > >  release-1.3/dev/stream/st
> > > ate.html>>
> > > On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote:
> > > > For example - this is a sample model from one of the Apache Apex
> > > > presentation.
> > > >
> > > > I would want to aggregate for different combinations, and different
> time
> > > > buckets. What is the best way to do this in Flink ?
> > > >
> > > > {"keys":[{"name":"campaignId","type":"integer"},
> > > >
> > > >  {"name":"adId","type":"integer"},
> > > >  {"name":"creativeId","type":"integer"},
> > > >  {"name":"publisherId","type":"integer"},
> > > >  {"name":"adOrderId","type":"integer"}],
> > > >  "timeBuckets":["1h","1d"],
> > > >
> > > >  "values":
> > > > [{"name":"impressions","type":"integer","aggregators":["SUM"]}
> > > > ,
> > > >
> > > >  {"name":"clicks","type":"integer","aggregators":["SUM"]},
> > > >  {"name":"revenue","type":"integer"}],
> > > >  "dimensions":
> > > >  [{"combination":["campaignId","adId"]},
> > > >  {"combination":["creativeId","campaignId"]},
> > > >  {"combination":["campaignId"]},
> > > >  {"combination":["publisherId","adOrderId","campaignId"],
> > > >
> > > > "additionalValues":["revenue:SUM"]}]
> > > > }
> > > >
> > > >
> > > > thank you,
> > > > B
> > > >
> > > > On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <
> basanth.go...@gmail.com>
> > > >
> > > > wrote:
> > > > > Hi,
> > > > > I want to aggregate hits by Country, State, City. I would these as
> > >
> > > tags in
> > >
> > > > > my sample data.
> > > > >
> > > > > How would I do aggregation at different levels ? Input data would
> be
> > > > > single record
> > > > >
> > > > > Should I do flatMap transformation first and create 3 records from
> 1
> > >
> > > input
> > >

Re: Access to datastream from BucketSink

2017-08-16 Thread Kostas Kloudas
Hi Ant,

I think you can do it by implementing your own Bucketer.

Cheers,
Kostas

.
> On Aug 16, 2017, at 1:09 PM, ant burton  wrote:
> 
> Hello,
> 
> Given 
> 
>// Set StreamExecutionEnvironment
>final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
>// Set checkpoints in ms
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 
>// Add source (input stream)
>DataStream dataStream = StreamUtil.getDataStream(env, params);
> 
> How can I construct the s3_filename from the content of the an event, it 
> seems that whenever I attempt this I either have access to an event or access 
> to .addSink but not both.
> 
>   dataStream.addSink(new BucketingSink("s3a://flink/" + 
> s3_filename));
> 
> 
> Thanks,
> 
> 
> 
> 



Access to datastream from BucketSink

2017-08-16 Thread ant burton
Hello,

Given 

// Set StreamExecutionEnvironment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// Set checkpoints in ms
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Add source (input stream)
DataStream dataStream = StreamUtil.getDataStream(env, params);

How can I construct the s3_filename from the content of the an event, it seems 
that whenever I attempt this I either have access to an event or access to 
.addSink but not both.

dataStream.addSink(new BucketingSink("s3a://flink/" + 
s3_filename));


Thanks,






Re: Change state backend.

2017-08-16 Thread Biplob Biswas
Could you clarify a bit more? Do you want an existing state on a running job
to be migrated from FsStateBackend to RocksDbStateBackend? 

Or 

Do you have the option of restarting your job after changing existing code? 







--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Change-state-backend-tp14928p14930.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Question about Global Windows.

2017-08-16 Thread Nico Kruber
Hi Steve,
are you sure a GlobalWindows assigner fits your needs? This may be the case if 
all your events always come in order and you do not ever have overlapping 
sessions since a GlobalWindows assigner simply puts all events (per key) into 
a single window (per key). If you have overlapping sessions, you may need your 
own window assigner that handles multiple windows (see the 
EventTimeSessionWindows assigner for our take on event-time session windows).

Regarding the timer: if you set it via `#registerEventTimeTimer()`, it only 
fires if a watermark passes the given timestamp, so you need to make sure your 
sources create them (see [1] and its sub-topics). Depending on your further 
constraints in your application, it may be ok to use 
`registerProcessingTimeTimer()` instead.

Does this help already? If not, we'd need some (minimal) example of how your 
using these things to debug further into your memory issues.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/
event_time.html

On Monday, 14 August 2017 06:32:01 CEST Steve Jerman wrote:
> Hi Folks,
> 
> I have a question regarding Global Windows.
> 
> I have a stream with a large number of records. The records have a key which
> has a very high cardinality. They also have a state ( start, status,
> finish).
 
> I need to do some processing where I look at the records separated into
> windows using the ‘state’ property.
 
> From the documentation, I believe I should be using a Global Window with a
> custom trigger to identify the windows….
 
> I have this implemented.. the Trigger returns ‘CONTINUE” for ‘start', and
> FIRE_AND_PURGE for ‘finish'.
 
> I also need to avoid running out of memory  since sometimes I don’t get
> ‘finish’ records… so I added a timer to the Trigger which PURGE’s if it
> fires..
 
> Is this the correct approach?
> 
> I say this since I do in fact see a memory leak …  is there anything else I
> need to be aware of?
 
> Thanks
> 
> Steve



signature.asc
Description: This is a digitally signed message part.


Change state backend.

2017-08-16 Thread shashank agarwal
Hi,

Can i change State backend from FsStateBackend to RocksDBStateBackend
directly or i have to do some migration ?


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things


Re: Aggregation by key hierarchy

2017-08-16 Thread Nico Kruber
[back to the ml...]

also including your other mail's additional content...
> I have been able to do this by the following and repeating this for every
> key + window combination. So in the above case there would be 8 blocks like
> below. (4 combinations and 2 window period for each combination)
> 
> modelDataStream.keyBy("campaiginId","addId")
> .timeWindow(Time.minutes(1))
> .trigger(CountTrigger.of(2))
> .reduce(..)

As mentioned in my last email, I only see one way for reducing duplication 
(for the key combinations) but this involves more handling from your side and 
I'd probably not recommend this. Regarding the different windows, I do not see 
something you may do otherwise here.

Maybe Aljoscha (cc'd) has an idea of how to do this better


Nico

On Monday, 14 August 2017 19:08:29 CEST Basanth Gowda wrote:
> Hi Nico,
> Thank you . This is pretty much what I am doing , was wondering if there is
> a better way.
> 
> If there are 10 dimensions on which I want to aggregate with 2 windows -
> this would become about 20 different combinations
> 
> Thank you
> Basanth
> 
> On Mon, Aug 14, 2017 at 12:50 PM Nico Kruber  wrote:
> > Hi Basanth,
> > Let's assume you have records of the form
> > Record = {timestamp, country, state, city, value}
> > Then you'd like to create aggregates, e.g. the average, for the following
> > combinations?
> > 1) avg per country
> > 2) avg per state and country
> > 3) avg per city and state and country
> > 
> > * You could create three streams and aggregate each individually:
> > DataStream ds = //...
> > DataStream ds1 = ds.keyBy("country");
> > DataStream ds2 = ds.keyBy("country","state");
> > DataStream ds3 = ds.keyBy("country","state","city");
> > // + your aggregation per stream ds1, ds2, ds3
> > 
> > You probably want to do different things for each of the resulting
> > aggregations anyway, so having separate streams is probably right for you.
> > 
> > * Alternatively, you could go with ds1 only and create the aggregates of
> > the
> > per-state (2) and per-city (3) ones in a stateful aggregation function
> > yourself, e.g. in a MapState [1]. At the end of your aggregation window,
> > you
> > could then emit those with different keys to be able to distinguish
> > between
> > them.
> > 
> > 
> > Nico
> > 
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
> > state.html
> >  > ate.html>> 
> > On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote:
> > > For example - this is a sample model from one of the Apache Apex
> > > presentation.
> > > 
> > > I would want to aggregate for different combinations, and different time
> > > buckets. What is the best way to do this in Flink ?
> > > 
> > > {"keys":[{"name":"campaignId","type":"integer"},
> > > 
> > >  {"name":"adId","type":"integer"},
> > >  {"name":"creativeId","type":"integer"},
> > >  {"name":"publisherId","type":"integer"},
> > >  {"name":"adOrderId","type":"integer"}],
> > >  "timeBuckets":["1h","1d"],
> > > 
> > >  "values":
> > > [{"name":"impressions","type":"integer","aggregators":["SUM"]}
> > > ,
> > > 
> > >  {"name":"clicks","type":"integer","aggregators":["SUM"]},
> > >  {"name":"revenue","type":"integer"}],
> > >  "dimensions":
> > >  [{"combination":["campaignId","adId"]},
> > >  {"combination":["creativeId","campaignId"]},
> > >  {"combination":["campaignId"]},
> > >  {"combination":["publisherId","adOrderId","campaignId"],
> > > 
> > > "additionalValues":["revenue:SUM"]}]
> > > }
> > > 
> > > 
> > > thank you,
> > > B
> > > 
> > > On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda 
> > > 
> > > wrote:
> > > > Hi,
> > > > I want to aggregate hits by Country, State, City. I would these as
> > 
> > tags in
> > 
> > > > my sample data.
> > > > 
> > > > How would I do aggregation at different levels ? Input data would be
> > > > single record
> > > > 
> > > > Should I do flatMap transformation first and create 3 records from 1
> > 
> > input
> > 
> > > > record, or is there a better way to do it ?
> > > > 
> > > > thank you,
> > > > basanth



signature.asc
Description: This is a digitally signed message part.


Re: Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state

2017-08-16 Thread Nico Kruber
Hi Chao,

1) regarding the difference of CoFlatMapFunction/CoProcessFunction, let me 
quote the javadoc of the CoProcessFunction:

"Contrary to the {@link CoFlatMapFunction}, this function can also query the 
time (both event and processing) and set timers, through the provided {@link 
Context}. When reacting to the firing of set timers the function can emit yet 
more elements."

So, imho, both deliver a different level of abstraction and control (high- vs. 
low-level). Also note the different methods available for you to implement.

2a) In general, Flink calls functions on a per-record base in a serialized 
fashion per task. For each task at a TaskManager, in case of it having 
multiple slots, separate function objects are used where you should only get 
in trouble if you share static references. Otherwise you do not need to worry 
about thread-safety.

2b) From what I see in the code (StreamTwoInputProcessor), the same should 
apply to CoFlatMapFunction and CoProcessFunction so that calls to flatMap1/2 
and processElement1/2 are not called in parallel!

3) why would you want to store the CoProcessFunction.Context?


Nico

On Monday, 14 August 2017 18:36:38 CEST Chao Wang wrote:
> Hi,
> 
> I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe,
> and to what extent? What's the difference between the two Functions? and
> in general, how does Flink prevent race conditions? Here's my case:
> 
> I tried to condition on two input streams and produce the third stream
> if the condition is met. I implemented CoFlatMapFunction and tried to
> monitor a state using a field in the implemented class (I want to
> isolate my application from the checkpointing feature, and therefore I
> do not use the states as documented here
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state
> .html). The field served as a flag indicating whether there are some pending
> data from either input stream, and if yes, processing it along with the
> arriving data from the other input stream (the processing invokes a native
> function).
> 
> But then I got double free error and segmentation fault, which I believe
> was due to unintentional concurrent access to the native function. Then
> I tried to wrap the access into a synchronized method, as well as
> explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain
> and the error remained.
> 
> I considered using CoProcessFunction in my case, but seems to me that it
> does not handle customary internal states, stating in the javadoc "The
> context [CoProcessFunction.Context] is only valid during the invocation
> of this method, do not store it."
> 
> 
> 
> Thanks,
> Chao



signature.asc
Description: This is a digitally signed message part.


Re: Time zones problem

2017-08-16 Thread Biplob Biswas
Hi Alex, 

Your problem sounds interesting and I have always found dealing with
timestamps cumbersome.

Nevertheless, what I understand is that your start and end timsstamp for
American and European customers are based on their local clock.

For ex the start and end timestamp of 12 AM - 12 AM in america would be
different for the same time in europe (also would be different across
timezones with the continent as well)

What I can think of right now is to define your start and end timestamp from
a reference timezone like take UTC as your reference and define one time
globally. It solves your problem of creating multiple windows for all
timezones but may not satisfy your customers need as the 1 day would be
defined not from midnight anymore (if thats what you need).

Otherwise, AFAIK about Flink ...you would have to define the timewindows for
all the timezones u need, but the experts here may point out to some better
solution.

Regards,
Biplob





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Time-zones-problem-tp14907p14925.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.