Flik typesafe configuration

2017-11-29 Thread Georg Heiler
Starting out with flint from a scala background I would like to use the
Typesafe configuration like: https://github.com/pureconfig/pureconfig,
however,
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html
link
recommends to setup:

env.getConfig().setGlobalJobParameters(parameters);

which is not fully compatible with a case class - what is the recommended
approach here?

Best,
Georg


Maintain heavy hitters in Flink application

2017-11-29 Thread m@xi
Hello everyone!

I want to implement a streaming algorithm like Misa-Gries or Space Saving in
Flink. The goal is to maintain the heavy hitters for my (possibly unbounded)
input streams throughout all the time my app runs. More precisely, I want to
have a non-stop running task that runs the Space Saving algorithm and
updates a data structure that should be accessible by other tasks like map,
flatmap of my Flink application at ad-hoc times. Although I am not so sure
of how I can achieve the aforementioned goal.

First is it possible to have a structure in my main function that is updated
by a task at all times and to be also accesible by others transformations at
ad-hoc times??

Any ideas on how I can implement the above are more than welcome.

Thanks in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Issue with Checkpoint restore( Beam pipeline)

2017-11-29 Thread Jins George

Hi,

I am running a Beam Pipeline on Flink 1.2 and facing an issue in 
restoring a job from checkpoint. If I modify my beam pipeline to add a 
new operator and  try to restore from the externalized checkpoint, I get 
the error


/java.lang.IllegalStateException: Invalid Invalid number of operator 
states. Found :56. Expected: 58//
//at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)//
//at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkRestorePreconditions(StreamTask.java:680)//
//at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:650)//
//at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)//

//at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)//
//at java.lang.Thread.run(Thread.java:745)/

From the savepoint guide [1], new operator added should be initialized 
without any state.  Any idea why this error is reported.


Also note, I am not setting a ID to my operator ( because Flink runner 
in Beam does set the operator name user provided in pipeline creation)


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html



Thanks,
Jins George


Re: getting started with link / scala

2017-11-29 Thread Georg Heiler
You would suggest: https://github.com/ottogroup/flink-spector for unit
tests?

Georg Heiler  schrieb am Mi., 29. Nov. 2017 um
22:33 Uhr:

> Thanks, this sounds like a good idea - can you recommend such a project?
>
> Jörn Franke  schrieb am Mi., 29. Nov. 2017 um
> 22:30 Uhr:
>
>> If you want to really learn then I recommend you to start with a flink
>> project that contains unit tests and integration tests (maybe augmented
>> with https://wiki.apache.org/hadoop/HowToDevelopUnitTests to simulate a
>> HDFS cluster during unit tests). It should also include coverage reporting.
>> These aspects are equally crucial to know for developers to develop high
>> quality big data applications and virtually all companies will require that
>> you know these things.
>>
>> I am not sure if a hello world project in Flink exists containing all
>> these but it would be a good learning task to create such a thing.
>>
>> On 29. Nov 2017, at 22:03, Georg Heiler 
>> wrote:
>>
>> Getting started with Flink / scala, I wonder whether the scala base
>> library should be excluded as a best practice:
>> https://github.com/tillrohrmann/flink-project/blob/master/build.sbt#L32
>> // exclude Scala library from assembly
>> assemblyOption in assembly := (assemblyOption in
>> assembly).value.copy(includeScala = false)
>>
>> Also I would like to know if
>> https://github.com/tillrohrmann/flink-project is the most up to date
>> getting started with flink-scala sample project you would recommend.
>>
>> Best,
>> Georg
>>
>>


Re: getting started with link / scala

2017-11-29 Thread Georg Heiler
Thanks, this sounds like a good idea - can you recommend such a project?

Jörn Franke  schrieb am Mi., 29. Nov. 2017 um
22:30 Uhr:

> If you want to really learn then I recommend you to start with a flink
> project that contains unit tests and integration tests (maybe augmented
> with https://wiki.apache.org/hadoop/HowToDevelopUnitTests to simulate a
> HDFS cluster during unit tests). It should also include coverage reporting.
> These aspects are equally crucial to know for developers to develop high
> quality big data applications and virtually all companies will require that
> you know these things.
>
> I am not sure if a hello world project in Flink exists containing all
> these but it would be a good learning task to create such a thing.
>
> On 29. Nov 2017, at 22:03, Georg Heiler  wrote:
>
> Getting started with Flink / scala, I wonder whether the scala base
> library should be excluded as a best practice:
> https://github.com/tillrohrmann/flink-project/blob/master/build.sbt#L32
> // exclude Scala library from assembly
> assemblyOption in assembly := (assemblyOption in
> assembly).value.copy(includeScala = false)
>
> Also I would like to know if https://github.com/tillrohrmann/flink-project is
> the most up to date getting started with flink-scala sample project you
> would recommend.
>
> Best,
> Georg
>
>


Re: getting started with link / scala

2017-11-29 Thread Jörn Franke
If you want to really learn then I recommend you to start with a flink project 
that contains unit tests and integration tests (maybe augmented with 
https://wiki.apache.org/hadoop/HowToDevelopUnitTests to simulate a HDFS cluster 
during unit tests). It should also include coverage reporting. These aspects 
are equally crucial to know for developers to develop high quality big data 
applications and virtually all companies will require that you know these 
things. 

I am not sure if a hello world project in Flink exists containing all these but 
it would be a good learning task to create such a thing.

> On 29. Nov 2017, at 22:03, Georg Heiler  wrote:
> 
> Getting started with Flink / scala, I wonder whether the scala base library 
> should be excluded as a best practice:
> https://github.com/tillrohrmann/flink-project/blob/master/build.sbt#L32 
> // exclude Scala library from assembly
> assemblyOption in assembly := (assemblyOption in 
> assembly).value.copy(includeScala = false)
> 
> Also I would like to know if https://github.com/tillrohrmann/flink-project is 
> the most up to date getting started with flink-scala sample project you would 
> recommend.
> 
> Best,
> Georg


getting started with link / scala

2017-11-29 Thread Georg Heiler
Getting started with Flink / scala, I wonder whether the scala base library
should be excluded as a best practice:
https://github.com/tillrohrmann/flink-project/blob/master/build.sbt#L32
// exclude Scala library from assembly
assemblyOption in assembly := (assemblyOption in
assembly).value.copy(includeScala = false)

Also I would like to know if https://github.com/tillrohrmann/flink-project is
the most up to date getting started with flink-scala sample project you
would recommend.

Best,
Georg


Re: Dataset using several count operator in the same environment

2017-11-29 Thread Timo Walther

Hi Ebru,

the count() operator is a very simple utility functions that calls 
execute() internally. If you want to have a more complex pipeline you 
can take a look at how our WordCount [0] example works. The general 
concept is to emit a 1 for every record and sum the ones in parallel. If 
you need an overall count, you need to set the parallelism of the last 
operator to 1 (operator(xxx).setParallelism(1)), but this means that 
your job is not executed in parallel anymore.


It might also make sense to take a look at Flink's Table & SQL API [1] 
which makes such operations easier.


Hope that helps.

Regards,
Timo



[0] 
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/index.html



Am 11/29/17 um 3:26 PM schrieb ebru:

Hi all,

