Re: multiple users per flink deployment

2017-08-02 Thread Eron Wright
One of the key challenges is isolation, eg. ensuring that one job cannot
access the credentials of another.  The easiest solution today is to use
the YARN deployment mode, with a separate app per job.  Meanwhile,
improvements being made under the FLIP-6 banner for 1.4+ are lying
groundwork for a multiuser experience.

Hope this helps!

On Aug 2, 2017 8:29 AM, "Georg Heiler"  wrote:

> Thanks for the overview.
> Currently a single flink cluster seems to run all tasks with the same
> user. I would want to be able to run each flink job as a separate user
> instead.
>
> The update for separate read/write users is nice though.
> Tzu-Li (Gordon) Tai  schrieb am Mi. 2. Aug. 2017 um
> 10:59:
>
>> Hi,
>>
>> There’s been quite a few requests on this recently on the mailing lists
>> and also mentioned by some users offline, so I think we may need to start
>> with plans to probably support this.
>> I’m CC’ing Eron to this thread to see if he has any thoughts on this, as
>> he was among the first authors driving the Kerberos support in Flink.
>> I’m not really sure if such a feature support makes sense, given that all
>> jobs of a single Flink deployment have full privileges and therefore no
>> isolation in between.
>>
>> Related question: what external service are you trying to authenticate to
>> with different users?
>> If it is Kafka and perhaps you have different users for the consumer /
>> producer, that will be very soon available in 1.3.2, which includes a
>> version bump to Kafka 0.10 that allows multiple independent users within
>> the same JVM through dynamic JAAS configuration.
>> See this mail thread [1] for more detail on that.
>>
>> Cheers,
>> Gordon
>>
>> [1] http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/Kafka-0-10-jaas-multiple-clients-td12831.html#a13317
>>
>> On 1 August 2017 at 6:16:08 PM, Georg Heiler (georg.kf.hei...@gmail.com)
>> wrote:
>>
>> Hi,
>>
>> flink currently only seems to support a single kerberos ticket for
>> deployment. Are there plans to support different users per each job?
>>
>> regards,
>> Georg
>>
>>


replacement for KeyedStream.fold(..) ?

2017-08-02 Thread Peter Ertl
Hi folks,

since KeyedStream.fold(..) is marked as @deprecated what is the proper 
replacement for that kind of functionality?

Is mapWithState() and flatMapWithState() a *full* replacement?

Cheers
Peter

Re: Eventime window

2017-08-02 Thread Eron Wright
Note that the built-in `BoundedOutOfOrdernessTimestampExtractor` generates
watermarks based only on the timestamp of incoming events.   Without new
events, `BoundedOutOfOrdernessTimestampExtractor` will not advance the
event-time clock. That explains why the window doesn't trigger immediately
after 10s; a new event must arrive to advance the watermark.

As Timo said, a different implementation of `AssignerWithPeriodicWatermark
s` could behave differently.   Have a look at the implementation of `
TimestampsAndPeriodicWatermarksOperator` within Flink to better understand
how `AssignerWithPeriodicWatermarks` is invoked.

On Wed, Aug 2, 2017 at 8:13 AM, Govindarajan Srinivasaraghavan <
govindragh...@gmail.com> wrote:

> Thanks Timo. Basically my requirement is based on event time the window
> has to be created but the trigger has to happen either when it has received
> the next element >10s or 10s has passed. Exactly the same way as you
> described. Let me try the AssignerWithPeriodicWatermarks approach.
>
> Thanks,
> Govind
>
> On Aug 2, 2017, at 7:46 AM, Timo Walther  wrote:
>
> I forgot about the AssignerWithPeriodicWatermarks [1]. I think it could
> solve your problem easily.
>
> Timo
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_
> timestamps_watermarks.html#with-periodic-watermarks
>
> Am 02.08.17 um 16:30 schrieb Timo Walther:
>
> The question is what defines your `10 seconds`. In event-time the incoming
> events determine when 10 seconds have passed. Your description sounds like
> you want to have results after 10 seconds wall-clock/processing-time. So
> either you use a processing-time window or you implement a custom trigger
> that triggers both on event-time or on a timer that you have set after 10 s
> processing-time.
>
> Timo
>
>
> Am 02.08.17 um 16:20 schrieb Govindarajan Srinivasaraghavan:
>
> Thanks Timo. The next message will arrive only after a minute or so. Is
> there a way to evict whatever is there in window buffer just after 10
> seconds irrespective of whether a new message arrives or not.
>
> Thanks,
> Govind
>
> On Aug 2, 2017, at 6:56 AM, Timo Walther  wrote:
>
> Hi Govind,
>
> if the window is not triggered, this usually indicates that your timestamp
> and watermark assignment is not correct. According to your description, I
> don't think that you need a custom trigger/evictor. How often do events
> arrive from one device? There must be another event from the same device
> that has a timestamp >10s in order to trigger the window evaluation.
>
> Instead of using the Kafka timestamp, maybe you could also convert your
> timestamps to UTC in the TimestampExtractor.
>
> There are no official limitation. However, each window comes with some
> overhead. So you should choose your memory/state backends and parallelism
> accordingly.
>
> Hope that helps.
>
> Timo
>
>
> Am 02.08.17 um 06:54 schrieb Govindarajan Srinivasaraghavan:
>
> Hi,
>
> I have few questions regarding event time windowing. My scenario is
> devices from various timezones will send messages with timestamp and I need
> to create a window per device for 10 seconds. The messages will mostly
> arrive in order.
>
> Here is my sample code to perform windowing and aggregating the messages
> after the window to further process it.
>
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("STREAM1",
> new DeserializationSchema(),
> kafkaConsumerProperties);
>
> DataStream msgStream = streamEnv
> .addSource(consumer)
> .assignTimestampsAndWatermarks(new TimestampExtractor(Time.of(100,
> TimeUnit.MILLISECONDS))); // TimestampExtractor implements
> BoundedOutOfOrdernessTimestampExtractor
>
> KeyedStream keyByStream = msgStream.keyBy(new
> CustomKeySelector());
>
> WindowedStream windowedStream =
> keyByStream.window(TumblingEventTimeWindows.of(
> org.apache.flink.streaming.api.windowing.time.Time.seconds(10)));
>
> SingleOutputStreamOperator aggregatedStream =
> windowedStream.apply(new AggregateEntries());
>
> My questions are
>
> - In the above code, data gets passed till the window function but even
> after window time the data is not received in the apply function. Do I have
> to supply a custom evictor or trigger?
>
> - Since the data is being received from multiple timezones and each device
> will have some time difference, would it be ok to assign the timestamp as
> that of received timestamp in the message at source (kafka). Will there be
> any issues with this?
>
> - Are there any limitations on the number of time windows that can be
> created at any given time? In my scenario if there are 1 million devices
> there will be 1 million tumbling windows.
>
> Thanks,
> Govind
>
>
>
>
>


Fwd: Flink -mesos-app master hang

2017-08-02 Thread Biswajit Das
Hi There,

