Re: 1.4.3 release/roadmap

2018-04-19 Thread Bowen Li
​to find bug fixes that are going into​ 1.4.x, say 1.4.3, you can filter
jira tickets with 'Fix Versions' as '1.4.3'

On Thu, Apr 19, 2018 at 1:36 AM, Daniel Harper 
wrote:

> Hi there,
>
> There are some bug fixes that are in the 1.4 branch that we would like to
> be made available for us to use.
>
> Is there a roadmap from the project when the next stable 1.4.x release
> will be cut? Any blockers?
>


gonna need more logs when task manager is shutting down

2018-04-19 Thread makeyang
one of my task manager is out ot the cluster and I checked its log found
something below:
2018-04-19 22:34:47,441 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Attempting to fail task externally Process (115/120)
(19d0b0ce1ef3b8023b37bdfda643ef44).
2018-04-19 22:34:47,441 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Process (115/120) (19d0b0ce1ef3b8023b37bdfda643ef44) switched from RUNNING
to FAILED.
java.lang.Exception: TaskManager is shutting down.
at
org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala:220)
at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
at
org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager.scala:121)
at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:374)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

well, 
Attempting to fail task externally Process due to what?
when task manager is shutting down and due to what?

these import info is not found in log which is actually very useful



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


CEP Debugging

2018-04-19 Thread Nortman, Bill
So I have events coming in in this format
TransactionID, MicroserviceName, Direction,EpochTimeStamp.
For each call to a microservice and event is generates with a timestamp with 
direction of "in". Then when completed it generates with a timestamp with 
direction of "out".
I need to calculate latency for both the microservices per transactions and for 
the entire transaction.
Example Data (TransactionID, MicroserviceName, Direction,EpochTimeStamp)
1,A,in,1700
1,B,in,1702
1,B,out,1704
1,C,in,1704
1,D,in,1705
1,D,out,1706
1,C,out,1709
1,A,out,1710
Look for a pattern to handle this.
Something like this
Pattern pattern = 
Pattern.begin("myBegin").where(
new SimpleCondition() {
public boolean filter(MetricsEvent metricsEvent) throws 
Exception {
return metricsEvent.getDirection().equals("in");
}
}
).followedBy("myNext").where(
new SimpleCondition() {
public boolean filter(MetricsEvent metricsEvent) throws 
Exception {
metricsEvent.getApplication().equals(//PREVIOUS 
EVENT.getApplication()))
return false;
}
}
)
I'm note sure how to get the previous event to compare too.
Then how to calculate the latency between the two events?
So I tried this
Pattern pattern = 
Pattern.begin("myBegin").where(
new SimpleCondition() {
public boolean filter(final MetricsEvent metricsEvent)  {
System.out.println("Begin Filter");
return !metricsEvent.getEventType().equals("in");
}
}
).followedBy("followed").where(

new SimpleCondition() {
public boolean filter(final MetricsEvent metricsEvent) {
System.out.println("FollowedByFilter");
return !metricsEvent.getEventType().equals("out");
}
}
);
// Define a stream processor using the pattern
PatternStream patternStream = CEP.pattern(
metricEventStream,
pattern);

// Process the stream
System.out.println("* Pattern Processing");
SingleOutputStreamOperator latencyEvents =
patternStream.flatSelect(new 
ApplicationLatencyFlatSelectFunction());
With this as the latency calc
public class ApplicationLatencyFlatSelectFunction implements
org.apache.flink.cep.PatternFlatSelectFunction 
{
@SuppressWarnings("CheckStyle") // Base class method doesn't have JavaDoc
public void flatSelect(final Map map, final 
Collector collector) {
System.out.println("  Flat Select From Pattern");
MetricsEvent begin = map.get("myBegin").get(0);
List ends = map.get("followed");
for (MetricsEvent me: ends
) {
if (me.getApplication().equals(begin.getApplication())) {
Long latency = me.getEpochTime() - begin.getEpochTime();
collector.collect(new MetricLatency(begin.getUid(), 
begin.getApplication(), latency));
}
}

}
}
However I don't get any output, the printlines in the pattern or the flatselect 
function never print.
How do you debug something like this?

