Re: About KafkaConsumerBase

2017-08-01 Thread Tzu-Li (Gordon) Tai
Hi,

it maintain itself a individual instance of 
FlinkKafkaConsumerBase, and it contains a individual pendingOffsetsToCommit 
, am right ? 
That is correct! The FlinkKafkaConsumerBase is code executed for each parallel 
subtask instance, and therefore have their own pendingOffsetsToCommit which 
would not be manipulated / accessed concurrently.

The only places where that map is accessed is in the snapshotState and 
notifyCheckpointComplete method, which I think is guaranteed to not be 
concurrently called.

Cheers,
Gordon


On 2 August 2017 at 1:02:57 PM, aitozi (gjying1...@gmail.com) wrote:


Hi,Piotr Nowojski  

i think you are right, but i think it is executed in parallel, but in  
each parallel , it maintain itself a individual instance of  
FlinkKafkaConsumerBase, and it contains a individual pendingOffsetsToCommit  
, am right ?  

thanks, aitozi  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-KafkaConsumerBase-tp14601p14619.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: About KafkaConsumerBase

2017-08-01 Thread aitozi

Hi,Piotr Nowojski

  i think you are right, but i think it is executed in parallel, but in
each parallel , it maintain itself a individual   instance of
FlinkKafkaConsumerBase, and it contains a individual pendingOffsetsToCommit
, am right ?

thanks, aitozi



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-KafkaConsumerBase-tp14601p14619.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Eventime window

2017-08-01 Thread Govindarajan Srinivasaraghavan
Hi,

I have few questions regarding event time windowing. My scenario is devices
from various timezones will send messages with timestamp and I need to
create a window per device for 10 seconds. The messages will mostly arrive
in order.

Here is my sample code to perform windowing and aggregating the messages
after the window to further process it.

streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("STREAM1",
new DeserializationSchema(),
kafkaConsumerProperties);

DataStream msgStream = streamEnv
.addSource(consumer)
.assignTimestampsAndWatermarks(new TimestampExtractor(Time.of(100,
TimeUnit.MILLISECONDS))); // TimestampExtractor implements
BoundedOutOfOrdernessTimestampExtractor

KeyedStream keyByStream = msgStream.keyBy(new
CustomKeySelector());

WindowedStream windowedStream =

keyByStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)));

SingleOutputStreamOperator aggregatedStream =
windowedStream.apply(new AggregateEntries());

My questions are

- In the above code, data gets passed till the window function but even
after window time the data is not received in the apply function. Do I have
to supply a custom evictor or trigger?

- Since the data is being received from multiple timezones and each device
will have some time difference, would it be ok to assign the timestamp as
that of received timestamp in the message at source (kafka). Will there be
any issues with this?

- Are there any limitations on the number of time windows that can be
created at any given time? In my scenario if there are 1 million devices
there will be 1 million tumbling windows.

Thanks,
Govind


Re: Using FileInputFormat - org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit

2017-08-01 Thread Mohit Anchlia
This was user induced problem - me. I wasn't calling streamenv.execute() :(

On Tue, Aug 1, 2017 at 1:29 PM, Mohit Anchlia 
wrote:

> This doesn't work even with TextInputFormat. Not sure what's wrong.
>
> On Tue, Aug 1, 2017 at 9:53 AM, Mohit Anchlia 
> wrote:
>
>> I don't see the print output.
>>
>> On Tue, Aug 1, 2017 at 2:08 AM, Fabian Hueske  wrote:
>>
>>> Hi Mohit,
>>>
>>> these are just INFO log statements that do not necessarily indicate a
>>> problem.
>>> Is the program working otherwise or do you observe other problems?
>>>
>>> Best, Fabian
>>>
>>> 2017-08-01 0:32 GMT+02:00 Mohit Anchlia :
>>>
 I even tried existing format but still same error:

 FileInputFormat fileInputFormat = *new* TextInputFormat(*new*
 Path(args[0]));

 fileInputFormat.setNestedFileEnumeration(*true*);

 streamEnv.readFile(fileInputFormat, args[0],

 FileProcessingMode.*PROCESS_CONTINUOUSLY*, 1L).print();


 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
 org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit
 does not contain a setter for field modificationTime
 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - c

 On Mon, Jul 31, 2017 at 1:07 PM, Mohit Anchlia 
 wrote:

>  In trying to use this code I get the following error. Is it asking me
> to implement additional interface?
>
> streamEnv.readFile(format, args[0], FileProcessingMode.
> *PROCESS_CONTINUOUSLY*, 2000).print();
>
>
> [main] INFO com.s.flink.example.PDFInputFormat - Start streaming
> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit
> does not contain a setter for field modificationTime
> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit
> is not a valid POJO type because not all fields are valid POJO fields.
>


>>>
>>
>


Re: Using FileInputFormat - org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit

2017-08-01 Thread Mohit Anchlia
This doesn't work even with TextInputFormat. Not sure what's wrong.

On Tue, Aug 1, 2017 at 9:53 AM, Mohit Anchlia 
wrote:

> I don't see the print output.
>
> On Tue, Aug 1, 2017 at 2:08 AM, Fabian Hueske  wrote:
>
>> Hi Mohit,
>>
>> these are just INFO log statements that do not necessarily indicate a
>> problem.
>> Is the program working otherwise or do you observe other problems?
>>
>> Best, Fabian
>>
>> 2017-08-01 0:32 GMT+02:00 Mohit Anchlia :
>>
>>> I even tried existing format but still same error:
>>>
>>> FileInputFormat fileInputFormat = *new* TextInputFormat(*new*
>>> Path(args[0]));
>>>
>>> fileInputFormat.setNestedFileEnumeration(*true*);
>>>
>>> streamEnv.readFile(fileInputFormat, args[0],
>>>
>>> FileProcessingMode.*PROCESS_CONTINUOUSLY*, 1L).print();
>>>
>>>
>>> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
>>> org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit
>>> does not contain a setter for field modificationTime
>>> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - c
>>>
>>> On Mon, Jul 31, 2017 at 1:07 PM, Mohit Anchlia 
>>> wrote:
>>>
  In trying to use this code I get the following error. Is it asking me
 to implement additional interface?

 streamEnv.readFile(format, args[0], FileProcessingMode.
 *PROCESS_CONTINUOUSLY*, 2000).print();


 [main] INFO com.s.flink.example.PDFInputFormat - Start streaming
 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
 org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit
 does not contain a setter for field modificationTime
 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
 org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit
 is not a valid POJO type because not all fields are valid POJO fields.

>>>
>>>
>>
>


S3 Write Execption

2017-08-01 Thread Aneesha Kaushal
Hello, 