I have posted this here in the group a few days back and after that I have
been exchanging email with Eron, thanks to Eron for all the tips.  Now  I
see this basic auth error, I'm little confused how come Job Manager
launched fine and task manager failing to auth.
Also, mesos doc says by default authenticate is false so it should not have
gone there,  do I have to disable somewhere inside flink ??? I don't see
any config or property in code.

This is kind of blocker for me now for mesos deployment , really appreciate
for any inputs/suggestion

~ Biswajit

-- Forwarded message --
From: Eron Wright 
Date: Wed, Aug 2, 2017 at 10:51 AM
--
*From:* Biswajit Das 
*Sent:* Wednesday, August 2, 2017 10:19:45 AM
*To:* Eron Wright
*Subject:* Re: Flink -mesos-app master hang

Hi Eron ,

Good morning , I'm really sorry for flooding question . I'll post this one
to user group also .
I could narrow down the actual error thrown by mesos , seems like JM some
how not able to authenticate . I'm little confused if it is *docker private
registry tls error *or some thing else , I have started slave even with
--docker_config , previously mostly I was using  docker.tar.gz with
container for private repo authentication .

017-08-02 03:32:54,163 WARN  org.apache.flink.mesos.
scheduler.TaskMonitor  - Mesos task taskmanager-3
failed unexpectedly.
2017-08-02 03:32:54,163 INFO  org.apache.flink.mesos.
runtime.clusterframework.MesosFlinkResourceManager * - Mesos task
taskmanager-3 failed, with a TaskManager in launch or registration.
State: TASK_FAILED Reason: REASON_CONTAINER_LAUNCH_FAILED (Failed to launch
container: Unexpected WWW-Authenticate header format: 'Basic
realm="Registry Realm"')*
2017-08-02 03:32:54,163 INFO  org.apache.flink.mesos.
runtime.clusterframework.MesosFlinkResourceManager  - Diagnostics for task
taskmanager-3 in state TASK_FAILED : reason=REASON_CONTAINER_LAUNCH_FAILED
message=Failed to launch container: Unexpected WWW-Authenticate header
format: 'Basic realm="Registry Realm"'
2017-08-02 03:32:54,163 INFO  org.apache.flink.mesos.
runtime.clusterframework.MesosFlinkResourceManager  - Total number of
failed tasks so far: 3
2017-08-02 03:32:54,164 ERROR org.apache.flink.mesos.
runtime.clusterframework.MesosFlinkResourceManager  - Stopping Mesos
session because the number of failed tasks (3) exceeded the maximum failed
tasks (2). This number is controlled by the 'mesos.maximum-failed-tasks'
configuration setting. By default its the number of requested tasks.
2017-08-02 03:32:54,164 INFO  org.apache.flink.mesos.
runtime.clusterframework.MesosFlinkResourceManager  - Shutting down cluster
with status FAILED : Stopping Mesos session because the number of failed
tasks (3) exceeded the maximum failed tasks (2). This number is controlled
by the 'mesos.maximum-failed-tasks' configuration setting. By default its
the number of requested tasks.
2017-08-02 03:32:54,164 INFO  org.apache.flink.mesos.
runtime.clusterframework.MesosFlinkResourceManager  - Shutting down and
unregistering as a Mesos framework.
2017-08-02 03:32:54,171 INFO  org.apache.flink.mesos.
runtime.clusterframework.MesosFlinkResourceManager  - Stopping Mesos
resource master
root@ip-172-31-4-44:/etc/me

On Tue, Aug 1, 2017 at 1:53 PM, Eron Wright  wrote:

> I think you're on the right track, in trying to configure the docker image
> provider.  This is on Linux right, and you definitely restarted the agents?
>
>
> An important difference between the JM and the TM is that the JM is a task
> launched by the Marathon framework, whereas the TM is a task launched by
> the JM framework.  The respective configurations and behaviors are
> different.   For example, I see that Marathon is launching the JM with the
> Docker containerizer, whereas the JS is launching the TM with the Mesos
> containerizer (with Docker image provider support). The Mesos
> containerizer is more modern and preferred, and I don't think Flink
> supports anything else.
>
>
> The doc I linked to shows how to launch a docker image-based container
> with mesos-execute.   Using mesos-execute to verify your cluster
> configuration is a good idea, to isolate any issue.  For example, see if
> you can launch a container using the Mesos containerizer and the Docker
> image provider, executing a simple command such as 'sleep'.
>
>
> Eron
> --
> *From:* Biswajit Das 
> *Sent:* Tuesday, August 1, 2017 10:02:51 AM
> *To:* Eron Wright
>
> *Subject:* Re: Flink -mesos-app master hang
>
> Hi Eron ,
>
> Thank you for the email , I really appreciate your reply.
>
> That's what is confusing me. I have been running mesos with container both
> on staging and production for almost a year now with mostly spark/presto
> load everything containerize fairly big cluster. .. Here is one of my slave
> config . One interesting part here is ,  app master is launched and I can
> access job manager web UI from mesos fra

Re: Odd flink behaviour

2017-08-02 Thread Mohit Anchlia
Thanks. I thought the purpose of below method was to supply that
information?

@Override

*public* *boolean* reachedEnd() *throws* IOException {

*logger*.info("Reached " + reached);

*return* reached;

}

On Wed, Aug 2, 2017 at 1:43 AM, Fabian Hueske  wrote:

> FileInputFormat cannot know about the reached variable that you added in
> your class. So there is no way it could reset it to false.
> An alternative implementation without overriding open() could be to change
> the reachedEnd method to check if the stream is still at offset 0.
>
> 2017-08-01 20:22 GMT+02:00 Mohit Anchlia :
>
>> Thanks that worked. However, what I don't understand is wouldn't the open
>> call that I am inheriting have this logic already inbuilt? I am inheriting
>> FileInputFormat.
>>
>> On Tue, Aug 1, 2017 at 1:42 AM, Fabian Hueske  wrote:
>>
>>> An InputFormat processes multiple InputSplits. open() is called for each
>>> InputSplit.
>>> If you don't reset reached to false in open() you will only read a
>>> single (i.e., the first) InputSplit and skip all others.
>>>
>>> I'd override open as follows:
>>>
>>> public void open(FileInputSplit fileSplit) throws IOException {
>>>   super.open();
>>>   reached = false;
>>> }
>>>
>>> Cheers, Fabian
>>>
>>>
>>> 2017-08-01 8:08 GMT+02:00 Mohit Anchlia :
>>>
 I didn't override open. I am using open that got inherited from
 FileInputFormat . Am I supposed to specifically override open?

 On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske 
 wrote:

> Do you set reached to false in open()?
>
>
> Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <
> mohitanch...@gmail.com>:
>
> And here is the inputformat code:
>
> public class PDFFileInputFormat extends FileInputFormat {
>  /**
>   *
>   */
>  private static final long serialVersionUID = -4137283038479003711L;
>  private static final Logger logger = LoggerFactory
>.getLogger(PDFInputFormat.class.getName());
>  private boolean reached = false;
>  @Override
>  public boolean reachedEnd() throws IOException {
>   logger.info("called reached " + reached);
>   // TODO Auto-generated method stub
>   return reached;
>  }
>  @Override
>  public String nextRecord(String reuse) throws IOException {
>   logger.info("This is where you parse PDF");
>   String content = new String(
> Files.readAllBytes(Paths.get(this.currentSplit.getPath()
> .getPath(;
>   logger.info("Content " + content);
>   reached = true;
>   return content;
>  }
> }
>
> On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia  > wrote:
>
>> I have a very simple program that just reads all the files in the
>> path. However, flink is not working as expected.
>>
>> Everytime I execute this job I only see flink reading 2 files, even
>> though there are more in that directory. On closer look it appears that 
>> it
>> might be related to:
>>
>> [flink-akka.actor.default-dispatcher-3] INFO
>> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2
>> task slot(s).
>>
>> My question is, isn't flink supposed to iterate over the directory
>> after those 2 slots become free again? I am assuming this problem is 
>> caused
>> because there are only 2 slots.
>>
>>
>> Code ---
>>
>>   PDFFileInputFormat format = new PDFFileInputFormat();
>>   format.setFilePath(args[0]);
>>   format.setNestedFileEnumeration(true);
>>   logger.info("Number of splits " + format.getNumSplits());
>>
>>   // logger.info(Paths.get(".").toAbsolutePath().normalize().toSt
>> ring());
>>
>>   env.createInput(format, TypeInformation.of(StringValue
>> .class)).print();
>>
>
>
>

>>>
>>
>


Can't find correct JobManager address, job fails with Queryable state

2017-08-02 Thread Biplob Biswas
When I start my flink job I get the following warning, if I am not wrong this
is because it can't find the jobmanager at the given address(localhost), I
tried changing:

 config.setString(JobManagerOptions.ADDRESS, "localhost");

to LAN IP, 127.0.0.1 and localhost but none of it seems to work. I am not
really sure what I am doing wrong here.

2017-08-02 17:20:26,137 INFO  [Remoting] - Remoting started; listening on
addresses :[akka.tcp://flink@169.254.65.27:53923]
2017-08-02 17:20:26,140 INFO  [Remoting] - Remoting started; listening on
addresses :[akka.tcp://flink@169.254.65.27:53920]
2017-08-02 17:20:26,154 INFO  [Remoting] - Remoting started; listening on
addresses :[akka.tcp://flink@169.254.65.27:53921]
2017-08-02 17:20:26,163 INFO  [Remoting] - Remoting started; listening on
addresses :[akka.tcp://flink@169.254.65.27:53922]
2017-08-02 17:20:26,166 INFO  [AbstractCoordinator] - Discovered coordinator
airpluspoc-hdp-dn0.germanycentral.cloudapp.microsoftazure.de:9092 (id:
2147482644 rack: null) for group flink-dqm.
2017-08-02 17:20:27,493 WARN  [ReliableDeliverySupervisor] - Association
with remote system [akka.tcp://flink@localhost:6123] has failed, address is
now gated for [5000] ms. Reason: [Association failed with
[akka.tcp://flink@localhost:6123]] Caused by: [Connection refused: no
further information]
2017-08-02 17:20:27,493 WARN  [ReliableDeliverySupervisor] - Association
with remote system [akka.tcp://flink@localhost:6123] has failed, address is
now gated for [5000] ms. Reason: [Association failed with
[akka.tcp://flink@localhost:6123]] Caused by: [Connection refused: no
further information]
2017-08-02 17:20:27,493 WARN  [ReliableDeliverySupervisor] - Association
with remote system [akka.tcp://flink@localhost:6123] has failed, address is
now gated for [5000] ms. Reason: [Association failed with
[akka.tcp://flink@localhost:6123]] Caused by: [Connection refused: no
further information]
2017-08-02 17:20:27,493 WARN  [ReliableDeliverySupervisor] - Association
with remote system [akka.tcp://flink@localhost:6123] has failed, address is
now gated for [5000] ms. Reason: [Association failed with
[akka.tcp://flink@localhost:6123]] Caused by: [Connection refused: no
further information]

and then it fails while accessing the queryable state with the following
error:

akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Anchor(akka.tcp://flink@localhost:6123/),
Path(/user/jobmanager)]


Also, I wanted to check the jobmanager UI and for this I set up my job as
follows:

Configuration conf = new Configuration();
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = 
LocalStreamEnvironment.createLocalEnvironmentWithWebUI(conf);

and I couldn't even access the flink UI at localhost:8081 and other
addresses as well. 


What would be the best way to find and access the jobmanager address and UI? 

Thanks and Regards
Biplob




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-find-correct-JobManager-address-job-fails-with-Queryable-state-tp14644.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: multiple users per flink deployment

2017-08-02 Thread Georg Heiler
Thanks for the overview.
Currently a single flink cluster seems to run all tasks with the same user.
I would want to be able to run each flink job as a separate user instead.

The update for separate read/write users is nice though.
Tzu-Li (Gordon) Tai  schrieb am Mi. 2. Aug. 2017 um
10:59:

> Hi,
>
> There’s been quite a few requests on this recently on the mailing lists
> and also mentioned by some users offline, so I think we may need to start
> with plans to probably support this.
> I’m CC’ing Eron to this thread to see if he has any thoughts on this, as
> he was among the first authors driving the Kerberos support in Flink.
> I’m not really sure if such a feature support makes sense, given that all
> jobs of a single Flink deployment have full privileges and therefore no
> isolation in between.
>
> Related question: what external service are you trying to authenticate to
> with different users?
> If it is Kafka and perhaps you have different users for the consumer /
> producer, that will be very soon available in 1.3.2, which includes a
> version bump to Kafka 0.10 that allows multiple independent users within
> the same JVM through dynamic JAAS configuration.
> See this mail thread [1] for more detail on that.
>
> Cheers,
> Gordon
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-0-10-jaas-multiple-clients-td12831.html#a13317
>
> On 1 August 2017 at 6:16:08 PM, Georg Heiler (georg.kf.hei...@gmail.com)
> wrote:
>
> Hi,
>
> flink currently only seems to support a single kerberos ticket for
> deployment. Are there plans to support different users per each job?
>
> regards,
> Georg
>
>


Re: Eventime window

2017-08-02 Thread Govindarajan Srinivasaraghavan
Thanks Timo. Basically my requirement is based on event time the window has to 
be created but the trigger has to happen either when it has received the next 
element >10s or 10s has passed. Exactly the same way as you described. Let me 
try the AssignerWithPeriodicWatermarks approach. 

Thanks,
Govind

> On Aug 2, 2017, at 7:46 AM, Timo Walther  wrote:
> 
> I forgot about the AssignerWithPeriodicWatermarks [1]. I think it could solve 
> your problem easily.
> 
> Timo
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html#with-periodic-watermarks
> 
>> Am 02.08.17 um 16:30 schrieb Timo Walther:
>> The question is what defines your `10 seconds`. In event-time the incoming 
>> events determine when 10 seconds have passed. Your description sounds like 
>> you want to have results after 10 seconds wall-clock/processing-time. So 
>> either you use a processing-time window or you implement a custom trigger 
>> that triggers both on event-time or on a timer that you have set after 10 s 
>> processing-time.
>> 
>> Timo
>> 
>> 
>>> Am 02.08.17 um 16:20 schrieb Govindarajan Srinivasaraghavan:
>>> Thanks Timo. The next message will arrive only after a minute or so. Is 
>>> there a way to evict whatever is there in window buffer just after 10 
>>> seconds irrespective of whether a new message arrives or not. 
>>> 
>>> Thanks,
>>> Govind
>>> 
>>> On Aug 2, 2017, at 6:56 AM, Timo Walther  wrote:
>>> 
 Hi Govind,
 
 if the window is not triggered, this usually indicates that your timestamp 
 and watermark assignment is not correct. According to your description, I 
 don't think that you need a custom trigger/evictor. How often do events 
 arrive from one device? There must be another event from the same device 
 that has a timestamp >10s in order to trigger the window evaluation.
 
 Instead of using the Kafka timestamp, maybe you could also convert your 
 timestamps to UTC in the TimestampExtractor.
 
 There are no official limitation. However, each window comes with some 
 overhead. So you should choose your memory/state backends and parallelism 
 accordingly.
 
 Hope that helps.
 
 Timo
 
 
> Am 02.08.17 um 06:54 schrieb Govindarajan Srinivasaraghavan:
> Hi,
> 
> I have few questions regarding event time windowing. My scenario is 
> devices from various timezones will send messages with timestamp and I 
> need to create a window per device for 10 seconds. The messages will 
> mostly arrive in order.
> 
> Here is my sample code to perform windowing and aggregating the messages 
> after the window to further process it.
> 
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("STREAM1",
> new DeserializationSchema(),
> kafkaConsumerProperties);
> 
> DataStream msgStream = streamEnv
>   .addSource(consumer)
>   .assignTimestampsAndWatermarks(new 
> TimestampExtractor(Time.of(100, TimeUnit.MILLISECONDS))); // 
> TimestampExtractor implements BoundedOutOfOrdernessTimestampExtractor
> 
> KeyedStream keyByStream = msgStream.keyBy(new 
> CustomKeySelector());
> 
> WindowedStream windowedStream =
> 
> keyByStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)));
> 
> SingleOutputStreamOperator aggregatedStream = 
> windowedStream.apply(new AggregateEntries());
> 
> My questions are
> 
> - In the above code, data gets passed till the window function but even 
> after window time the data is not received in the apply function. Do I 
> have to supply a custom evictor or trigger?
> 
> - Since the data is being received from multiple timezones and each 
> device will have some time difference, would it be ok to assign the 
> timestamp as that of received timestamp in the message at source (kafka). 
> Will there be any issues with this?
> 
> - Are there any limitations on the number of time windows that can be 
> created at any given time? In my scenario if there are 1 million devices 
> there will be 1 million tumbling windows.
> 
> Thanks,
> Govind
 
>> 
> 


Re: KafkaConsumerBase

2017-08-02 Thread aitozi


Hi,Gordon

Yes, just now i again read the code in assignTopicPartitions method , it
indeed subscribe the partition the subtask should subscribe to. i  didn't
read the for loop generate subscribedPartitions for each subtasks in
assignTopicPartitions carefully before 

for (int i = getRuntimeContext().getIndexOfThisSubtask(); i <
kafkaTopicPartitions.size(); i +=
getRuntimeContext().getNumberOfParallelSubtasks()) {

subscribedPartitions.add(kafkaTopicPartitions.get(i));  }

you ar right : "the partitions are still filtered out to only be the
partitions for each local subtask, using the `assignTopicPartitions` method"

Thanks
aitozi



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KafkaConsumerBase-tp14636p14642.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Eventime window

2017-08-02 Thread Timo Walther
I forgot about the AssignerWithPeriodicWatermarks [1]. I think it could 
solve your problem easily.


Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html#with-periodic-watermarks


Am 02.08.17 um 16:30 schrieb Timo Walther:
The question is what defines your `10 seconds`. In event-time the 
incoming events determine when 10 seconds have passed. Your 
description sounds like you want to have results after 10 seconds 
wall-clock/processing-time. So either you use a processing-time window 
or you implement a custom trigger that triggers both on event-time or 
on a timer that you have set after 10 s processing-time.


Timo


Am 02.08.17 um 16:20 schrieb Govindarajan Srinivasaraghavan:
Thanks Timo. The next message will arrive only after a minute or so. 
Is there a way to evict whatever is there in window buffer just after 
10 seconds irrespective of whether a new message arrives or not.


Thanks,
Govind

On Aug 2, 2017, at 6:56 AM, Timo Walther > wrote:



Hi Govind,

if the window is not triggered, this usually indicates that your 
timestamp and watermark assignment is not correct. According to your 
description, I don't think that you need a custom trigger/evictor. 
How often do events arrive from one device? There must be another 
event from the same device that has a timestamp >10s in order to 
trigger the window evaluation.


Instead of using the Kafka timestamp, maybe you could also convert 
your timestamps to UTC in the TimestampExtractor.


There are no official limitation. However, each window comes with 
some overhead. So you should choose your memory/state backends and 
parallelism accordingly.


Hope that helps.

Timo


Am 02.08.17 um 06:54 schrieb Govindarajan Srinivasaraghavan:

Hi,

I have few questions regarding event time windowing. My scenario is 
devices from various timezones will send messages with timestamp 
and I need to create a window per device for 10 seconds. The 
messages will mostly arrive in order.


Here is my sample code to perform windowing and aggregating the 
messages after the window to further process it.


streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("STREAM1",
new DeserializationSchema(),
kafkaConsumerProperties);

DataStream msgStream = streamEnv
.addSource(consumer)
.assignTimestampsAndWatermarks(new TimestampExtractor(Time.of(100, 
TimeUnit.MILLISECONDS))); // TimestampExtractor implements 
BoundedOutOfOrdernessTimestampExtractor


KeyedStream keyByStream = msgStream.keyBy(new 
CustomKeySelector());

WindowedStream windowedStream =
keyByStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)));

SingleOutputStreamOperator aggregatedStream = 
windowedStream.apply(new AggregateEntries());


My questions are

- In the above code, data gets passed till the window function but 
even after window time the data is not received in the apply 
function. Do I have to supply a custom evictor or trigger?


- Since the data is being received from multiple timezones and each 
device will have some time difference, would it be ok to assign the 
timestamp as that of received timestamp in the message at source 
(kafka). Will there be any issues with this?


- Are there any limitations on the number of time windows that can 
be created at any given time? In my scenario if there are 1 million 
devices there will be 1 million tumbling windows.


Thanks,
Govind









Re: Eventime window

2017-08-02 Thread Timo Walther
The question is what defines your `10 seconds`. In event-time the 
incoming events determine when 10 seconds have passed. Your description 
sounds like you want to have results after 10 seconds 
wall-clock/processing-time. So either you use a processing-time window 
or you implement a custom trigger that triggers both on event-time or on 
a timer that you have set after 10 s processing-time.


Timo


Am 02.08.17 um 16:20 schrieb Govindarajan Srinivasaraghavan:
Thanks Timo. The next message will arrive only after a minute or so. 
Is there a way to evict whatever is there in window buffer just after 
10 seconds irrespective of whether a new message arrives or not.


Thanks,
Govind

On Aug 2, 2017, at 6:56 AM, Timo Walther > wrote:



Hi Govind,

if the window is not triggered, this usually indicates that your 
timestamp and watermark assignment is not correct. According to your 
description, I don't think that you need a custom trigger/evictor. 
How often do events arrive from one device? There must be another 
event from the same device that has a timestamp >10s in order to 
trigger the window evaluation.


Instead of using the Kafka timestamp, maybe you could also convert 
your timestamps to UTC in the TimestampExtractor.


There are no official limitation. However, each window comes with 
some overhead. So you should choose your memory/state backends and 
parallelism accordingly.


Hope that helps.

Timo


Am 02.08.17 um 06:54 schrieb Govindarajan Srinivasaraghavan:

Hi,

I have few questions regarding event time windowing. My scenario is 
devices from various timezones will send messages with timestamp and 
I need to create a window per device for 10 seconds. The messages 
will mostly arrive in order.


Here is my sample code to perform windowing and aggregating the 
messages after the window to further process it.


streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("STREAM1",
new DeserializationSchema(),
kafkaConsumerProperties);

DataStream msgStream = streamEnv
.addSource(consumer)
.assignTimestampsAndWatermarks(new TimestampExtractor(Time.of(100, 
TimeUnit.MILLISECONDS))); // TimestampExtractor implements 
BoundedOutOfOrdernessTimestampExtractor


KeyedStream keyByStream = msgStream.keyBy(new 
CustomKeySelector());

WindowedStream windowedStream =
keyByStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)));

SingleOutputStreamOperator aggregatedStream = 
windowedStream.apply(new AggregateEntries());


My questions are

- In the above code, data gets passed till the window function but 
even after window time the data is not received in the apply 
function. Do I have to supply a custom evictor or trigger?


- Since the data is being received from multiple timezones and each 
device will have some time difference, would it be ok to assign the 
timestamp as that of received timestamp in the message at source 
(kafka). Will there be any issues with this?


- Are there any limitations on the number of time windows that can 
be created at any given time? In my scenario if there are 1 million 
devices there will be 1 million tumbling windows.


Thanks,
Govind







Re: KafkaConsumerBase

2017-08-02 Thread Tzu-Li (Gordon) Tai
Hi!

method shown in KafkaConsumerBase.java (version 1.2.0) 

A lot has changed in the FlinkKafkaConsumerBase since version 1.2.0.
And if I remember correctly, the `assignPartitions` method was actually a no 
longer relevant method used in the code, and was properly removed afterwards.
The method for partition assigning in 1.2.0 is called `assignTopicPartitions`, 
and is used in the open() method.

consumerCallBridge.assignPartitions(consumer, 
convertKafkaPartitions(subscribedPartitions)); 

i think here subscribedPartitions is all the partitions , not 
subtaskPartitions.
This code snippet is from `KafkaConsumerThread`, correct?

As stated above, the partitions are still filtered out to only be the 
partitions for each local subtask, using the `assignTopicPartitions` method. So 
here, the `subscribedPartitions` is not the complete list of partitions, only 
the partitions that the subtask should subscribe to.


Cheers,
Gordon

On 2 August 2017 at 9:52:03 PM, aitozi (gjying1...@gmail.com) wrote:


Hi,  

i have a question that , when we use KafkaConsumerBase, we will have to  
fetch data from different partition  
in different parllel thread like the method shown in  
KafkaConsumerBase.java (version 1.2.0)  

protected static List assignPartitions(  
List allPartitions,  
int numConsumers, int consumerIndex) {  
final List thisSubtaskPartitions = new ArrayList<>(  
allPartitions.size() / numConsumers + 1);  

for (int i = 0; i < allPartitions.size(); i++) {  
if (i % numConsumers == consumerIndex) {  
thisSubtaskPartitions.add(allPartitions.get(i));  
}  
}  

return thisSubtaskPartitions;  
}  

but i have not find any place invoke this method , in  
KafkaConsumerThread.java it used  

consumerCallBridge.assignPartitions(consumer,  
convertKafkaPartitions(subscribedPartitions));  

i think here subscribedPartitions is all the partitions , not  
subtaskPartitions. Can any one address my problem  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KafkaConsumerBase-tp14636.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Eventime window

2017-08-02 Thread Govindarajan Srinivasaraghavan
Thanks Timo. The next message will arrive only after a minute or so. Is there a 
way to evict whatever is there in window buffer just after 10 seconds 
irrespective of whether a new message arrives or not. 

Thanks,
Govind

> On Aug 2, 2017, at 6:56 AM, Timo Walther  wrote:
> 
> Hi Govind,
> 
> if the window is not triggered, this usually indicates that your timestamp 
> and watermark assignment is not correct. According to your description, I 
> don't think that you need a custom trigger/evictor. How often do events 
> arrive from one device? There must be another event from the same device that 
> has a timestamp >10s in order to trigger the window evaluation.
> 
> Instead of using the Kafka timestamp, maybe you could also convert your 
> timestamps to UTC in the TimestampExtractor.
> 
> There are no official limitation. However, each window comes with some 
> overhead. So you should choose your memory/state backends and parallelism 
> accordingly.
> 
> Hope that helps.
> 
> Timo
> 
> 
>> Am 02.08.17 um 06:54 schrieb Govindarajan Srinivasaraghavan:
>> Hi,
>> 
>> I have few questions regarding event time windowing. My scenario is devices 
>> from various timezones will send messages with timestamp and I need to 
>> create a window per device for 10 seconds. The messages will mostly arrive 
>> in order.
>> 
>> Here is my sample code to perform windowing and aggregating the messages 
>> after the window to further process it.
>> 
>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("STREAM1",
>> new DeserializationSchema(),
>> kafkaConsumerProperties);
>> 
>> DataStream msgStream = streamEnv
>>  .addSource(consumer)
>>  .assignTimestampsAndWatermarks(new 
>> TimestampExtractor(Time.of(100, TimeUnit.MILLISECONDS))); // 
>> TimestampExtractor implements BoundedOutOfOrdernessTimestampExtractor
>> 
>> KeyedStream keyByStream = msgStream.keyBy(new 
>> CustomKeySelector());
>> 
>> WindowedStream windowedStream =
>> 
>> keyByStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)));
>> 
>> SingleOutputStreamOperator aggregatedStream = 
>> windowedStream.apply(new AggregateEntries());
>> 
>> My questions are
>> 
>> - In the above code, data gets passed till the window function but even 
>> after window time the data is not received in the apply function. Do I have 
>> to supply a custom evictor or trigger?
>> 
>> - Since the data is being received from multiple timezones and each device 
>> will have some time difference, would it be ok to assign the timestamp as 
>> that of received timestamp in the message at source (kafka). Will there be 
>> any issues with this?
>> 
>> - Are there any limitations on the number of time windows that can be 
>> created at any given time? In my scenario if there are 1 million devices 
>> there will be 1 million tumbling windows.
>> 
>> Thanks,
>> Govind
> 


Re: Eventime window

2017-08-02 Thread Timo Walther

Hi Govind,

if the window is not triggered, this usually indicates that your 
timestamp and watermark assignment is not correct. According to your 
description, I don't think that you need a custom trigger/evictor. How 
often do events arrive from one device? There must be another event from 
the same device that has a timestamp >10s in order to trigger the window 
evaluation.


Instead of using the Kafka timestamp, maybe you could also convert your 
timestamps to UTC in the TimestampExtractor.


There are no official limitation. However, each window comes with some 
overhead. So you should choose your memory/state backends and 
parallelism accordingly.


Hope that helps.

Timo


Am 02.08.17 um 06:54 schrieb Govindarajan Srinivasaraghavan:

Hi,

I have few questions regarding event time windowing. My scenario is 
devices from various timezones will send messages with timestamp and I 
need to create a window per device for 10 seconds. The messages will 
mostly arrive in order.


Here is my sample code to perform windowing and aggregating the 
messages after the window to further process it.


streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("STREAM1",
new DeserializationSchema(),
kafkaConsumerProperties);

DataStream msgStream = streamEnv
.addSource(consumer)
.assignTimestampsAndWatermarks(new TimestampExtractor(Time.of(100, 
TimeUnit.MILLISECONDS))); // TimestampExtractor implements 
BoundedOutOfOrdernessTimestampExtractor


KeyedStream keyByStream = msgStream.keyBy(new 
CustomKeySelector());

WindowedStream windowedStream =
keyByStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)));

SingleOutputStreamOperator aggregatedStream = 
windowedStream.apply(new AggregateEntries());


My questions are

- In the above code, data gets passed till the window function but 
even after window time the data is not received in the apply function. 
Do I have to supply a custom evictor or trigger?


- Since the data is being received from multiple timezones and each 
device will have some time difference, would it be ok to assign the 
timestamp as that of received timestamp in the message at source 
(kafka). Will there be any issues with this?


- Are there any limitations on the number of time windows that can be 
created at any given time? In my scenario if there are 1 million 
devices there will be 1 million tumbling windows.


Thanks,
Govind





KafkaConsumerBase

2017-08-02 Thread aitozi

Hi,

i have a question that , when we use KafkaConsumerBase, we will have to
fetch data from different partition  
 in different parllel thread like the method shown in 
KafkaConsumerBase.java (version 1.2.0)

protected static List assignPartitions(
List allPartitions,
int numConsumers, int consumerIndex) {
final List thisSubtaskPartitions = new 
ArrayList<>(
allPartitions.size() / numConsumers + 1);

for (int i = 0; i < allPartitions.size(); i++) {
if (i % numConsumers == consumerIndex) {
thisSubtaskPartitions.add(allPartitions.get(i));
}
}

return thisSubtaskPartitions;
}

but i have not find any place invoke this method ,  in
KafkaConsumerThread.java it used 

consumerCallBridge.assignPartitions(consumer,
convertKafkaPartitions(subscribedPartitions));

i think here subscribedPartitions is all the partitions , not
subtaskPartitions.  Can any one address my problem



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KafkaConsumerBase-tp14636.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Storing POJO's to RocksDB state backend

2017-08-02 Thread Timo Walther

Hi Biplob,

Flink is shipped with own serializers. POJOs and other datatypes are 
analyzed automatically. Kryo is only the fallback option, if your class 
does not meet the POJO criteria (see [1]). Usually, all 
serialization/deserialization to e.g. RocksDB happens internally and the 
user doesn't have to think about it.


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/types_serialization.html#flinks-typeinformation-class


Am 02.08.17 um 11:03 schrieb Biplob Biswas:

Hi,

I had a simple query as to how POJO's are stored in a state back end like
RocksDB? Is it deserialized internally(with a default serde or we have to
specify something)? and if yes, is Kryo the default serde?

Thanks,
Biplob



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Storing-POJO-s-to-RocksDB-state-backend-tp14628.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: [POLL] Dropping savepoint format compatibility for 1.1.x in the Flink 1.4.0 release

2017-08-02 Thread Kostas Kloudas
+1

> On Aug 2, 2017, at 3:16 PM, Till Rohrmann  wrote:
> 
> +1
> 
> On Wed, Aug 2, 2017 at 9:12 AM, Stefan Richter 
> wrote:
> 
>> +1
>> 
>> Am 28.07.2017 um 16:03 schrieb Stephan Ewen :
>> 
>> Seems like no one raised a concern so far about dropping the savepoint
>> format compatibility for 1.1 in 1.4.
>> 
>> Leaving this thread open for some more days, but from the sentiment, it
>> seems like we should go ahead?
>> 
>> On Wed, Jul 12, 2017 at 4:43 PM, Stephan Ewen  wrote:
>> 
>>> Hi users!
>>> 
>>> Flink currently maintains backwards compatibility for savepoint formats,
>>> which means that savepoints taken with Flink version 1.1.x and 1.2.x can be
>>> resumed in Flink 1.3.x
>>> 
>>> We are discussing how many versions back to support. The proposition is
>>> the following:
>>> 
>>> *   Suggestion: Flink 1.4.0 will be able to resume savepoints taken with
>>> version 1.3.x and 1.2.x, but not savepoints from version 1.1.x and 1.0.x*
>>> 
>>> 
>>> The reason for that is that there is a lot of code mapping between the
>>> completely different legacy format (1.1.x, not re-scalable) and the
>>> key-group-oriented format (1.2.x onwards, re-scalable). It would greatly
>>> help the development of state and checkpointing features to drop that old
>>> code.
>>> 
>>> Please let us know if you have concerns about that.
>>> 
>>> Best,
>>> Stephan
>>> 
>>> 
>> 
>> 



Re: [POLL] Dropping savepoint format compatibility for 1.1.x in the Flink 1.4.0 release

2017-08-02 Thread Till Rohrmann
+1

On Wed, Aug 2, 2017 at 9:12 AM, Stefan Richter 
wrote:

> +1
>
> Am 28.07.2017 um 16:03 schrieb Stephan Ewen :
>
> Seems like no one raised a concern so far about dropping the savepoint
> format compatibility for 1.1 in 1.4.
>
> Leaving this thread open for some more days, but from the sentiment, it
> seems like we should go ahead?
>
> On Wed, Jul 12, 2017 at 4:43 PM, Stephan Ewen  wrote:
>
>> Hi users!
>>
>> Flink currently maintains backwards compatibility for savepoint formats,
>> which means that savepoints taken with Flink version 1.1.x and 1.2.x can be
>> resumed in Flink 1.3.x
>>
>> We are discussing how many versions back to support. The proposition is
>> the following:
>>
>> *   Suggestion: Flink 1.4.0 will be able to resume savepoints taken with
>> version 1.3.x and 1.2.x, but not savepoints from version 1.1.x and 1.0.x*
>>
>>
>> The reason for that is that there is a lot of code mapping between the
>> completely different legacy format (1.1.x, not re-scalable) and the
>> key-group-oriented format (1.2.x onwards, re-scalable). It would greatly
>> help the development of state and checkpointing features to drop that old
>> code.
>>
>> Please let us know if you have concerns about that.
>>
>> Best,
>> Stephan
>>
>>
>
>


Re: Can i use lot of keyd states or should i use 1 big key state.

2017-08-02 Thread shashank agarwal
If I am creating KeyedState ("count by email id") and keyed stream has 10
unique email id's.

Will it create 1 column family or hash table ?

Or it will create 10 column family or hash table ?

Can i have millions of unique email id in that keyed state ?



On Tue, Aug 1, 2017 at 2:59 AM, shashank agarwal 
wrote:

> Ok if i am taking it as right for an example :
>
> if  i am creating a keyed state with name "total count by email" for
> key(project id + email)  than it will create a single hash-table or column
> family "total count by email" and all the unique email id's will be rows of
> that single hash-table or column family and than i can store millions of
> unique email id's in that.
>
> Means it will create only single state object for all unique email id's ?
>
>
>
>
> On Tue, Aug 1, 2017 at 1:53 AM, Stephan Ewen  wrote:
>
>> Each keyed state in Flink is a hashtable or a column family in RocksDB.
>> Having too many of those is not memory efficient.
>>
>> Having fewer states is better, if you can adapt your schema that way.
>>
>> I would also look into "MapState", which is an efficient way to have "sub
>> keys" under a keyed state.
>>
>> Stephan
>>
>>
>> On Mon, Jul 31, 2017 at 6:01 PM, shashank agarwal 
>> wrote:
>>
>>> Hello,
>>>
>>> I have to compute results on basis of lot of history data, parameters
>>> like total transactions in last 1 month, last 1 day, last 1 hour etc. by
>>> email id, ip, mobile, name, address, zipcode etc.
>>>
>>> So my question is this right approach to create keyed state by email,
>>> mobile, zipcode etc. or should i create 1 big mapped state (BS) and than
>>> process that BS, may be in process function or by applying some loop and
>>> filter logic in window or process function.
>>>
>>> My main worry is i will end up with millions of states, because there
>>> can be millions unique emails, phone numbers or zipcode if i create keyed
>>> state by email, phone etc.
>>>
>>> am i right ? is this impact on the performance or is this wrong approach
>>> ? Which approach would you suggest in this use case.
>>>
>>>
>>> --
>>> Thanks Regards
>>>
>>> SHASHANK AGARWAL
>>>  ---  Trying to mobilize the things
>>>
>>>
>>>
>>>
>>>
>>
>
>
> --
> Thanks Regards
>
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things


Re: S3 Write Execption

2017-08-02 Thread Stephan Ewen
It is very important to point out that the Bucketing sink can currently NOT
work properly on S3. It assumes a consistent file system (like listing /
renaming works consistently), and S3 is only eventually consistent. I
assume that this eventual consistency of S3 is the cause of your error.

There is a pull request for a bucketing sink on eventually consistent FS:
https://github.com/apache/flink/pull/3752
Hope we can merge this once we are done with the 1.3.2 release.

(cc-ing Gordon and Aljoscha, FYI)

On Wed, Aug 2, 2017 at 10:56 AM, Fabian Hueske  wrote:

> Hi Aneesha,
>
> the logs would show that Flink is going through a recovery cycle. Recovery
> means to cancel running tasks and start them again.
> If you don't see something like that in the logs, Flink continues to
> processing.
>
> I'm not familiar with the details of S3, so I can't tell if the exception
> indicates data loss.
>
> Best, Fabian
>
> 2017-08-01 20:38 GMT+02:00 Aneesha Kaushal  >:
>
>> Hello,
>>
>> I am using flink 1.2 and writing records to S3 using rolling sink.
>>
>> I am encountering this S3 write error quite frequently :
>>
>> TimerException{com.amazonaws.services.s3.model.AmazonS3Exception: Status 
>> Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS 
>> Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: 
>> JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr}
>>  at 
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
>>  at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>  at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>  at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 
>> 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error 
>> Code: null, AWS Error Message: Not Found, S3 Extended Request ID: 
>> JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr
>>  at 
>> com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
>>  at 
>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
>>  at 
>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
>>  at 
>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
>>  at 
>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
>>  at 
>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
>>  at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java:1088)
>>  at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:521)
>>  at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:563)
>>  at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
>>  at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218)
>>  ... 7 more
>>
>>
>> I am unable to find the cause of this error. Also, I have the following
>> questions regarding this error :
>>
>> 1) Do we loose any data or flink will go to last checkpoint and write
>> again?
>> 2) how can we prevent this error?
>>
>> Thanks,
>> Aneesha
>>
>>
>>
>