We are trying to use more than one count operator for dataset, but it executes 
first count and skips other operations. Also we call env.execute().
How can we solve this problem?

-Ebru





Re: Classpath for execution of KafkaSerializer/Deserializer; java.lang.NoClassDefFoundError while class in job jar

2017-11-29 Thread Chesnay Schepler

This issues sounds strikingly similar to FLINK-6965.

TL;DR: You must place classes loaded during serialization by the kafka 
connector under /lib.


On 29.11.2017 16:15, Timo Walther wrote:

Hi Bart,

usually, this error means that your Maven project configuration is not 
correct. Is your custom class included in the jar file that you submit 
to the cluster?


It might make sense to share your pom.xml with us.

Regards,
Timo



Am 11/29/17 um 2:44 PM schrieb Bart Kastermans:

I have a custom serializer for writing/reading from kafka.  I am setting
this up in main with code as follows:

 val kafkaConsumerProps = new Properties()
 kafkaConsumerProps.setProperty("bootstrap.servers", 
kafka_bootstrap)

kafkaConsumerProps.setProperty("group.id",s"normalize-call-events-${scala.util.Random.nextInt}")
kafkaConsumerProps.setProperty("client.id",s"normalize-call-events-${scala.util.Random.nextInt}")
 val source = new FlinkKafkaConsumer010[RaeEvent](sourceTopic, new
 KafkaRaeEventSerializer(schemaBaseDirectory),
   kafkaConsumerProps)

This generates java.lang.NoClassDefFoundError on classes that are
in my job jar.  Printing the classpath doesn't show the libraries
explicitly (but these are also not shown explicitly in place where they
are found; I guess the current jar is now shown on the classpath).  I
don't know how to list the current classloaders.

Also, the error goes away when I add the dependency to /flink/lib and
restart flink.  Hence my conjecture that in the kafka
serializer/deserializer context the depenencies from my job jar are
not available.

Flink version 1.2.0

Any help greatly appreciated; also I'll be happy to provide additional
info.

Also greatly appreciated where I should have looked in the flink code to
decide the answer myself.

- bart








Re: Classpath for execution of KafkaSerializer/Deserializer; java.lang.NoClassDefFoundError while class in job jar

2017-11-29 Thread Timo Walther

Hi Bart,

usually, this error means that your Maven project configuration is not 
correct. Is your custom class included in the jar file that you submit 
to the cluster?


It might make sense to share your pom.xml with us.

Regards,
Timo



Am 11/29/17 um 2:44 PM schrieb Bart Kastermans:

I have a custom serializer for writing/reading from kafka.  I am setting
this up in main with code as follows:

 val kafkaConsumerProps = new Properties()
 kafkaConsumerProps.setProperty("bootstrap.servers", kafka_bootstrap)
 
kafkaConsumerProps.setProperty("group.id",s"normalize-call-events-${scala.util.Random.nextInt}")
 
kafkaConsumerProps.setProperty("client.id",s"normalize-call-events-${scala.util.Random.nextInt}")
 val source = new FlinkKafkaConsumer010[RaeEvent](sourceTopic, new
 KafkaRaeEventSerializer(schemaBaseDirectory),
   kafkaConsumerProps)

This generates java.lang.NoClassDefFoundError on classes that are
in my job jar.  Printing the classpath doesn't show the libraries
explicitly (but these are also not shown explicitly in place where they
are found; I guess the current jar is now shown on the classpath).  I
don't know how to list the current classloaders.

Also, the error goes away when I add the dependency to /flink/lib and
restart flink.  Hence my conjecture that in the kafka
serializer/deserializer context the depenencies from my job jar are
not available.

Flink version 1.2.0

Any help greatly appreciated; also I'll be happy to provide additional
info.

Also greatly appreciated where I should have looked in the flink code to
decide the answer myself.

- bart





Re: Taskmanagers are quarantined

2017-11-29 Thread Stephan Ewen
We also saw issues in the failure detection/quarantining with some Hadoop
versions because of a subtle runtime netty version conflict. Fink 1.4
shades Flink's / Akka's Netty, in Flink 1.3 you may need to exclude the
Netty dependency pulled in through Hadoop explicitly.

Also, Hadoop version mismatches dinginess cause subtle problems. You can
try to drop Flink's own Hadoop dependency and just drop in the CDH
dependecy or jars.

Stephan


On Nov 29, 2017 14:34, "Till Rohrmann"  wrote:

Hi,

you could also try increasing the heartbeat timeout via
`akka.watch.heartbeat.pause`. Maybe this helps to overcome the GC pauses.

Cheers,
Till

On Wed, Nov 29, 2017 at 12:41 PM, T Obi  wrote:

