Re: KafkaProducer can not be instantiated

2016-10-06 Thread Timo Walther
Thanks for the information Tzu-Li. I will mock the FlinkKafkaProducer 
class until this issue is fixed.


Timo


Am 05/10/16 um 17:57 schrieb Tzu-Li (Gordon) Tai:

Sorry, correction to my last statements:
On the consumer side I think the instantiation was already removed from the 
constructor in a recent commit.


On October 5, 2016 at 11:37:41 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) 
wrote:

This matters on the consumer side, yes. Moving the instantiation out of the 
constructor will require such
guarantee that the list fetched individually at subtasks are determinate and 
identical.

On the producer side I don’t really think it matters. Unless the user 
implementations of the provided KafkaPartitioner depends on the ordering of the 
passed partition id array to KafkaPartitioner.open(), though. From the 
interface Javadoc I’m not really sure if there was a contract / guarantee on 
that to the user in the first place.

Otherwise, if we want to be really safe to not break any user code on the 
producer side, then we should also keep the ordering guarantee there too.


On October 5, 2016 at 11:26:43 PM, Chesnay Schepler (ches...@apache.org) wrote:

if you were to move the partition list fetching out of the constructor
int open(), is there any guarantee that for each fetching subtask the
partition list is identical?

On 05.10.2016 17:17, Tzu-Li (Gordon) Tai wrote:

Hi Timo,

I haven’t had the chance to look at the producer side too much yet, but after a 
look in the code,
I think it’s reasonable to remove the instantiation from the producer 
constructor.
The instantiation in the constructor is only used for partition list fetching & 
eager properties validation
before running up the job. With an alternative to do the eager properties 
validation in the constructor without relying on KafkaProducer,
it should be safe to remove it from the constructor.

The consumer side actually has the same problem right now too. I was hoping to 
bundle the fix with a bigger task,
but would probably consider moving it up TODO list so it can be resolved sooner 
as a standalone fix.

Cheers,
Gordon


On October 5, 2016 at 10:51:05 PM, Timo Walther (twal...@apache.org) wrote:

Hey everyone,

I'm currently rewriting the KafkaTabeSinkTest and discovered something
that doesn't seem to be intended: Is it intended that
FlinkKafkaProducer08 cannot be instantiated without a running Kafka
instance?

The constructor of FlinkKafkaProducerBase calls getKafkaProducer() which
actually should be called in the open() method first. What happens if
the Client has no access to the Kafka properties (e.g. using an remote
execution environment)? Then it is impossible to create a KafkaProducer?

Thanks.

Timo






--
Freundliche Grüße / Kind Regards

Timo Walther

Follow me: @twalthr
https://www.linkedin.com/in/twalthr



Re: KafkaProducer can not be instantiated

2016-10-05 Thread Tzu-Li (Gordon) Tai
Sorry, correction to my last statements:
On the consumer side I think the instantiation was already removed from the 
constructor in a recent commit.


On October 5, 2016 at 11:37:41 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) 
wrote:

This matters on the consumer side, yes. Moving the instantiation out of the 
constructor will require such
guarantee that the list fetched individually at subtasks are determinate and 
identical.

On the producer side I don’t really think it matters. Unless the user 
implementations of the provided KafkaPartitioner depends on the ordering of the 
passed partition id array to KafkaPartitioner.open(), though. From the 
interface Javadoc I’m not really sure if there was a contract / guarantee on 
that to the user in the first place.

Otherwise, if we want to be really safe to not break any user code on the 
producer side, then we should also keep the ordering guarantee there too.


On October 5, 2016 at 11:26:43 PM, Chesnay Schepler (ches...@apache.org) wrote:

if you were to move the partition list fetching out of the constructor
int open(), is there any guarantee that for each fetching subtask the
partition list is identical?

On 05.10.2016 17:17, Tzu-Li (Gordon) Tai wrote:
> Hi Timo,
>
> I haven’t had the chance to look at the producer side too much yet, but after 
> a look in the code,
> I think it’s reasonable to remove the instantiation from the producer 
> constructor.
> The instantiation in the constructor is only used for partition list fetching 
> & eager properties validation
> before running up the job. With an alternative to do the eager properties 
> validation in the constructor without relying on KafkaProducer,
> it should be safe to remove it from the constructor.
>
> The consumer side actually has the same problem right now too. I was hoping 
> to bundle the fix with a bigger task,
> but would probably consider moving it up TODO list so it can be resolved 
> sooner as a standalone fix.
>
> Cheers,
> Gordon
>
>
> On October 5, 2016 at 10:51:05 PM, Timo Walther (twal...@apache.org) wrote:
>
> Hey everyone,
>
> I'm currently rewriting the KafkaTabeSinkTest and discovered something
> that doesn't seem to be intended: Is it intended that
> FlinkKafkaProducer08 cannot be instantiated without a running Kafka
> instance?
>
> The constructor of FlinkKafkaProducerBase calls getKafkaProducer() which
> actually should be called in the open() method first. What happens if
> the Client has no access to the Kafka properties (e.g. using an remote
> execution environment)? Then it is impossible to create a KafkaProducer?
>
> Thanks.
>
> Timo
>



Re: KafkaProducer can not be instantiated

2016-10-05 Thread Tzu-Li (Gordon) Tai
This matters on the consumer side, yes. Moving the instantiation out of the 
constructor will require such
guarantee that the list fetched individually at subtasks are determinate and 
identical.