This message contains confidential information and is intended only for the 
individual named. If you are not the named addressee, you should not 
disseminate, distribute, alter or copy this e-mail. Please notify the sender 
immediately by e-mail if you have received this e-mail by mistake and delete 
this e-mail from your system. E-mail transmissions cannot be guaranteed to be 
secure or without error as information could be intercepted, corrupted, lost, 
destroyed, arrive late or incomplete, or contain viruses. The sender, 
therefore, does not accept liability for any errors or omissions in the 
contents of this message which arise during or as a result of e-mail 
transmission. If verification is required, please request a hard-copy version. 
This message is provided for information purposes and should not be construed 
as a solicitation or offer to buy or sell any securities or related financial 
instruments in any jurisdiction.  Securities are offered in the U.S. through 
PIMCO Investments LLC, distributor and a company of PIMCO LLC.

The individual providing the information herein is an employee of Pacific 
Investment Management Company LLC ("PIMCO"), an SEC-registered investment 
adviser.  To the extent such individual advises you regarding a PIMCO 
investment strategy, he or she does so as an associated person of PIMCO.  To 
the extent that any information is provided to you related to a PIMCO-sponsored 
investment fund ("PIMCO Fund"), it is being provided to you in the individual's 
capacity as a registered representative of PIMCO Investments LLC ("PI"), an 
SEC-registered broker-dealer.  PI is not 

Re: debug for Flink

2018-04-19 Thread Qian Ye
Thanks for your kind reply. 
But I still have some question. What does the logging level mean in your 
system? Why do you need to re-deploy the cluster to change the logging level?  
As far as I know, the debugging information can be divided into level like 
info, warn, error, etc. Is these information totally logged during the system 
running? And then they can further be analyzed by category them by level?

Best,
Stephen

> On Apr 19, 2018, at 7:19 AM, Kien Truong  wrote:
> 
> Hi,
> 
> Our most useful tool when debugging Flink is actually the simple log files, 
> because debugger just slow things down too much for us.
> 
> However, having to re-deploy the entire cluster to change the logging level 
> is a pain (we use YARN),
> 
> so we would really like an easier method to change the logging level at 
> runtime.
> 
> 
> Regards,
> 
> Kien
> 
> 
> On 4/19/2018 5:53 AM, Qian Ye wrote:
>> Hi
>> 
>> I’m wondering if new debugging methods/tools  are urgent for Flink 
>> development. I know there already exists some debug methods for Flink, e.g., 
>> remote debugging of flink 
>> clusters(https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters).
>>  But are they are convenient enough?
>> 
>> Best regards.



Re: debug for Flink

2018-04-19 Thread Qian Ye
That sounds nice. But the lazy evaluation feature of Flink seems to cause the 
debug process more different with a regular Java/Scala application. Do you know 
how to debug a Flink application, e.g., tracing some local variables, in IDE?

Best,
Stephen

> On Apr 19, 2018, at 6:29 AM, Fabian Hueske  wrote:
> 
> Hi,
> 
> You can run Flink applications locally in your IDE and debug a Flink program 
> just like a regular Java/Scala application.
> 
> Best, Fabian
> 
> 2018-04-19 0:53 GMT+02:00 Qian Ye  >:
> Hi
> 
> I’m wondering if new debugging methods/tools  are urgent for Flink 
> development. I know there already exists some debug methods for Flink, e.g., 
> remote debugging of flink 
> clusters(https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters
>  
> ).
>  But are they are convenient enough? 
> 
> Best regards.
> 



Re: Testing on Flink 1.5

2018-04-19 Thread Gary Yao
Hi Amit,

Thank you for the follow up. What you describe sounds like a bug but I am
not
able to reproduce it. Can you open an issue in Jira with an outline of your
code
and how you submit the job?

> Could you also recommend us the best practice in FLIP6, should we use
YARN session or submit jobs in non-detached mode?

This should be independent of FLIP-6. We are aiming at backwards
compatibility
but apparently there are still things that require polishing.

Best,
Gary

On Thu, Apr 19, 2018 at 12:49 PM, Amit Jain  wrote:
>
> Hi Gary,
>
> We found the underlying issue with the following problem.
> Few of our jobs are stuck with logs [1], these jobs are only able to
allocate JM and couldn't get any TM, however, there are ample resource on
our cluster.
>
> We are running ETL merge job here. In this job, we first find new deltas
and if there is no delta detected then we make exit without actually
executing the job. I think this is the reason we see no TM allocation is
happening.
>
> I believe in above case (non-detached mode) we should mark the submitted
application as complete compare to running. Please share your thoughts on
this.
> Should I log this improvement in JIRA?
>
> Could you also recommend us the best practice in FLIP6, should we use
YARN session or submit jobs in non-detached mode?
>
> --
> Thanks,
> Amit
>


Re: Flink Kafka connector not exist