> Warnings of Datanode appeared not in all cases of timeout. They seem
> to be raised just by timeout while snapshotting.
>
> We output GC logs on taskmanagers and found that someone kicks
> System.gc() every an hour.
> So a full GC runs every an hour, and it takes about a minute or more
> in our cases...
> When a taskmanager is timed out, the full GC seems to be always
> running on it. The full GC is not only by System.gc() but also "Full
> GC (Ergonomics)" and "Full GC (Metadata GC Threshold)", though.
>
> Some of our jobs have a large state. I think because of this the full
> GC takes long time.
> I try to make a few taskmanagers run with divided memory size on each
> machine.
> Also I will tune JVM memory parameters to reduce the frequency of
> "Full GC (Metadata GC Threshold)".
>
> Best,
> Tetsuya
>
>
> 2017-11-28 16:30 GMT+09:00 T Obi :
> > Hello Chesnay,
> >
> > Thank you for answer to my rough question.
> >
> > Not all of taskmanagers are quarantined at a time, but each
> > taskmanager has been quarantined at least once.
> >
> > We are using CDH 5.8 based on hadoop 2.6.
> > We didn't give attention about datanodes. We will check it.
> > However, we are also using the HDFS for MapReduce and it seems to work
> fine.
> >
> > I searched archives of this mailing list with keyword "Detected
> > unreachable" and found out mails about trouble on GC.
> > Though we are using G1GC, we try to output GC log.
> >
> >
> > Best,
> > Tetsuya
> >
> > 2017-11-28 1:15 GMT+09:00 Chesnay Schepler :
> >> Are only some taskmanagers quarantined, or all of them?
> >>
> >> Do the quarantined taskmanagers have anything in common?
> >> (are the failing ones always on certain machines; do the stacktraces
> >> reference the same hdfs datanodes)
> >>
> >> Which hadoop version are you using?
> >>
> >> From the stack-trace it appears that multiple hdfs nodes are being
> >> corrupted.
> >> The taskmanagers timeout since the connection to zookeeper breaks down,
> >> at which point it no longer knows who the leading jobmanager knows and
> >> subsequently shuts down.
> >>
> >>
> >> On 27.11.2017 08:02, T Obi wrote:
> >>>
> >>> Hello all,
> >>>
> >>> We run jobs on a standalone cluster with Flink 1.3.2 and we're facing
> >>> a problem. Suddenly a connection between a taskmanager and the
> >>> jobmanager is timed out and the taskmanager is "quarantined" by
> >>> jobmanager.
> >>> Once a taskmanager is quarantined, of course jobs are restarted, but
> >>> the timeout and quarantine happens to some taskmanager successively.
> >>>
> >>> When a taskmanager's connection to jobmanager was timed out, its
> >>> connections to zookeeper and snapshot HDFS were also timed out. So the
> >>> problem doesn't seem to be one of Flink itself.
> >>> But though a taskmanager which runs on the same machine as jobmanager
> >>> is timed out, jobmanager is alright at the time. So I think it is not
> >>> OS problem too.
> >>>
> >>> Could you give us some advice on how to investigate? Thank you.
> >>>
> >>>
> >>>
> >>> Taskmanager command line:
> >>>
> >>> java -XX:+UseG1GC -Xms219136M -Xmx219136M
> >>> -XX:MaxDirectMemorySize=8388607T
> >>> -Dlog.file=/var/log/flink/flink-log-manager-taskmanager-0-
> flink-jp-2.log
> >>> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j
> .properties
> >>> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf
> /logback.xml
> >>> -classpath
> >>> /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/
> flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:
> /opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/
> flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.
> 3.2/lib/flink-dist_2.11-1.3.2.jar:::
> >>> org.apache.flink.runtime.taskmanager.TaskManager --configDir
> >>> /opt/flink/flink-1.3.2/conf
> >>>
> >>>
> >>> Taskmanager (on flink-jp-2) log:
> >>>
> >>> 2017-11-22 14:09:31,595 INFO
> >>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Heap
> >>> backend snapshot (File Stream Factory @
> >>>
> >>> hdfs://nameservice1/user/log-manager/flink/checkpoints-data/
> 9469db324b834e9dcf5b46428b3ae011,
> >>> synchronous part) in thread
> >>> Thread[TriggerWindow(TumblingProcessingTimeWindows(6),
> >>>
> >>> 

Dataset using several count operator in the same environment

2017-11-29 Thread ebru
Hi all,

We are trying to use more than one count operator for dataset, but it executes 
first count and skips other operations. Also we call env.execute().
How can we solve this problem?

-Ebru

Classpath for execution of KafkaSerializer/Deserializer; java.lang.NoClassDefFoundError while class in job jar

2017-11-29 Thread Bart Kastermans
I have a custom serializer for writing/reading from kafka.  I am setting
this up in main with code as follows:

val kafkaConsumerProps = new Properties()
kafkaConsumerProps.setProperty("bootstrap.servers", kafka_bootstrap)

kafkaConsumerProps.setProperty("group.id",s"normalize-call-events-${scala.util.Random.nextInt}")

kafkaConsumerProps.setProperty("client.id",s"normalize-call-events-${scala.util.Random.nextInt}")
val source = new FlinkKafkaConsumer010[RaeEvent](sourceTopic, new
KafkaRaeEventSerializer(schemaBaseDirectory),
  kafkaConsumerProps)

This generates java.lang.NoClassDefFoundError on classes that are
in my job jar.  Printing the classpath doesn't show the libraries
explicitly (but these are also not shown explicitly in place where they
are found; I guess the current jar is now shown on the classpath).  I
don't know how to list the current classloaders.

Also, the error goes away when I add the dependency to /flink/lib and
restart flink.  Hence my conjecture that in the kafka
serializer/deserializer context the depenencies from my job jar are
not available.

Flink version 1.2.0

Any help greatly appreciated; also I'll be happy to provide additional
info.

Also greatly appreciated where I should have looked in the flink code to
decide the answer myself.

- bart


Re: Taskmanagers are quarantined

2017-11-29 Thread Till Rohrmann
Hi,

you could also try increasing the heartbeat timeout via
`akka.watch.heartbeat.pause`. Maybe this helps to overcome the GC pauses.

Cheers,
Till

On Wed, Nov 29, 2017 at 12:41 PM, T Obi  wrote:

> Warnings of Datanode appeared not in all cases of timeout. They seem
> to be raised just by timeout while snapshotting.
>
> We output GC logs on taskmanagers and found that someone kicks
> System.gc() every an hour.
> So a full GC runs every an hour, and it takes about a minute or more
> in our cases...
> When a taskmanager is timed out, the full GC seems to be always
> running on it. The full GC is not only by System.gc() but also "Full
> GC (Ergonomics)" and "Full GC (Metadata GC Threshold)", though.
>
> Some of our jobs have a large state. I think because of this the full
> GC takes long time.
> I try to make a few taskmanagers run with divided memory size on each
> machine.
> Also I will tune JVM memory parameters to reduce the frequency of
> "Full GC (Metadata GC Threshold)".
>
> Best,
> Tetsuya
>
>
> 2017-11-28 16:30 GMT+09:00 T Obi :
> > Hello Chesnay,
> >
> > Thank you for answer to my rough question.
> >
> > Not all of taskmanagers are quarantined at a time, but each
> > taskmanager has been quarantined at least once.
> >
> > We are using CDH 5.8 based on hadoop 2.6.
> > We didn't give attention about datanodes. We will check it.
> > However, we are also using the HDFS for MapReduce and it seems to work
> fine.
> >
> > I searched archives of this mailing list with keyword "Detected
> > unreachable" and found out mails about trouble on GC.
> > Though we are using G1GC, we try to output GC log.
> >
> >
> > Best,
> > Tetsuya
> >
> > 2017-11-28 1:15 GMT+09:00 Chesnay Schepler :
> >> Are only some taskmanagers quarantined, or all of them?
> >>
> >> Do the quarantined taskmanagers have anything in common?
> >> (are the failing ones always on certain machines; do the stacktraces
> >> reference the same hdfs datanodes)
> >>
> >> Which hadoop version are you using?
> >>
> >> From the stack-trace it appears that multiple hdfs nodes are being
> >> corrupted.
> >> The taskmanagers timeout since the connection to zookeeper breaks down,
> >> at which point it no longer knows who the leading jobmanager knows and
> >> subsequently shuts down.
> >>
> >>
> >> On 27.11.2017 08:02, T Obi wrote:
> >>>
> >>> Hello all,
> >>>
> >>> We run jobs on a standalone cluster with Flink 1.3.2 and we're facing
> >>> a problem. Suddenly a connection between a taskmanager and the
> >>> jobmanager is timed out and the taskmanager is "quarantined" by
> >>> jobmanager.
> >>> Once a taskmanager is quarantined, of course jobs are restarted, but
> >>> the timeout and quarantine happens to some taskmanager successively.
> >>>
> >>> When a taskmanager's connection to jobmanager was timed out, its
> >>> connections to zookeeper and snapshot HDFS were also timed out. So the
> >>> problem doesn't seem to be one of Flink itself.
> >>> But though a taskmanager which runs on the same machine as jobmanager
> >>> is timed out, jobmanager is alright at the time. So I think it is not
> >>> OS problem too.
> >>>
> >>> Could you give us some advice on how to investigate? Thank you.
> >>>
> >>>
> >>>
> >>> Taskmanager command line:
> >>>
> >>> java -XX:+UseG1GC -Xms219136M -Xmx219136M
> >>> -XX:MaxDirectMemorySize=8388607T
> >>> -Dlog.file=/var/log/flink/flink-log-manager-taskmanager-
> 0-flink-jp-2.log
> >>> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/
> log4j.properties
> >>> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/
> conf/logback.xml
> >>> -classpath
> >>> /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/
> opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.
> 2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/
> flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/
> flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
> >>> org.apache.flink.runtime.taskmanager.TaskManager --configDir
> >>> /opt/flink/flink-1.3.2/conf
> >>>
> >>>
> >>> Taskmanager (on flink-jp-2) log:
> >>>
> >>> 2017-11-22 14:09:31,595 INFO
> >>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Heap
> >>> backend snapshot (File Stream Factory @
> >>>
> >>> hdfs://nameservice1/user/log-manager/flink/checkpoints-data/
> 9469db324b834e9dcf5b46428b3ae011,
> >>> synchronous part) in thread
> >>> Thread[TriggerWindow(TumblingProcessingTimeWindows(6),
> >>>
> >>> ReducingStateDescriptor{serializer=jp.geniee.reporter.executable.
> BuyerReporterV2Auction$$anon$12$$anon$7@d2619591,
> >>>
> >>> reduceFunction=org.apache.flink.streaming.api.scala.function.util.
> ScalaReduceFunction@72bca894},
> >>> ProcessingTimeTrigger(),
> >>> WindowedStream.reduce(WindowedStream.java:300)) -> Map -> Map
> >>> (9/30),5,Flink Task Threads] took 142 ms.
> >>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
> >>>   - DFSOutputStream ResponseProcessor exception

FlinkKafkaProducerXX

2017-11-29 Thread Mikhail Pryakhin
Hi all,
I've just come across a FlinkKafkaProducer misconfiguration issue especially 
when a FlinkKafkaProducer is created without specifying a kafka partitioner 
then a FlinkFixedPartitioner instance is used, and all messages end up in a 
single kafka partition (in case I have a single task manager instance). Just 
curious if it was done on purpose? 
Don't you consider a round robin strategy more suitable for this case? For now 
if I need a round-robin strategy I have to explicitly pass null value for it

Thanks in advance.

Kind Regards,
Mike Pryakhin











smime.p7s
Description: S/MIME cryptographic signature


Re: Taskmanagers are quarantined

2017-11-29 Thread T Obi
Warnings of Datanode appeared not in all cases of timeout. They seem
to be raised just by timeout while snapshotting.

We output GC logs on taskmanagers and found that someone kicks
System.gc() every an hour.
So a full GC runs every an hour, and it takes about a minute or more
in our cases...
When a taskmanager is timed out, the full GC seems to be always
running on it. The full GC is not only by System.gc() but also "Full
GC (Ergonomics)" and "Full GC (Metadata GC Threshold)", though.

Some of our jobs have a large state. I think because of this the full
GC takes long time.
I try to make a few taskmanagers run with divided memory size on each machine.
Also I will tune JVM memory parameters to reduce the frequency of
"Full GC (Metadata GC Threshold)".

Best,
Tetsuya


2017-11-28 16:30 GMT+09:00 T Obi :
> Hello Chesnay,
>
> Thank you for answer to my rough question.
>
> Not all of taskmanagers are quarantined at a time, but each
> taskmanager has been quarantined at least once.
>
> We are using CDH 5.8 based on hadoop 2.6.
> We didn't give attention about datanodes. We will check it.
> However, we are also using the HDFS for MapReduce and it seems to work fine.
>
> I searched archives of this mailing list with keyword "Detected
> unreachable" and found out mails about trouble on GC.
> Though we are using G1GC, we try to output GC log.
>
>
> Best,
> Tetsuya
>
> 2017-11-28 1:15 GMT+09:00 Chesnay Schepler :
>> Are only some taskmanagers quarantined, or all of them?
>>
>> Do the quarantined taskmanagers have anything in common?
>> (are the failing ones always on certain machines; do the stacktraces
>> reference the same hdfs datanodes)
>>
>> Which hadoop version are you using?
>>
>> From the stack-trace it appears that multiple hdfs nodes are being
>> corrupted.
>> The taskmanagers timeout since the connection to zookeeper breaks down,
>> at which point it no longer knows who the leading jobmanager knows and
>> subsequently shuts down.
>>
>>
>> On 27.11.2017 08:02, T Obi wrote:
>>>
>>> Hello all,
>>>
>>> We run jobs on a standalone cluster with Flink 1.3.2 and we're facing
>>> a problem. Suddenly a connection between a taskmanager and the
>>> jobmanager is timed out and the taskmanager is "quarantined" by
>>> jobmanager.
>>> Once a taskmanager is quarantined, of course jobs are restarted, but
>>> the timeout and quarantine happens to some taskmanager successively.
>>>
>>> When a taskmanager's connection to jobmanager was timed out, its
>>> connections to zookeeper and snapshot HDFS were also timed out. So the
>>> problem doesn't seem to be one of Flink itself.
>>> But though a taskmanager which runs on the same machine as jobmanager
>>> is timed out, jobmanager is alright at the time. So I think it is not
>>> OS problem too.
>>>
>>> Could you give us some advice on how to investigate? Thank you.
>>>
>>>
>>>
>>> Taskmanager command line:
>>>
>>> java -XX:+UseG1GC -Xms219136M -Xmx219136M
>>> -XX:MaxDirectMemorySize=8388607T
>>> -Dlog.file=/var/log/flink/flink-log-manager-taskmanager-0-flink-jp-2.log
>>> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
>>> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
>>> -classpath
>>> /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
>>> org.apache.flink.runtime.taskmanager.TaskManager --configDir
>>> /opt/flink/flink-1.3.2/conf
>>>
>>>
>>> Taskmanager (on flink-jp-2) log:
>>>
>>> 2017-11-22 14:09:31,595 INFO
>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Heap
>>> backend snapshot (File Stream Factory @
>>>
>>> hdfs://nameservice1/user/log-manager/flink/checkpoints-data/9469db324b834e9dcf5b46428b3ae011,
>>> synchronous part) in thread
>>> Thread[TriggerWindow(TumblingProcessingTimeWindows(6),
>>>
>>> ReducingStateDescriptor{serializer=jp.geniee.reporter.executable.BuyerReporterV2Auction$$anon$12$$anon$7@d2619591,
>>>
>>> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@72bca894},
>>> ProcessingTimeTrigger(),
>>> WindowedStream.reduce(WindowedStream.java:300)) -> Map -> Map
>>> (9/30),5,Flink Task Threads] took 142 ms.
>>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>>>   - DFSOutputStream ResponseProcessor exception
>>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999
>>> java.io.EOFException: Premature EOF: no length prefix available
>>>  at
>>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>>>  at
>>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>>  at
>>> 

Re: Question about Timestamp in Flink SQL

2017-11-29 Thread Timo Walther

Hi Wangsan,

I opened an issue to document the behavior properly in the future 
(https://issues.apache.org/jira/browse/FLINK-8169). Basically, both your 
event-time and processing-time timestamps should be GMT. We plan to 
support offsets for windows in the future 
(https://issues.apache.org/jira/browse/FLINK-8168). Internally, the long 
values remain constant in GMT. Only the toString() output is timezone 
dependent. For now, I would suggest to implement either some 
user-defined scalar functions to implement your desired behavior or just 
subtract the offset (ts - INTERVAL '8' HOURS should work).


The timezone support must definitely improved in future versions of 
Flink SQL.


Regards,
Timo



Am 11/29/17 um 10:50 AM schrieb wangsan:

Hi Timo,

What I am doing is extracting a timestamp field (may be string format 
as “2017-11-28 11:00:00” or a long value base on my current timezone) 
as/Event time/ attribute. So In /timestampAndWatermarkAssigner , /for 
string//format I should parse the data time string using GMT, and for 
long value I should add the offset as opposite to what 
/internalToTimestamp /did. But the Processing time attribute can not 
keep consistent. Am I understanding that correctly?


Best,
wangsan



On 29 Nov 2017, at 4:43 PM, Timo Walther > wrote:


Hi Wangsan,

currently the timestamps in Flink SQL do not depend on a timezone. 
All calculations happen on the UTC timestamp. This also guarantees 
that an input with Timestamp.valueOf("XXX") remains consistent when 
parsing and outputing it with toString().


Regards,
Timo


Am 11/29/17 um 3:43 AM schrieb wangsan:

Hi Xincan,

Thanks for your reply.

The system default timezone is just as what I expected 
(sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=2880,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]). 

I looked into the generated code, and I found the following code 
snippet:


```
result$20 = 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);

```

And what `internalToTimestamp` function did is:

```
public static Timestamp internalToTimestamp(long v) {
    return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
}
```

So, if I give it an event time with unix timestamp 0, then I got the 
Timestamp(-2880). I am confused why `internalToTimestamp` need 
to subtract the offset?


Best,
wangsan


On 28 Nov 2017, at 11:32 PM, Xingcan Cui > wrote:


Hi wangsan,

in Flink, the ProcessingTime is just implemented by invoking 
System.currentTimeMillis() and the long value will be automatically 
wrapped to a Timestamp with the following statement:


`new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`

You can check your TimeZone.getDefault() to see if it returns the 
right TimeZone. Generally, the returned value should rely on the 
default TimeZone of your operating system.


Hope that helps.

Best,
Xingcan

On Tue, Nov 28, 2017 at 9:31 PM, wangsan > wrote:


Hi all,

While using Timestamp in Flint SQL, how can I set timezone
info? Since my current timezone is *GMT+8*, and I found the
selected processing time is always *8 hours* late than current
time. So as extracted event time.

Here’s my simplified code:

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val sTableEnv = TableEnvironment.getTableEnvironment(senv)
println(s"current time: ${new SimpleDateFormat(".MM.dd HH:mm:ss.SSS", 
Locale.CHINA).format(new Date())}")

val stream:DataStream[(String, String, String)]= 
senv.socketTextStream("localhost", ).map(line => (line, line, line))
val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 
't.proctime)
sTableEnv.registerTable("foo", table)
val result = sTableEnv.sql("select * from foo")
result.printSchema()
result.toAppendStream[Row].print()

senv.execute("foo")

And here’s the result:



Best,
wangsan












Re: Are there plans to support Hadoop 2.9.0 on near future?

2017-11-29 Thread Kostas Kloudas
Hi Oriol,

This estimation is not accurate and the whole plan is a bit outdated.
This was based on an outdated time-based release model that the community tried 
but without the expected results,
so we changed it.

You can follow the release voting for 1.4 in the dev mailing list. And the 
archived discussion is here:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-4-0-release-candidate-2-td20291.html
 


Cheers,
Kostas

> On Nov 29, 2017, at 11:27 AM, ORIOL LOPEZ SANCHEZ 
>  wrote:
> 
> Thanks, it helped a lot!
> 
> But I've seen on 
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Release+and+Feature+Plan#FlinkReleaseandFeaturePlan-Flink1.4
>  
> 
>  that they estimated releasing 1.4 at September. Do you know if it will be 
> released this year or we may have to wait longer?
> 
> Thanks a lot.
> 
> De: Kostas Kloudas  >
> Enviat el: dimecres, 29 de novembre de 2017 11:15:16
> Per a: ORIOL LOPEZ SANCHEZ
> A/c: user@flink.apache.org 
> Tema: Re: Are there plans to support Hadoop 2.9.0 on near future?
>  
> Hi Oriol,
> 
> As you may have seen form the mailing list we are currently in the process of 
> releasing Flink 1.4. This is going 
> to be a hadoop-free distribution which means that it should work with any 
> hadoop version, including Hadoop 2.9.0.
> 
> Given this, I would recommend to try out the release candidate (which will 
> hopefully be the next official Flink release) 
> and it should work just fine!
> 
> Hope this helps,
> Kostas
> 
>> On Nov 29, 2017, at 10:37 AM, ORIOL LOPEZ SANCHEZ 
>> > > wrote:
>> 
>> Hi, I'm currently working on designing a data-processing cluster, and one of 
>> the distributed processing tools we want to use is Flink.
>> 
>> As we're creating our cluster from barebones, without relying on any Hadoop 
>> distributions such as Hortonworks or Cloudera, we want to use Flink with 
>> Hadoop 2.9.0, but it's not officially supported.
>> 
>> May I know if there are plans to support Hadoop 2.9.0 on a near future?
>> 
>> Thank you very much,
>> 
>> Oriol López Sánchez.
>> 
>> 
>> Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, 
>> puede contener información privilegiada o confidencial y es para uso 
>> exclusivo de la persona o entidad de destino. Si no es usted. el 
>> destinatario indicado, queda notificado de que la lectura, utilización, 
>> divulgación y/o copia sin autorización puede estar prohibida en virtud de la 
>> legislación vigente. Si ha recibido este mensaje por error, le rogamos que 
>> nos lo comunique inmediatamente por esta misma vía y proceda a su 
>> destrucción.
>> 
>> The information contained in this transmission is privileged and 
>> confidential information intended only for the use of the individual or 
>> entity named above. If the reader of this message is not the intended 
>> recipient, you are hereby notified that any dissemination, distribution or 
>> copying of this communication is strictly prohibited. If you have received 
>> this transmission in error, do not read it. Please immediately reply to the 
>> sender that you have received this communication in error and then delete it.
>> 
>> Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, 
>> pode conter informação privilegiada ou confidencial e é para uso exclusivo 
>> da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário 
>> indicado, fica notificado de que a leitura, utilização, divulgação e/ou 
>> cópia sem autorização pode estar proibida em virtude da legislação vigente. 
>> Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique 
>> imediatamente por esta mesma via e proceda a sua destruição
> 
> 
> 
> Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, 
> puede contener información privilegiada o confidencial y es para uso 
> exclusivo de la persona o entidad de destino. Si no es usted. el destinatario 
> indicado, queda notificado de que la lectura, utilización, divulgación y/o 
> copia sin autorización puede estar prohibida en virtud de la legislación 
> vigente. Si ha recibido este mensaje por error, le rogamos que nos lo 
> comunique inmediatamente por esta misma vía y proceda a su destrucción.
> 
> The information contained in this transmission is privileged and confidential 
> information intended only for the use of the individual or entity named 
> above. If the reader of this message is not the intended recipient, you are 
> hereby notified that any dissemination, distribution or copying of this 
> 

Re: Are there plans to support Hadoop 2.9.0 on near future?

2017-11-29 Thread ORIOL LOPEZ SANCHEZ
Thanks, it helped a lot!


But I've seen on 
https://cwiki.apache.org/confluence/display/FLINK/Flink+Release+and+Feature+Plan#FlinkReleaseandFeaturePlan-Flink1.4
 that they estimated releasing 1.4 at September. Do you know if it will be 
released this year or we may have to wait longer?


Thanks a lot.


De: Kostas Kloudas 
Enviat el: dimecres, 29 de novembre de 2017 11:15:16
Per a: ORIOL LOPEZ SANCHEZ
A/c: user@flink.apache.org
Tema: Re: Are there plans to support Hadoop 2.9.0 on near future?

Hi Oriol,

As you may have seen form the mailing list we are currently in the process of 
releasing Flink 1.4. This is going
to be a hadoop-free distribution which means that it should work with any 
hadoop version, including Hadoop 2.9.0.

Given this, I would recommend to try out the release candidate (which will 
hopefully be the next official Flink release)
and it should work just fine!

Hope this helps,
Kostas

On Nov 29, 2017, at 10:37 AM, ORIOL LOPEZ SANCHEZ 
> 
wrote:

Hi, I'm currently working on designing a data-processing cluster, and one of 
the distributed processing tools we want to use is Flink.

As we're creating our cluster from barebones, without relying on any Hadoop 
distributions such as Hortonworks or Cloudera, we want to use Flink with Hadoop 
2.9.0, but it's not officially supported.

May I know if there are plans to support Hadoop 2.9.0 on a near future?

Thank you very much,

Oriol López Sánchez.



Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede 
contener información privilegiada o confidencial y es para uso exclusivo de la 
persona o entidad de destino. Si no es usted. el destinatario indicado, queda 
notificado de que la lectura, utilización, divulgación y/o copia sin 
autorización puede estar prohibida en virtud de la legislación vigente. Si ha 
recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente 
por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential 
information intended only for the use of the individual or entity named above. 
If the reader of this message is not the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this communication 
is strictly prohibited. If you have received this transmission in error, do not 
read it. Please immediately reply to the sender that you have received this 
communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode 
conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa 
ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica 
notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização 
pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem 
por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e 
proceda a sua destruição




Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede 
contener información privilegiada o confidencial y es para uso exclusivo de la 
persona o entidad de destino. Si no es usted. el destinatario indicado, queda 
notificado de que la lectura, utilización, divulgación y/o copia sin 
autorización puede estar prohibida en virtud de la legislación vigente. Si ha 
recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente 
por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential 
information intended only for the use of the individual or entity named above. 
If the reader of this message is not the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this communication 
is strictly prohibited. If you have received this transmission in error, do not 
read it. Please immediately reply to the sender that you have received this 
communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode 
conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa 
ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica 
notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização 
pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem 
por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e 
proceda a sua destruição


Re: user driven stream processing

2017-11-29 Thread Fabian Hueske
Another example is King's RBEA platform [1] which was built on Flink.
In a nutshell, RBEA runs a single large Flink job, to which users can add
queries that should be computed.
Of course, the query language is restricted because they queries must match
on the structure of the running job.

Hope this helps,
Fabian

[1]
http://2016.flink-forward.org/kb_sessions/rbea-scalable-real-time-analytics-at-king/

2017-11-29 3:32 GMT+01:00 Tony Wei :

> Hi KZ,
>
> https://data-artisans.com/blog/real-time-fraud-detection-ing-bank-apache-
> flink
> This article seems to be a good example to trigger a new calculation on a
> running job. Maybe you can get some help from it.
>
> Best Regards,
> Tony Wei
>
> 2017-11-29 4:53 GMT+08:00 zanqing zhang :
>
>> Hi All,
>>
>> Has anyone done any stream processing driven by a user request? What's
>> the recommended way of doing this? Or is this completely wrong direction to
>> go for applications running on top of Flink?
>>
>> Basically we need to tweak the stream processing based on parameters
>> provided by a user, e.g. show me the total # of application failures due to
>> "ABC", which is provided by the user. We are thinking of starting a flink
>> job with "ABC" as a parameter but this would result in a huge number of
>> flink jobs, is there a better way for this? Can we trigger the calculation
>> on a running job?
>>
>> Thanks in advance.
>>
>> KZ
>>
>>
>


Re: How to perform efficient DataSet reuse between iterations

2017-11-29 Thread Fabian Hueske
The monitoring REST interface provides detailed stats about a job, its
tasks, and processing verticies including their start and end time [1].

However, it is not trivial to make sense of the execution times because
Flink uses pipelined shuffles by default.
That means that the execution of multiple operators can overlap. For
example the records that are produced by a GroupReduce can be processed by
a Map, shuffled, and sorted (for another GroupReduce) in a pipelined
fashion.
Hence, all these operations run at the same time. You can disable this
behavior to some extend by setting the execution mode to batched shuffles
[2].
However, this will likely have a negative impact on the overall execution
time.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#details-of-a-running-or-completed-job
[2]
https://stackoverflow.com/questions/33691612/apache-flink-stepwise-execution/33691957#33691957



2017-11-29 0:44 GMT+01:00 Miguel Coimbra :

> Hello,
>
> You're right, I was overlooking that.
> With your suggestion, I now just define a different sink in each iteration
> of the loop.
> Then they all output to disk when executing a single bigger plan.
>
> I have one more question: I know I can retrieve the total time this single
> job takes to execute, but what if I want to know the time taken for
> specific operators in the dag?
> Is there some functionality in the Flink Batch API akin to counting
> elements but for measuring time instead?
> For example, if I am not mistaken, an operator can be executed in parallel
> or serially (with a parallelism of one).
>
> Is there a straightforward way to get the time taken by the operator's
> tasks?
> In a way that I could:
>
> a) just get the time of a single task (if running serially) to get the
> total operator execution time;
> b) know the time taken by each parallel component of the operator's
> execution so I could know where and what was the "lagging element" in the
> operator's execution.
>
> Is this possible? I was hoping I could retrieve this information in the
> Java program itself and avoid processing logs.
>
> Thanks again.
>
> Best regards,
>
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com 
> Skype: miguel.e.coimbra
>
> On 28 November 2017 at 08:56, Fabian Hueske  wrote:
>
>> Hi,
>>
>> by calling result.count(), you compute the complete plan from the
>> beginning and not just the operations you added since the last execution.
>> Looking at the output you posted, each step takes about 15 seconds (with
>> about 5 secs of initialization).
>> So the 20 seconds of the first step include initialization + 1st step.
>> The 35 seconds on the second step include initialization, 1st step + 2nd
>> step.
>> If you don't call count on the intermediate steps, you can compute the
>> 4th step in 65 seconds.
>>
>> Implementing a caching operator would be a pretty huge effort because you
>> need to touch code at many places such as the API, optimizer, runtime,
>> scheduling, etc.
>> The documentation you found should still be applicable. There hasn't been
>> major additions to the DataSet API and runtime in the last releases.
>>
>> Best, Fabian
>>
>>
>>
>> 2017-11-28 9:14 GMT+01:00 Miguel Coimbra :
>>
>>> Hello Fabian,
>>>
>>> Thank you for the reply.
>>> I was hoping the situation had in fact changed.
>>>
>>> As far as I know, I am not calling execute() directly even once - it is
>>> being called implicitly by simple DataSink elements added to the plan
>>> through count():
>>>
>>> System.out.println(String.format("%d-th graph algorithm produced %d
>>> elements. (%d.%d s).",
>>> executionCounter,
>>> *result.count()*, // this would trigger
>>> execution...
>>> env.getLastJobExecutionResult(
>>> ).getNetRuntime(TimeUnit.SECONDS),
>>> env.getLastJobExecutionResult(
>>> ).getNetRuntime(TimeUnit.MILLISECONDS) % 1000));
>>>
>>>
>>> I have taken a look at Flink's code base (e.g. how the dataflow dag is
>>> processed with classes such as  
>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor,
>>> org.apache.flink.api.java.operators.OperatorTranslation) but I'm not
>>> sure on the most direct way to achieve this.
>>> Perhaps I missed some online documentation that would help to get a grip
>>> on how to contribute to the different parts of Flink?
>>>
>>> I did find some information which hints at implementing this sort of
>>> thing (such as adding custom operators) but it was associated to an old
>>> version of Flink:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>> internals/add_operator.html
>>> However, as far as I know there is no equivalent page in the current
>>> online stable or snapshot documentation.
>>>
>>> What would be the best way to go about this?
>>>
>>> It really seems 

