[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-13 Thread Fred Teunissen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579276#comment-16579276
 ] 

Fred Teunissen commented on FLINK-8500:
---

I've rebased this PR with the latest master branch yesterday evening.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-28 Thread Fred Teunissen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493109#comment-16493109
 ] 

Fred Teunissen commented on FLINK-8500:
---

@[~aljoscha], [~tzulitai], If you like, I can make the PR with a new default 
{{deserialize}} method on the interface {{KeyedDeserializationSchema}} with 
either the signature
 {{default T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException ...}}
 or the signature
 {{default T deserialize(ConsumerRecord record) throws 
IOException ...}}

I have no preference, both have their pros and cons.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-19 Thread Fred Teunissen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481515#comment-16481515
 ] 

Fred Teunissen commented on FLINK-8500:
---

I agree that 'separation of concerns' is a good principle, but in this 
situation I have a problem with it. The interface DeserializationSchema is 
responsible for deserialisation of the message, but in our case the messages 
are encrypted and every topic has its own decryption key. So we need at least 
the topic name during deserialisation, so currently we are using the interface 
KeyedDeserializationSchema. I don't think this is kafka specific, so it 
should be solved in the "common deserialization schema" interface.  

Another concern I have is that by having a two-step approach for creating the 
records from kafka messages, you get more memory allocations and thus more 
garbage collection cycles.

Like [~StephanEwen] said, this issue can be fixed for now by introducing a new 
default method on the interface KeyedDeserializationSchema, but for the long 
run a new `common connector framework` should be designed and implemented. This 
new `common connector framework` should also address some other issues like 
'temporarily idleness of partitions', 'customisable partition assignments' .

When you agree, I can make a new PR with the new default method on the 
interface KeyedDeserializationSchema.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-03 Thread Fred Teunissen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462345#comment-16462345
 ] 

Fred Teunissen commented on FLINK-8500:
---

[~aljoscha] I have some time that I can spend on this, so yes I'm interested. I 
can also update the current PR with the changes unless you think it's better to 
make a new fresh PR for this.

There are now 3 possible approaches.
 # Kafka {{ConsumerRecord}} as parameter in the Flink API
 # a new {{ConsumerRecordMetaInfo}} class as parameter for the {{deserialize}} 
method
 # extend the interface {{KeyedDeserializationSchema}} with a new method, 
although I don't know how to give an interface default behavior in java.

I'm leaning to almost the same approach I used now, creating a new separate 
interface {{RichDeserializationSchema}} with a {{deserialize}} method with a 
{{ConsumerRecordMetaInfo}} parameter. Also create a 
{{RichDeserializationSchemaWrapper}} for implementing the current API for 
backwards compatibility

Do you think this is the right approach?

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-02 Thread Fred Teunissen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16461953#comment-16461953
 ] 

Fred Teunissen commented on FLINK-8500:
---

Both approaches will work, but we have to choose. The first approach exposes 
the Kafka API as part of the Flink API. The second approach hides the Kafka API 
but will require a bit more resources to maintain. The second approach would 
have my vote.

I don’t want to introduce scope creep, but I think there are more input sources 
that could benefit from a more generic (de)serialization scheme. Should we look 
into that, or leave it for now (in issues 5479 the idea of a `common connector 
framework` is mentioned, should it be picked up there)?

 

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-04-28 Thread Fred Teunissen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457600#comment-16457600
 ] 

Fred Teunissen commented on FLINK-8500:
---

I've created a [pull request|https://github.com/apache/flink/pull/5939] for 
this issue. This is my first pull request, so I hope that I addressed all of 
the 'contribution code guidelines' correctly. Please let me know whether I 
should do something different or when I forgot something. 

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-04-22 Thread Fred Teunissen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447144#comment-16447144
 ] 

Fred Teunissen commented on FLINK-8500:
---

We want to use the kafka message timestamp in our business logic, so we would 
like to have this timestamp available during the deserialisation of the 
message. But we also need the TimestampType to be able to detect from whom the 
timestamp came (the producer or the kafka broker).

If we don't want to break the interface we can make a new interface 
{{KeyedAndTimestampDeserializationSchema}} extending the interface 
{{KeyedDeserializationSchema}} and add a 
{{KeyedAndTimestampDeserializationSchemaWrapper}} that accepts a 
{{KeyedDeserializationSchema}}. The FlinkKafkaConsumers constructors can be 
extended to accept the new {{KeyedAndTimestampDeserializationSchema}} as 
parameter and wrap the calls with the {{KeyedDeserializationSchema}} interface.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9149) The creation of the ExecutionPlan fails when you combine a SideOutput with an SplitStream.

2018-04-09 Thread Fred Teunissen (JIRA)
Fred Teunissen created FLINK-9149:
-

 Summary: The creation of the ExecutionPlan fails when you combine 
a SideOutput with an SplitStream.
 Key: FLINK-9149
 URL: https://issues.apache.org/jira/browse/FLINK-9149
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.4.2
Reporter: Fred Teunissen


The creation of the ExecutionPlan fails when you combine a SideOutput with an 
SplitStream.

Code:
{code:java}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object SideOutputTest {
  def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)

val inputLongStream = env.generateSequence(0L, 100L)

val filteredLongStream = inputLongStream.process(new LogFilterFunction)
val splittedLongStream = filteredLongStream.split(l => Seq((l%4).toString))
// workaround
//  val splittedLongStream = filteredLongStream.map(x=>x).split(l => 
Seq((l%4).toString))

val oneStream = splittedLongStream.select("1").map(l => l+1000)
val twoStream = splittedLongStream.select("2").map(l => l+2000)
val threeStream = splittedLongStream.select("3").map(l => l+3000)

oneStream.union(twoStream, threeStream).print()

val loggingStream = filteredLongStream.getSideOutput(loggingOutputTag)
loggingStream.print()

println(env.getExecutionPlan)

env.execute()
  }

  val loggingOutputTag = OutputTag[String]("loggingStream")
}

class LogFilterFunction extends ProcessFunction[Long, Long] {
  override def processElement(value: Long, ctx: ProcessFunction[Long, 
Long]#Context, out: Collector[Long]): Unit = {
if (value % 4 == 0) {
  ctx.output(SideOutputTest.loggingOutputTag, s"LogFilterFunction logging 
for $value")
}else {
  out.collect(value)
}
  }
}
{code}
Exception:
{noformat}
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at 
org.apache.flink.streaming.api.collector.selector.DirectedOutput.(DirectedOutput.java:74)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:331)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:346)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
{noformat}
Workaround:

add a redundant *map(x=>x)* before the *split* function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)