2018-04-19 Thread Tzu-Li (Gordon) Tai
Hi Sebastien,

You need to add the dependency under a “dependencies” section, like so:


…


Then it should be working.

I would also recommend using the Flink quickstart Maven templates [1], as they 
already have a well defined Maven project skeleton for Flink jobs.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html

On 19 April 2018 at 10:17:11 PM, Lehuede sebastien (lehued...@gmail.com) wrote:

Hi Guys,

I have created a project with Maven to try to send data from Kafka to Flink. 
But when i try to build the project i have the following error :

[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on 
project processing-app: Compilation failure: Compilation failure:
[ERROR] 
/env/mvn/test-0.0.1/processing-app/src/main/java/com/test/alpha/ReadFromKafka.java:[24,51]
 package org.apache.flink.streaming.connectors.kafka does not exist
[ERROR] 
/env/mvn/test-0.0.1/processing-app/src/main/java/com/test/alpha/ReadFromKafka.java:[52,63]
 cannot find symbol
[ERROR]   symbol:   class FlinkKafkaConsumer011

Here is my "pom.xml" configuration for Flink Kafka connector : 

                1.4.2
                1.8
                2.11
                ${java.version}
                ${java.version}

                
                        org.apache.flink
                        
flink-connector-kafka-0.11_${scala.binary.version}
                        ${flink.version}
                

And here is the import line in my java file :

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

Can anyone could help me with this issue ? 

Regards,
Sebastien

Re: Substasks - Uneven allocation

2018-04-19 Thread Kien Truong

Hi Pedro,

You can try to call either

.rebalance() or|.shuffle()|

||

|before the Async operator. Shuffle might give a better result if you 
have fewer tasks than parallelism. Best regards, Kien |


On 4/18/2018 11:10 PM, PedroMrChaves wrote:

Hello,

I have a job that has one async operational node (i.e. implements
AsyncFunction). This Operational node will spawn multiple threads that
perform heavy tasks (cpu bound).

I have a Flink Standalone cluster deployed on two machines of 32 cores and
128 gb of RAM, each machine has one task manager and one Job Manager. When I
deploy the job, all of the subtasks from the async operational node end up
on the same machine, which causes it to have a much higher cpu load then the
other.

I've researched ways to overcome this issue, but I haven't found a solution
to my problem.
Ideally, the subtasks would be evenly split across both machines.

Can this problem be solved somehow?

Regards,
Pedro Chaves.



-
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: debug for Flink

2018-04-19 Thread Kien Truong

Hi,

Our most useful tool when debugging Flink is actually the simple log 
files, because debugger just slow things down too much for us.


However, having to re-deploy the entire cluster to change the logging 
level is a pain (we use YARN),


so we would really like an easier method to change the logging level at 
runtime.



Regards,

Kien


On 4/19/2018 5:53 AM, Qian Ye wrote:

Hi

I’m wondering if new debugging methods/tools  are urgent for Flink 
development. I know there already exists some debug methods for Flink, 
e.g., remote debugging of flink 
clusters(https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters). 
But are they are convenient enough?


Best regards.


Flink Kafka connector not exist

2018-04-19 Thread Lehuede sebastien
Hi Guys,

I have created a project with Maven to try to send data from Kafka to
Flink. But when i try to build the project i have the following error :

*[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile
(default-compile) on project processing-app: Compilation failure:
Compilation failure:*
*[ERROR]
/env/mvn/test-0.0.1/processing-app/src/main/java/com/test/alpha/ReadFromKafka.java:[24,51]
package org.apache.flink.streaming.connectors.kafka does not exist*
*[ERROR]
/env/mvn/test-0.0.1/processing-app/src/main/java/com/test/alpha/ReadFromKafka.java:[52,63]
cannot find symbol*
*[ERROR]   symbol:   class FlinkKafkaConsumer011*

Here is my "pom.xml" configuration for Flink Kafka connector :

*1.4.2
1.8
2.11
${java.version}
  ${java.version}*

**
*org.apache.flink*
*
flink-connector-kafka-0.11_${scala.binary.version}*
*${flink.version}*
**

And here is the import line in my java file :

*import**
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; *

Can anyone could help me with this issue ?

Regards,
Sebastien


Re: Consumer offsets not visible in Kafka

2018-04-19 Thread Kien Truong

Hi,

That tool only shows active consumer-groups that make use of the 
automatic partitions assignment API.


Flink use the manual partitions assignment API, so it will now show up 
there.



The best way to monitor kafka offset with Flink is using Flink's own 
metrics system.


Otherwise, you can scan the __committed_offset  topics and search for 
the committed offset,


however this result can be outdated, since Flink only commits offset on 
checkpoint


bin/kafka-console-consumer.sh --consumer.config config/consumer.properties
--from-beginning --topic __consumer_offsets --zookeeper zookeeper:2181
--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"


Best regards,

Kien


On 4/19/2018 4:37 PM, bernd.winterst...@dev.helaba.de wrote:

Hi
We are using Kafka 0.11  consumers with Flink 1.4 and Confluence Kafka 
4.0.0. Checkpointing is enabled and enableCommitOnCheckpoints ist set 
to true.
However there are no offsets from Flink jobs visible in Kafka when 
checking with the kafka-consumer-groups tool.

Any ideas
Regards
Bernd

_ _


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum 
Informationsaustausch. Wir koennen auf diesem Wege keine 
rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.


Der Inhalt dieser Nachricht ist vertraulich und nur fuer den 
angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder 
Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht 
fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder 
telefonisch in Verbindung zu setzen.


Please use your E-mail connection with us exclusively for the exchange 
of information. We do not accept legally binding declarations (orders, 
etc.) by this means of communication.


The contents of this message is confidential and intended only for the 
recipient indicated. Taking notice of this message or disclosure by 
third parties is not
permitted. In the event that this message is not intended for you, 
please contact us via E-mail or phone.


Re: Tracking deserialization errors

2018-04-19 Thread Tzu-Li (Gordon) Tai
@Alexander
Sorry about that, that would be my mistake. I’ll close FLINK-9204 as a 
duplicate and leave my thoughts on FLINK-9155. Thanks for pointing out!


On 19 April 2018 at 2:00:51 AM, Elias Levy (fearsome.lucid...@gmail.com) wrote:

Either proposal would work.  In the later case, at a minimum we'd need a way to 
identify the source within the metric.  The basic error metric would then allow 
us to go into the logs to determine the cause of the error, as we already 
record the message causing trouble in the log. 


On Mon, Apr 16, 2018 at 4:42 AM, Fabian Hueske  wrote:
Thanks for starting the discussion Elias.

I see two ways to address this issue.

1) Add an interface that a deserialization schema can implement to register 
metrics. Each source would need to check for the interface and call it to setup 
metrics.
2) Check for null returns in the source functions and increment a respective 
counter.