Re: Are there plans to support Hadoop 2.9.0 on near future?

2017-11-29 Thread Kostas Kloudas
Hi Oriol,

As you may have seen form the mailing list we are currently in the process of 
releasing Flink 1.4. This is going 
to be a hadoop-free distribution which means that it should work with any 
hadoop version, including Hadoop 2.9.0.

Given this, I would recommend to try out the release candidate (which will 
hopefully be the next official Flink release) 
and it should work just fine!

Hope this helps,
Kostas

> On Nov 29, 2017, at 10:37 AM, ORIOL LOPEZ SANCHEZ 
>  wrote:
> 
> Hi, I'm currently working on designing a data-processing cluster, and one of 
> the distributed processing tools we want to use is Flink.
> 
> As we're creating our cluster from barebones, without relying on any Hadoop 
> distributions such as Hortonworks or Cloudera, we want to use Flink with 
> Hadoop 2.9.0, but it's not officially supported.
> 
> May I know if there are plans to support Hadoop 2.9.0 on a near future?
> 
> Thank you very much,
> 
> Oriol López Sánchez.
> 
> 
> Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, 
> puede contener información privilegiada o confidencial y es para uso 
> exclusivo de la persona o entidad de destino. Si no es usted. el destinatario 
> indicado, queda notificado de que la lectura, utilización, divulgación y/o 
> copia sin autorización puede estar prohibida en virtud de la legislación 
> vigente. Si ha recibido este mensaje por error, le rogamos que nos lo 
> comunique inmediatamente por esta misma vía y proceda a su destrucción.
> 
> The information contained in this transmission is privileged and confidential 
> information intended only for the use of the individual or entity named 
> above. If the reader of this message is not the intended recipient, you are 
> hereby notified that any dissemination, distribution or copying of this 
> communication is strictly prohibited. If you have received this transmission 
> in error, do not read it. Please immediately reply to the sender that you 
> have received this communication in error and then delete it.
> 
> Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, 
> pode conter informação privilegiada ou confidencial e é para uso exclusivo da 
> pessoa ou entidade de destino. Se não é vossa senhoria o destinatário 
> indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia 
> sem autorização pode estar proibida em virtude da legislação vigente. Se 
> recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente 
> por esta mesma via e proceda a sua destruição



