[jira] [Commented] (FLINK-5633) ClassCastException: X cannot be cast to X when re-submitting a job.

2017-11-09 Thread Erik van Oosten (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245474#comment-16245474
 ] 

Erik van Oosten commented on FLINK-5633:


bq. Just curious, why are you creating a new reader for each record?

Its just a bit easier then caching a reader for each writer/reader schema 
combination.

> ClassCastException: X cannot be cast to X when re-submitting a job.
> ---
>
> Key: FLINK-5633
> URL: https://issues.apache.org/jira/browse/FLINK-5633
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, YARN
>Affects Versions: 1.1.4
>Reporter: Giuliano Caliari
>Priority: Minor
>
> I’m running a job on my local cluster and the first time I submit the job 
> everything works but whenever I cancel and re-submit the same job it fails 
> with:
> {quote}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634)
>   at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
>   at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)
>   at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21)
>   at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
>   at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> 

[jira] [Commented] (FLINK-5633) ClassCastException: X cannot be cast to X when re-submitting a job.

2017-11-07 Thread Erik van Oosten (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241701#comment-16241701
 ] 

Erik van Oosten commented on FLINK-5633:


[~StephanEwen] We need to process 130K msg/s, I guess that can be called often 
:) . Our process is CPU bound and parsing Avro is ±15% of that. Any improvement 
means we can run with fewer machines.

For every message we create a new SpecificDatumReader. If I follow the code 
correctly that should _not_ give a large overhead. The Schema instances we pass 
to it _are_ cached.

Then we call {SpecificDatumReader.read}} to parse each Avro message. In that 
call you eventually end up in {{SpecificData.newInstance}} to create a new 
instance of the target class. The constructor of that class is looked up in a 
cache. That cache is declared as {{static}}. I do not understand how 
instantiating a new {{SpecificData}} for every call to {{read}} helps because 
it would still use the same cache. The code I pasted above also uses a 
constructor cache but the cache is not {{static}}. Reversing the class loader 
order should also work.

> ClassCastException: X cannot be cast to X when re-submitting a job.
> ---
>
> Key: FLINK-5633
> URL: https://issues.apache.org/jira/browse/FLINK-5633
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, YARN
>Affects Versions: 1.1.4
>Reporter: Giuliano Caliari
>Priority: Minor
>
> I’m running a job on my local cluster and the first time I submit the job 
> everything works but whenever I cancel and re-submit the same job it fails 
> with:
> {quote}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634)
>   at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
>   at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)
>   at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21)
>   at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
>   at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> 

[jira] [Commented] (FLINK-4796) Add new Sink interface with access to more meta data

2017-09-14 Thread Erik van Oosten (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166443#comment-16166443
 ] 

Erik van Oosten commented on FLINK-4796:


I am not sure why this is marked as a duplicate. The problem here is 
inconsistent handling of the runtime context inside the different layers under 
FlinkKafkaProducer: method {{getRuntimeContext}} gives {{null}} even though 
{{setRuntimeContext}} was called.

How does that relate to the addition of a new interface?

> Add new Sink interface with access to more meta data
> 
>
> Key: FLINK-4796
> URL: https://issues.apache.org/jira/browse/FLINK-4796
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Aljoscha Krettek
>
> The current {{SinkFunction}} cannot access the timestamps of elements which 
> resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other 
> limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} 
> and not a {{SinkFunction}}.
> We should add a new interface for sinks that takes a context parameter, 
> similar to {{ProcessFunction}}. This will allow sinks to query additional 
> meta data about the element that they're receiving. 
> This is one ML thread where a user ran into a problem caused by this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635
> h3. Original Text (that is still valid but not general)
> The Kafka 0.10 connector supports writing event timestamps to Kafka.
> Currently, the regular DataStream APIs don't allow user code to access the 
> event timestamp easily. That's why the Kafka connector is using a custom 
> operator ({{transform()}}) to access the event time.
> With this JIRA, I would like to provide the event timestamp in the regular 
> DataStream APIs.
> Once I'll look into the issue, I'll post some proposals how to add the 
> timestamp. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4796) Add new Sink interface with access to more meta data

2017-09-11 Thread Erik van Oosten (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16160810#comment-16160810
 ] 

Erik van Oosten commented on FLINK-4796:


A workaround is to override {{setRuntimeContext}} (make sure to call 
{{super.setRuntimeContext}}), and use the passed in context. Possibly store it 
in a private field for later access.