In both cases, we need to touch the source connectors.

I see that passing information such as topic name, partition, and offset are 
important debugging information. However, I don't think that metrics would be 
good to capture them.
In that case, log files might be a better approach.

I'm not sure to what extend the source functions (Kafka, Kinesis) support such 
error tracking.
Adding Gordon to the thread who knows the internals of the connectors.



Re: Flink / Kafka unit testing with scalatest-embedded-kafka

2018-04-19 Thread Ted Yu
Pardon - I missed the implicit config (which is used by withRunningKafka).

Without your manual message production, was there any indication in broker
log that it received message(s) ?

Thanks

On Thu, Apr 19, 2018 at 6:31 AM, Chauvet, Thomas 
wrote:

> Hi,
>
>
>
> withRunningKafka launch a kafka broker. This is one of the advantage of
> this library.
>
>
>
> I test to consume / produce messages with kafka command line, and it seems
> alright.
>
>
>
> Thanks
>
>
>
> *De :* Ted Yu [mailto:yuzhih...@gmail.com]
> *Envoyé :* jeudi 19 avril 2018 15:28
> *À :* Chauvet, Thomas 
> *Objet :* Re: Flink / Kafka unit testing with scalatest-embedded-kafka
>
>
>
> Looking at your code, Kafka broker was not started.
>
>
>
> Was there running broker on localhost ?
>
>
>
> Cheers
>
>
>
> On Thu, Apr 19, 2018 at 6:23 AM, Chauvet, Thomas 
> wrote:
>
> Hi,
>
>
>
> I would like to « unit test » a job flink with Kafka as source (and Sink).
> I am trying to use the library scalatest-embedded-kafka to simulate a Kafka
> for my test.
>
>
>
> For example, I would like to get data (string stream) from Kafka, convert
> it intro uppercase and put it into another topic.
>
>
>
> Now, I am just trying to use Flink’s kafka consumer to read into a topic
> (with embedded kafka).
>
>
>
> Here is the code for example :
>
>
>
> ```scala
>
>
>
> import java.util.Properties
>
>
>
> import org.apache.flink.streaming.api.scala._
>
> import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
>
> import org.apache.flink.api.common.serialization.SimpleStringSchema
>
> import org.apache.flink.core.fs.FileSystem.WriteMode
>
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
>
> import org.scalatest.{Matchers, WordSpec}
>
> import scala.util.Random
>
>
>
> object SimpleFlinkKafkaTest {
>
>   SimpleFlinkKafkaTest
>
>   val kafkaPort = 9092
>
>   val zooKeeperPort = 2181
>
>
>
>   val groupId = Random.nextInt(100).toString
>
>   val props = new Properties()
>
>   props.put("bootstrap.servers", "localhost:9092")
>
>   props.put("zookeeper.connect", "localhost:2181")
>
>   props.put("auto.offset.reset", "earliest")
>
>   props.put("group.id", groupId)
>
>   props.put("key.deserializer", "org.apache.kafka.common.serialization.
> StringDeserializer")
>
>   props.put("value.deserializer", "org.apache.kafka.common.serialization.
> StringDeserializer")
>
>
>
>   val propsMap = Map(
>
>"bootstrap.servers" -> "localhost:9092",
>
> "zookeeper.connect" -> "localhost:2181",
>
> "auto.offset.reset" -> "earliest",
>
> "group.id" -> groupId,
>
> "key.deserializer" -> "org.apache.kafka.common.serialization.
> StringDeserializer",
>
> "value.deserializer" -> "org.apache.kafka.common.serialization.
> StringDeserializer"
>
>   )
>
>
>
>   val inputString = "mystring"
>
>   val expectedString = "MYSTRING"
>
>
>
> }
>
>
>
> class SimpleFlinkKafkaTest extends WordSpec with Matchers with
> EmbeddedKafka {
>
>
>
>   "runs with embedded kafka" should {
>
>
>
> "work" in {
>
>
>
>   implicit val config = EmbeddedKafkaConfig(
>
> kafkaPort = SimpleFlinkKafkaTest.kafkaPort,
>
> zooKeeperPort = SimpleFlinkKafkaTest.zooKeeperPort,
>
> customConsumerProperties = SimpleFlinkKafkaTest.propsMap
>
>   )
>
>
>
>   withRunningKafka {
>
>
>
> publishStringMessageToKafka("input-topic", SimpleFlinkKafkaTest.
> inputString)
>
>
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.setParallelism(1)
>
>
>
> val kafkaConsumer = new FlinkKafkaConsumer011(
>
>   "input-topic",
>
>   new SimpleStringSchema,
>
>   SimpleFlinkKafkaTest.props
>
> )
>
>
>
> val inputStream = env.addSource(kafkaConsumer)
>
>
>
> val outputStream = inputStream.map { msg =>
>
>   msg.toUpperCase
>
> }
>
>
>
> outputStream.writeAsText("test.csv", WriteMode.OVERWRITE)
>
>
>
> env.execute()
>
>
>
> consumeFirstStringMessageFrom("output-topic") shouldEqual
> SimpleFlinkKafkaTest.expectedString
>
>
>
>   }
>
> }
>
>   }
>
> }
>
> ```
>
>
>
> The flink process si running but nothing happen. I try ot write into a
> text file to see any output but there is nothing into the file.
>
>
>
> Any idea ? Does anybody use this library to test a Flink Job using Kafka ?
>
>
>
> Thanks in advance,
>
>
>
> Thomas
>
>
>