On the producer side I don’t really think it matters. Unless the user 
implementations of the provided KafkaPartitioner depends on the ordering of the 
passed partition id array to KafkaPartitioner.open(), though. From the 
interface Javadoc I’m not really sure if there was a contract / guarantee on 
that to the user in the first place.

Otherwise, if we want to be really safe to not break any user code on the 
producer side, then we should also keep the ordering guarantee there too.


On October 5, 2016 at 11:26:43 PM, Chesnay Schepler (ches...@apache.org) wrote:

if you were to move the partition list fetching out of the constructor  
int open(), is there any guarantee that for each fetching subtask the  
partition list is identical?  

On 05.10.2016 17:17, Tzu-Li (Gordon) Tai wrote:  
> Hi Timo,  
>  
> I haven’t had the chance to look at the producer side too much yet, but after 
> a look in the code,  
> I think it’s reasonable to remove the instantiation from the producer 
> constructor.  
> The instantiation in the constructor is only used for partition list fetching 
> & eager properties validation  
> before running up the job. With an alternative to do the eager properties 
> validation in the constructor without relying on KafkaProducer,  
> it should be safe to remove it from the constructor.  
>  
> The consumer side actually has the same problem right now too. I was hoping 
> to bundle the fix with a bigger task,  
> but would probably consider moving it up TODO list so it can be resolved 
> sooner as a standalone fix.  
>  
> Cheers,  
> Gordon  
>  
>  
> On October 5, 2016 at 10:51:05 PM, Timo Walther (twal...@apache.org) wrote:  
>  
> Hey everyone,  
>  
> I'm currently rewriting the KafkaTabeSinkTest and discovered something  
> that doesn't seem to be intended: Is it intended that  
> FlinkKafkaProducer08 cannot be instantiated without a running Kafka  
> instance?  
>  
> The constructor of FlinkKafkaProducerBase calls getKafkaProducer() which  
> actually should be called in the open() method first. What happens if  
> the Client has no access to the Kafka properties (e.g. using an remote  
> execution environment)? Then it is impossible to create a KafkaProducer?  
>  
> Thanks.  
>  
> Timo  
>  



Re: KafkaProducer can not be instantiated

2016-10-05 Thread Chesnay Schepler
if you were to move the partition list fetching out of the constructor 
int open(), is there any guarantee that for each fetching subtask the 
partition list is identical?


On 05.10.2016 17:17, Tzu-Li (Gordon) Tai wrote:

Hi Timo,

I haven’t had the chance to look at the producer side too much yet, but after a 
look in the code,
I think it’s reasonable to remove the instantiation from the producer 
constructor.
The instantiation in the constructor is only used for partition list fetching & 
eager properties validation
before running up the job. With an alternative to do the eager properties 
validation in the constructor without relying on KafkaProducer,
it should be safe to remove it from the constructor.

The consumer side actually has the same problem right now too. I was hoping to 
bundle the fix with a bigger task,
but would probably consider moving it up TODO list so it can be resolved sooner 
as a standalone fix.

Cheers,
Gordon


On October 5, 2016 at 10:51:05 PM, Timo Walther (twal...@apache.org) wrote:

Hey everyone,

I'm currently rewriting the KafkaTabeSinkTest and discovered something
that doesn't seem to be intended: Is it intended that
FlinkKafkaProducer08 cannot be instantiated without a running Kafka
instance?

The constructor of FlinkKafkaProducerBase calls getKafkaProducer() which
actually should be called in the open() method first. What happens if
the Client has no access to the Kafka properties (e.g. using an remote
execution environment)? Then it is impossible to create a KafkaProducer?

Thanks.

Timo





Re: KafkaProducer can not be instantiated

2016-10-05 Thread Tzu-Li (Gordon) Tai
Hi Timo,

I haven’t had the chance to look at the producer side too much yet, but after a 
look in the code,
I think it’s reasonable to remove the instantiation from the producer 
constructor.
The instantiation in the constructor is only used for partition list fetching & 
eager properties validation
before running up the job. With an alternative to do the eager properties 
validation in the constructor without relying on KafkaProducer,
it should be safe to remove it from the constructor.

The consumer side actually has the same problem right now too. I was hoping to 
bundle the fix with a bigger task,
but would probably consider moving it up TODO list so it can be resolved sooner 
as a standalone fix.

Cheers,
Gordon


On October 5, 2016 at 10:51:05 PM, Timo Walther (twal...@apache.org) wrote:

Hey everyone,  

I'm currently rewriting the KafkaTabeSinkTest and discovered something  
that doesn't seem to be intended: Is it intended that  
FlinkKafkaProducer08 cannot be instantiated without a running Kafka  
instance?  

The constructor of FlinkKafkaProducerBase calls getKafkaProducer() which  
actually should be called in the open() method first. What happens if  
the Client has no access to the Kafka properties (e.g. using an remote  
execution environment)? Then it is impossible to create a KafkaProducer?  

Thanks.  

Timo  


KafkaProducer can not be instantiated

2016-10-05 Thread Timo Walther

Hey everyone,

I'm currently rewriting the KafkaTabeSinkTest and discovered something 
that doesn't seem to be intended: Is it intended that 
FlinkKafkaProducer08 cannot be instantiated without a running Kafka 
instance?


The constructor of FlinkKafkaProducerBase calls getKafkaProducer() which 
actually should be called in the open() method first. What happens if 
the Client has no access to the Kafka properties (e.g. using an remote 
execution environment)? Then it is impossible to create a KafkaProducer?


Thanks.

Timo