[jira] [Created] (SPARK-15239) spark document conflict about mesos cluster

2016-05-09 Thread JasonChang (JIRA)
JasonChang created SPARK-15239:
--

 Summary: spark document conflict about mesos cluster 
 Key: SPARK-15239
 URL: https://issues.apache.org/jira/browse/SPARK-15239
 Project: Spark
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.6.1
Reporter: JasonChang


1. http://spark.apache.org/docs/latest/submitting-applications.html

if your application is submitted from a machine far from the worker machines 
(e.g. locally on your laptop), it is common to use cluster mode to minimize 
network latency between the drivers and the executors. Note that cluster mode 
is currently not supported for Mesos clusters. Currently only YARN supports 
cluster mode for Python applications.

2. http://spark.apache.org/docs/latest/running-on-mesos.html

Spark on Mesos also supports cluster mode, where the driver is launched in the 
cluster and the client can find the results of the driver from the Mesos Web UI.

I confused does mesos supports cluster mode?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15089) kafka-spark consumer with SSL problem

2016-05-03 Thread JasonChang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15270030#comment-15270030
 ] 

JasonChang commented on SPARK-15089:


Hi Sean
yes, broker  works with SSL
I run on kafka comsumer, it work
but kafka-spark consumer is not working
{code}
   public void consume(String topic, BiConsumer 
callback) {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaHosts);
props.put("key.deserializer", 
org.apache.kafka.common.serialization.StringDeserializer.class);
props.put("value.deserializer", 
org.apache.kafka.common.serialization.StringDeserializer.class);
props.put("group.id", group);
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", 
"/opt/cert/client.truststore.jks");
props.put("ssl.truststore.password", "password");
props.put("ssl.keystore.location", "/opt/cert/keystore.jks");
props.put("ssl.keystore.password", "password");
props.put("ssl.key.password", "password");

try (KafkaConsumer consumer = new 
KafkaConsumer(props)) {
consumer.subscribe(Arrays.asList(topic));

while (!stopped.get()) {
ConsumerRecords records = 
consumer.poll(100);
for (ConsumerRecord record : 
records) {
System.out.println("<<< " + 
record.key() + ", " + record.value());
callback.accept(record.key(), 
record.value());
}
}
System.out.println("Finishing subscription to topic " + 
topic);
}
}

{code}

> kafka-spark consumer with SSL problem
> -
>
> Key: SPARK-15089
> URL: https://issues.apache.org/jira/browse/SPARK-15089
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.1
>Reporter: JasonChang
>
> I am not sure spark streaming support SSL
> I tried to add params to kafkaParams, but it not work
> {code}
> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new 
> Duration(1));
> Set topicmap = new HashSet();
> topicmap.add(kafkaTopic);
> Map kafkaParams = new HashMap();
> kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server_url);
> kafkaParams.put("security.protocol", "SSL");
> kafkaParams.put("ssl.keystore.type", "JKS");
> kafkaParams.put("ssl.keystore.location", "/opt/cert/keystore.jks");
> kafkaParams.put("ssl.keystore.password ", "password");
> kafkaParams.put("ssl.key.password", "password");
> kafkaParams.put("ssl.truststore.type", "JKS");
> kafkaParams.put("ssl.truststore.location", "/opt/cert/client.truststore.jks");
> kafkaParams.put("ssl.truststore.password", "password");
> kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic);
> JavaPairInputDStream stream = 
> KafkaUtils.createDirectStream(jsc,
>   String.class,
>   String.class,
>   StringDecoder.class,
>   StringDecoder.class,
>   kafkaParams,
>   topicmap
> );
> JavaDStream lines = stream.map(new Function, 
> String>() {
>   public String call(Tuple2 tuple2) {
>   return tuple2._2();
>   }
> });
> {code}
> {code}
> Exception in thread "main" org.apache.spark.SparkException: 
> java.io.EOFException: Received -1 when reading from channel, socket has 
> likely been closed.
>   at 
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>   at 
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>   at scala.util.Either.fold(Either.scala:97)
>   at 
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>   at 
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>   at 
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>   at 
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
>   at 
> org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15089) kafka-spark consumer with SSL problem

2016-05-03 Thread JasonChang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JasonChang updated SPARK-15089:
---
Description: 
I am not sure spark streaming support SSL
I tried to add params to kafkaParams, but it not work
{code}
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new 
Duration(1));
Set topicmap = new HashSet();
topicmap.add(kafkaTopic);