Re: Applying an void function to DataStream

2018-04-19 Thread Timo Walther
In your case a FlatMapFunction is better suited because it allows 0, 1 
or more output.


It would look like this:

text.flatMap((FlatMapFunction) (value, out) ->parse(value));


Or with an anonymous class:

text.flatMap(new FlatMapFunction() {
   @Override public void flatMap(String value, Collector out)throws 
Exception {
  parse(value); }
});


Regards,
Timo

Am 19.04.18 um 15:26 schrieb Soheil Pourbafrani:
parse function is a static function from another class I've imported 
into the project.


On Thu, Apr 19, 2018 at 5:55 PM, Soheil Pourbafrani 
> wrote:


Thanks,
my map code is like this:

stream.map(x -> parse(x));

I can't get what you mean!
Something like the line below?

DataStream t = stream.map(x -> parse(x));

?

On Thu, Apr 19, 2018 at 5:49 PM, Timo Walther > wrote:

Hi Soheil,

Flink supports the type "java.lang.Void" which you can use in
this case.

Regards,
Timo


Am 19.04.18 um 15:16 schrieb Soheil Pourbafrani:

Hi, I have a void function that takes a String, parse it
and write it into Cassandra (Using pure java, not Flink
Cassandra connector). Using Apache Flink Kafka connector,
I've got some data into DataStream. Now I want to
apply Parse function to each message in
DataStream, but as the Parse function returns
nothing (is void), I got the error
no instance of type variable R exists so that void
conforms to R

