import sql.implicits._

2016-10-14 Thread Jakub Dubovsky
Hey community,

I would like to *educate* myself about why all *sql implicits* (most
notably conversion to Dataset API) are imported from *instance* of
SparkSession and not using static imports.

Having this design one runs into problems like this
.
It requires the presence of SparkSession instance (the only one we have) in
many parts of code. This makes code structuring harder.

I assume that there is a *reason* why this design was *chosen*. Can
somebody please point me to a resource or explain why is this?
What is an advantage of this approach?
Or why it is not possible to implement it with static imports?

Thanks a lot!

Jakub


Re: import sql.implicits._

2016-10-14 Thread Koert Kuipers
for example when do you Seq(1,2,3).toDF("a") it needs to get the
SparkSession from somewhere. by importing the implicits from
spark.implicits._ they have access to a SparkSession for operations like
this.

On Fri, Oct 14, 2016 at 4:42 PM, Jakub Dubovsky <
spark.dubovsky.ja...@gmail.com> wrote:

> Hey community,
>
> I would like to *educate* myself about why all *sql implicits* (most
> notably conversion to Dataset API) are imported from *instance* of
> SparkSession and not using static imports.
>
> Having this design one runs into problems like this
> .
> It requires the presence of SparkSession instance (the only one we have) in
> many parts of code. This makes code structuring harder.
>
> I assume that there is a *reason* why this design was *chosen*. Can
> somebody please point me to a resource or explain why is this?
> What is an advantage of this approach?
> Or why it is not possible to implement it with static imports?
>
> Thanks a lot!
>
> Jakub
>
>


Re: import sql.implicits._

2016-10-14 Thread Koert Kuipers
b
​asically the implicit conversiosn that need it are rdd => dataset and seq
=> dataset​

On Fri, Oct 14, 2016 at 5:47 PM, Koert Kuipers  wrote:

> for example when do you Seq(1,2,3).toDF("a") it needs to get the
> SparkSession from somewhere. by importing the implicits from
> spark.implicits._ they have access to a SparkSession for operations like
> this.
>
> On Fri, Oct 14, 2016 at 4:42 PM, Jakub Dubovsky <
> spark.dubovsky.ja...@gmail.com> wrote:
>
>> Hey community,
>>
>> I would like to *educate* myself about why all *sql implicits* (most
>> notably conversion to Dataset API) are imported from *instance* of
>> SparkSession and not using static imports.
>>
>> Having this design one runs into problems like this
>> .
>> It requires the presence of SparkSession instance (the only one we have) in
>> many parts of code. This makes code structuring harder.
>>
>> I assume that there is a *reason* why this design was *chosen*. Can
>> somebody please point me to a resource or explain why is this?
>> What is an advantage of this approach?
>> Or why it is not possible to implement it with static imports?
>>
>> Thanks a lot!
>>
>> Jakub
>>
>>
>


Re: import sql.implicits._

2016-10-14 Thread Koert Kuipers
about the stackoverflow question, do this:

def validateAndTransform(df: DataFrame) : DataFrame = {

  import df.sparkSession.implicits._

  ...
}



On Fri, Oct 14, 2016 at 5:51 PM, Koert Kuipers  wrote:

> b
> ​asically the implicit conversiosn that need it are rdd => dataset and seq
> => dataset​
>
> On Fri, Oct 14, 2016 at 5:47 PM, Koert Kuipers  wrote:
>
>> for example when do you Seq(1,2,3).toDF("a") it needs to get the
>> SparkSession from somewhere. by importing the implicits from
>> spark.implicits._ they have access to a SparkSession for operations like
>> this.
>>
>> On Fri, Oct 14, 2016 at 4:42 PM, Jakub Dubovsky <
>> spark.dubovsky.ja...@gmail.com> wrote:
>>
>>> Hey community,
>>>
>>> I would like to *educate* myself about why all *sql implicits* (most
>>> notably conversion to Dataset API) are imported from *instance* of
>>> SparkSession and not using static imports.
>>>
>>> Having this design one runs into problems like this
>>> .
>>> It requires the presence of SparkSession instance (the only one we have) in
>>> many parts of code. This makes code structuring harder.
>>>
>>> I assume that there is a *reason* why this design was *chosen*. Can
>>> somebody please point me to a resource or explain why is this?
>>> What is an advantage of this approach?
>>> Or why it is not possible to implement it with static imports?
>>>
>>> Thanks a lot!
>>>
>>> Jakub
>>>
>>>
>>
>