Map kafkaParams = new HashMap();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server_url);
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.keystore.type", "JKS");
kafkaParams.put("ssl.keystore.location", "/opt/cert/keystore.jks");
kafkaParams.put("ssl.keystore.password ", "password");
kafkaParams.put("ssl.key.password", "password");
kafkaParams.put("ssl.truststore.type", "JKS");
kafkaParams.put("ssl.truststore.location", "/opt/cert/client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "password");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic);

JavaPairInputDStream stream = KafkaUtils.createDirectStream(jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicmap
);


JavaDStream lines = stream.map(new Function, 
String>() {
public String call(Tuple2 tuple2) {
return tuple2._2();
}
});
{code}


{code}
Exception in thread "main" org.apache.spark.SparkException: 
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at 
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at 
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
{code}

  was:
I am not sure spark streaming support SSL
I tried to add params to kafkaParams, but it not work
{code}
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new 
Duration(1));
Set topicmap = new HashSet();
topicmap.add(kafkaTopic);

Map kafkaParams = new HashMap();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
server_url);
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.keystore.type", "JKS");
kafkaParams.put("ssl.keystore.location", 
"/opt/cert/keystore.jks");
kafkaParams.put("ssl.keystore.password ", "password");
kafkaParams.put("ssl.key.password", "password");
kafkaParams.put("ssl.truststore.type", "JKS");
kafkaParams.put("ssl.truststore.location", 
"/opt/cert/client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "password");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic);

JavaPairInputDStream stream = 
KafkaUtils.createDirectStream(jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicmap
);


JavaDStream lines = stream.map(new 
Function, String>() {
public String call(Tuple2 tuple2) {
return tuple2._2();
}
});
{code}

exception

Exception in thread "main" org.apache.spark.SparkException: 
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at 
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at 
org.apache.spark.streaming.kafka.KafkaUtils

[jira] [Updated] (SPARK-15089) kafka-spark consumer with SSL problem

2016-05-03 Thread JasonChang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JasonChang updated SPARK-15089:
---
Description: 
I am not sure spark streaming support SSL
I tried to add params to kafkaParams, but it not work
{code}
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new 
Duration(1));
Set topicmap = new HashSet();
topicmap.add(kafkaTopic);

Map kafkaParams = new HashMap();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
server_url);
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.keystore.type", "JKS");
kafkaParams.put("ssl.keystore.location", 
"/opt/cert/keystore.jks");
kafkaParams.put("ssl.keystore.password ", "password");
kafkaParams.put("ssl.key.password", "password");
kafkaParams.put("ssl.truststore.type", "JKS");
kafkaParams.put("ssl.truststore.location", 
"/opt/cert/client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "password");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic);

JavaPairInputDStream stream = 
KafkaUtils.createDirectStream(jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicmap
);


JavaDStream lines = stream.map(new 
Function, String>() {
public String call(Tuple2 tuple2) {
return tuple2._2();
}
});
{/code}

exception

Exception in thread "main" org.apache.spark.SparkException: 
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at 
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at 
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)

  was:
I am not sure spark streaming support SSL
I tried to add params to kafkaParams, but it not work

JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new 
Duration(1));
Set topicmap = new HashSet();
topicmap.add(kafkaTopic);

Map kafkaParams = new HashMap();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
server_url);
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.keystore.type", "JKS");
kafkaParams.put("ssl.keystore.location", 
"/opt/cert/keystore.jks");
kafkaParams.put("ssl.keystore.password ", "password");
kafkaParams.put("ssl.key.password", "password");
kafkaParams.put("ssl.truststore.type", "JKS");
kafkaParams.put("ssl.truststore.location", 
"/opt/cert/client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "password");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic);

JavaPairInputDStream stream = 
KafkaUtils.createDirectStream(jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicmap
);


JavaDStream lines = stream.map(new 
Function, String>() {
public String call(Tuple2 tuple2) {
return tuple2._2();
}
});


exception

Exception in thread "main" org.apache.spark.SparkException: 
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at 
org.apache.spark.streaming.kafka.KafkaC

[jira] [Updated] (SPARK-15089) kafka-spark consumer with SSL problem

2016-05-03 Thread JasonChang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JasonChang updated SPARK-15089:
---
Description: 
I am not sure spark streaming support SSL
I tried to add params to kafkaParams, but it not work
{code}
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new 
Duration(1));
Set topicmap = new HashSet();
topicmap.add(kafkaTopic);

Map kafkaParams = new HashMap();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
server_url);
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.keystore.type", "JKS");
kafkaParams.put("ssl.keystore.location", 
"/opt/cert/keystore.jks");
kafkaParams.put("ssl.keystore.password ", "password");
kafkaParams.put("ssl.key.password", "password");
kafkaParams.put("ssl.truststore.type", "JKS");
kafkaParams.put("ssl.truststore.location", 
"/opt/cert/client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "password");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic);

JavaPairInputDStream stream = 
KafkaUtils.createDirectStream(jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicmap
);


JavaDStream lines = stream.map(new 
Function, String>() {
public String call(Tuple2 tuple2) {
return tuple2._2();
}
});
{code}

exception

Exception in thread "main" org.apache.spark.SparkException: 
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at 
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at 
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)

  was:
I am not sure spark streaming support SSL
I tried to add params to kafkaParams, but it not work
{code}
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new 
Duration(1));
Set topicmap = new HashSet();
topicmap.add(kafkaTopic);

Map kafkaParams = new HashMap();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
server_url);
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.keystore.type", "JKS");
kafkaParams.put("ssl.keystore.location", 
"/opt/cert/keystore.jks");
kafkaParams.put("ssl.keystore.password ", "password");
kafkaParams.put("ssl.key.password", "password");
kafkaParams.put("ssl.truststore.type", "JKS");
kafkaParams.put("ssl.truststore.location", 
"/opt/cert/client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "password");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic);

JavaPairInputDStream stream = 
KafkaUtils.createDirectStream(jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicmap
);


JavaDStream lines = stream.map(new 
Function, String>() {
public String call(Tuple2 tuple2) {
return tuple2._2();
}
});
{/code}

exception

Exception in thread "main" org.apache.spark.SparkException: 
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at 
org.apache.spark.streaming.

[jira] [Updated] (SPARK-15089) kafka-spark consumer with SSL problem

2016-05-03 Thread JasonChang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JasonChang updated SPARK-15089:
---
Description: 
I am not sure spark streaming support SSL
I tried to add params to kafkaParams, but it not work

JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new 
Duration(1));
Set topicmap = new HashSet();
topicmap.add(kafkaTopic);

Map kafkaParams = new HashMap();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
server_url);
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.keystore.type", "JKS");
kafkaParams.put("ssl.keystore.location", 
"/opt/cert/keystore.jks");
kafkaParams.put("ssl.keystore.password ", "password");
kafkaParams.put("ssl.key.password", "password");
kafkaParams.put("ssl.truststore.type", "JKS");
kafkaParams.put("ssl.truststore.location", 
"/opt/cert/client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "password");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic);

JavaPairInputDStream stream = 
KafkaUtils.createDirectStream(jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicmap
);


JavaDStream lines = stream.map(new 
Function, String>() {
public String call(Tuple2 tuple2) {
return tuple2._2();
}
});


exception

Exception in thread "main" org.apache.spark.SparkException: 
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at 
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at 
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)

  was:
I am not sure spark streaming support SSL
I tried to add params to kafkaParams, but it not work

JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new 
Duration(1));
//存放话题跟分片的映射关系
Set topicmap = new HashSet();
topicmap.add(kafkaTopic);

Map kafkaParams = new HashMap();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
server_url);
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.keystore.type", "JKS");
kafkaParams.put("ssl.keystore.location", 
"/opt/cert/keystore.jks");
kafkaParams.put("ssl.keystore.password ", "password");
kafkaParams.put("ssl.key.password", "password");
kafkaParams.put("ssl.truststore.type", "JKS");
kafkaParams.put("ssl.truststore.location", 
"/opt/cert/client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "password");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic);

JavaPairInputDStream stream = 
KafkaUtils.createDirectStream(jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicmap
);


JavaDStream lines = stream.map(new 
Function, String>() {
public String call(Tuple2 tuple2) {
return tuple2._2();
}
});


exception

Exception in thread "main" org.apache.spark.SparkException: 
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at 
org.apache.spark.stre

[jira] [Created] (SPARK-15089) kafka-spark consumer with SSL problem

2016-05-03 Thread JasonChang (JIRA)
JasonChang created SPARK-15089:
--

 Summary: kafka-spark consumer with SSL problem
 Key: SPARK-15089
 URL: https://issues.apache.org/jira/browse/SPARK-15089
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.6.1
Reporter: JasonChang


I am not sure spark streaming support SSL
I tried to add params to kafkaParams, but it not work

JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new 
Duration(1));
//存放话题跟分片的映射关系
Set topicmap = new HashSet();
topicmap.add(kafkaTopic);

Map kafkaParams = new HashMap();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
server_url);
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.keystore.type", "JKS");
kafkaParams.put("ssl.keystore.location", 
"/opt/cert/keystore.jks");
kafkaParams.put("ssl.keystore.password ", "password");
kafkaParams.put("ssl.key.password", "password");
kafkaParams.put("ssl.truststore.type", "JKS");
kafkaParams.put("ssl.truststore.location", 
"/opt/cert/client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "password");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic);

JavaPairInputDStream stream = 
KafkaUtils.createDirectStream(jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicmap
);


JavaDStream lines = stream.map(new 
Function, String>() {
public String call(Tuple2 tuple2) {
return tuple2._2();
}
});


exception

Exception in thread "main" org.apache.spark.SparkException: 
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at 
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at 
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org