Is there any way to do such process using apache Flink?









RE: Flink / Kafka unit testing with scalatest-embedded-kafka

2018-04-19 Thread Chauvet, Thomas
Hi,

withRunningKafka launch a kafka broker. This is one of the advantage of this 
library.

I test to consume / produce messages with kafka command line, and it seems 
alright.

Thanks

De : Ted Yu [mailto:yuzhih...@gmail.com]
Envoyé : jeudi 19 avril 2018 15:28
À : Chauvet, Thomas 
Objet : Re: Flink / Kafka unit testing with scalatest-embedded-kafka

Looking at your code, Kafka broker was not started.

Was there running broker on localhost ?

Cheers

On Thu, Apr 19, 2018 at 6:23 AM, Chauvet, Thomas 
> wrote:
Hi,

I would like to « unit test » a job flink with Kafka as source (and Sink). I am 
trying to use the library scalatest-embedded-kafka to simulate a Kafka for my 
test.

For example, I would like to get data (string stream) from Kafka, convert it 
intro uppercase and put it into another topic.

Now, I am just trying to use Flink’s kafka consumer to read into a topic (with 
embedded kafka).

Here is the code for example :

```scala

import java.util.Properties

import org.apache.flink.streaming.api.scala._
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.scalatest.{Matchers, WordSpec}
import scala.util.Random

object SimpleFlinkKafkaTest {
  SimpleFlinkKafkaTest
  val kafkaPort = 9092
  val zooKeeperPort = 2181

  val groupId = Random.nextInt(100).toString
  val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("zookeeper.connect", "localhost:2181")
  props.put("auto.offset.reset", "earliest")
  props.put("group.id", groupId)
  props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
  props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")

  val propsMap = Map(
   "bootstrap.servers" -> "localhost:9092",
"zookeeper.connect" -> "localhost:2181",
"auto.offset.reset" -> "earliest",
"group.id" -> groupId,
"key.deserializer" -> 
"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> 
"org.apache.kafka.common.serialization.StringDeserializer"
  )

  val inputString = "mystring"
  val expectedString = "MYSTRING"

}

class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka {

  "runs with embedded kafka" should {

"work" in {

  implicit val config = EmbeddedKafkaConfig(
kafkaPort = SimpleFlinkKafkaTest.kafkaPort,
zooKeeperPort = SimpleFlinkKafkaTest.zooKeeperPort,
customConsumerProperties = SimpleFlinkKafkaTest.propsMap
  )

  withRunningKafka {

publishStringMessageToKafka("input-topic", 
SimpleFlinkKafkaTest.inputString)

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val kafkaConsumer = new FlinkKafkaConsumer011(
  "input-topic",
  new SimpleStringSchema,
  SimpleFlinkKafkaTest.props
)

val inputStream = env.addSource(kafkaConsumer)

val outputStream = inputStream.map { msg =>
  msg.toUpperCase
}

outputStream.writeAsText("test.csv", WriteMode.OVERWRITE)

env.execute()

consumeFirstStringMessageFrom("output-topic") shouldEqual 
SimpleFlinkKafkaTest.expectedString

  }
}
  }
}
```

The flink process si running but nothing happen. I try ot write into a text 
file to see any output but there is nothing into the file.

Any idea ? Does anybody use this library to test a Flink Job using Kafka ?

Thanks in advance,

Thomas



Re: debug for Flink

2018-04-19 Thread Fabian Hueske
Hi,

You can run Flink applications locally in your IDE and debug a Flink
program just like a regular Java/Scala application.

Best, Fabian

2018-04-19 0:53 GMT+02:00 Qian Ye :

> Hi
>
> I’m wondering if new debugging methods/tools  are urgent for Flink
> development. I know there already exists some debug methods for Flink,
> e.g., remote debugging of flink clusters(https://cwiki.apache.
> org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters). But are
> they are convenient enough?
>
> Best regards.
>


Flink / Kafka unit testing with scalatest-embedded-kafka

2018-04-19 Thread Chauvet, Thomas
Hi,

I would like to < unit test > a job flink with Kafka as source (and Sink). I am 
trying to use the library scalatest-embedded-kafka to simulate a Kafka for my 
test.