I am using flink 1.2 and writing records to S3 using rolling sink.  

I am encountering this S3 write error quite frequently :

TimerException{com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 
404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error Code: 
null, AWS Error Message: Not Found, S3 Extended Request ID: 
JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr}
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, 
AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error Code: null, 
AWS Error Message: Not Found, S3 Extended Request ID: 
JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr
at 
com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at 
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at 
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
at 
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java:1088)
at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:521)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:563)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218)
... 7 more

I am unable to find the cause of this error. Also, I have the following 
questions regarding this error : 

1) Do we loose any data or flink will go to last checkpoint and write again?
2) how can we prevent this error?

Thanks,
Aneesha




Re: Odd flink behaviour

2017-08-01 Thread Mohit Anchlia
Thanks that worked. However, what I don't understand is wouldn't the open
call that I am inheriting have this logic already inbuilt? I am inheriting
FileInputFormat.

On Tue, Aug 1, 2017 at 1:42 AM, Fabian Hueske  wrote:

> An InputFormat processes multiple InputSplits. open() is called for each
> InputSplit.
> If you don't reset reached to false in open() you will only read a single
> (i.e., the first) InputSplit and skip all others.
>
> I'd override open as follows:
>
> public void open(FileInputSplit fileSplit) throws IOException {
>   super.open();
>   reached = false;
> }
>
> Cheers, Fabian
>
>
> 2017-08-01 8:08 GMT+02:00 Mohit Anchlia :
>
>> I didn't override open. I am using open that got inherited from
>> FileInputFormat . Am I supposed to specifically override open?
>>
>> On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske  wrote:
>>
>>> Do you set reached to false in open()?
>>>
>>>
>>> Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" >> >:
>>>
>>> And here is the inputformat code:
>>>
>>> public class PDFFileInputFormat extends FileInputFormat {
>>>  /**
>>>   *
>>>   */
>>>  private static final long serialVersionUID = -4137283038479003711L;
>>>  private static final Logger logger = LoggerFactory
>>>.getLogger(PDFInputFormat.class.getName());
>>>  private boolean reached = false;
>>>  @Override
>>>  public boolean reachedEnd() throws IOException {
>>>   logger.info("called reached " + reached);
>>>   // TODO Auto-generated method stub
>>>   return reached;
>>>  }
>>>  @Override
>>>  public String nextRecord(String reuse) throws IOException {
>>>   logger.info("This is where you parse PDF");
>>>   String content = new String(
>>> Files.readAllBytes(Paths.get(this.currentSplit.getPath()
>>> .getPath(;
>>>   logger.info("Content " + content);
>>>   reached = true;
>>>   return content;
>>>  }
>>> }
>>>
>>> On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia 
>>> wrote:
>>>
 I have a very simple program that just reads all the files in the path.
 However, flink is not working as expected.

 Everytime I execute this job I only see flink reading 2 files, even
 though there are more in that directory. On closer look it appears that it
 might be related to:

 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2
 task slot(s).

 My question is, isn't flink supposed to iterate over the directory
 after those 2 slots become free again? I am assuming this problem is caused
 because there are only 2 slots.


 Code ---

   PDFFileInputFormat format = new PDFFileInputFormat();
   format.setFilePath(args[0]);
   format.setNestedFileEnumeration(true);
   logger.info("Number of splits " + format.getNumSplits());

   // logger.info(Paths.get(".").toAbsolutePath().normalize().toSt
 ring());

   env.createInput(format, TypeInformation.of(StringValue
 .class)).print();

>>>
>>>
>>>
>>
>


Proper way to establish bucket counts

2017-08-01 Thread Robert Rapplean
I want a count of events that are put into a bucketing sink, but can't find
a ready-made way of doing that. Is there an easier way than to implement a
counter for each bucket via the metrics? If metrics counters is the easy
way, what do I do to make sure that I don't have a memory leak from expired
counters?

Thanks,

Robert


Re: Using FileInputFormat - org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit

2017-08-01 Thread Mohit Anchlia
I don't see the print output.

On Tue, Aug 1, 2017 at 2:08 AM, Fabian Hueske  wrote:

> Hi Mohit,
>
> these are just INFO log statements that do not necessarily indicate a
> problem.
> Is the program working otherwise or do you observe other problems?
>
> Best, Fabian
>
> 2017-08-01 0:32 GMT+02:00 Mohit Anchlia :
>
>> I even tried existing format but still same error:
>>
>> FileInputFormat fileInputFormat = *new* TextInputFormat(*new*
>> Path(args[0]));
>>
>> fileInputFormat.setNestedFileEnumeration(*true*);
>>
>> streamEnv.readFile(fileInputFormat, args[0],
>>
>> FileProcessingMode.*PROCESS_CONTINUOUSLY*, 1L).print();
>>
>>
>> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
>> org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit
>> does not contain a setter for field modificationTime
>> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - c
>>
>> On Mon, Jul 31, 2017 at 1:07 PM, Mohit Anchlia 
>> wrote:
>>
>>>  In trying to use this code I get the following error. Is it asking me
>>> to implement additional interface?
>>>
>>> streamEnv.readFile(format, args[0], FileProcessingMode.
>>> *PROCESS_CONTINUOUSLY*, 2000).print();
>>>
>>>
>>> [main] INFO com.s.flink.example.PDFInputFormat - Start streaming
>>> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
>>> org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit
>>> does not contain a setter for field modificationTime
>>> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
>>> org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit
>>> is not a valid POJO type because not all fields are valid POJO fields.
>>>
>>
>>
>


Re: Azkaban Job Type Plugin for Flink

2017-08-01 Thread Aljoscha Krettek
Hi Yann,

I'm not aware of anyone that has started work on this. I'm quite interested in 
how this turns out for you.

Best,
Aljoscha

> On 20. Jul 2017, at 14:49, Yann Pauly  wrote:
> 
> Hi all,
> 
> We want to integrate our Flink instances with our Azkaban scheduler.
> For that we will have to create a custom job type plugin for Azkaban.
> Has anyone already started creating something like that ? If not... I guess 
> we will try to !
> 
> 
> Best,
> 
> Yann
> 



Re: Is that possible for flink to dynamically read and change configuration?

2017-08-01 Thread Aljoscha Krettek
Just some clarification: Flink state is never shared between different parallel 
operator instances. If you want to make those changes available to all parallel 
instances of the operation you have to broadcast the control stream, i.e. 
control.broadcast().

Best,
Aljoscha

> On 24. Jul 2017, at 17:20, Chesnay Schepler  wrote:
> 
> So I don't know why it doesn't work (it should, afaik), but as a workaround 
> you could maintain
> an ArrayList or similar in your function, and only add/read elements from the 
> ListState in snapshot/initialize state.
> 
> On 24.07.2017 17:10, ZalaCheung wrote:
>> Hi all,
>> 
>> Does anyone have idea about the non-keyed managed state problem below?
>> I think all the function in the testFunc class should share the ListState 
>> “metrics”. But after I add element to ListState at flatMap2 function, I 
>> cannot retrieve the element added to ListState.
>> 
>> 
>> Desheng Zhang
>> 
>> 
>>> On Jul 24, 2017, at 22:06, ZalaCheung >> > wrote:
>>> 
>>> Hi Chesnay,
>>> 
>>> Thank you very much. Now I tried to ignore the default value of ListState 
>>> and Try to use the CoFlatmap function with managed state. But what 
>>> surprised me is that it seems the state was not shared by two streams.
>>> 
>>> My test code is shown below.
>>> 
>>> DataStream result = stream
>>> .connect(control)
>>> .flatMap(new testFunc());
>>> 
>>> public static class testFunc implements 
>>> CoFlatMapFunction,CheckpointedFunction{
>>> 
>>> private ListState metrics;
>>> 
>>> @Override
>>> public void snapshotState(FunctionSnapshotContext 
>>> functionSnapshotContext) throws Exception {
>>> 
>>> }
>>> 
>>> @Override
>>> public void initializeState(FunctionInitializationContext 
>>> functionInitializationContext) throws Exception {
>>> ListStateDescriptor metricsStateDescriptor =
>>> new ListStateDescriptor<>(
>>> "metrics",
>>> TypeInformation.of(new TypeHint() {}));
>>> metrics = 
>>> functionInitializationContext.getOperatorStateStore().getListState(metricsStateDescriptor);
>>> 
>>> }
>>> 
>>> @Override
>>> public void flatMap1(String s, Collector collector) throws 
>>> Exception {
>>> String myMetrics = null;
>>> for(String element:metrics.get()){
>>> logger.info("element in metric: " + s);
>>> myMetrics = element;
>>> }
>>> if(myMetrics == null){
>>> logger.info("Not initialized");
>>> }else {
>>> logger.info("initialized: " + myMetrics);
>>> }
>>> 
>>> }
>>> 
>>> @Override
>>> public void flatMap2(String s, Collector collector) throws 
>>> Exception {
>>> metrics.clear();
>>> metrics.add(s);
>>> 
>>> for(String element:metrics.get()){
>>> logger.info("element in metric: " + element);
>>> 
>>> }
>>> 
>>> }
>>> }
>>> 
>>> I connected two streams(stream and control) and use CoflatmapFunction on 
>>> them. For control stream, I send a string and print the right log:
>>> - element in metric: heyjude
>>> Then I send another string to the first stream. 
>>> But the log prints:
>>> - Not initialized
>>> 
>>> I am confused. I successfully receive msg for stream control and add the 
>>> string to ListState. But when I tried to retrieve ListState and flatMap1, I 
>>> got nothing.
>>> 
>>> Thanks.
>>> Desheng Zhang
>>> 
>>> 
>>> 
 On Jul 24, 2017, at 21:01, Chesnay Schepler >>> > wrote:
 
 Hello,
 
 That's an error in the documentation, only the ValueStateDescriptor has a 
 defaultValue constructor argument.
 
 Regards,
 Chesnay
 
 On 24.07.2017 14:56, ZalaCheung wrote:
> Hi Martin,
> 
> Thanks for your advice. That’s really helpful. I am using the push 
> scenario. I am now having some trouble because of the state I want to 
> maintain. For me, the simplest way is to maintain to ValueState in a 
> CoFlatMapFunction(Actually RichCoFlatMapFunction). But the rich function 
> can only be used on Keyed Stream. And for a connected stream, at least 
> for my scenario, I should not use KeyBy() method(Actually it seems not 
> allowed to use KeyBy() function on connected stream ).
> 
> Thus instead of using Rich function for Keyed Managed State, I tried to 
> use CheckpointedFunction for my non-keyed state. However, in 
> CheckpointedFunction, I can only use ListState, which only has add() and 
> Iterator method. I am not sure whether I can just replace the element in 
> the ListState. What exactly make me stuck is that I cannot initialize my 
> ListState with ListStateDescriptor. It says there is no constructor for 
> initialization value. I   actually saw that on 
> official document.
> 
> https://ci.apache.org/projects/fl

Re: Flink CLI cannot submit job to Flink on Mesos

2017-08-01 Thread Stephan Ewen
Cool, good to hear!

It is one of those "it a feature, not a bug" situations ;-)

Flink's HA mode supports multiple masters, so the CLI needs to have a way
to find which master is "leader" (active, versus the passive masters on
standby). That discovery goes through ZooKeeper as well (which is the
ground truth for who is the leader).

Stephan


On Tue, Aug 1, 2017 at 11:36 AM, Francisco Gonzalez Barea <
francisco.gonza...@piksel.com> wrote:

> Hey! It´s working now!!
>
> I will do a summary for those who might have the same problem in the
> future:
>
> - *Flink 1.3.0 dockerized on Mesos:*
> - Add the HA configuration values in your flink app:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/jobmanager_high_availability.html#config-
> file-flink-confyaml
> - Add the Mesos HA configuration values in your flink app:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/mesos.html#high-availability
>
> -* Flink CLI 1.3.0 on my local machine* (make sure you *use the same
> version*!!)
> - Add same HA configuration values in your flink CLI configuration:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> jobmanager_high_availability.html#config-file-flink-confyaml
>
>
> With those steps, my ./fink run command it´s working like a charm.
>
> Thank you very much guys!
>
> Regards,
> Francisco
>
>
>
> 
>
> 
> 
>
> On 1 Aug 2017, at 10:24, Francisco Gonzalez Barea <
> francisco.gonza...@piksel.com> wrote:
>
> Hi Stephan,
>
> So, do you mean to remove the “-m” param from the flink CLI call? And on
> the other hand, that I should add the Zookeeper configuration in both
> sides, the remote flink and locally in the flink CLI config, right?
>
> Regards
>
>
> On 31 Jul 2017, at 22:21, Stephan Ewen  wrote:
>
> Hi Francisco!
>
> Can you drop the explicit address of the jobmanager? The client should
> pick up that address automatically from ZooKeeper as well (together with
> the HA leader session ID).
>
> Please check if you have the ZooKeeper HA config entries in the config
> used by the CLI.
>
> Stephan
>
>
> On Mon, Jul 31, 2017 at 6:27 PM, Francisco Gonzalez Barea <
> francisco.gonza...@piksel.com> wrote:
>
>> Hi again,
>>
>> On the other hand, we are running the following flink CLI command:
>>
>> ./flink run -d -m ${jobmanager.rpc.address}:${jobmanager.rpc.port}
>>  ${our-program-jar} ${our-program-params}
>>
>> Maybe is the command what we are using wrongly?
>>
>> Thank you
>>
>> On 28 Jul 2017, at 11:07, Till Rohrmann  wrote:
>>
>> Hi Francisco,
>>
>> have you set the right high-availability configuration options in your
>> client configuration as described here [1]? If not, then Flink is not able
>> to find the correct JobManager because it retrieves the address as well as
>> a fencing token (called leader session id) from the HA store (ZooKeeper).
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-
>> 1.3/setup/mesos.html#high-availability
>>
>> Cheers,
>> Till
>>
>> On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea <
>> francisco.gonza...@piksel.com> wrote:
>>
>>> Hello,
>>>
>>> We´re having lot of issues while trying to submit a job remotely using
>>> the Flink CLI command line tool. We have tried different configurations but
>>> in all of them we get errors from AKKA while trying to connect. I will try
>>> to summarise the configurations we´ve tried.
>>>
>>> - Flink 1.3.0 deployed within a docker container on a Mesos cluster
>>> (using Marathon)
>>> - This flink has the property jobmanager.rpc.address as a hostname (i.e.
>>> kind of ip-X.eu .west-1.comp
>>> ute.internal)
>>> - Use the same version for Flink Client remotely (e.g. in my laptop).
>>>
>>> When I try to submit the job using the command flink run -m
>>> myHostName:myPort (the same in jobmanager.rpc.address and
>>> jobmanager.rpc.port) after some time waiting I get the trace at the end of
>>> this email. In the flink side we get this error from AKKA:
>>>
>>> Association with remote system [akka.tcp://flink@10.203.23.24:24469]
>>> has failed, address is now gated for [5000] ms. Reason: [Association failed
>>> with [akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection
>>> refused: /10.203.23.24:24469]
>>>
>>> After reading a bit, it seems there´re some problems related to akka
>>> resolving hostnames to ips, so we decided to startup the same flink but
>>> changing jobmanager.rpc.address to have the direct ip (i.e. kind of
>>> XX.XXX.XX.XX). In this case I´m getting same trace (at the end of the
>>> email) from the client side and this one from the Flink server

Re: Split Streams not working

2017-08-01 Thread Aljoscha Krettek
Hi,

In your original program, the problem is that there is both an ERROR and 
SUCCESS event in your List. Thus you add both "success" and "error" to the list 
of split outputs. To discern between those different types you first have to 
flatten that DataStream> into a DataStream using a flatMap().

Or, as Kien suggested, you use side outputs, which are the better alternative.

Best,
Aljoscha
 
> On 25. Jul 2017, at 03:02, Kien Truong  wrote:
> 
> Hi,
> 
> I meant adding a select function between the two consecutive select.
> 
> Or if you use Flink 1.3, you can use the new side output functionality.
> 
> Regards,
> 
> Kien
> 
> 
> On 7/25/2017 7:54 AM, Kien Truong wrote:
>> Hi,
>> 
>> I think you're hitting this bug
>> 
>> https://issues.apache.org/jira/browse/FLINK-5031
>> 
>> Try the workaround mentioned in a bug: add a map function between map and 
>> select
>> 
>> Regards,
>> Kien
>> 
>> On 7/25/2017 3:14 AM, smandrell wrote:
>>> Basically, we are not splitting the streams correctly because when we try to
>>> select the stream we want from our splitStream (using the select()
>>> operation), it never returns a DataStream with just ERROR_EVENT's or a
>>> DataStream with just SUCCESS_EVENT's. Instead it returns a DataStream with
>>> both ERROR_EVENT's and SUCCESS_EVENT's.
>>> 
>>> 
>>> 
>>> I am receiving data by doing the following:
>>> 
>>> return env.fromElements(SUCCESS_EVENT_JSON, SUCCESS_AND_ERROR_EVENT_JSON);
>>> 
>>> SUCCESS_EVENT_JSON will generate one success event once it is sent through
>>> our parser. This is not the concern.
>>> 
>>> The concern is the SUCCESS_AND_ERROR_EVENT_JSON. SUCCESS_AND_ERROR_EVENT
>>> will generate 3 events once it is sent through our parser: 1 success event
>>> and 2 error events.
>>> 
>>> 
>>> To discern between success events and error events in a given stream, we use
>>> the following splitting logic:
>>> 
>>> 
>>>  
>>> 
>>> This splitting logic works fine when dealing with the stream generated from
>>> our parser on the SUCCESS_EVENT_JSON because there is only one event at play
>>> here: the success event.
>>> 
>>> However, the splitting logic does not correctly split the stream generated
>>> from sending SUCCESS_AND_ERROR_EVENT_JSON through our parser.
>>> 
>>> For some context: when sending SUCCESS_AND_ERROR_EVENT_JSON through our
>>> parser, the parser returns a DataStream> in the
>>> following form [ERROR_EVENT, ERROR_EVENT, SUCCESS_EVENT].
>>> 
>>> As you can see from the above code, we try to separate the ERROR_EVENT's
>>> from the SUCCESS_EVENT by doing output.add("success") or output.add("error")
>>> but when when we attempt to select the events in our
>>> SplitStream[ERROR_EVENT, ERROR_EVENT, SUCCESS_EVENT] with
>>> splitStream.select("success") and splitStream.select("error"), the different
>>> events are not separated and both select() operations
>>> (splitStream.select("success") & splitStream.select("error")) return two
>>> DataStream[ERROR_EVENT, ERROR_EVENT, SUCCESS_EVENT]'s and not one
>>> DataStream[ERROR_EVENT, ERROR_EVENT] and one DataStream[SUCCESS_EVENT].
>>> 
>>> My suspicion for this bug is that we are attempting to split a
>>> DataStream> instead of a DataStream,
>>> but I cannot find a workaround for DataStream>.
>>> 
>>> Thanks!!
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> -- 
>>> View this message in context: 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Split-Streams-not-working-tp14418.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>>> at Nabble.com.
>> 
> 



Re: Class not found when deserializing

2017-08-01 Thread Aljoscha Krettek
Hi,

I think the problem here is that the SerializationUtils don't use the correct 
ClassLoader when deserialising. When running a Flink job on a cluster the user 
code (from the user-code jar) is only available in the user-code ClassLoader. 
If you can pass a ClassLoader to your deserialize method you can try passing 
the result of Thread.currentThread().getContextClassLoader().

Best,
Aljoscha

> On 27. Jul 2017, at 09:52, Fabian Hueske  wrote:
> 
> Hi Paolo,
> 
> do you get the ClassNotFoundException for TheGlobalModel or for another class?
> Did you maybe forget to include SerializationUtils in the classpath?
> 
> Best, Fabian
> 
> 2017-07-26 16:14 GMT+02:00 Paolo Cristofanelli 
> :
> Hi, 
> 
> I am trying to write and read in a Kafka topic a user-defined class (that 
> implements serializable, and all the fields are serializable). Everything 
> works fine when I am executing the program in the IDE or with the mvn exec 
> command.
> When I try to execute the program in standalone mode I get the 
> ClassNotFoundException.
> 
> More specifically I get the exception only during the deserialization parts :
> 
>   @Override
>   public TheGlobalModel deserialize(byte[] message) throws 
> IOException {
>   
>   outlierDetection.network.TheGlobalModel model;
> 
>   model = (outlierDetection.network.TheGlobalModel) 
> SerializationUtils.deserialize(message);
>   
>   return model;
>   
>   }
> 
> It seems that the problem lies in the deserialize method. If I remove it and 
> simply return "new TheGlobalModel()" the exception is not thrown. I don´t 
> understand why in this case the program seems to be aware of the existence of 
> the class, I guess the problem is in the deserialize function.
> 
> I only know this method for sending a serializable class through Kafka, I 
> would be glad to hear other ways. 
> 
> Thanks in advance for your time.
> Cheers 
> Paolo
> 



Re: Flink QueryableState with Sliding Window on RocksDB

2017-08-01 Thread Biplob Biswas
Hi Fabian,

I am not really sure using CoProcessFunction would be useful for my use
case. My use case, in short, can be explained as follows:

1) create 2 different local state store, where both have 1-N relationship.
For eg.  1 -> [A,B,C] and A -> [1,2,3]
2) Based on the key A, get list of elements [1,2,3] and then iterate over
this list and based on the keys 1,2,3 query the second store to get the list
of elements. 
3) Do this till a depth of 1
4) Now based on Key A gain perform merge operations and emit the merged
output. 