RE: Kafka integration: get existing Kafka messages?

2016-10-14 Thread Haopu Wang
Cody, the link is helpful. But I still have issues in my test.

I set "auto.offset.reset" to "earliest" and then create KafkaRDD using 
OffsetRange which is out of range.

According to Kafka's document, I expect to get earliest offset of that 
partition.

But I get below exception and it looks like "auto.offset.reset" is ignored at 
all. Please help, thanks again!

==
16/10/14 15:45:16 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.AssertionError: assertion failed: Failed to get records for 
spark-executor-null mytopic2 0 2 after polling for 512
==

-Original Message-
From: Cody Koeninger [mailto:c...@koeninger.org] 
Sent: 2016年10月13日 9:31
To: Haopu Wang
Cc: user@spark.apache.org
Subject: Re: Kafka integration: get existing Kafka messages?

Look at the presentation and blog post linked from

https://github.com/koeninger/kafka-exactly-once

They refer to the kafka 0.8 version of the direct stream but the basic
idea is the same

On Wed, Oct 12, 2016 at 7:35 PM, Haopu Wang  wrote:
> Cody, thanks for the response.
>
>
>
> So Kafka direct stream actually has consumer on both the driver and
> executor? Can you please provide more details? Thank you very much!
>
>
>
> 
>
> From: Cody Koeninger [mailto:c...@koeninger.org]
> Sent: 2016年10月12日 20:10
> To: Haopu Wang
> Cc: user@spark.apache.org
> Subject: Re: Kafka integration: get existing Kafka messages?
>
>
>
> its set to none for the executors, because otherwise they wont do exactly
> what the driver told them to do.
>
>
>
> you should be able to set up the driver consumer to determine batches
> however you want, though.
>
> On Wednesday, October 12, 2016, Haopu Wang  wrote:
>
> Hi,
>
>
>
> I want to read the existing Kafka messages and then subscribe new stream
> messages.
>
> But I find "auto.offset.reset" property is always set to "none" in
> KafkaUtils. Does that mean I cannot specify "earliest" property value when
> create direct stream?
>
> Thank you!
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: SparkR execution hang on when handle a RDD which is converted from DataFrame

2016-10-14 Thread Lantao Jin
40GB

2016-10-14 14:20 GMT+08:00 Felix Cheung :

> How big is the metrics_moveing_detection_cube table?
>
>
>
>
>
> On Thu, Oct 13, 2016 at 8:51 PM -0700, "Lantao Jin" 
> wrote:
>
> sqlContext <- sparkRHive.init(sc)
> sqlString<-
> "SELECT
> key_id,
> rtl_week_beg_dt rawdate,
> gmv_plan_rate_amt value
> FROM
> metrics_moveing_detection_cube
> "
> df <- sql(sqlString)
> rdd<-SparkR:::toRDD(df)
>
> #hang on case one: take from rdd
> #take(rdd,3)
>
> #hang on case two: convert back to dataframe
> #df1<-createDataFrame(rdd)
> #head(df1)
>
> #not hang case: direct handle on dataframe is ok
> head(df,3)
>
> Code above is spark2.0.0, change "df <- sql(sqlString)" to "df <-
> sql(sqlContext , sqlString)" and "createDataFrame(rdd)" to
> "createDataFrame(sqlContext, rdd)" can be used in Spark1.6.2
> BTW, metrics_moveing_detection_cube is a table from hive.
>
> All version is the same result. Thanks.
>
>
> Hang on happened in executor.
>
> stack is below:
>
>
> Thread ID Thread Name Thread State
> 116 Executor task launch worker-0 RUNNABLE
>
> java.net.SocketInputStream.socketRead0(Native Method)
> java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
> java.net.SocketInputStream.read(SocketInputStream.java:170)
> java.net.SocketInputStream.read(SocketInputStream.java:141)
> java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
> java.io.BufferedInputStream.read(BufferedInputStream.java:265)
> java.io.DataInputStream.readInt(DataInputStream.java:387)org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read(RRunner.scala:212)
> org.apache.spark.api.r.RRunner$$anon$1.(RRunner.scala:96)
> org.apache.spark.api.r.RRunner.compute(RRunner.scala:87)
> org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:49)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> org.apache.spark.scheduler.Task.run(Task.scala:85)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>
> 119 client DomainSocketWatcher RUNNABLE
>
> org.apache.hadoop.net.unix.DomainSocketWatcher.doPoll0(Native Method)
> org.apache.hadoop.net.unix.DomainSocketWatcher.access$900(DomainSocketWatcher.java:52)
> org.apache.hadoop.net.unix.DomainSocketWatcher$2.run(DomainSocketWatcher.java:511)
> java.lang.Thread.run(Thread.java:745)
>
> 69 dispatcher-event-loop-0 WAITING
>
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:205)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>
> 70 dispatcher-event-loop-1 WAITING
>
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:205)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>
> 79 dispatcher-event-loop-10 WAITING
>
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:205)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>
> 80 dispatcher-event-loop-11 WAITING
>
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:205)
> 