> Add new Sink interface with access to more meta data
> 
>
> Key: FLINK-4796
> URL: https://issues.apache.org/jira/browse/FLINK-4796
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>
> The current {{SinkFunction}} cannot access the timestamps of elements which 
> resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other 
> limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} 
> and not a {{SinkFunction}}.
> We should add a new interface for sinks that takes a context parameter, 
> similar to {{ProcessFunction}}. This will allow sinks to query additional 
> meta data about the element that they're receiving. 
> This is one ML thread where a user ran into a problem caused by this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635
> h3. Original Text (that is still valid but not general)
> The Kafka 0.10 connector supports writing event timestamps to Kafka.
> Currently, the regular DataStream APIs don't allow user code to access the 
> event timestamp easily. That's why the Kafka connector is using a custom 
> operator ({{transform()}}) to access the event time.
> With this JIRA, I would like to provide the event timestamp in the regular 
> DataStream APIs.
> Once I'll look into the issue, I'll post some proposals how to add the 
> timestamp. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-1390) java.lang.ClassCastException: X cannot be cast to X

2017-06-21 Thread Erik van Oosten (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16058708#comment-16058708
 ] 

Erik van Oosten commented on FLINK-1390:


See 
https://issues.apache.org/jira/browse/FLINK-5633?focusedCommentId=16058706=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16058706
 for a proper solution.

>  java.lang.ClassCastException: X cannot be cast to X
> 
>
> Key: FLINK-1390
> URL: https://issues.apache.org/jira/browse/FLINK-1390
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 0.8.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>
> A user is affected by an issue, which is probably caused by different 
> classloaders being used for loading user classes.
> Current state of investigation:
> - the error happens in yarn sessions (there is only a YARN environment 
> available)
> - the error doesn't happen on the first time the job is being executed. It 
> only happens on subsequent executions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5633) ClassCastException: X cannot be cast to X when re-submitting a job.

2017-06-21 Thread Erik van Oosten (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16058706#comment-16058706
 ] 

Erik van Oosten commented on FLINK-5633:


In case you need throughput (like we do), the caching is indispensable. In 
those cases you can use the following {{SpecificData}} implementation. Simply 
instantiate it once and then pass that singleton instance to every 
{{SpecificDatumReader}}.

{code:scala|title=LocalCachingSpecificData.scala}
import java.lang.reflect.Constructor
import java.util.concurrent.ConcurrentHashMap

import org.apache.avro.Schema
import org.apache.avro.specific.SpecificData
import scala.collection.JavaConverters._

/**
  * This can be used instead of [[SpecificData]] in multi-classloader 
environments like Flink.
  * This variation removes the JVM singleton constructor cache and replaces it 
with a
  * cache that is local to the current class loader.
  *
  * If two Flink jobs use the same generated Avro code, they will still have 
separate instances of the classes because
  * they live in separate class loaders.
  * However, a JVM-wide singleton cache keeps reference to the class in the 
first class loader that was loaded. Any
  * subsequent jobs will fail with a [[ClassCastException]] because they will 
get incompatible classes.
  */
class LocalCachingSpecificData extends SpecificData {
  private val NO_ARG: Array[Class[_]] = Array.empty
  private val SCHEMA_ARG: Array[Class[_]] = Array(classOf[Schema])
  private val CTOR_CACHE: scala.collection.concurrent.Map[Class[_], 
Constructor[_]] =
new ConcurrentHashMap[Class[_], Constructor[_]]().asScala

  /** Create an instance of a class.
* If the class implements 
[[org.apache.avro.specific.SpecificData.SchemaConstructable]], call a 
constructor with a
* [[org.apache.avro.Schema]] parameter, otherwise use a no-arg constructor.
*/
  private def newInstance(c: Class[_], s: Schema): AnyRef = {
val useSchema = 
classOf[SpecificData.SchemaConstructable].isAssignableFrom(c)
val constructor = CTOR_CACHE.getOrElseUpdate(c, {
  val ctor = c.getDeclaredConstructor((if (useSchema) SCHEMA_ARG else 
NO_ARG): _*)
  ctor.setAccessible(true)
  ctor
})
if (useSchema) constructor.newInstance(s).asInstanceOf[AnyRef]
else constructor.newInstance().asInstanceOf[AnyRef]
  }

  override def createFixed(old: AnyRef, schema: Schema): AnyRef = {
val c = getClass(schema)
if (c == null) super.createFixed(old, schema) // delegate to generic
else if (c.isInstance(old)) old
else newInstance(c, schema)
  }

