Re: Providing Kafka configuration as Map of Strings

2018-01-24 Thread Cody Koeninger
Have you tried passing in a Map that happens to have
string for all the values?  I haven't tested this, but the underlying
kafka consumer constructor is documented to take either strings or
objects as values, despite the static type.

On Wed, Jan 24, 2018 at 2:48 PM, Tecno Brain
 wrote:
> Basically, I am trying to avoid writing code like:
>
>   switch( key ) {
> case "key.deserializer" :  result.put(key ,
> Class.forName(value)); break;
> case "key.serializer"   :  result.put(key ,
> Class.forName(value)); break;
> case "value.deserializer" :  result.put(key ,
> Class.forName(value)); break;
> case "value.serializer"   :  result.put(key ,
> Class.forName(value)); break;
> case "max.partition.fetch.bytes" : result.put(key,
> Long.valueOf(value)); break;
> case "max.poll.interval.ms" : result.put(key,
> Long.valueOf(value)); break;
> case "enable.auto.commit" : result.put(key,
> Boolean.valueOf(value)); break;
> default:
> result.put(key, value);
> break;
> }
>
> since I would need to go over all possible Kafka properties that are not
> expected as a String.
>
> On Wed, Jan 24, 2018 at 12:32 PM, Tecno Brain 
> wrote:
>>
>> On page
>> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>> there is this Java example:
>>
>> Map kafkaParams = new HashMap<>();
>> kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
>> kafkaParams.put("key.deserializer", StringDeserializer.class);
>> kafkaParams.put("value.deserializer", StringDeserializer.class);
>> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>> kafkaParams.put("auto.offset.reset", "latest");
>> kafkaParams.put("enable.auto.commit", false);
>>
>> Collection topics = Arrays.asList("topicA", "topicB");
>>
>> JavaInputDStream> stream =
>>   KafkaUtils.createDirectStream(
>> streamingContext,
>> LocationStrategies.PreferConsistent(),
>> ConsumerStrategies.Subscribe(topics, kafkaParams)
>>   );
>>
>> I would like to configure Kafka from properties loaded from a Properties
>> file or a Map.
>>
>> Is there any API to take a Map and produce the required
>> Map required to set the Kafka parameters ? Such code would
>> convert "true" to a boolean, or a class name to the Class depending on the
>> key.
>>
>> Seems to me that I would need to know ALL possible Kafka parameters and
>> what data type they should be converted to in order to produce the
>> Map kafkaParams.
>>
>> The older API used a Map passed to the
>> KafkaUtils.createDirectStream
>>
>> Thanks
>>
>>
>>
>>
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Providing Kafka configuration as Map of Strings

2018-01-24 Thread Tecno Brain
Basically, I am trying to avoid writing code like:

  switch( key ) {
case "key.deserializer" :  result.put(key ,
Class.forName(value)); break;
case "key.serializer"   :  result.put(key ,
Class.forName(value)); break;
case "value.deserializer" :  result.put(key ,
Class.forName(value)); break;
case "value.serializer"   :  result.put(key ,
Class.forName(value)); break;
case "max.partition.fetch.bytes" : result.put(key,
Long.valueOf(value)); break;
case "max.poll.interval.ms" : result.put(key,
Long.valueOf(value)); break;
case "enable.auto.commit" : result.put(key,
Boolean.valueOf(value)); break;
default:
result.put(key, value);
break;
}

since I would need to go over all possible Kafka properties that are not
expected as a String.

On Wed, Jan 24, 2018 at 12:32 PM, Tecno Brain 
wrote:

> On page https://spark.apache.org/docs/latest/streaming-kafka-0-
> 10-integration.html
> there is this Java example:
>
> Map kafkaParams = new 
> HashMap<>();kafkaParams.put("bootstrap.servers", 
> "localhost:9092,anotherhost:9092");kafkaParams.put("key.deserializer", 
> StringDeserializer.class);kafkaParams.put("value.deserializer", 
> StringDeserializer.class);kafkaParams.put("group.id", 
> "use_a_separate_group_id_for_each_stream");kafkaParams.put("auto.offset.reset",
>  "latest");kafkaParams.put("enable.auto.commit", false);
> Collection topics = Arrays.asList("topicA", "topicB");
> JavaInputDStream> stream =
>   KafkaUtils.createDirectStream(
> streamingContext,
> LocationStrategies.PreferConsistent(),
> ConsumerStrategies.Subscribe(topics, kafkaParams)
>   );
>
> I would like to configure Kafka from properties loaded from a Properties
> file or a Map.
>
> Is there any API to take a Map and produce the required
> Map required to set the Kafka parameters ? Such code would
> convert "true" to a boolean, or a class name to the Class depending on the
> key.
>
> Seems to me that I would need to know ALL possible Kafka parameters and
> what data type they should be converted to in order to produce the
> Map kafkaParams.
>
> The older API used a Map passed to the
> KafkaUtils.createDirectStream
>
> Thanks
>
>
>
>
>
>
>


Providing Kafka configuration as Map of Strings

2018-01-24 Thread Tecno Brain
On page
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
there is this Java example:

Map kafkaParams = new
HashMap<>();kafkaParams.put("bootstrap.servers",
"localhost:9092,anotherhost:9092");kafkaParams.put("key.deserializer",
StringDeserializer.class);kafkaParams.put("value.deserializer",
StringDeserializer.class);kafkaParams.put("group.id",
"use_a_separate_group_id_for_each_stream");kafkaParams.put("auto.offset.reset",
"latest");kafkaParams.put("enable.auto.commit", false);
Collection topics = Arrays.asList("topicA", "topicB");
JavaInputDStream> stream =
  KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams)
  );

I would like to configure Kafka from properties loaded from a Properties
file or a Map.

Is there any API to take a Map and produce the required
Map required to set the Kafka parameters ? Such code would
convert "true" to a boolean, or a class name to the Class depending on the
key.

Seems to me that I would need to know ALL possible Kafka parameters and
what data type they should be converted to in order to produce the
Map kafkaParams.

The older API used a Map passed to the
KafkaUtils.createDirectStream

Thanks