For example, I would like to get data (string stream) from Kafka, convert it 
intro uppercase and put it into another topic.

Now, I am just trying to use Flink's kafka consumer to read into a topic (with 
embedded kafka).

Here is the code for example :

```scala

import java.util.Properties

import org.apache.flink.streaming.api.scala._
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.scalatest.{Matchers, WordSpec}
import scala.util.Random

object SimpleFlinkKafkaTest {
  SimpleFlinkKafkaTest
  val kafkaPort = 9092
  val zooKeeperPort = 2181

  val groupId = Random.nextInt(100).toString
  val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("zookeeper.connect", "localhost:2181")
  props.put("auto.offset.reset", "earliest")
  props.put("group.id", groupId)
  props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
  props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")

  val propsMap = Map(
   "bootstrap.servers" -> "localhost:9092",
"zookeeper.connect" -> "localhost:2181",
"auto.offset.reset" -> "earliest",
"group.id" -> groupId,
"key.deserializer" -> 
"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> 
"org.apache.kafka.common.serialization.StringDeserializer"
  )

  val inputString = "mystring"
  val expectedString = "MYSTRING"

}

class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka {

  "runs with embedded kafka" should {

"work" in {

  implicit val config = EmbeddedKafkaConfig(
kafkaPort = SimpleFlinkKafkaTest.kafkaPort,
zooKeeperPort = SimpleFlinkKafkaTest.zooKeeperPort,
customConsumerProperties = SimpleFlinkKafkaTest.propsMap
  )

  withRunningKafka {

publishStringMessageToKafka("input-topic", 
SimpleFlinkKafkaTest.inputString)

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val kafkaConsumer = new FlinkKafkaConsumer011(
  "input-topic",
  new SimpleStringSchema,
  SimpleFlinkKafkaTest.props
)

val inputStream = env.addSource(kafkaConsumer)

val outputStream = inputStream.map { msg =>
  msg.toUpperCase
}

outputStream.writeAsText("test.csv", WriteMode.OVERWRITE)

env.execute()

consumeFirstStringMessageFrom("output-topic") shouldEqual 
SimpleFlinkKafkaTest.expectedString

  }
}
  }
}
```

The flink process si running but nothing happen. I try ot write into a text 
file to see any output but there is nothing into the file.

Any idea ? Does anybody use this library to test a Flink Job using Kafka ?

Thanks in advance,

Thomas


Re: Applying an void function to DataStream

2018-04-19 Thread Timo Walther

Hi Soheil,

Flink supports the type "java.lang.Void" which you can use in this case.

Regards,
Timo


Am 19.04.18 um 15:16 schrieb Soheil Pourbafrani:
Hi, I have a void function that takes a String, parse it and write it 
into Cassandra (Using pure java, not Flink Cassandra connector). Using 
Apache Flink Kafka connector, I've got some data into 
DataStream. Now I want to apply Parse function to each message 
in DataStream, but as the Parse function returns nothing (is 
void), I got the error

no instance of type variable R exists so that void conforms to R

Is there any way to do such process using apache Flink?





Applying an void function to DataStream

2018-04-19 Thread Soheil Pourbafrani
Hi, I have a void function that takes a String, parse it and write it into
Cassandra (Using pure java, not Flink Cassandra connector). Using Apache
Flink Kafka connector, I've got some data into DataStream. Now I
want to apply Parse function to each message in DataStream, but as
the Parse function returns nothing (is void), I got the error
no instance of type variable R exists so that void conforms to R

Is there any way to do such process using apache Flink?


Regarding keyed stream

2018-04-19 Thread Vishal Santoshi
It is evident that if I have n windows and subsequent aggregate, on a keyed
stream, it is a n way hash increasing as in n copies of the stream. Is
there a reason why that is the direction flink has gone rather than one
hash operator localized specific  window/subsequent operations.


Re: Testing on Flink 1.5

2018-04-19 Thread Amit Jain
Hi Gary,

We found the underlying issue with the following problem.
Few of our jobs are stuck with logs [1], these jobs are only able to
allocate JM and couldn't get any TM, however, there are ample resource on
our cluster.

We are running ETL merge job here. In this job, we first find new deltas
and if there is no delta detected then we make exit without actually
executing the job. I think this is the reason we see no TM allocation is
happening.

I believe in above case (non-detached mode) we should mark the submitted
application as complete compare to running. Please share your thoughts on
this.
Should I log this improvement in JIRA?

Could you also recommend us the best practice in FLIP6, should we use YARN
session or submit jobs in non-detached mode?