  override def newRecord(old: AnyRef, schema: Schema): AnyRef = {
val c = getClass(schema)
if (c == null) super.newRecord(old, schema) // delegate to generic
else if (c.isInstance(old)) {old }
else {newInstance(c, schema) }
  }
}
{code}

> ClassCastException: X cannot be cast to X when re-submitting a job.
> ---
>
> Key: FLINK-5633
> URL: https://issues.apache.org/jira/browse/FLINK-5633
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, YARN
>Affects Versions: 1.1.4
>Reporter: Giuliano Caliari
>Priority: Minor
>
> I’m running a job on my local cluster and the first time I submit the job 
> everything works but whenever I cancel and re-submit the same job it fails 
> with:
> {quote}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634)
>   at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
>   at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)
>   at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at 

[jira] [Comment Edited] (FLINK-6928) Kafka sink: default topic should not need to exist

2017-06-15 Thread Erik van Oosten (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050925#comment-16050925
 ] 

Erik van Oosten edited comment on FLINK-6928 at 6/15/17 6:39 PM:
-

In my ideal world method {{getTargetTopic}} would be removed from 
{{*SerializationSchema}} and moved to a new interface, e.g. 
{{DestinationTopic}}.
Then there are two constructor variants for {{FlinkKafkaProducer}}: one would 
take a topic ({{String}}), the other would take a {{DestinationTopic}}. Both 
would have the simplified {{*SerializationSchema}} as argument. To make things 
simple internally, the first variant could wrap the topic in a implementation 
of {{DestinationTopic}} that always returns the same topic.


was (Author: erikvanoosten):
In my ideal world method {{getTargetTopic}} would be removed from 
{{SerializationSchema}} and moved to a new interface, e.g. {{DestinationTopic}}.
Then there are two constructor variants for {{FlinkKafkaProducer}}: one would 
take a topic ({{String}}), the other would take a {{DestinationTopic}}. Both 
would have the simplified {{SerializationSchema}} as argument. To make things 
simple internally, the first variant could wrap the topic in a implementation 
of {{DestinationTopic}} that always returns the same topic.

> Kafka sink: default topic should not need to exist
> --
>
> Key: FLINK-6928
> URL: https://issues.apache.org/jira/browse/FLINK-6928
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Erik van Oosten
>
> When using a Kafka sink, the defaultTopic needs to exist even when it is 
> never used. It would be nice if fetching partition information for the 
> default topic would be delayed until the moment a topic is actually used.
> Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
> default topic.
> In addition, it would be nice if we could signal that the defaultTopic is not 
> needed by passing {{null}}. Currently, a value for the defaultTopic is 
> required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6928) Kafka sink: default topic should not need to exist

2017-06-15 Thread Erik van Oosten (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050925#comment-16050925
 ] 

Erik van Oosten commented on FLINK-6928:


In my ideal world method {{getTargetTopic}} would be removed from 
{{SerializationSchema}} and moved to a new interface, e.g. {{DestinationTopic}}.
Then there are two constructor variants for {{FlinkKafkaProducer}}: one would 
take a topic ({{String}}), the other would take a {{DestinationTopic}}. Both 
would have the simplified {{SerializationSchema}} as argument. To make things 
simple internally, the first variant could wrap the topic in a implementation 
of {{DestinationTopic}} that always returns the same topic.

> Kafka sink: default topic should not need to exist
> --
>
> Key: FLINK-6928
> URL: https://issues.apache.org/jira/browse/FLINK-6928
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Erik van Oosten
>
> When using a Kafka sink, the defaultTopic needs to exist even when it is 
> never used. It would be nice if fetching partition information for the 
> default topic would be delayed until the moment a topic is actually used.
> Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
> default topic.
> In addition, it would be nice if we could signal that the defaultTopic is not 
> needed by passing {{null}}. Currently, a value for the defaultTopic is 
> required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6928) Kafka sink: default topic should not need to exist

2017-06-15 Thread Erik van Oosten (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik van Oosten updated FLINK-6928:
---
Description: 
When using a Kafka sink, the defaultTopic needs to exist even when it is never 
used. It would be nice if fetching partition information for the default topic 
would be delayed until the moment a topic is actually used.

Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
default topic.

It would be nice if we could signal that the defaultTopic is not needed by 
passing {{null}}. Currently, a value for the defaultTopic is required.

  was:
When using a Kafka sink, the defaultTopic needs to exist even when it is never 
used. It would be nice if fetching partition information for the default topic 
would be delayed until the moment a topic is actually used.

Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
default topic.


> Kafka sink: default topic should not need to exist
> --
>
> Key: FLINK-6928
> URL: https://issues.apache.org/jira/browse/FLINK-6928
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Erik van Oosten
>
> When using a Kafka sink, the defaultTopic needs to exist even when it is 
> never used. It would be nice if fetching partition information for the 
> default topic would be delayed until the moment a topic is actually used.
> Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
> default topic.
> It would be nice if we could signal that the defaultTopic is not needed by 
> passing {{null}}. Currently, a value for the defaultTopic is required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6928) Kafka sink: default topic should not need to exist