Re: Question about Timestamp in Flink SQL

2017-11-29 Thread wangsan
Hi Timo,

What I am doing is extracting a timestamp field (may be string format as 
“2017-11-28 11:00:00” or a long value base on my current timezone) as Event 
time attribute. So In timestampAndWatermarkAssigner , for string format I 
should parse the data time string using GMT, and for long value I should add 
the offset as opposite to what internalToTimestamp did. But the Processing time 
attribute can not keep consistent. Am I understanding that correctly?

Best,
wangsan



> On 29 Nov 2017, at 4:43 PM, Timo Walther  wrote:
> 
> Hi Wangsan,
> 
> currently the timestamps in Flink SQL do not depend on a timezone. All 
> calculations happen on the UTC timestamp. This also guarantees that an input 
> with Timestamp.valueOf("XXX") remains consistent when parsing and outputing 
> it with toString().
> 
> Regards,
> Timo
> 
> 
> Am 11/29/17 um 3:43 AM schrieb wangsan:
>> Hi Xincan,
>> 
>> Thanks for your reply. 
>> 
>> The system default timezone is just as what I expected 
>> (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=2880,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]).
>>  
>> I looked into the generated code, and I found the following code snippet:
>> 
>> ```
>> result$20 = 
>> org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
>> ```
>> 
>> And what `internalToTimestamp` function did is:
>> 
>> ```
>> public static Timestamp internalToTimestamp(long v) {
>> return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
>> }
>> ```
>> 
>> So, if I give it an event time with unix timestamp 0, then I got the 
>> Timestamp(-2880). I am confused why `internalToTimestamp` need to 
>> subtract the offset?
>> 
>> Best,
>> wangsan
>> 
>> 
>>> On 28 Nov 2017, at 11:32 PM, Xingcan Cui >> > wrote:
>>> 
>>> Hi wangsan,
>>> 
>>> in Flink, the ProcessingTime is just implemented by invoking 
>>> System.currentTimeMillis() and the long value will be automatically wrapped 
>>> to a Timestamp with the following statement:
>>> 
>>> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`
>>> 
>>> You can check your TimeZone.getDefault() to see if it returns the right 
>>> TimeZone. Generally, the returned value should rely on the default TimeZone 
>>> of your operating system.
>>> 
>>> Hope that helps.
>>> 
>>> Best,
>>> Xingcan
>>> 
>>> On Tue, Nov 28, 2017 at 9:31 PM, wangsan >> > wrote:
>>> Hi all,
>>> 
>>> While using Timestamp in Flint SQL, how can I set timezone info? Since my 
>>> current timezone is GMT+8, and I found the selected processing time is 
>>> always 8 hours late than current time. So as extracted event time.
>>> 
>>> Here’s my simplified code:
>>> val senv = StreamExecutionEnvironment.getExecutionEnvironment
>>> senv.setParallelism(1)
>>> senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>> 
>>> val sTableEnv = TableEnvironment.getTableEnvironment(senv)
>>> println(s"current time: ${new SimpleDateFormat(".MM.dd HH:mm:ss.SSS", 
>>> Locale.CHINA).format(new Date())}")
>>> 
>>> val stream: DataStream[(String, String, String)] = 
>>> senv.socketTextStream("localhost", ).map(line => (line, line, line))
>>> val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 
>>> 't.proctime)
>>> sTableEnv.registerTable("foo", table)
>>> val result = sTableEnv.sql("select * from foo")
>>> result.printSchema()
>>> result.toAppendStream[Row].print()
>>> 
>>> senv.execute("foo")
>>> And here’s the result:
>>> 
>>> 
>>> 
>>> Best,
>>> wangsan
>>> 
>> 
> 



Re: Question about Timestamp in Flink SQL

2017-11-29 Thread Timo Walther

Hi Wangsan,

currently the timestamps in Flink SQL do not depend on a timezone. All 
calculations happen on the UTC timestamp. This also guarantees that an 
input with Timestamp.valueOf("XXX") remains consistent when parsing and 
outputing it with toString().


Regards,
Timo


Am 11/29/17 um 3:43 AM schrieb wangsan:

Hi Xincan,

Thanks for your reply.

The system default timezone is just as what I expected 
(sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=2880,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]). 


I looked into the generated code, and I found the following code snippet:

```
result$20 = 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);

