[jira] [Commented] (FLINK-5633) ClassCastException: X cannot be cast to X when re-submitting a job.
[ https://issues.apache.org/jira/browse/FLINK-5633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245474#comment-16245474 ] Erik van Oosten commented on FLINK-5633: bq. Just curious, why are you creating a new reader for each record? Its just a bit easier then caching a reader for each writer/reader schema combination. > ClassCastException: X cannot be cast to X when re-submitting a job. > --- > > Key: FLINK-5633 > URL: https://issues.apache.org/jira/browse/FLINK-5633 > Project: Flink > Issue Type: Bug > Components: Job-Submission, YARN >Affects Versions: 1.1.4 >Reporter: Giuliano Caliari >Priority: Minor > > I’m running a job on my local cluster and the first time I submit the job > everything works but whenever I cancel and re-submit the same job it fails > with: > {quote} > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634) > at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21) > at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at >
[jira] [Commented] (FLINK-5633) ClassCastException: X cannot be cast to X when re-submitting a job.
[ https://issues.apache.org/jira/browse/FLINK-5633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241701#comment-16241701 ] Erik van Oosten commented on FLINK-5633: [~StephanEwen] We need to process 130K msg/s, I guess that can be called often :) . Our process is CPU bound and parsing Avro is ±15% of that. Any improvement means we can run with fewer machines. For every message we create a new SpecificDatumReader. If I follow the code correctly that should _not_ give a large overhead. The Schema instances we pass to it _are_ cached. Then we call {SpecificDatumReader.read}} to parse each Avro message. In that call you eventually end up in {{SpecificData.newInstance}} to create a new instance of the target class. The constructor of that class is looked up in a cache. That cache is declared as {{static}}. I do not understand how instantiating a new {{SpecificData}} for every call to {{read}} helps because it would still use the same cache. The code I pasted above also uses a constructor cache but the cache is not {{static}}. Reversing the class loader order should also work. > ClassCastException: X cannot be cast to X when re-submitting a job. > --- > > Key: FLINK-5633 > URL: https://issues.apache.org/jira/browse/FLINK-5633 > Project: Flink > Issue Type: Bug > Components: Job-Submission, YARN >Affects Versions: 1.1.4 >Reporter: Giuliano Caliari >Priority: Minor > > I’m running a job on my local cluster and the first time I submit the job > everything works but whenever I cancel and re-submit the same job it fails > with: > {quote} > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634) > at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21) > at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at >
[jira] [Commented] (FLINK-4796) Add new Sink interface with access to more meta data
[ https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166443#comment-16166443 ] Erik van Oosten commented on FLINK-4796: I am not sure why this is marked as a duplicate. The problem here is inconsistent handling of the runtime context inside the different layers under FlinkKafkaProducer: method {{getRuntimeContext}} gives {{null}} even though {{setRuntimeContext}} was called. How does that relate to the addition of a new interface? > Add new Sink interface with access to more meta data > > > Key: FLINK-4796 > URL: https://issues.apache.org/jira/browse/FLINK-4796 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Aljoscha Krettek > > The current {{SinkFunction}} cannot access the timestamps of elements which > resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other > limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} > and not a {{SinkFunction}}. > We should add a new interface for sinks that takes a context parameter, > similar to {{ProcessFunction}}. This will allow sinks to query additional > meta data about the element that they're receiving. > This is one ML thread where a user ran into a problem caused by this: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635 > h3. Original Text (that is still valid but not general) > The Kafka 0.10 connector supports writing event timestamps to Kafka. > Currently, the regular DataStream APIs don't allow user code to access the > event timestamp easily. That's why the Kafka connector is using a custom > operator ({{transform()}}) to access the event time. > With this JIRA, I would like to provide the event timestamp in the regular > DataStream APIs. > Once I'll look into the issue, I'll post some proposals how to add the > timestamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4796) Add new Sink interface with access to more meta data
[ https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16160810#comment-16160810 ] Erik van Oosten commented on FLINK-4796: A workaround is to override {{setRuntimeContext}} (make sure to call {{super.setRuntimeContext}}), and use the passed in context. Possibly store it in a private field for later access. > Add new Sink interface with access to more meta data > > > Key: FLINK-4796 > URL: https://issues.apache.org/jira/browse/FLINK-4796 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Robert Metzger > > The current {{SinkFunction}} cannot access the timestamps of elements which > resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other > limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} > and not a {{SinkFunction}}. > We should add a new interface for sinks that takes a context parameter, > similar to {{ProcessFunction}}. This will allow sinks to query additional > meta data about the element that they're receiving. > This is one ML thread where a user ran into a problem caused by this: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635 > h3. Original Text (that is still valid but not general) > The Kafka 0.10 connector supports writing event timestamps to Kafka. > Currently, the regular DataStream APIs don't allow user code to access the > event timestamp easily. That's why the Kafka connector is using a custom > operator ({{transform()}}) to access the event time. > With this JIRA, I would like to provide the event timestamp in the regular > DataStream APIs. > Once I'll look into the issue, I'll post some proposals how to add the > timestamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-1390) java.lang.ClassCastException: X cannot be cast to X
[ https://issues.apache.org/jira/browse/FLINK-1390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16058708#comment-16058708 ] Erik van Oosten commented on FLINK-1390: See https://issues.apache.org/jira/browse/FLINK-5633?focusedCommentId=16058706=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16058706 for a proper solution. > java.lang.ClassCastException: X cannot be cast to X > > > Key: FLINK-1390 > URL: https://issues.apache.org/jira/browse/FLINK-1390 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 0.8.0 >Reporter: Robert Metzger >Assignee: Robert Metzger > > A user is affected by an issue, which is probably caused by different > classloaders being used for loading user classes. > Current state of investigation: > - the error happens in yarn sessions (there is only a YARN environment > available) > - the error doesn't happen on the first time the job is being executed. It > only happens on subsequent executions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5633) ClassCastException: X cannot be cast to X when re-submitting a job.
[ https://issues.apache.org/jira/browse/FLINK-5633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16058706#comment-16058706 ] Erik van Oosten commented on FLINK-5633: In case you need throughput (like we do), the caching is indispensable. In those cases you can use the following {{SpecificData}} implementation. Simply instantiate it once and then pass that singleton instance to every {{SpecificDatumReader}}. {code:scala|title=LocalCachingSpecificData.scala} import java.lang.reflect.Constructor import java.util.concurrent.ConcurrentHashMap import org.apache.avro.Schema import org.apache.avro.specific.SpecificData import scala.collection.JavaConverters._ /** * This can be used instead of [[SpecificData]] in multi-classloader environments like Flink. * This variation removes the JVM singleton constructor cache and replaces it with a * cache that is local to the current class loader. * * If two Flink jobs use the same generated Avro code, they will still have separate instances of the classes because * they live in separate class loaders. * However, a JVM-wide singleton cache keeps reference to the class in the first class loader that was loaded. Any * subsequent jobs will fail with a [[ClassCastException]] because they will get incompatible classes. */ class LocalCachingSpecificData extends SpecificData { private val NO_ARG: Array[Class[_]] = Array.empty private val SCHEMA_ARG: Array[Class[_]] = Array(classOf[Schema]) private val CTOR_CACHE: scala.collection.concurrent.Map[Class[_], Constructor[_]] = new ConcurrentHashMap[Class[_], Constructor[_]]().asScala /** Create an instance of a class. * If the class implements [[org.apache.avro.specific.SpecificData.SchemaConstructable]], call a constructor with a * [[org.apache.avro.Schema]] parameter, otherwise use a no-arg constructor. */ private def newInstance(c: Class[_], s: Schema): AnyRef = { val useSchema = classOf[SpecificData.SchemaConstructable].isAssignableFrom(c) val constructor = CTOR_CACHE.getOrElseUpdate(c, { val ctor = c.getDeclaredConstructor((if (useSchema) SCHEMA_ARG else NO_ARG): _*) ctor.setAccessible(true) ctor }) if (useSchema) constructor.newInstance(s).asInstanceOf[AnyRef] else constructor.newInstance().asInstanceOf[AnyRef] } override def createFixed(old: AnyRef, schema: Schema): AnyRef = { val c = getClass(schema) if (c == null) super.createFixed(old, schema) // delegate to generic else if (c.isInstance(old)) old else newInstance(c, schema) } override def newRecord(old: AnyRef, schema: Schema): AnyRef = { val c = getClass(schema) if (c == null) super.newRecord(old, schema) // delegate to generic else if (c.isInstance(old)) {old } else {newInstance(c, schema) } } } {code} > ClassCastException: X cannot be cast to X when re-submitting a job. > --- > > Key: FLINK-5633 > URL: https://issues.apache.org/jira/browse/FLINK-5633 > Project: Flink > Issue Type: Bug > Components: Job-Submission, YARN >Affects Versions: 1.1.4 >Reporter: Giuliano Caliari >Priority: Minor > > I’m running a job on my local cluster and the first time I submit the job > everything works but whenever I cancel and re-submit the same job it fails > with: > {quote} > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634) > at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at
[jira] [Comment Edited] (FLINK-6928) Kafka sink: default topic should not need to exist
[ https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050925#comment-16050925 ] Erik van Oosten edited comment on FLINK-6928 at 6/15/17 6:39 PM: - In my ideal world method {{getTargetTopic}} would be removed from {{*SerializationSchema}} and moved to a new interface, e.g. {{DestinationTopic}}. Then there are two constructor variants for {{FlinkKafkaProducer}}: one would take a topic ({{String}}), the other would take a {{DestinationTopic}}. Both would have the simplified {{*SerializationSchema}} as argument. To make things simple internally, the first variant could wrap the topic in a implementation of {{DestinationTopic}} that always returns the same topic. was (Author: erikvanoosten): In my ideal world method {{getTargetTopic}} would be removed from {{SerializationSchema}} and moved to a new interface, e.g. {{DestinationTopic}}. Then there are two constructor variants for {{FlinkKafkaProducer}}: one would take a topic ({{String}}), the other would take a {{DestinationTopic}}. Both would have the simplified {{SerializationSchema}} as argument. To make things simple internally, the first variant could wrap the topic in a implementation of {{DestinationTopic}} that always returns the same topic. > Kafka sink: default topic should not need to exist > -- > > Key: FLINK-6928 > URL: https://issues.apache.org/jira/browse/FLINK-6928 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.3.0, 1.2.1 >Reporter: Erik van Oosten > > When using a Kafka sink, the defaultTopic needs to exist even when it is > never used. It would be nice if fetching partition information for the > default topic would be delayed until the moment a topic is actually used. > Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the > default topic. > In addition, it would be nice if we could signal that the defaultTopic is not > needed by passing {{null}}. Currently, a value for the defaultTopic is > required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6928) Kafka sink: default topic should not need to exist
[ https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050925#comment-16050925 ] Erik van Oosten commented on FLINK-6928: In my ideal world method {{getTargetTopic}} would be removed from {{SerializationSchema}} and moved to a new interface, e.g. {{DestinationTopic}}. Then there are two constructor variants for {{FlinkKafkaProducer}}: one would take a topic ({{String}}), the other would take a {{DestinationTopic}}. Both would have the simplified {{SerializationSchema}} as argument. To make things simple internally, the first variant could wrap the topic in a implementation of {{DestinationTopic}} that always returns the same topic. > Kafka sink: default topic should not need to exist > -- > > Key: FLINK-6928 > URL: https://issues.apache.org/jira/browse/FLINK-6928 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.3.0, 1.2.1 >Reporter: Erik van Oosten > > When using a Kafka sink, the defaultTopic needs to exist even when it is > never used. It would be nice if fetching partition information for the > default topic would be delayed until the moment a topic is actually used. > Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the > default topic. > In addition, it would be nice if we could signal that the defaultTopic is not > needed by passing {{null}}. Currently, a value for the defaultTopic is > required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6928) Kafka sink: default topic should not need to exist
[ https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten updated FLINK-6928: --- Description: When using a Kafka sink, the defaultTopic needs to exist even when it is never used. It would be nice if fetching partition information for the default topic would be delayed until the moment a topic is actually used. Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the default topic. It would be nice if we could signal that the defaultTopic is not needed by passing {{null}}. Currently, a value for the defaultTopic is required. was: When using a Kafka sink, the defaultTopic needs to exist even when it is never used. It would be nice if fetching partition information for the default topic would be delayed until the moment a topic is actually used. Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the default topic. > Kafka sink: default topic should not need to exist > -- > > Key: FLINK-6928 > URL: https://issues.apache.org/jira/browse/FLINK-6928 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.3.0, 1.2.1 >Reporter: Erik van Oosten > > When using a Kafka sink, the defaultTopic needs to exist even when it is > never used. It would be nice if fetching partition information for the > default topic would be delayed until the moment a topic is actually used. > Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the > default topic. > It would be nice if we could signal that the defaultTopic is not needed by > passing {{null}}. Currently, a value for the defaultTopic is required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6928) Kafka sink: default topic should not need to exist
[ https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten updated FLINK-6928: --- Description: When using a Kafka sink, the defaultTopic needs to exist even when it is never used. It would be nice if fetching partition information for the default topic would be delayed until the moment a topic is actually used. Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the default topic. In addition, it would be nice if we could signal that the defaultTopic is not needed by passing {{null}}. Currently, a value for the defaultTopic is required. was: When using a Kafka sink, the defaultTopic needs to exist even when it is never used. It would be nice if fetching partition information for the default topic would be delayed until the moment a topic is actually used. Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the default topic. It would be nice if we could signal that the defaultTopic is not needed by passing {{null}}. Currently, a value for the defaultTopic is required. > Kafka sink: default topic should not need to exist > -- > > Key: FLINK-6928 > URL: https://issues.apache.org/jira/browse/FLINK-6928 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.3.0, 1.2.1 >Reporter: Erik van Oosten > > When using a Kafka sink, the defaultTopic needs to exist even when it is > never used. It would be nice if fetching partition information for the > default topic would be delayed until the moment a topic is actually used. > Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the > default topic. > In addition, it would be nice if we could signal that the defaultTopic is not > needed by passing {{null}}. Currently, a value for the defaultTopic is > required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6928) Kafka sink: default topic should not need to exist
[ https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten updated FLINK-6928: --- Summary: Kafka sink: default topic should not need to exist (was: Kafka source: default topic should not need to exist) > Kafka sink: default topic should not need to exist > -- > > Key: FLINK-6928 > URL: https://issues.apache.org/jira/browse/FLINK-6928 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.3.0, 1.2.1 >Reporter: Erik van Oosten > > When using a Kafka source, the defaultTopic needs to exist even when it is > never used. It would be nice if fetching partition information for the > default topic would be delayed until the moment a topic is actually used. > Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the > default topic. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6928) Kafka sink: default topic should not need to exist
[ https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten updated FLINK-6928: --- Description: When using a Kafka sink, the defaultTopic needs to exist even when it is never used. It would be nice if fetching partition information for the default topic would be delayed until the moment a topic is actually used. Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the default topic. was: When using a Kafka source, the defaultTopic needs to exist even when it is never used. It would be nice if fetching partition information for the default topic would be delayed until the moment a topic is actually used. Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the default topic. > Kafka sink: default topic should not need to exist > -- > > Key: FLINK-6928 > URL: https://issues.apache.org/jira/browse/FLINK-6928 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.3.0, 1.2.1 >Reporter: Erik van Oosten > > When using a Kafka sink, the defaultTopic needs to exist even when it is > never used. It would be nice if fetching partition information for the > default topic would be delayed until the moment a topic is actually used. > Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the > default topic. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6928) Kafka source: default topic should not need to exist
[ https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten updated FLINK-6928: --- Summary: Kafka source: default topic should not need to exist (was: Kafka source: default topic needs to exist) > Kafka source: default topic should not need to exist > > > Key: FLINK-6928 > URL: https://issues.apache.org/jira/browse/FLINK-6928 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.3.0, 1.2.1 >Reporter: Erik van Oosten > > When using a Kafka source, the defaultTopic needs to exist even when it is > never used. It would be nice if fetching partition information for the > default topic would be delayed until the moment a topic is actually used. > Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the > default topic. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6928) Kafka source: default topic needs to exist
Erik van Oosten created FLINK-6928: -- Summary: Kafka source: default topic needs to exist Key: FLINK-6928 URL: https://issues.apache.org/jira/browse/FLINK-6928 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.2.1, 1.3.0 Reporter: Erik van Oosten When using a Kafka source, the defaultTopic needs to exist even when it is never used. It would be nice if fetching partition information for the default topic would be delayed until the moment a topic is actually used. Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the default topic. -- This message was sent by Atlassian JIRA (v6.4.14#64029)