2017-06-15 Thread Erik van Oosten (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik van Oosten updated FLINK-6928:
---
Description: 
When using a Kafka sink, the defaultTopic needs to exist even when it is never 
used. It would be nice if fetching partition information for the default topic 
would be delayed until the moment a topic is actually used.

Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
default topic.

In addition, it would be nice if we could signal that the defaultTopic is not 
needed by passing {{null}}. Currently, a value for the defaultTopic is required.

  was:
When using a Kafka sink, the defaultTopic needs to exist even when it is never 
used. It would be nice if fetching partition information for the default topic 
would be delayed until the moment a topic is actually used.

Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
default topic.

It would be nice if we could signal that the defaultTopic is not needed by 
passing {{null}}. Currently, a value for the defaultTopic is required.


> Kafka sink: default topic should not need to exist
> --
>
> Key: FLINK-6928
> URL: https://issues.apache.org/jira/browse/FLINK-6928
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Erik van Oosten
>
> When using a Kafka sink, the defaultTopic needs to exist even when it is 
> never used. It would be nice if fetching partition information for the 
> default topic would be delayed until the moment a topic is actually used.
> Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
> default topic.
> In addition, it would be nice if we could signal that the defaultTopic is not 
> needed by passing {{null}}. Currently, a value for the defaultTopic is 
> required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6928) Kafka sink: default topic should not need to exist

2017-06-15 Thread Erik van Oosten (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik van Oosten updated FLINK-6928:
---
Summary: Kafka sink: default topic should not need to exist  (was: Kafka 
source: default topic should not need to exist)

> Kafka sink: default topic should not need to exist
> --
>
> Key: FLINK-6928
> URL: https://issues.apache.org/jira/browse/FLINK-6928
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Erik van Oosten
>
> When using a Kafka source, the defaultTopic needs to exist even when it is 
> never used. It would be nice if fetching partition information for the 
> default topic would be delayed until the moment a topic is actually used.
> Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
> default topic.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6928) Kafka sink: default topic should not need to exist

2017-06-15 Thread Erik van Oosten (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik van Oosten updated FLINK-6928:
---
Description: 
When using a Kafka sink, the defaultTopic needs to exist even when it is never 
used. It would be nice if fetching partition information for the default topic 
would be delayed until the moment a topic is actually used.

Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
default topic.

  was:
When using a Kafka source, the defaultTopic needs to exist even when it is 
never used. It would be nice if fetching partition information for the default 
topic would be delayed until the moment a topic is actually used.

Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
default topic.


> Kafka sink: default topic should not need to exist
> --
>
> Key: FLINK-6928
> URL: https://issues.apache.org/jira/browse/FLINK-6928
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Erik van Oosten
>
> When using a Kafka sink, the defaultTopic needs to exist even when it is 
> never used. It would be nice if fetching partition information for the 
> default topic would be delayed until the moment a topic is actually used.
> Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
> default topic.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6928) Kafka source: default topic should not need to exist

2017-06-15 Thread Erik van Oosten (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik van Oosten updated FLINK-6928:
---
Summary: Kafka source: default topic should not need to exist  (was: Kafka 
source: default topic needs to exist)

> Kafka source: default topic should not need to exist
> 
>
> Key: FLINK-6928
> URL: https://issues.apache.org/jira/browse/FLINK-6928
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Erik van Oosten
>
> When using a Kafka source, the defaultTopic needs to exist even when it is 
> never used. It would be nice if fetching partition information for the 
> default topic would be delayed until the moment a topic is actually used.
> Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
> default topic.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-6928) Kafka source: default topic needs to exist

2017-06-15 Thread Erik van Oosten (JIRA)
Erik van Oosten created FLINK-6928:
--

 Summary: Kafka source: default topic needs to exist
 Key: FLINK-6928
 URL: https://issues.apache.org/jira/browse/FLINK-6928
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.2.1, 1.3.0
Reporter: Erik van Oosten


When using a Kafka source, the defaultTopic needs to exist even when it is 
never used. It would be nice if fetching partition information for the default 
topic would be delayed until the moment a topic is actually used.

Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
default topic.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)