So, I can't imagine having 2 keyed state together when I need to query them
randomly and not just on the key of one store. That's why we need 2
queryable state which can be queried in the next operator together. 

That's why I am not very optimistic about the CoProcessFunction for my case.
Maybe I am wrong and I have missed something, so any insights would be
useful! 

Regards
Biplob




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-QueryableState-with-Sliding-Window-on-RocksDB-tp14514p14606.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: SQL API with Table API

2017-08-01 Thread nragon
"No, those are two different queries. "

This is enough. The second part does not applies since i'm calculating
EventTime from table source.

Thanks



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-API-with-Table-API-tp14599p14605.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: SQL API with Table API

2017-08-01 Thread Fabian Hueske
No, those are two different queries.

The second query would also not work because the revenue table does not
have the EventTime attribute.

Best, Fabian

2017-08-01 13:03 GMT+02:00 nragon :

> Hi,
>
> Can i expect the output from this:
>
> Table revenue = tableEnv.sql(
> "SELECT TUMBLE_START(EventTime, INTERVAL '30' MINUTE) as tStart, " +
> "TUMBLE_END(EventTime, INTERVAL '30' MINUTE) as
> tEnd, " +
> "cID, " +
> "cName, " +
> "SUM(revenue) AS revSum " +
> "FROM Orders " +
> "WHERE cCountry = 'FRANCE' " +
> "GROUP BY TUMBLE(EventTime, INTERVAL '30' MINUTE), cID, cName"
>   );
>
> being the same as this:
>
> Table revenue = tableEnv.sql(
> "SELECT cID, cName, SUM(revenue) AS revSum " +
> "FROM Orders " +
> "WHERE cCountry = 'FRANCE' " +
> "GROUP BY cID, cName"
>   );
>
> revenue
> .select("*, e.start, e.end")
> .window(Tumble.over("30.minute").on("EventTime").as("e"))
> .groupBy("e")
>
>
> Or the second involves one query as nested of tumble query?
>
> Thanks
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/SQL-API-with-
> Table-API-tp14599.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: About KafkaConsumerBase