Re: Want to test spark-sql-kafka but get unresolved dependency error

2016-10-14 Thread Julian Keppel
Okay, thank you! Can you say, when this feature will be released?

2016-10-13 16:29 GMT+02:00 Cody Koeninger :

> As Sean said, it's unreleased.  If you want to try it out, build spark
>
> http://spark.apache.org/docs/latest/building-spark.html
>
> The easiest way to include the jar is probably to use mvn install to
> put it in your local repository, then link it in your application's
> mvn or sbt build file as described in the docs you linked.
>
>
> On Thu, Oct 13, 2016 at 3:24 AM, JayKay 
> wrote:
> > I want to work with the Kafka integration for structured streaming. I use
> > Spark version 2.0.0. and I start the spark-shell with:
> >
> > spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.0.0
> >
> > As described here:
> > https://github.com/apache/spark/blob/master/docs/
> structured-streaming-kafka-integration.md
> >
> > But I get a unresolved dependency error ("unresolved dependency:
> > org.apache.spark#spark-sql-kafka-0-10_2.11;2.0.0: not found"). So it
> seems
> > not to be available via maven or spark-packages.
> >
> > How can I accesss this package? Or am I doing something wrong/missing?
> >
> > Thank you for you help.
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Want-to-test-spark-sql-kafka-but-get-
> unresolved-dependency-error-tp27891.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Re: Want to test spark-sql-kafka but get unresolved dependency error

2016-10-14 Thread Cody Koeninger
I can't be sure, no.

On Fri, Oct 14, 2016 at 3:06 AM, Julian Keppel
 wrote:
> Okay, thank you! Can you say, when this feature will be released?
>
> 2016-10-13 16:29 GMT+02:00 Cody Koeninger :
>>
>> As Sean said, it's unreleased.  If you want to try it out, build spark
>>
>> http://spark.apache.org/docs/latest/building-spark.html
>>
>> The easiest way to include the jar is probably to use mvn install to
>> put it in your local repository, then link it in your application's
>> mvn or sbt build file as described in the docs you linked.
>>
>>
>> On Thu, Oct 13, 2016 at 3:24 AM, JayKay 
>> wrote:
>> > I want to work with the Kafka integration for structured streaming. I
>> > use
>> > Spark version 2.0.0. and I start the spark-shell with:
>> >
>> > spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.0.0
>> >
>> > As described here:
>> >
>> > https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md
>> >
>> > But I get a unresolved dependency error ("unresolved dependency:
>> > org.apache.spark#spark-sql-kafka-0-10_2.11;2.0.0: not found"). So it
>> > seems
>> > not to be available via maven or spark-packages.
>> >
>> > How can I accesss this package? Or am I doing something wrong/missing?
>> >
>> > Thank you for you help.
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Want-to-test-spark-sql-kafka-but-get-unresolved-dependency-error-tp27891.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Problems with new experimental Kafka Consumer for 0.10

2016-10-14 Thread Cody Koeninger
For you or anyone else having issues with consumer rebalance, what are
your settings for

heartbeat.interval.ms
session.timeout.ms
group.max.session.timeout.ms

relative to your batch time?

On Tue, Oct 11, 2016 at 10:19 AM, static-max  wrote:
> Hi,
>
> I run into the same exception
> (org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced ...), but I only use one
> stream.
> I get the exceptions when trying to manually commit the offset to Kafka:
>
> OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
> CanCommitOffsets cco = (CanCommitOffsets) directKafkaStream.dstream();
> cco.commitAsync(offsets);
>
> I tried setting "max.poll.records" to 1000 but this did not help.
>
> Any idea what could be wrong?
>
> 2016-10-11 15:36 GMT+02:00 Cody Koeninger :
>>
>> The new underlying kafka consumer prefetches data and is generally heavier
>> weight, so it is cached on executors.  Group id is part of the cache key. I
>> assumed kafka users would use different group ids for consumers they wanted
>> to be distinct, since otherwise would cause problems even with the normal
>> kafka consumer,  but that appears to be a poor assumption.
>>
>> I'll figure out a way to make this more obvious.
>>
>>
>> On Oct 11, 2016 8:19 AM, "Matthias Niehoff"
>>  wrote:
>>
>> good point, I changed the group id to be unique for the separate streams
>> and now it works. Thanks!
>>
>> Although changing this is ok for us, i am interested in the why :-) With
>> the old connector this was not a problem nor is it afaik with the pure kafka
>> consumer api
>>
>> 2016-10-11 14:30 GMT+02:00 Cody Koeninger :
>>>
>>> Just out of curiosity, have you tried using separate group ids for the
>>> separate streams?
>>>
>>>
>>> On Oct 11, 2016 4:46 AM, "Matthias Niehoff"
>>>  wrote:

 I stripped down the job to just consume the stream and print it, without
 avro deserialization. When I only consume one topic, everything is fine. As
 soon as I add a second stream it stucks after about 5 minutes. So I
 basically bails down to:


   val kafkaParams = Map[String, String](
 "bootstrap.servers" -> kafkaBrokers,
 "group.id" -> group,
 "key.deserializer" -> classOf[StringDeserializer].getName,
 "value.deserializer" -> classOf[BytesDeserializer].getName,
 "session.timeout.ms" -> s"${1 * 60 * 1000}",
 "request.timeout.ms" -> s"${2 * 60 * 1000}",
 "auto.offset.reset" -> "latest",
 "enable.auto.commit" -> "false"
   )

   def main(args: Array[String]) {

 def createStreamingContext(): StreamingContext = {

   val sparkConf = new SparkConf(true)
 .setAppName("Kafka Consumer Test")
   sparkConf.setMaster("local[*]")

   val ssc = new StreamingContext(sparkConf,
 Seconds(streaming_interval_seconds))

   // AD REQUESTS
   // ===
   val serializedAdRequestStream = createStream(ssc, topic_adrequest)
   serializedAdRequestStream.map(record =>
 record.value().get()).print(10)

   // VIEWS
   // ==
   val serializedViewStream = createStream(ssc, topic_view)
   serializedViewStream.map(record => record.value().get()).print(10)

 //  // CLICKS
 //  // ==
 //  val serializedClickStream = createStream(ssc, topic_click)
 //  serializedClickStream.map(record =>
 record.value().get()).print(10)

   ssc
 }

 val streamingContext = createStreamingContext
 streamingContext.start()
 streamingContext.awaitTermination()
   }


 And in the logs you see:


 16/10/10 14:02:26 INFO JobScheduler: Finished job streaming job
 1476100944000 ms.2 from job set of time 1476100944000 ms
 16/10/10 14:02:26 INFO JobScheduler: Total delay: 2,314 s for time
 1476100944000 ms (execution: 0,698 s)
 16/10/10 14:03:26 INFO JobScheduler: Added jobs for time 1476100946000
 ms
 16/10/10 14:03:26 INFO MapPartitionsRDD: Removing RDD 889 from
 persistence list
 16/10/10 14:03:26 INFO JobScheduler: Starting job streaming job
 1476100946000 ms.0 from job set of time 1476100946000 ms


 2016-10-11 9:28 GMT+02:00 Matthias Niehoff
 :