Re: Constant write stall warning with RocksDB state backend

2017-08-02 Thread Stefan Richter
Hi,

there is some documentation on this topic here 
https://github.com/facebook/rocksdb/wiki/Write-Stalls 
. Increasing the buffer 
size seems ok, and the downside is a potentially higher memory footprint.

Best,
Stefan

> Am 02.08.2017 um 10:24 schrieb Kien Truong :
> 
> Hi, 
> 
> With the setting SPINNING_DISK_OPTIMIZED_HIGH_MEM, I'm having a lot of 
> warning from RocksDB:
> 
> Stalling writes because we have 3 immutable memtables (waiting for flush), 
> max_write_buffer_number is set to 4 rate 16777216
> 
> Increasing max_write_buffer_number causes the message to go away, but I want 
> to ask if there are any downside to it? 
> 
> Best regards, 
> Kien Truong



Storing POJO's to RocksDB state backend

2017-08-02 Thread Biplob Biswas
Hi,

I had a simple query as to how POJO's are stored in a state back end like
RocksDB? Is it deserialized internally(with a default serde or we have to
specify something)? and if yes, is Kryo the default serde? 

Thanks,
Biplob



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Storing-POJO-s-to-RocksDB-state-backend-tp14628.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Proper way to establish bucket counts

2017-08-02 Thread Fabian Hueske
Hi Robert,