2017-08-01 Thread Piotr Nowojski
Hi,

pendingOffsetsToCommit is a private field which is not accessed from outside of 
the FlinkKafkaConsumerBase class. It is only used in state manipulation 
methods, which are not executed in parallel.

Thanks, Piotrek


> On Aug 1, 2017, at 1:20 PM, aitozi  wrote:
> 
> Hello:
> 
>i am new to Flink, ijust read the source code . i am doubt that , why in
> FlinkKafkaConsumerBase.java (version1.2),  like method :
> notifyCheckpointComplete  may change the pendingOffsetsToCommit in parallel
> , why dont need to be surrouned with "synchronized"  
> 
> thanks 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-KafkaConsumerBase-tp14601.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Invalid path exception

2017-08-01 Thread Chesnay Schepler

Let's move the discussions to FLINK-7330.

On 01.08.2017 13:15, Chesnay Schepler wrote:
One problem i know of is that windows paths with a scheme are not 
detected as windows paths, as documented in FLINK-6889.

They generally still work though (/maybe /by chance).

I just verified that calling FileInputFormat#setFilePath() works for 
both "file:///" and "file:/" on Windows.
(I'm assuming that we're talking about the FileInputFormat, if I'm 
wrong please correct me)


@Mohit Could you provide the full stacktrace or a small self-contained 
example to reproduce the issue?


