Re: Providing Kafka configuration as Map of Strings
Have you tried passing in a Mapthat happens to have string for all the values? I haven't tested this, but the underlying kafka consumer constructor is documented to take either strings or objects as values, despite the static type. On Wed, Jan 24, 2018 at 2:48 PM, Tecno Brain wrote: > Basically, I am trying to avoid writing code like: > > switch( key ) { > case "key.deserializer" : result.put(key , > Class.forName(value)); break; > case "key.serializer" : result.put(key , > Class.forName(value)); break; > case "value.deserializer" : result.put(key , > Class.forName(value)); break; > case "value.serializer" : result.put(key , > Class.forName(value)); break; > case "max.partition.fetch.bytes" : result.put(key, > Long.valueOf(value)); break; > case "max.poll.interval.ms" : result.put(key, > Long.valueOf(value)); break; > case "enable.auto.commit" : result.put(key, > Boolean.valueOf(value)); break; > default: > result.put(key, value); > break; > } > > since I would need to go over all possible Kafka properties that are not > expected as a String. > > On Wed, Jan 24, 2018 at 12:32 PM, Tecno Brain > wrote: >> >> On page >> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html >> there is this Java example: >> >> Map kafkaParams = new HashMap<>(); >> kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092"); >> kafkaParams.put("key.deserializer", StringDeserializer.class); >> kafkaParams.put("value.deserializer", StringDeserializer.class); >> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); >> kafkaParams.put("auto.offset.reset", "latest"); >> kafkaParams.put("enable.auto.commit", false); >> >> Collection topics = Arrays.asList("topicA", "topicB"); >> >> JavaInputDStream > stream = >> KafkaUtils.createDirectStream( >> streamingContext, >> LocationStrategies.PreferConsistent(), >> ConsumerStrategies. Subscribe(topics, kafkaParams) >> ); >> >> I would like to configure Kafka from properties loaded from a Properties >> file or a Map . >> >> Is there any API to take a Map and produce the required >> Map required to set the Kafka parameters ? Such code would >> convert "true" to a boolean, or a class name to the Class depending on the >> key. >> >> Seems to me that I would need to know ALL possible Kafka parameters and >> what data type they should be converted to in order to produce the >> Map kafkaParams. >> >> The older API used a Map passed to the >> KafkaUtils.createDirectStream >> >> Thanks >> >> >> >> >> >> > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Providing Kafka configuration as Map of Strings
Basically, I am trying to avoid writing code like: switch( key ) { case "key.deserializer" : result.put(key , Class.forName(value)); break; case "key.serializer" : result.put(key , Class.forName(value)); break; case "value.deserializer" : result.put(key , Class.forName(value)); break; case "value.serializer" : result.put(key , Class.forName(value)); break; case "max.partition.fetch.bytes" : result.put(key, Long.valueOf(value)); break; case "max.poll.interval.ms" : result.put(key, Long.valueOf(value)); break; case "enable.auto.commit" : result.put(key, Boolean.valueOf(value)); break; default: result.put(key, value); break; } since I would need to go over all possible Kafka properties that are not expected as a String. On Wed, Jan 24, 2018 at 12:32 PM, Tecno Brainwrote: > On page https://spark.apache.org/docs/latest/streaming-kafka-0- > 10-integration.html > there is this Java example: > > Map kafkaParams = new > HashMap<>();kafkaParams.put("bootstrap.servers", > "localhost:9092,anotherhost:9092");kafkaParams.put("key.deserializer", > StringDeserializer.class);kafkaParams.put("value.deserializer", > StringDeserializer.class);kafkaParams.put("group.id", > "use_a_separate_group_id_for_each_stream");kafkaParams.put("auto.offset.reset", > "latest");kafkaParams.put("enable.auto.commit", false); > Collection topics = Arrays.asList("topicA", "topicB"); > JavaInputDStream > stream = > KafkaUtils.createDirectStream( > streamingContext, > LocationStrategies.PreferConsistent(), > ConsumerStrategies. Subscribe(topics, kafkaParams) > ); > > I would like to configure Kafka from properties loaded from a Properties > file or a Map . > > Is there any API to take a Map and produce the required > Map required to set the Kafka parameters ? Such code would > convert "true" to a boolean, or a class name to the Class depending on the > key. > > Seems to me that I would need to know ALL possible Kafka parameters and > what data type they should be converted to in order to produce the > Map kafkaParams. > > The older API used a Map passed to the > KafkaUtils.createDirectStream > > Thanks > > > > > > >
Providing Kafka configuration as Map of Strings
On page https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html there is this Java example: MapkafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");kafkaParams.put("auto.offset.reset", "latest");kafkaParams.put("enable.auto.commit", false); Collection topics = Arrays.asList("topicA", "topicB"); JavaInputDStream > stream = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies. Subscribe(topics, kafkaParams) ); I would like to configure Kafka from properties loaded from a Properties file or a Map . Is there any API to take a Map and produce the required Map required to set the Kafka parameters ? Such code would convert "true" to a boolean, or a class name to the Class depending on the key. Seems to me that I would need to know ALL possible Kafka parameters and what data type they should be converted to in order to produce the Map kafkaParams. The older API used a Map passed to the KafkaUtils.createDirectStream Thanks