Flink collects many metrics by default, including the number of records /
events that go into each operator (see [1], System Metrics, IO,
"numRecordsIn").
So, you would only need to access that metric.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html



2017-08-01 18:56 GMT+02:00 Robert Rapplean :

> I want a count of events that are put into a bucketing sink, but can't
> find a ready-made way of doing that. Is there an easier way than to
> implement a counter for each bucket via the metrics? If metrics counters is
> the easy way, what do I do to make sure that I don't have a memory leak
> from expired counters?
>
> Thanks,
>
> Robert
>


Re: multiple users per flink deployment

2017-08-02 Thread Tzu-Li (Gordon) Tai
Hi,

There’s been quite a few requests on this recently on the mailing lists and 
also mentioned by some users offline, so I think we may need to start with 
plans to probably support this.
I’m CC’ing Eron to this thread to see if he has any thoughts on this, as he was 
among the first authors driving the Kerberos support in Flink.
I’m not really sure if such a feature support makes sense, given that all jobs 
of a single Flink deployment have full privileges and therefore no isolation in 
between.

Related question: what external service are you trying to authenticate to with 
different users?
If it is Kafka and perhaps you have different users for the consumer / 
producer, that will be very soon available in 1.3.2, which includes a version 
bump to Kafka 0.10 that allows multiple independent users within the same JVM 
through dynamic JAAS configuration.
See this mail thread [1] for more detail on that.