On 31.07.2017 22:19, Stephan Ewen wrote:

Hmm, looks like a bug then... Could you open a JIRA issue for that?

@Chesnay are you aware of Path issues on Windows?

On Mon, Jul 31, 2017 at 8:01 PM, Mohit Anchlia 
mailto:mohitanch...@gmail.com>> wrote:


I tried that as well but same result

format.setFilePath("file:/c:/proj/test/a.txt.txt");


Caused by: _java.nio.file.InvalidPathException_: Illegal char <:>
at index 2: /c:/proj/test/a.txt.txt





On Mon, Jul 31, 2017 at 6:04 AM, Stephan Ewen mailto:se...@apache.org>> wrote:

I think that on Windows, you need to use "file:/c:/proj/..."
with just one slash after the scheme.



On Mon, Jul 31, 2017 at 1:24 AM, Mohit Anchlia
mailto:mohitanch...@gmail.com>> wrote:

This is what I tired and it doesn't work. Is this a bug?

format.setFilePath("file:///c:/proj/test/a.txt.txt");


On Sun, Jul 30, 2017 at 2:10 PM, Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Did the path by chance start with file://C:/... ?

If so, please try file:///C: ...


On 30.07.2017 22:28, Mohit Anchlia wrote:

I am using flink 1.3.1 and getting this exception.
Is there a workaround?