```

And what `internalToTimestamp` function did is:

```
public static Timestamp internalToTimestamp(long v) {
    return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
}
```

So, if I give it an event time with unix timestamp 0, then I got the 
Timestamp(-2880). I am confused why `internalToTimestamp` need to 
subtract the offset?


Best,
wangsan


On 28 Nov 2017, at 11:32 PM, Xingcan Cui > wrote:


Hi wangsan,

in Flink, the ProcessingTime is just implemented by invoking 
System.currentTimeMillis() and the long value will be automatically 
wrapped to a Timestamp with the following statement:


`new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`

You can check your TimeZone.getDefault() to see if it returns the 
right TimeZone. Generally, the returned value should rely on the 
default TimeZone of your operating system.


Hope that helps.

Best,
Xingcan

On Tue, Nov 28, 2017 at 9:31 PM, wangsan > wrote:


Hi all,

While using Timestamp in Flint SQL, how can I set timezone info?
Since my current timezone is *GMT+8*, and I found the selected
processing time is always *8 hours* late than current time. So as
extracted event time.

Here’s my simplified code:

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val sTableEnv = TableEnvironment.getTableEnvironment(senv)
println(s"current time: ${new SimpleDateFormat(".MM.dd HH:mm:ss.SSS", 
Locale.CHINA).format(new Date())}")

val stream:DataStream[(String, String, String)]= 
senv.socketTextStream("localhost", ).map(line => (line, line, line))
val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 
't.proctime)
sTableEnv.registerTable("foo", table)
val result = sTableEnv.sql("select * from foo")
result.printSchema()
result.toAppendStream[Row].print()

senv.execute("foo")

And here’s the result:



Best,
wangsan








Re: S3 Access in eu-central-1

2017-11-29 Thread Ufuk Celebi
Hey Dominik,

yes, we should definitely add this to the docs.

@Nico: You recently updated the Flink S3 setup docs. Would you mind
adding these hints for eu-central-1 from Steve? I think that would be
super helpful!

Best,

Ufuk

On Tue, Nov 28, 2017 at 10:00 PM, Dominik Bruhn  wrote:
> Hey Stephan, Hey Steve,
> that was the right hint, adding that open to the Java-Options fixed the
> problem. Maybe we should add this somehow to our Flink Wiki?
>
> Thanks!
>
> Dominik
>
> On 28/11/17 11:55, Stephan Ewen wrote:
>>
>> Got a pointer from Steve that this is answered on Stack Overflow here:
>> https://stackoverflow.com/questions/36154484/aws-java-sdk-manually-set-signature-version
>> 
>>
>> Flink 1.4 contains a specially bundled "fs-s3-hadoop" with smaller no
>> footprint, compatible across Hadoop versions, and based on a later s3a and
>> AWS sdk. In that connector, it should work out of the box because it uses a
>> later AWS SDK. You can also use it with earlier Hadoop versions because
>> dependencies are relocated, so it should not cash/conflict.
>>
>>
>>
>>
>> On Mon, Nov 27, 2017 at 8:58 PM, Stephan Ewen > > wrote:
>>
>> Hi!
>>
>> The endpoint config entry looks correct.
>> I was looking at this issue to see if there are pointers to anything
>> else, but it looks like the explicit endpoint entry is the most
>> important thing: https://issues.apache.org/jira/browse/HADOOP-13324
>> 
>>
>> I cc-ed Steve Loughran, who is Hadoop's S3 expert (sorry Steve for
>> pulling you in again - listening and learning still about the subtle
>> bits and pieces of S3).
>> @Steve are S3 V4 endpoints supported in Hadoop 2.7.x already, or
>> only in Hadoop 2.8?
>>
>> Best,
>> Stephan
>>
>>
>> On Mon, Nov 27, 2017 at 9:47 AM, Dominik Bruhn > > wrote:
>>
>> Hey,
>> can anyone give a hint? Does anyone have flink running with an
>> S3 Bucket in Frankfurt/eu-central-1 and can share his config and
>> setup?
>>
>> Thanks,
>> Dominik
>>
>> On 22. Nov 2017, at 17:52, domi...@dbruhn.de
>>  wrote:
>>
>>> Hey everyone,
>>> I'm trying since hours to get Flink 1.3.2 (downloaded for
>>> hadoop 2.7) to snapshot/checkpoint to an S3 bucket which is
>>> hosted in the eu-central-1 region. Everything works fine for
>>> other regions. I'm running my job on a JobTracker in local
>>> mode. I googled the internet and found several hints, most of
>>> them telling that setting the `fs.s3a.endpoint` should solve
>>> it. It doesn't. I'm also sure that the core-site.xml (see
>>> below) is picked up, if I put garbage into the endpoint then I
>>> receive a hostname not found error.
>>>
>>> The exception I'm getting is:
>>> com.amazonaws.services.s3.model.AmazonS3Exception: Status
>>> Code: 400, AWS Service: Amazon S3, AWS Request ID:
>>> 432415098B0994BC, AWS Error Code: null, AWS Error Message: Bad
>>> Request, S3 Extended Request ID:
>>>
>>> 1PSDe4EOh7zvfNPdWrwoBKKOtsS/gf9atn5movRzcpvIH2WsR+ptXvXyFyEHXjDb3F9AniXgsBQ=
>>>
>>> I read the AWS FAQ but I don't think that
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#ioexception-400-bad-request
>>>
>>> 
>>> applies to me as I'm not running the NativeFileSystem.
>>>
>>> I suspect this is related to the v4 signing protocol which is
>>> required for S3 in Frankfurt. Could it be that the aws-sdk
>>> version is just too old? I tried to play around with it but
>>> the hadoop adapter is incompatible with newer versions.
>>>
>>> I have the following core-site.xml:
>>>
>>> 
>>> 
>>>
>>> fs.s3.implorg.apache.hadoop.fs.s3a.S3AFileSystem
>>>
>>> fs.s3a.buffer.dir/tmp
>>>
>>> fs.s3a.access.keysomething
>>>
>>> fs.s3a.secret.keywont-tell
>>>
>>> fs.s3a.endpoints3.eu-central-1.amazonaws.com
>>> 
>>> >>
>>> Here is my lib folder with the versions of the aws-sdk and the
>>> hadoop-aws integration:
>>> -rw---1 root root   11.4M Mar 20  2014
>>> aws-java-sdk-1.7.4.jar
>>> -rw-r--r--1 1005 1006   70.0M Aug  3 12:10
>>> flink-dist_2.11-1.3.2.jar
>>> -rw-rw-r--1 1005 1006   98.3K Aug  3 12:07
>>> flink-python_2.11-1.3.2.jar
>>> -rw-r--r--1 1005 1006   34.9M Aug  3 11:58
>>> flink-shaded-hadoop2-uber-1.3.2.jar
>>> -rw---