Cheers,
Gordon

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-0-10-jaas-multiple-clients-td12831.html#a13317

On 1 August 2017 at 6:16:08 PM, Georg Heiler (georg.kf.hei...@gmail.com) wrote:

Hi,

flink currently only seems to support a single kerberos ticket for deployment. 
Are there plans to support different users per each job?

regards,
Georg

Re: S3 Write Execption

2017-08-02 Thread Fabian Hueske
Hi Aneesha,

the logs would show that Flink is going through a recovery cycle. Recovery
means to cancel running tasks and start them again.
If you don't see something like that in the logs, Flink continues to
processing.

I'm not familiar with the details of S3, so I can't tell if the exception
indicates data loss.

Best, Fabian

2017-08-01 20:38 GMT+02:00 Aneesha Kaushal :

> Hello,
>
> I am using flink 1.2 and writing records to S3 using rolling sink.
>
> I am encountering this S3 write error quite frequently :
>
> TimerException{com.amazonaws.services.s3.model.AmazonS3Exception: Status 
> Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS 
> Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: 
> JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr}
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 
> 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error 
> Code: null, AWS Error Message: Not Found, S3 Extended Request ID: 
> JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr
>   at 
> com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
>   at 
> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
>   at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java:1088)
>   at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:521)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:563)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218)
>   ... 7 more
>
>
> I am unable to find the cause of this error. Also, I have the following
> questions regarding this error :
>
> 1) Do we loose any data or flink will go to last checkpoint and write
> again?
> 2) how can we prevent this error?
>
> Thanks,
> Aneesha
>
>
>