Caused by: _java.nio.file.InvalidPathException_:
Illegal char <:> at index 2:
/C:/Users/m/default/flink-example/pom.xml

at sun.nio.fs.WindowsPathParser.normalize(Unknown
Source)

at sun.nio.fs.WindowsPathParser.parse(Unknown Source)

at sun.nio.fs.WindowsPathParser.parse(Unknown Source)













About KafkaConsumerBase

2017-08-01 Thread aitozi
Hello:

i am new to Flink, ijust read the source code . i am doubt that , why in
FlinkKafkaConsumerBase.java (version1.2),  like method :
notifyCheckpointComplete  may change the pendingOffsetsToCommit in parallel
, why dont need to be surrouned with "synchronized"  

thanks 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-KafkaConsumerBase-tp14601.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.



Re: Invalid path exception

2017-08-01 Thread Chesnay Schepler
One problem i know of is that windows paths with a scheme are not 
detected as windows paths, as documented in FLINK-6889.

They generally still work though (/maybe /by chance).

I just verified that calling FileInputFormat#setFilePath() works for 
both "file:///" and "file:/" on Windows.
(I'm assuming that we're talking about the FileInputFormat, if I'm wrong 
please correct me)


@Mohit Could you provide the full stacktrace or a small self-contained 
example to reproduce the issue?


On 31.07.2017 22:19, Stephan Ewen wrote:

Hmm, looks like a bug then... Could you open a JIRA issue for that?

@Chesnay are you aware of Path issues on Windows?

On Mon, Jul 31, 2017 at 8:01 PM, Mohit Anchlia > wrote:


I tried that as well but same result

format.setFilePath("file:/c:/proj/test/a.txt.txt");


Caused by: _java.nio.file.InvalidPathException_: Illegal char <:>
at index 2: /c:/proj/test/a.txt.txt





On Mon, Jul 31, 2017 at 6:04 AM, Stephan Ewen mailto:se...@apache.org>> wrote:

I think that on Windows, you need to use "file:/c:/proj/..."
with just one slash after the scheme.



On Mon, Jul 31, 2017 at 1:24 AM, Mohit Anchlia
mailto:mohitanch...@gmail.com>> wrote:

This is what I tired and it doesn't work. Is this a bug?

format.setFilePath("file:///c:/proj/test/a.txt.txt");


On Sun, Jul 30, 2017 at 2:10 PM, Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Did the path by chance start with file://C:/... ?

If so, please try file:///C: ...


On 30.07.2017 22:28, Mohit Anchlia wrote:

I am using flink 1.3.1 and getting this exception. Is
there a workaround?

Caused by: _java.nio.file.InvalidPathException_:
Illegal char <:> at index 2:
/C:/Users/m/default/flink-example/pom.xml

at sun.nio.fs.WindowsPathParser.normalize(Unknown Source)

at sun.nio.fs.WindowsPathParser.parse(Unknown Source)

at sun.nio.fs.WindowsPathParser.parse(Unknown Source)











SQL API with Table API

2017-08-01 Thread nragon
Hi,

Can i expect the output from this:

Table revenue = tableEnv.sql(
"SELECT TUMBLE_START(EventTime, INTERVAL '30' MINUTE) as tStart, " +
"TUMBLE_END(EventTime, INTERVAL '30' MINUTE) as tEnd, " 
+
"cID, " +
"cName, " +
"SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY TUMBLE(EventTime, INTERVAL '30' MINUTE), cID, cName"
  );

being the same as this:

Table revenue = tableEnv.sql(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
  );
  
revenue
.select("*, e.start, e.end")
.window(Tumble.over("30.minute").on("EventTime").as("e"))
.groupBy("e")


Or the second involves one query as nested of tumble query?

Thanks




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-API-with-Table-API-tp14599.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


multiple users per flink deployment

2017-08-01 Thread Georg Heiler
Hi,

flink currently only seems to support a single kerberos ticket for
deployment. Are there plans to support different users per each job?

regards,
Georg


Re: Flink CLI cannot submit job to Flink on Mesos

2017-08-01 Thread Francisco Gonzalez Barea
Hey! It´s working now!!

I will do a summary for those who might have the same problem in the future:

- Flink 1.3.0 dockerized on Mesos:
- Add the HA configuration values in your flink app: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#config-file-flink-confyaml
- Add the Mesos HA configuration values in your flink app: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html#high-availability

- Flink CLI 1.3.0 on my local machine (make sure you use the same version!!)
- Add same HA configuration values in your flink CLI configuration: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#config-file-flink-confyaml


With those steps, my ./fink run command it´s working like a charm.

Thank you very much guys!

Regards,
Francisco





On 1 Aug 2017, at 10:24, Francisco Gonzalez Barea 
mailto:francisco.gonza...@piksel.com>> wrote:

Hi Stephan,

So, do you mean to remove the “-m” param from the flink CLI call? And on the 
other hand, that I should add the Zookeeper configuration in both sides, the 
remote flink and locally in the flink CLI config, right?

Regards


On 31 Jul 2017, at 22:21, Stephan Ewen 
mailto:se...@apache.org>> wrote:

Hi Francisco!

Can you drop the explicit address of the jobmanager? The client should pick up 
that address automatically from ZooKeeper as well (together with the HA leader 
session ID).

Please check if you have the ZooKeeper HA config entries in the config used by 
the CLI.

Stephan


On Mon, Jul 31, 2017 at 6:27 PM, Francisco Gonzalez Barea 
mailto:francisco.gonza...@piksel.com>> wrote:
Hi again,

On the other hand, we are running the following flink CLI command:

./flink run -d -m ${jobmanager.rpc.address}:${jobmanager.rpc.port}  
${our-program-jar} ${our-program-params}

Maybe is the command what we are using wrongly?

Thank you

On 28 Jul 2017, at 11:07, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:

Hi Francisco,

have you set the right high-availability configuration options in your client 
configuration as described here [1]? If not, then Flink is not able to find the 
correct JobManager because it retrieves the address as well as a fencing token 
(called leader session id) from the HA store (ZooKeeper).

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html#high-availability

Cheers,
Till

On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea 
mailto:francisco.gonza...@piksel.com>> wrote:
Hello,

We´re having lot of issues while trying to submit a job remotely using the 
Flink CLI command line tool. We have tried different configurations but in all 
of them we get errors from AKKA while trying to connect. I will try to 
summarise the configurations we´ve tried.

