[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579276#comment-16579276 ] Fred Teunissen commented on FLINK-8500: --- I've rebased this PR with the latest master branch yesterday evening. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493109#comment-16493109 ] Fred Teunissen commented on FLINK-8500: --- @[~aljoscha], [~tzulitai], If you like, I can make the PR with a new default {{deserialize}} method on the interface {{KeyedDeserializationSchema}} with either the signature {{default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException ...}} or the signature {{default T deserialize(ConsumerRecord record) throws IOException ...}} I have no preference, both have their pros and cons. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481515#comment-16481515 ] Fred Teunissen commented on FLINK-8500: --- I agree that 'separation of concerns' is a good principle, but in this situation I have a problem with it. The interface DeserializationSchema is responsible for deserialisation of the message, but in our case the messages are encrypted and every topic has its own decryption key. So we need at least the topic name during deserialisation, so currently we are using the interface KeyedDeserializationSchema. I don't think this is kafka specific, so it should be solved in the "common deserialization schema" interface. Another concern I have is that by having a two-step approach for creating the records from kafka messages, you get more memory allocations and thus more garbage collection cycles. Like [~StephanEwen] said, this issue can be fixed for now by introducing a new default method on the interface KeyedDeserializationSchema, but for the long run a new `common connector framework` should be designed and implemented. This new `common connector framework` should also address some other issues like 'temporarily idleness of partitions', 'customisable partition assignments' . When you agree, I can make a new PR with the new default method on the interface KeyedDeserializationSchema. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462345#comment-16462345 ] Fred Teunissen commented on FLINK-8500: --- [~aljoscha] I have some time that I can spend on this, so yes I'm interested. I can also update the current PR with the changes unless you think it's better to make a new fresh PR for this. There are now 3 possible approaches. # Kafka {{ConsumerRecord}} as parameter in the Flink API # a new {{ConsumerRecordMetaInfo}} class as parameter for the {{deserialize}} method # extend the interface {{KeyedDeserializationSchema}} with a new method, although I don't know how to give an interface default behavior in java. I'm leaning to almost the same approach I used now, creating a new separate interface {{RichDeserializationSchema}} with a {{deserialize}} method with a {{ConsumerRecordMetaInfo}} parameter. Also create a {{RichDeserializationSchemaWrapper}} for implementing the current API for backwards compatibility Do you think this is the right approach? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16461953#comment-16461953 ] Fred Teunissen commented on FLINK-8500: --- Both approaches will work, but we have to choose. The first approach exposes the Kafka API as part of the Flink API. The second approach hides the Kafka API but will require a bit more resources to maintain. The second approach would have my vote. I don’t want to introduce scope creep, but I think there are more input sources that could benefit from a more generic (de)serialization scheme. Should we look into that, or leave it for now (in issues 5479 the idea of a `common connector framework` is mentioned, should it be picked up there)? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457600#comment-16457600 ] Fred Teunissen commented on FLINK-8500: --- I've created a [pull request|https://github.com/apache/flink/pull/5939] for this issue. This is my first pull request, so I hope that I addressed all of the 'contribution code guidelines' correctly. Please let me know whether I should do something different or when I forgot something. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447144#comment-16447144 ] Fred Teunissen commented on FLINK-8500: --- We want to use the kafka message timestamp in our business logic, so we would like to have this timestamp available during the deserialisation of the message. But we also need the TimestampType to be able to detect from whom the timestamp came (the producer or the kafka broker). If we don't want to break the interface we can make a new interface {{KeyedAndTimestampDeserializationSchema}} extending the interface {{KeyedDeserializationSchema}} and add a {{KeyedAndTimestampDeserializationSchemaWrapper}} that accepts a {{KeyedDeserializationSchema}}. The FlinkKafkaConsumers constructors can be extended to accept the new {{KeyedAndTimestampDeserializationSchema}} as parameter and wrap the calls with the {{KeyedDeserializationSchema}} interface. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9149) The creation of the ExecutionPlan fails when you combine a SideOutput with an SplitStream.
Fred Teunissen created FLINK-9149: - Summary: The creation of the ExecutionPlan fails when you combine a SideOutput with an SplitStream. Key: FLINK-9149 URL: https://issues.apache.org/jira/browse/FLINK-9149 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.4.2 Reporter: Fred Teunissen The creation of the ExecutionPlan fails when you combine a SideOutput with an SplitStream. Code: {code:java} import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector object SideOutputTest { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(3) val inputLongStream = env.generateSequence(0L, 100L) val filteredLongStream = inputLongStream.process(new LogFilterFunction) val splittedLongStream = filteredLongStream.split(l => Seq((l%4).toString)) // workaround // val splittedLongStream = filteredLongStream.map(x=>x).split(l => Seq((l%4).toString)) val oneStream = splittedLongStream.select("1").map(l => l+1000) val twoStream = splittedLongStream.select("2").map(l => l+2000) val threeStream = splittedLongStream.select("3").map(l => l+3000) oneStream.union(twoStream, threeStream).print() val loggingStream = filteredLongStream.getSideOutput(loggingOutputTag) loggingStream.print() println(env.getExecutionPlan) env.execute() } val loggingOutputTag = OutputTag[String]("loggingStream") } class LogFilterFunction extends ProcessFunction[Long, Long] { override def processElement(value: Long, ctx: ProcessFunction[Long, Long]#Context, out: Collector[Long]): Unit = { if (value % 4 == 0) { ctx.output(SideOutputTest.loggingOutputTag, s"LogFilterFunction logging for $value") }else { out.collect(value) } } } {code} Exception: {noformat} Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.NullPointerException at org.apache.flink.streaming.api.collector.selector.DirectedOutput.(DirectedOutput.java:74) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:331) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:346) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) {noformat} Workaround: add a redundant *map(x=>x)* before the *split* function. -- This message was sent by Atlassian JIRA (v7.6.3#76005)