[jira] [Created] (SPARK-15239) spark document conflict about mesos cluster
JasonChang created SPARK-15239: -- Summary: spark document conflict about mesos cluster Key: SPARK-15239 URL: https://issues.apache.org/jira/browse/SPARK-15239 Project: Spark Issue Type: Bug Components: Documentation Affects Versions: 1.6.1 Reporter: JasonChang 1. http://spark.apache.org/docs/latest/submitting-applications.html if your application is submitted from a machine far from the worker machines (e.g. locally on your laptop), it is common to use cluster mode to minimize network latency between the drivers and the executors. Note that cluster mode is currently not supported for Mesos clusters. Currently only YARN supports cluster mode for Python applications. 2. http://spark.apache.org/docs/latest/running-on-mesos.html Spark on Mesos also supports cluster mode, where the driver is launched in the cluster and the client can find the results of the driver from the Mesos Web UI. I confused does mesos supports cluster mode? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15089) kafka-spark consumer with SSL problem
[ https://issues.apache.org/jira/browse/SPARK-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15270030#comment-15270030 ] JasonChang commented on SPARK-15089: Hi Sean yes, broker works with SSL I run on kafka comsumer, it work but kafka-spark consumer is not working {code} public void consume(String topic, BiConsumer callback) { Properties props = new Properties(); props.put("bootstrap.servers", kafkaHosts); props.put("key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class); props.put("value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class); props.put("group.id", group); props.put("security.protocol", "SSL"); props.put("ssl.truststore.location", "/opt/cert/client.truststore.jks"); props.put("ssl.truststore.password", "password"); props.put("ssl.keystore.location", "/opt/cert/keystore.jks"); props.put("ssl.keystore.password", "password"); props.put("ssl.key.password", "password"); try (KafkaConsumer consumer = new KafkaConsumer(props)) { consumer.subscribe(Arrays.asList(topic)); while (!stopped.get()) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.println("<<< " + record.key() + ", " + record.value()); callback.accept(record.key(), record.value()); } } System.out.println("Finishing subscription to topic " + topic); } } {code} > kafka-spark consumer with SSL problem > - > > Key: SPARK-15089 > URL: https://issues.apache.org/jira/browse/SPARK-15089 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.6.1 >Reporter: JasonChang > > I am not sure spark streaming support SSL > I tried to add params to kafkaParams, but it not work > {code} > JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new > Duration(1)); > Set topicmap = new HashSet(); > topicmap.add(kafkaTopic); > Map kafkaParams = new HashMap(); > kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server_url); > kafkaParams.put("security.protocol", "SSL"); > kafkaParams.put("ssl.keystore.type", "JKS"); > kafkaParams.put("ssl.keystore.location", "/opt/cert/keystore.jks"); > kafkaParams.put("ssl.keystore.password ", "password"); > kafkaParams.put("ssl.key.password", "password"); > kafkaParams.put("ssl.truststore.type", "JKS"); > kafkaParams.put("ssl.truststore.location", "/opt/cert/client.truststore.jks"); > kafkaParams.put("ssl.truststore.password", "password"); > kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic); > JavaPairInputDStream stream = > KafkaUtils.createDirectStream(jsc, > String.class, > String.class, > StringDecoder.class, > StringDecoder.class, > kafkaParams, > topicmap > ); > JavaDStream lines = stream.map(new Function, > String>() { > public String call(Tuple2 tuple2) { > return tuple2._2(); > } > }); > {code} > {code} > Exception in thread "main" org.apache.spark.SparkException: > java.io.EOFException: Received -1 when reading from channel, socket has > likely been closed. > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) > at scala.util.Either.fold(Either.scala:97) > at > org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) > at > org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) > at > org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) > at > org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) > at > org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-15089) kafka-spark consumer with SSL problem
[ https://issues.apache.org/jira/browse/SPARK-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JasonChang updated SPARK-15089: --- Description: I am not sure spark streaming support SSL I tried to add params to kafkaParams, but it not work {code} JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(1)); Set topicmap = new HashSet(); topicmap.add(kafkaTopic); Map kafkaParams = new HashMap(); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server_url); kafkaParams.put("security.protocol", "SSL"); kafkaParams.put("ssl.keystore.type", "JKS"); kafkaParams.put("ssl.keystore.location", "/opt/cert/keystore.jks"); kafkaParams.put("ssl.keystore.password ", "password"); kafkaParams.put("ssl.key.password", "password"); kafkaParams.put("ssl.truststore.type", "JKS"); kafkaParams.put("ssl.truststore.location", "/opt/cert/client.truststore.jks"); kafkaParams.put("ssl.truststore.password", "password"); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic); JavaPairInputDStream stream = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicmap ); JavaDStream lines = stream.map(new Function, String>() { public String call(Tuple2 tuple2) { return tuple2._2(); } }); {code} {code} Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) {code} was: I am not sure spark streaming support SSL I tried to add params to kafkaParams, but it not work {code} JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(1)); Set topicmap = new HashSet(); topicmap.add(kafkaTopic); Map kafkaParams = new HashMap(); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server_url); kafkaParams.put("security.protocol", "SSL"); kafkaParams.put("ssl.keystore.type", "JKS"); kafkaParams.put("ssl.keystore.location", "/opt/cert/keystore.jks"); kafkaParams.put("ssl.keystore.password ", "password"); kafkaParams.put("ssl.key.password", "password"); kafkaParams.put("ssl.truststore.type", "JKS"); kafkaParams.put("ssl.truststore.location", "/opt/cert/client.truststore.jks"); kafkaParams.put("ssl.truststore.password", "password"); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic); JavaPairInputDStream stream = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicmap ); JavaDStream lines = stream.map(new Function, String>() { public String call(Tuple2 tuple2) { return tuple2._2(); } }); {code} exception Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) at org.apache.spark.streaming.kafka.KafkaUtils
[jira] [Updated] (SPARK-15089) kafka-spark consumer with SSL problem
[ https://issues.apache.org/jira/browse/SPARK-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JasonChang updated SPARK-15089: --- Description: I am not sure spark streaming support SSL I tried to add params to kafkaParams, but it not work {code} JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(1)); Set topicmap = new HashSet(); topicmap.add(kafkaTopic); Map kafkaParams = new HashMap(); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server_url); kafkaParams.put("security.protocol", "SSL"); kafkaParams.put("ssl.keystore.type", "JKS"); kafkaParams.put("ssl.keystore.location", "/opt/cert/keystore.jks"); kafkaParams.put("ssl.keystore.password ", "password"); kafkaParams.put("ssl.key.password", "password"); kafkaParams.put("ssl.truststore.type", "JKS"); kafkaParams.put("ssl.truststore.location", "/opt/cert/client.truststore.jks"); kafkaParams.put("ssl.truststore.password", "password"); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic); JavaPairInputDStream stream = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicmap ); JavaDStream lines = stream.map(new Function, String>() { public String call(Tuple2 tuple2) { return tuple2._2(); } }); {/code} exception Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) was: I am not sure spark streaming support SSL I tried to add params to kafkaParams, but it not work JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(1)); Set topicmap = new HashSet(); topicmap.add(kafkaTopic); Map kafkaParams = new HashMap(); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server_url); kafkaParams.put("security.protocol", "SSL"); kafkaParams.put("ssl.keystore.type", "JKS"); kafkaParams.put("ssl.keystore.location", "/opt/cert/keystore.jks"); kafkaParams.put("ssl.keystore.password ", "password"); kafkaParams.put("ssl.key.password", "password"); kafkaParams.put("ssl.truststore.type", "JKS"); kafkaParams.put("ssl.truststore.location", "/opt/cert/client.truststore.jks"); kafkaParams.put("ssl.truststore.password", "password"); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic); JavaPairInputDStream stream = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicmap ); JavaDStream lines = stream.map(new Function, String>() { public String call(Tuple2 tuple2) { return tuple2._2(); } }); exception Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaC
[jira] [Updated] (SPARK-15089) kafka-spark consumer with SSL problem
[ https://issues.apache.org/jira/browse/SPARK-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JasonChang updated SPARK-15089: --- Description: I am not sure spark streaming support SSL I tried to add params to kafkaParams, but it not work {code} JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(1)); Set topicmap = new HashSet(); topicmap.add(kafkaTopic); Map kafkaParams = new HashMap(); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server_url); kafkaParams.put("security.protocol", "SSL"); kafkaParams.put("ssl.keystore.type", "JKS"); kafkaParams.put("ssl.keystore.location", "/opt/cert/keystore.jks"); kafkaParams.put("ssl.keystore.password ", "password"); kafkaParams.put("ssl.key.password", "password"); kafkaParams.put("ssl.truststore.type", "JKS"); kafkaParams.put("ssl.truststore.location", "/opt/cert/client.truststore.jks"); kafkaParams.put("ssl.truststore.password", "password"); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic); JavaPairInputDStream stream = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicmap ); JavaDStream lines = stream.map(new Function, String>() { public String call(Tuple2 tuple2) { return tuple2._2(); } }); {code} exception Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) was: I am not sure spark streaming support SSL I tried to add params to kafkaParams, but it not work {code} JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(1)); Set topicmap = new HashSet(); topicmap.add(kafkaTopic); Map kafkaParams = new HashMap(); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server_url); kafkaParams.put("security.protocol", "SSL"); kafkaParams.put("ssl.keystore.type", "JKS"); kafkaParams.put("ssl.keystore.location", "/opt/cert/keystore.jks"); kafkaParams.put("ssl.keystore.password ", "password"); kafkaParams.put("ssl.key.password", "password"); kafkaParams.put("ssl.truststore.type", "JKS"); kafkaParams.put("ssl.truststore.location", "/opt/cert/client.truststore.jks"); kafkaParams.put("ssl.truststore.password", "password"); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic); JavaPairInputDStream stream = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicmap ); JavaDStream lines = stream.map(new Function, String>() { public String call(Tuple2 tuple2) { return tuple2._2(); } }); {/code} exception Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.
[jira] [Updated] (SPARK-15089) kafka-spark consumer with SSL problem
[ https://issues.apache.org/jira/browse/SPARK-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JasonChang updated SPARK-15089: --- Description: I am not sure spark streaming support SSL I tried to add params to kafkaParams, but it not work JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(1)); Set topicmap = new HashSet(); topicmap.add(kafkaTopic); Map kafkaParams = new HashMap(); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server_url); kafkaParams.put("security.protocol", "SSL"); kafkaParams.put("ssl.keystore.type", "JKS"); kafkaParams.put("ssl.keystore.location", "/opt/cert/keystore.jks"); kafkaParams.put("ssl.keystore.password ", "password"); kafkaParams.put("ssl.key.password", "password"); kafkaParams.put("ssl.truststore.type", "JKS"); kafkaParams.put("ssl.truststore.location", "/opt/cert/client.truststore.jks"); kafkaParams.put("ssl.truststore.password", "password"); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic); JavaPairInputDStream stream = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicmap ); JavaDStream lines = stream.map(new Function, String>() { public String call(Tuple2 tuple2) { return tuple2._2(); } }); exception Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) was: I am not sure spark streaming support SSL I tried to add params to kafkaParams, but it not work JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(1)); //存放话题跟分片的映射关系 Set topicmap = new HashSet(); topicmap.add(kafkaTopic); Map kafkaParams = new HashMap(); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server_url); kafkaParams.put("security.protocol", "SSL"); kafkaParams.put("ssl.keystore.type", "JKS"); kafkaParams.put("ssl.keystore.location", "/opt/cert/keystore.jks"); kafkaParams.put("ssl.keystore.password ", "password"); kafkaParams.put("ssl.key.password", "password"); kafkaParams.put("ssl.truststore.type", "JKS"); kafkaParams.put("ssl.truststore.location", "/opt/cert/client.truststore.jks"); kafkaParams.put("ssl.truststore.password", "password"); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic); JavaPairInputDStream stream = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicmap ); JavaDStream lines = stream.map(new Function, String>() { public String call(Tuple2 tuple2) { return tuple2._2(); } }); exception Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.stre
[jira] [Created] (SPARK-15089) kafka-spark consumer with SSL problem
JasonChang created SPARK-15089: -- Summary: kafka-spark consumer with SSL problem Key: SPARK-15089 URL: https://issues.apache.org/jira/browse/SPARK-15089 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.6.1 Reporter: JasonChang I am not sure spark streaming support SSL I tried to add params to kafkaParams, but it not work JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(1)); //存放话题跟分片的映射关系 Set topicmap = new HashSet(); topicmap.add(kafkaTopic); Map kafkaParams = new HashMap(); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server_url); kafkaParams.put("security.protocol", "SSL"); kafkaParams.put("ssl.keystore.type", "JKS"); kafkaParams.put("ssl.keystore.location", "/opt/cert/keystore.jks"); kafkaParams.put("ssl.keystore.password ", "password"); kafkaParams.put("ssl.key.password", "password"); kafkaParams.put("ssl.truststore.type", "JKS"); kafkaParams.put("ssl.truststore.location", "/opt/cert/client.truststore.jks"); kafkaParams.put("ssl.truststore.password", "password"); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic); JavaPairInputDStream stream = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicmap ); JavaDStream lines = stream.map(new Function, String>() { public String call(Tuple2 tuple2) { return tuple2._2(); } }); exception Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org