- Flink 1.3.0 deployed within a docker container on a Mesos cluster (using 
Marathon)
- This flink has the property jobmanager.rpc.address as a hostname (i.e. kind 
of ip-X.eu.west-1.compute.internal)
- Use the same version for Flink Client remotely (e.g. in my laptop).

When I try to submit the job using the command flink run -m myHostName:myPort 
(the same in jobmanager.rpc.address and jobmanager.rpc.port) after some time 
waiting I get the trace at the end of this email. In the flink side we get this 
error from AKKA:

Association with remote system [akka.tcp://flink@10.203.23.24:24469] has 
failed, address is now gated for [5000] ms. Reason: [Association failed with 
[akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection refused: 
/10.203.23.24:24469]

After reading a bit, it seems there´re some problems related to akka resolving 
hostnames to ips, so we decided to startup the same flink but changing 
jobmanager.rpc.address to have the direct ip (i.e. kind of XX.XXX.XX.XX). In 
this case I´m getting same trace (at the end of the email) from the client side 
and this one from the Flink server:

Discard message 
LeaderSessionMessage(----,SubmitJob(JobGraph(jobId:
 b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader 
session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the received 
leader session ID ----.

We have tried some other stuff but without success… any clue that could help us?

Thanks in advance!

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: JobManager did not respond within 6 milliseconds
at 
org.apache.flink.client.program.ClusterClient.runDetached(C

Re: Access Sliding window

2017-08-01 Thread Fabian Hueske
The average would be computed over the aggregated 15-minute count values.
The sliding window would emit every 15 minutes the average of all records
that arrived within the last 6 hours.
Since the preceding 15-minute tumbling window emits 1 record every 15 mins,
this would be the avg over 24 records.

So, it would be running with a "granularity" of 15 minutes.

Best, Fabian

2017-08-01 4:48 GMT+02:00 Raj Kumar :

> Thanks Fabian. That helps.
>
> I have one more question. In the second step since I am using window
> function apply, The average calculated will be a running average or it will
> be computed at the end of 6hrs window ??
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Access-Sliding-
> window-tp14519p14584.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Using FileInputFormat - org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit

2017-08-01 Thread Fabian Hueske
Hi Mohit,

these are just INFO log statements that do not necessarily indicate a
problem.
Is the program working otherwise or do you observe other problems?

Best, Fabian

2017-08-01 0:32 GMT+02:00 Mohit Anchlia :

> I even tried existing format but still same error:
>
> FileInputFormat fileInputFormat = *new* TextInputFormat(*new*
> Path(args[0]));
>
> fileInputFormat.setNestedFileEnumeration(*true*);
>
> streamEnv.readFile(fileInputFormat, args[0],
>
> FileProcessingMode.*PROCESS_CONTINUOUSLY*, 1L).print();
>
>
> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit
> does not contain a setter for field modificationTime
> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - c
>
> On Mon, Jul 31, 2017 at 1:07 PM, Mohit Anchlia 
> wrote:
>
>>  In trying to use this code I get the following error. Is it asking me to
>> implement additional interface?
>>
>> streamEnv.readFile(format, args[0], FileProcessingMode.
>> *PROCESS_CONTINUOUSLY*, 2000).print();
>>
>>
>> [main] INFO com.s.flink.example.PDFInputFormat - Start streaming
>> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
>> org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit
>> does not contain a setter for field modificationTime
>> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
>> org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit
>> is not a valid POJO type because not all fields are valid POJO fields.
>>
>
>


Re: Does Flink support long term storage and complex queries

2017-08-01 Thread Fabian Hueske
Hi Basanth,

Flink is not a storage system (neither for stream nor for batch data).
Flink applications can be stateful and maintain very large state (in the
order of several TBs) but state is always associated with an application.
State can be queryable, so outside applications can run key-lookup (point)
queries against state but bulk reading of active state is not supported.

So it depends on your use case if Flink is a good fit.
Flink excels at processing streams and can emit the results to a variety of
different storage systems.

Best, Fabian

2017-08-01 2:17 GMT+02:00 Basanth Gowda :

> We are looking at Druid for long term storage and roll up metrics and
> adhoc queries on time series data.
>
> If Flink can support dynamic queries and rollups like Druid does, we would
> like to just use Flink and leave Druid aside. Does Flink support such kind
> of queries and what are the guidelines for data retention ?
>
> thank you
>


Re: Odd flink behaviour

2017-08-01 Thread Fabian Hueske
An InputFormat processes multiple InputSplits. open() is called for each
InputSplit.
If you don't reset reached to false in open() you will only read a single
(i.e., the first) InputSplit and skip all others.

I'd override open as follows:

public void open(FileInputSplit fileSplit) throws IOException {
  super.open();
  reached = false;
}

Cheers, Fabian

2017-08-01 8:08 GMT+02:00 Mohit Anchlia :

> I didn't override open. I am using open that got inherited from
> FileInputFormat . Am I supposed to specifically override open?
>
> On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske  wrote:
>
>> Do you set reached to false in open()?
>>
>>
>> Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" > >:
>>
>> And here is the inputformat code:
>>
>> public class PDFFileInputFormat extends FileInputFormat {
>>  /**
>>   *
>>   */
>>  private static final long serialVersionUID = -4137283038479003711L;
>>  private static final Logger logger = LoggerFactory
>>.getLogger(PDFInputFormat.class.getName());
>>  private boolean reached = false;
>>  @Override
>>  public boolean reachedEnd() throws IOException {
>>   logger.info("called reached " + reached);
>>   // TODO Auto-generated method stub
>>   return reached;
>>  }
>>  @Override
>>  public String nextRecord(String reuse) throws IOException {
>>   logger.info("This is where you parse PDF");
>>   String content = new String(
>> Files.readAllBytes(Paths.get(this.currentSplit.getPath()
>> .getPath(;
>>   logger.info("Content " + content);
>>   reached = true;
>>   return content;
>>  }
>> }
>>
>> On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia 
>> wrote:
>>
>>> I have a very simple program that just reads all the files in the path.
>>> However, flink is not working as expected.
>>>
>>> Everytime I execute this job I only see flink reading 2 files, even
>>> though there are more in that directory. On closer look it appears that it
>>> might be related to:
>>>
>>> [flink-akka.actor.default-dispatcher-3] INFO
>>> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2
>>> task slot(s).
>>>
>>> My question is, isn't flink supposed to iterate over the directory after
>>> those 2 slots become free again? I am assuming this problem is caused
>>> because there are only 2 slots.
>>>
>>>
>>> Code ---
>>>
>>>   PDFFileInputFormat format = new PDFFileInputFormat();
>>>   format.setFilePath(args[0]);
>>>   format.setNestedFileEnumeration(true);
>>>   logger.info("Number of splits " + format.getNumSplits());
>>>
>>>   // logger.info(Paths.get(".").toAbsolutePath().normalize().toSt
>>> ring());
>>>
>>>   env.createInput(format, TypeInformation.of(StringValue
>>> .class)).print();
>>>
>>
>>
>>
>


Re: KeyBy State

2017-08-01 Thread Fabian Hueske
Hi,

there is no built-in support for key changes.
You might be able to feedback a changed key with an iteration edge, but not
sure how well that works.

Best, Fabian

2017-08-01 7:32 GMT+02:00 Govindarajan Srinivasaraghavan <
govindragh...@gmail.com>:

> Hi,
>
> I have a keyby state but the key can change quite frequently for the same
> user and I need the previous keyBy state value for the user if there is a
> key change.
>
> Right now I'm using redis cache for the global state. Is there a way to
> achieve this within flink?
>


Re: Flatbuffers and Flink

2017-08-01 Thread Fabian Hueske
Hi,

in principle you can use any data type with Flink including byte[].
However, all of your functions need the logic to interpret the bytes and
you have to implement custom key extractors (if you need to keyBy or
partition your stream).

Best, Fabian

2017-08-01 2:09 GMT+02:00 Basanth Gowda :

> Hi,
> This is 1 of 3 questions I had for Flink. Didn't want to club all of them
> together, as this might be useful for some one else in the future.
>
> Do we have Flatbuffers support in Flink ? If there is no support, is there
> a way to implement it ?
>
> Trying to see if we could use the byte[] that has come from upstream,
> without converting it into POJO / other format.
>
>
> thank you
>


Re: Flink CLI cannot submit job to Flink on Mesos

2017-08-01 Thread Francisco Gonzalez Barea
Hi Stephan,

So, do you mean to remove the “-m” param from the flink CLI call? And on the 
other hand, that I should add the Zookeeper configuration in both sides, the 
remote flink and locally in the flink CLI config, right?

Regards


On 31 Jul 2017, at 22:21, Stephan Ewen 
mailto:se...@apache.org>> wrote:

Hi Francisco!

Can you drop the explicit address of the jobmanager? The client should pick up 
that address automatically from ZooKeeper as well (together with the HA leader 
session ID).

Please check if you have the ZooKeeper HA config entries in the config used by 
the CLI.

Stephan


On Mon, Jul 31, 2017 at 6:27 PM, Francisco Gonzalez Barea 
mailto:francisco.gonza...@piksel.com>> wrote:
Hi again,

On the other hand, we are running the following flink CLI command:

./flink run -d -m ${jobmanager.rpc.address}:${jobmanager.rpc.port}  
${our-program-jar} ${our-program-params}

Maybe is the command what we are using wrongly?

Thank you

On 28 Jul 2017, at 11:07, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:

Hi Francisco,

have you set the right high-availability configuration options in your client 
configuration as described here [1]? If not, then Flink is not able to find the 
correct JobManager because it retrieves the address as well as a fencing token 
(called leader session id) from the HA store (ZooKeeper).

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html#high-availability

Cheers,
Till

On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea 
mailto:francisco.gonza...@piksel.com>> wrote:
Hello,

We´re having lot of issues while trying to submit a job remotely using the 
Flink CLI command line tool. We have tried different configurations but in all 
of them we get errors from AKKA while trying to connect. I will try to 
summarise the configurations we´ve tried.

- Flink 1.3.0 deployed within a docker container on a Mesos cluster (using 
Marathon)
- This flink has the property jobmanager.rpc.address as a hostname (i.e. kind 
of ip-X.eu.west-1.compute.internal)
- Use the same version for Flink Client remotely (e.g. in my laptop).

When I try to submit the job using the command flink run -m myHostName:myPort 
(the same in jobmanager.rpc.address and jobmanager.rpc.port) after some time 
waiting I get the trace at the end of this email. In the flink side we get this 
error from AKKA:

Association with remote system [akka.tcp://flink@10.203.23.24:24469] has 
failed, address is now gated for [5000] ms. Reason: [Association failed with 
[akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection refused: 
/10.203.23.24:24469]

After reading a bit, it seems there´re some problems related to akka resolving 
hostnames to ips, so we decided to startup the same flink but changing 
jobmanager.rpc.address to have the direct ip (i.e. kind of XX.XXX.XX.XX). In 
this case I´m getting same trace (at the end of the email) from the client side 
and this one from the Flink server:

Discard message 
LeaderSessionMessage(----,SubmitJob(JobGraph(jobId:
 b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader 
session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the received 
leader session ID ----.

We have tried some other stuff but without success… any clue that could help us?

Thanks in advance!

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: JobManager did not respond within 6 milliseconds
at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:454)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:99)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at 
org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:345)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at 
javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did 
not respond within 60

Re: multiple streams with multiple actions - proper way?

2017-08-01 Thread Fabian Hueske
Hi Peter,

this kind of use case is supported, but it is best practice to split
independent pipelines into individual jobs.
One reason for that is to isolate failures and restarts.
For example, I would split the program you posted into two programs, one
for the "foo" topic and one of the "bar" topic. Depending on the complexity
of the operations, you might also want to split it further.

Best, Fabian




2017-07-29 18:51 GMT+02:00 Peter Ertl :

> Hello Flink People :-)
>
>
> I am trying to get my head around flink - is it a supported use case to
> register multiple streams with possibly more than one transformation /
> action per stream?
>
>
> def main(args: Array[String]): Unit = {
>
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>   val prop = new Properties()
>   prop.setProperty("bootstrap.servers", "vmi:9092")
>
>   // first stream
>   val ins = env.addSource(new FlinkKafkaConsumer010("foo", new 
> SimpleStringSchema(), prop))
> .map(s => "transformation-1: " + s)
>
>   ins.map(s => "transformation-2:" + s).print() // one action
>   ins.map(s => "transformation-3:" + s).print() // one more action
>   ins.map(s => "transformation-4:" + s).print() // another action on the same 
> stream
>
>   // second, different stream
>   val ins2 = env.addSource(new FlinkKafkaConsumer010("bar", new 
> SimpleStringSchema(), prop))
> .map(s => "transformation-5: " + s)
>
>   ins2.map(s => "transformation-7:" + s).print() // action
>   ins2.map(s => "transformation-6:" + s).print() // different action
>
>   env.execute("run all streams with multiple actions attached")
> }
>
>
>
> Is this program abusing flnk or is this just how you are supposed to do
> things?
>
> also, how many threads will this programm consume when running with
> parallelism = 4 ?
>
>
> Best regards
> Peter
>
>