Re: Odd flink behaviour

2017-08-02 Thread Fabian Hueske
FileInputFormat cannot know about the reached variable that you added in
your class. So there is no way it could reset it to false.
An alternative implementation without overriding open() could be to change
the reachedEnd method to check if the stream is still at offset 0.

2017-08-01 20:22 GMT+02:00 Mohit Anchlia :

> Thanks that worked. However, what I don't understand is wouldn't the open
> call that I am inheriting have this logic already inbuilt? I am inheriting
> FileInputFormat.
>
> On Tue, Aug 1, 2017 at 1:42 AM, Fabian Hueske  wrote:
>
>> An InputFormat processes multiple InputSplits. open() is called for each
>> InputSplit.
>> If you don't reset reached to false in open() you will only read a single
>> (i.e., the first) InputSplit and skip all others.
>>
>> I'd override open as follows:
>>
>> public void open(FileInputSplit fileSplit) throws IOException {
>>   super.open();
>>   reached = false;
>> }
>>
>> Cheers, Fabian
>>
>>
>> 2017-08-01 8:08 GMT+02:00 Mohit Anchlia :
>>
>>> I didn't override open. I am using open that got inherited from
>>> FileInputFormat . Am I supposed to specifically override open?
>>>
>>> On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske 
>>> wrote:
>>>
 Do you set reached to false in open()?


 Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <
 mohitanch...@gmail.com>:

 And here is the inputformat code:

 public class PDFFileInputFormat extends FileInputFormat {
  /**
   *
   */
  private static final long serialVersionUID = -4137283038479003711L;
  private static final Logger logger = LoggerFactory
.getLogger(PDFInputFormat.class.getName());
  private boolean reached = false;
  @Override
  public boolean reachedEnd() throws IOException {
   logger.info("called reached " + reached);
   // TODO Auto-generated method stub
   return reached;
  }
  @Override
  public String nextRecord(String reuse) throws IOException {
   logger.info("This is where you parse PDF");
   String content = new String(
 Files.readAllBytes(Paths.get(this.currentSplit.getPath()
 .getPath(;
   logger.info("Content " + content);
   reached = true;
   return content;
  }
 }

 On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia 
 wrote:

> I have a very simple program that just reads all the files in the
> path. However, flink is not working as expected.
>
> Everytime I execute this job I only see flink reading 2 files, even
> though there are more in that directory. On closer look it appears that it
> might be related to:
>
> [flink-akka.actor.default-dispatcher-3] INFO
> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2
> task slot(s).
>
> My question is, isn't flink supposed to iterate over the directory
> after those 2 slots become free again? I am assuming this problem is 
> caused
> because there are only 2 slots.
>
>
> Code ---
>
>   PDFFileInputFormat format = new PDFFileInputFormat();
>   format.setFilePath(args[0]);
>   format.setNestedFileEnumeration(true);
>   logger.info("Number of splits " + format.getNumSplits());
>
>   // logger.info(Paths.get(".").toAbsolutePath().normalize().toSt
> ring());
>
>   env.createInput(format, TypeInformation.of(StringValue
> .class)).print();
>



>>>
>>
>


Constant write stall warning with RocksDB state backend

2017-08-02 Thread Kien Truong
Hi,

With the setting SPINNING_DISK_OPTIMIZED_HIGH_MEM, I'm having a lot of warning 
from RocksDB:

Stalling writes because we have 3 immutable memtables (waiting for flush), 
max_write_buffer_number is set to 4 rate 16777216

Increasing max_write_buffer_number causes the message to go away, but I want to 
ask if there are any downside to it?

Best regards,
Kien Truong 

Re: [POLL] Dropping savepoint format compatibility for 1.1.x in the Flink 1.4.0 release

2017-08-02 Thread Stefan Richter
+1

> Am 28.07.2017 um 16:03 schrieb Stephan Ewen :
> 
> Seems like no one raised a concern so far about dropping the savepoint format 
> compatibility for 1.1 in 1.4.
> 
> Leaving this thread open for some more days, but from the sentiment, it seems 
> like we should go ahead?
> 
> On Wed, Jul 12, 2017 at 4:43 PM, Stephan Ewen  > wrote:
> Hi users!
> 
> Flink currently maintains backwards compatibility for savepoint formats, 
> which means that savepoints taken with Flink version 1.1.x and 1.2.x can be 
> resumed in Flink 1.3.x
> 
> We are discussing how many versions back to support. The proposition is the 
> following:
> 
>Suggestion: Flink 1.4.0 will be able to resume savepoints taken with 
> version 1.3.x and 1.2.x, but not savepoints from version 1.1.x and 1.0.x
> 
> 
> The reason for that is that there is a lot of code mapping between the 
> completely different legacy format (1.1.x, not re-scalable) and the 
> key-group-oriented format (1.2.x onwards, re-scalable). It would greatly help 
> the development of state and checkpointing features to drop that old code.
> 
> Please let us know if you have concerns about that.
> 
> Best,
> Stephan
> 
> 



Re: About KafkaConsumerBase

2017-08-02 Thread aitozi

Hi,

 thanks,you explained clearly!



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-KafkaConsumerBase-tp14601p14621.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.