--
Thanks,
Amit


Consumer offsets not visible in Kafka

2018-04-19 Thread Bernd.Winterstein
Hi
We are using Kafka 0.11  consumers with Flink 1.4 and Confluence Kafka 4.0.0. 
Checkpointing is enabled and enableCommitOnCheckpoints ist set to true.
However there are no offsets from Flink jobs visible in Kafka when checking 
with the kafka-consumer-groups tool.
Any ideas

Regards

Bernd



  


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum 
Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen 
Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen 
Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte 
ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten 
wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of 
information. We do not accept legally binding declarations (orders, etc.) by 
this means of communication.

The contents of this message is confidential and intended only for the 
recipient indicated. Taking notice of this message or disclosure by third 
parties is not
permitted. In the event that this message is not intended for you, please 
contact us via E-mail or phone.


1.4.3 release/roadmap

2018-04-19 Thread Daniel Harper
Hi there,

There are some bug fixes that are in the 1.4 branch that we would like to be 
made available for us to use.

Is there a roadmap from the project when the next stable 1.4.x release will be 
cut? Any blockers?


Re: Efficiency with different approaches of aggregation in Flink

2018-04-19 Thread Fabian Hueske
Hi Teena,

I'd go with approach 2. The performance difference shouldn't be significant
compared to 1. but it is much easier to implement, IMO.

Avoid approach 3. It will be much slower because you need at least one call
to an external data store and more difficult to implement.
Flink's checkpointing mechanism (as used by 1. and 2. ) gives you better
consistency and protection against failures than what you can achieve with
3.

Cheers, Fabian

2018-04-19 8:42 GMT+02:00 Puneet Kinra :

> Hi Teena
>
> If you are proceeding with point 3, no doubt it will add some overhead but
> major significance is that you are persisting the state as per
> some key. so there will not be data loss in case of the job failure.
>
>
>
> On Thu, Apr 19, 2018 at 11:45 AM, Teena Kappen // BPRISE <
> teena.kap...@bprise.com> wrote:
>
>> Hi,
>>
>>
>>
>> If I have to aggregate a value in a stream of records, which one of the
>> below approaches will be the most/least efficient?
>>
>>
>>
>>1. Using a Global Window to aggregate the value and emit the record
>>when it reaches a particular threshold value.
>>2. Using a FlatMap with a State Variable which gets updated with each
>>incoming record and emit the record when it reaches the threshold value.
>>3. Using a FlatMap to store the aggregated value in an in-memory DB
>>like Redis and query the value and update it with each incoming record, 
>> and
>>emit the record when it reaches the threshold value.
>>
>>
>>
>> Please rate the three approaches according to their efficiency.
>>
>>
>>
>> Regards,
>>
>> Teena
>>
>
>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>


Re: Efficiency with different approaches of aggregation in Flink

2018-04-19 Thread Puneet Kinra
Hi Teena

If you are proceeding with point 3, no doubt it will add some overhead but
major significance is that you are persisting the state as per
some key. so there will not be data loss in case of the job failure.



On Thu, Apr 19, 2018 at 11:45 AM, Teena Kappen // BPRISE <
teena.kap...@bprise.com> wrote:

> Hi,
>
>
>
> If I have to aggregate a value in a stream of records, which one of the
> below approaches will be the most/least efficient?
>
>
>
>1. Using a Global Window to aggregate the value and emit the record
>when it reaches a particular threshold value.
>2. Using a FlatMap with a State Variable which gets updated with each
>incoming record and emit the record when it reaches the threshold value.
>3. Using a FlatMap to store the aggregated value in an in-memory DB
>like Redis and query the value and update it with each incoming record, and
>emit the record when it reaches the threshold value.
>
>
>
> Please rate the three approaches according to their efficiency.
>
>
>
> Regards,
>
> Teena
>



-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Efficiency with different approaches of aggregation in Flink

2018-04-19 Thread Teena Kappen // BPRISE
Hi,

If I have to aggregate a value in a stream of records, which one of the below 
approaches will be the most/least efficient?


  1.  Using a Global Window to aggregate the value and emit the record when it 
reaches a particular threshold value.
  2.  Using a FlatMap with a State Variable which gets updated with each 
incoming record and emit the record when it reaches the threshold value.
  3.  Using a FlatMap to store the aggregated value in an in-memory DB like 
Redis and query the value and update it with each incoming record, and emit the 
record when it reaches the threshold value.

Please rate the three approaches according to their efficiency.

Regards,
Teena