>
> This Job will fail after about 5 minutes:
>
>
> object SparkJobMinimal {
>
>   //read Avro schemas
>   var stream = getClass.getResourceAsStream("/avro/AdRequestLog.avsc")
>   val avroSchemaAdRequest =
> scala.io.Source.fromInputStream(stream).getLines.mkString
>   stream.close
>   stream =
> getClass.getResourceAsStream("/avro/AbstractEventLogEntry.avsc")
>   val 

Re: Kafka integration: get existing Kafka messages?

2016-10-14 Thread Cody Koeninger
If you're creating a Kafka RDD as opposed to a dstream, you're
explicitly specifying a beginning and ending offset, auto.offset.reset
doesn't really have anything to do with it.

If you look at that log line, it's trying to read the 2nd message out
of the 0th partition of mytopic2, and not able to get that record.

On Fri, Oct 14, 2016 at 2:54 AM, Haopu Wang  wrote:
> Cody, the link is helpful. But I still have issues in my test.
>
> I set "auto.offset.reset" to "earliest" and then create KafkaRDD using 
> OffsetRange which is out of range.
>
> According to Kafka's document, I expect to get earliest offset of that 
> partition.
>
> But I get below exception and it looks like "auto.offset.reset" is ignored at 
> all. Please help, thanks again!
>
> ==
> 16/10/14 15:45:16 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.lang.AssertionError: assertion failed: Failed to get records for 
> spark-executor-null mytopic2 0 2 after polling for 512
> ==
>
> -Original Message-
> From: Cody Koeninger [mailto:c...@koeninger.org]
> Sent: 2016年10月13日 9:31
> To: Haopu Wang
> Cc: user@spark.apache.org
> Subject: Re: Kafka integration: get existing Kafka messages?
>
> Look at the presentation and blog post linked from
>
> https://github.com/koeninger/kafka-exactly-once
>
> They refer to the kafka 0.8 version of the direct stream but the basic
> idea is the same
>
> On Wed, Oct 12, 2016 at 7:35 PM, Haopu Wang  wrote:
>> Cody, thanks for the response.
>>
>>
>>
>> So Kafka direct stream actually has consumer on both the driver and
>> executor? Can you please provide more details? Thank you very much!
>>
>>
>>
>> 
>>
>> From: Cody Koeninger [mailto:c...@koeninger.org]
>> Sent: 2016年10月12日 20:10
>> To: Haopu Wang
>> Cc: user@spark.apache.org
>> Subject: Re: Kafka integration: get existing Kafka messages?
>>
>>
>>
>> its set to none for the executors, because otherwise they wont do exactly
>> what the driver told them to do.
>>
>>
>>
>> you should be able to set up the driver consumer to determine batches
>> however you want, though.
>>
>> On Wednesday, October 12, 2016, Haopu Wang  wrote:
>>
>> Hi,
>>
>>
>>
>> I want to read the existing Kafka messages and then subscribe new stream
>> messages.
>>
>> But I find "auto.offset.reset" property is always set to "none" in
>> KafkaUtils. Does that mean I cannot specify "earliest" property value when
>> create direct stream?
>>
>> Thank you!
>>
>>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark with kerberos

2016-10-14 Thread Steve Loughran

On 13 Oct 2016, at 10:50, dbolshak 
> wrote:

Hello community,

We've a challenge and no ideas how to solve it.

The problem,

Say we have the following environment:
1. `cluster A`, the cluster does not use kerberos and we use it as a source
of data, important thing is - we don't manage this cluster.
2. `cluster B`, small cluster where our spark application is running and
performing some logic. (we manage this cluster and it does not have
kerberos).
3. `cluster C`, the cluster uses kerberos and we use it to keep results of
our spark application, we manage this cluster

Our requrements and conditions that are not mentioned yet:
1. All clusters are in a single data center, but in the different
subnetworks.
2. We cannot turn on kerberos on `cluster A`
3. We cannot turn off kerberos on `cluster C`
4. We can turn on/off kerberos on `cluster B`, currently it's turned off.
5. Spark app is built on top of RDD and does not depend on spark-sql.

Does anybody know how to write data using RDD api to remote cluster which is
running with Kerberos?

If you want to talk to the secure clsuter, C, from code running in cluster B, 
you'll need to turn kerberos on there. Maybe, maybe, you could just get away 
with kerberos being turned off, but you, the user, launching the application 
while logged in to kerberos yourself and so trusted by Cluster C.

one of the problems you are likely to hit with Spark here is that it's only 
going to collect the tokens you need to talk to HDFS at the time you launch the 
application, and by default, it only knows about the cluster FS. You will need 
to tell spark about the other filesystem at launch time, so it will know to 
authenticate with it as you, then collect the tokens needed for the application 
itself to work with kerberos.

spark.yarn.access.namenodes=hdfs://cluster-c:8080

-Steve

ps: https://steveloughran.gitbooks.io/kerberos_and_hadoop/content/