Re: HiveContext is Serialized?

2016-10-25 Thread Ajay Chander
Sean, thank you for making it clear. It was helpful.

Regards,
Ajay

On Wednesday, October 26, 2016, Sean Owen  wrote:

> This usage is fine, because you are only using the HiveContext locally on
> the driver. It's applied in a function that's used on a Scala collection.
>
> You can't use the HiveContext or SparkContext in a distribution operation.
> It has nothing to do with for loops.
>
> The fact that they're serializable is misleading. It's there, I believe,
> because these objects may be inadvertently referenced in the closure of a
> function that executes remotely, yet doesn't use the context. The closure
> cleaner can't always remove this reference. The task would fail to
> serialize even though it doesn't use the context. You will find these
> objects serialize but then don't work if used remotely.
>
> The NPE you see is an unrelated cosmetic problem that was fixed in 2.0.1
> IIRC.
>
> On Wed, Oct 26, 2016 at 4:28 AM Ajay Chander  > wrote:
>
>> Hi Everyone,
>>
>> I was thinking if I can use hiveContext inside foreach like below,
>>
>> object Test {
>>   def main(args: Array[String]): Unit = {
>>
>> val conf = new SparkConf()
>> val sc = new SparkContext(conf)
>> val hiveContext = new HiveContext(sc)
>>
>> val dataElementsFile = args(0)
>> val deDF = 
>> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>>
>> def calculate(de: Row) {
>>   val dataElement = de.getAs[String]("DataElement").trim
>>   val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + 
>> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM 
>> TEST_DB.TEST_TABLE1 ")
>>   df1.write.insertInto("TEST_DB.TEST_TABLE1")
>> }
>>
>> deDF.collect().foreach(calculate)
>>   }
>> }
>>
>>
>> I looked at 
>> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>>  and I see it is extending SqlContext which extends Logging with 
>> Serializable.
>>
>> Can anyone tell me if this is the right way to use it ? Thanks for your time.
>>
>> Regards,
>>
>> Ajay
>>
>>


Re: HiveContext is Serialized?

2016-10-25 Thread Sunita Arvind
Thanks for the response Sean. I have seen the NPE on similar issues very
consistently and assumed that could be the reason :) Thanks for clarifying.
regards
Sunita

On Tue, Oct 25, 2016 at 10:11 PM, Sean Owen  wrote:

> This usage is fine, because you are only using the HiveContext locally on
> the driver. It's applied in a function that's used on a Scala collection.
>
> You can't use the HiveContext or SparkContext in a distribution operation.
> It has nothing to do with for loops.
>
> The fact that they're serializable is misleading. It's there, I believe,
> because these objects may be inadvertently referenced in the closure of a
> function that executes remotely, yet doesn't use the context. The closure
> cleaner can't always remove this reference. The task would fail to
> serialize even though it doesn't use the context. You will find these
> objects serialize but then don't work if used remotely.
>
> The NPE you see is an unrelated cosmetic problem that was fixed in 2.0.1
> IIRC.
>
>
> On Wed, Oct 26, 2016 at 4:28 AM Ajay Chander  wrote:
>
>> Hi Everyone,
>>
>> I was thinking if I can use hiveContext inside foreach like below,
>>
>> object Test {
>>   def main(args: Array[String]): Unit = {
>>
>> val conf = new SparkConf()
>> val sc = new SparkContext(conf)
>> val hiveContext = new HiveContext(sc)
>>
>> val dataElementsFile = args(0)
>> val deDF = 
>> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>>
>> def calculate(de: Row) {
>>   val dataElement = de.getAs[String]("DataElement").trim
>>   val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + 
>> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM 
>> TEST_DB.TEST_TABLE1 ")
>>   df1.write.insertInto("TEST_DB.TEST_TABLE1")
>> }
>>
>> deDF.collect().foreach(calculate)
>>   }
>> }
>>
>>
>> I looked at 
>> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>>  and I see it is extending SqlContext which extends Logging with 
>> Serializable.
>>
>> Can anyone tell me if this is the right way to use it ? Thanks for your time.
>>
>> Regards,
>>
>> Ajay
>>
>>


Re: HiveContext is Serialized?

2016-10-25 Thread Sean Owen
This usage is fine, because you are only using the HiveContext locally on
the driver. It's applied in a function that's used on a Scala collection.

You can't use the HiveContext or SparkContext in a distribution operation.
It has nothing to do with for loops.

The fact that they're serializable is misleading. It's there, I believe,
because these objects may be inadvertently referenced in the closure of a
function that executes remotely, yet doesn't use the context. The closure
cleaner can't always remove this reference. The task would fail to
serialize even though it doesn't use the context. You will find these
objects serialize but then don't work if used remotely.

The NPE you see is an unrelated cosmetic problem that was fixed in 2.0.1
IIRC.

On Wed, Oct 26, 2016 at 4:28 AM Ajay Chander  wrote:

> Hi Everyone,
>
> I was thinking if I can use hiveContext inside foreach like below,
>
> object Test {
>   def main(args: Array[String]): Unit = {
>
> val conf = new SparkConf()
> val sc = new SparkContext(conf)
> val hiveContext = new HiveContext(sc)
>
> val dataElementsFile = args(0)
> val deDF = 
> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>
> def calculate(de: Row) {
>   val dataElement = de.getAs[String]("DataElement").trim
>   val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + 
> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM 
> TEST_DB.TEST_TABLE1 ")
>   df1.write.insertInto("TEST_DB.TEST_TABLE1")
> }
>
> deDF.collect().foreach(calculate)
>   }
> }
>
>
> I looked at 
> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>  and I see it is extending SqlContext which extends Logging with Serializable.
>
> Can anyone tell me if this is the right way to use it ? Thanks for your time.
>
> Regards,
>
> Ajay
>
>


Re: HiveContext is Serialized?

2016-10-25 Thread Ajay Chander
Sunita, Thanks for your time. In my scenario, based on each attribute from
deDF(1 column with just 66 rows), I have to query a Hive table and insert
into another table.

Thanks,
Ajay

On Wed, Oct 26, 2016 at 12:21 AM, Sunita Arvind 
wrote:

> Ajay,
>
> Afaik Generally these contexts cannot be accessed within loops. The sql
> query itself would run on distributed datasets so it's a parallel
> execution. Putting them in foreach would make it nested in nested. So
> serialization would become hard. Not sure I could explain it right.
>
> If you can create the dataframe in main, you can register it as a table
> and run the queries in main method itself. You don't need to coalesce or
> run the method within foreach.
>
> Regards
> Sunita
>
> On Tuesday, October 25, 2016, Ajay Chander  wrote:
>
>>
>> Jeff, Thanks for your response. I see below error in the logs. You think
>> it has to do anything with hiveContext ? Do I have to serialize it before
>> using inside foreach ?
>>
>> 16/10/19 15:16:23 ERROR scheduler.LiveListenerBus: Listener SQLListener
>> threw an exception
>> java.lang.NullPointerException
>> at org.apache.spark.sql.execution.ui.SQLListener.onTaskEnd(SQLL
>> istener.scala:167)
>> at org.apache.spark.scheduler.SparkListenerBus$class.onPostEven
>> t(SparkListenerBus.scala:42)
>> at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveL
>> istenerBus.scala:31)
>> at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveL
>> istenerBus.scala:31)
>> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBu
>> s.scala:55)
>> at org.apache.spark.util.AsynchronousListenerBus.postToAll(Asyn
>> chronousListenerBus.scala:37)
>> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonf
>> un$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Asynchronous
>> ListenerBus.scala:80)
>> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonf
>> un$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
>> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonf
>> un$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonf
>> un$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
>> at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.sca
>> la:1181)
>> at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(As
>> ynchronousListenerBus.scalnerBus.scala:63)
>>
>> Thanks,
>> Ajay
>>
>> On Tue, Oct 25, 2016 at 11:45 PM, Jeff Zhang  wrote:
>>
>>>
>>> In your sample code, you can use hiveContext in the foreach as it is
>>> scala List foreach operation which runs in driver side. But you cannot use
>>> hiveContext in RDD.foreach
>>>
>>>
>>>
>>> Ajay Chander 于2016年10月26日周三 上午11:28写道:
>>>
 Hi Everyone,

 I was thinking if I can use hiveContext inside foreach like below,

 object Test {
   def main(args: Array[String]): Unit = {

 val conf = new SparkConf()
 val sc = new SparkContext(conf)
 val hiveContext = new HiveContext(sc)

 val dataElementsFile = args(0)
 val deDF = 
 hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()

 def calculate(de: Row) {
   val dataElement = de.getAs[String]("DataElement").trim
   val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + 
 dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM 
 TEST_DB.TEST_TABLE1 ")
   df1.write.insertInto("TEST_DB.TEST_TABLE1")
 }

 deDF.collect().foreach(calculate)
   }
 }


 I looked at 
 https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
  and I see it is extending SqlContext which extends Logging with 
 Serializable.

 Can anyone tell me if this is the right way to use it ? Thanks for your 
 time.

 Regards,

 Ajay


>>


Re: HiveContext is Serialized?

2016-10-25 Thread Sunita Arvind
Ajay,

Afaik Generally these contexts cannot be accessed within loops. The sql
query itself would run on distributed datasets so it's a parallel
execution. Putting them in foreach would make it nested in nested. So
serialization would become hard. Not sure I could explain it right.

If you can create the dataframe in main, you can register it as a table and
run the queries in main method itself. You don't need to coalesce or run
the method within foreach.

Regards
Sunita

On Tuesday, October 25, 2016, Ajay Chander  wrote:

>
> Jeff, Thanks for your response. I see below error in the logs. You think
> it has to do anything with hiveContext ? Do I have to serialize it before
> using inside foreach ?
>
> 16/10/19 15:16:23 ERROR scheduler.LiveListenerBus: Listener SQLListener
> threw an exception
> java.lang.NullPointerException
> at org.apache.spark.sql.execution.ui.SQLListener.onTaskEnd(
> SQLListener.scala:167)
> at org.apache.spark.scheduler.SparkListenerBus$class.onPostEven
> t(SparkListenerBus.scala:42)
> at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveL
> istenerBus.scala:31)
> at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveL
> istenerBus.scala:31)
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBu
> s.scala:55)
> at org.apache.spark.util.AsynchronousListenerBus.postToAll(Asyn
> chronousListenerBus.scala:37)
> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$
> anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Asynchro
> nousListenerBus.scala:80)
> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$
> anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousLis
> tenerBus.scala:65)
> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$
> anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousLis
> tenerBus.scala:65)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$
> anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
> at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.sca
> la:1181)
> at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(
> AsynchronousListenerBus.scalnerBus.scala:63)
>
> Thanks,
> Ajay
>
> On Tue, Oct 25, 2016 at 11:45 PM, Jeff Zhang  > wrote:
>
>>
>> In your sample code, you can use hiveContext in the foreach as it is
>> scala List foreach operation which runs in driver side. But you cannot use
>> hiveContext in RDD.foreach
>>
>>
>>
>> Ajay Chander > >于2016年10月26日周三
>> 上午11:28写道:
>>
>>> Hi Everyone,
>>>
>>> I was thinking if I can use hiveContext inside foreach like below,
>>>
>>> object Test {
>>>   def main(args: Array[String]): Unit = {
>>>
>>> val conf = new SparkConf()
>>> val sc = new SparkContext(conf)
>>> val hiveContext = new HiveContext(sc)
>>>
>>> val dataElementsFile = args(0)
>>> val deDF = 
>>> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>>>
>>> def calculate(de: Row) {
>>>   val dataElement = de.getAs[String]("DataElement").trim
>>>   val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + 
>>> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM 
>>> TEST_DB.TEST_TABLE1 ")
>>>   df1.write.insertInto("TEST_DB.TEST_TABLE1")
>>> }
>>>
>>> deDF.collect().foreach(calculate)
>>>   }
>>> }
>>>
>>>
>>> I looked at 
>>> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>>>  and I see it is extending SqlContext which extends Logging with 
>>> Serializable.
>>>
>>> Can anyone tell me if this is the right way to use it ? Thanks for your 
>>> time.
>>>
>>> Regards,
>>>
>>> Ajay
>>>
>>>
>


Re: HiveContext is Serialized?

2016-10-25 Thread Ajay Chander
Jeff, Thanks for your response. I see below error in the logs. You think it
has to do anything with hiveContext ? Do I have to serialize it before
using inside foreach ?

16/10/19 15:16:23 ERROR scheduler.LiveListenerBus: Listener SQLListener
threw an exception
java.lang.NullPointerException
at org.apache.spark.sql.execution.ui.SQLListener.
onTaskEnd(SQLListener.scala:167)
at org.apache.spark.scheduler.SparkListenerBus$class.
onPostEvent(SparkListenerBus.scala:42)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(
LiveListenerBus.scala:31)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(
LiveListenerBus.scala:31)
at org.apache.spark.util.ListenerBus$class.postToAll(
ListenerBus.scala:55)
at org.apache.spark.util.AsynchronousListenerBus.postToAll(
AsynchronousListenerBus.scala:37)
at org.apache.spark.util.AsynchronousListenerBus$$anon$
1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(
AsynchronousListenerBus.scala:80)
at org.apache.spark.util.AsynchronousListenerBus$$anon$
1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(
AsynchronousListenerBus.scala:65)
at org.apache.spark.util.AsynchronousListenerBus$$anon$
1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(
AsynchronousListenerBus.scala:65)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.util.AsynchronousListenerBus$$anon$
1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.
scala:1181)
at org.apache.spark.util.AsynchronousListenerBus$$anon$
1.run(AsynchronousListenerBus.scalnerBus.scala:63)

Thanks,
Ajay

On Tue, Oct 25, 2016 at 11:45 PM, Jeff Zhang  wrote:

>
> In your sample code, you can use hiveContext in the foreach as it is scala
> List foreach operation which runs in driver side. But you cannot use
> hiveContext in RDD.foreach
>
>
>
> Ajay Chander 于2016年10月26日周三 上午11:28写道:
>
>> Hi Everyone,
>>
>> I was thinking if I can use hiveContext inside foreach like below,
>>
>> object Test {
>>   def main(args: Array[String]): Unit = {
>>
>> val conf = new SparkConf()
>> val sc = new SparkContext(conf)
>> val hiveContext = new HiveContext(sc)
>>
>> val dataElementsFile = args(0)
>> val deDF = 
>> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>>
>> def calculate(de: Row) {
>>   val dataElement = de.getAs[String]("DataElement").trim
>>   val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + 
>> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM 
>> TEST_DB.TEST_TABLE1 ")
>>   df1.write.insertInto("TEST_DB.TEST_TABLE1")
>> }
>>
>> deDF.collect().foreach(calculate)
>>   }
>> }
>>
>>
>> I looked at 
>> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>>  and I see it is extending SqlContext which extends Logging with 
>> Serializable.
>>
>> Can anyone tell me if this is the right way to use it ? Thanks for your time.
>>
>> Regards,
>>
>> Ajay
>>
>>


Re: HiveContext is Serialized?

2016-10-25 Thread Jeff Zhang
In your sample code, you can use hiveContext in the foreach as it is scala
List foreach operation which runs in driver side. But you cannot use
hiveContext in RDD.foreach



Ajay Chander 于2016年10月26日周三 上午11:28写道:

> Hi Everyone,
>
> I was thinking if I can use hiveContext inside foreach like below,
>
> object Test {
>   def main(args: Array[String]): Unit = {
>
> val conf = new SparkConf()
> val sc = new SparkContext(conf)
> val hiveContext = new HiveContext(sc)
>
> val dataElementsFile = args(0)
> val deDF = 
> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>
> def calculate(de: Row) {
>   val dataElement = de.getAs[String]("DataElement").trim
>   val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + 
> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM 
> TEST_DB.TEST_TABLE1 ")
>   df1.write.insertInto("TEST_DB.TEST_TABLE1")
> }
>
> deDF.collect().foreach(calculate)
>   }
> }
>
>
> I looked at 
> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>  and I see it is extending SqlContext which extends Logging with Serializable.
>
> Can anyone tell me if this is the right way to use it ? Thanks for your time.
>
> Regards,
>
> Ajay
>
>


HiveContext is Serialized?

2016-10-25 Thread Ajay Chander
Hi Everyone,

I was thinking if I can use hiveContext inside foreach like below,

object Test {
  def main(args: Array[String]): Unit = {

val conf = new SparkConf()
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)

val dataElementsFile = args(0)
val deDF = 
hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()

def calculate(de: Row) {
  val dataElement = de.getAs[String]("DataElement").trim
  val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" +
dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM
TEST_DB.TEST_TABLE1 ")
  df1.write.insertInto("TEST_DB.TEST_TABLE1")
}

deDF.collect().foreach(calculate)
  }
}


I looked at 
https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
and I see it is extending SqlContext which extends Logging with
Serializable.

Can anyone tell me if this is the right way to use it ? Thanks for your time.

Regards,

Ajay


Re: [Spark 2.0.1] Error in generated code, possible regression?

2016-10-25 Thread Efe Selcuk
I'd like to do that, though are there any guidelines of tracking down the
context of the generated code?

On Mon, Oct 24, 2016 at 11:44 PM Kazuaki Ishizaki 
wrote:

Can you have a smaller program that can reproduce the same error? If you
also create a JIRA entry, it would be great.

Kazuaki Ishizaki



From:Efe Selcuk 
To:"user @spark" 
Date:2016/10/25 10:23
Subject:[Spark 2.0.1] Error in generated code, possible regression?
--



I have an application that works in 2.0.0 but has been dying at runtime on
the 2.0.1 distribution.

at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
at
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
... 30 more
Caused by: org.codehaus.commons.compiler.CompileException: File
'generated.java', Line 74, Column 145: Unknown variable or type "value4"

It also includes a massive 1800-line generated code output (which repeats
over and over, even on 1 thread, which makes this a pain), but fortunately
the error occurs early so I can give at least some context.

/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificMutableProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificMutableProjection extends
org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private MutableRow mutableRow;
/* 009 */   private Object[] values;
... // many lines of class variables, mostly errMsg strings and Object[]
/* 071 */   private void apply2_7(InternalRow i) {
/* 072 */
/* 073 */ boolean isNull215 = false;
/* 074 */ final com.mypackage.MyThing value215 = isNull215 ? null :
(com.mypackage.MyThing) value4._2();
/* 075 */ isNull215 = value215 == null;
/* 076 */
...

As you can see, on line 74 there's a reference to value4 but nothing called
value4 has been defined. I have no idea of where to even begin looking for
what caused this, or even whether it's my fault or a bug in the code
generation. Any help is appreciated.

Efe


Re: Operator push down through JDBC driver

2016-10-25 Thread AnilKumar B
I thought, we can use  sqlContext.sql("some join query") API with jdbc,
that's why I have asked the above question.

But as we can only use
sqlContext.read().format("jdbc").options(options).load() and here we can
use actual join query of ORACLE source.

So this question is not valid. Please ignore it.

Thanks & Regards,
B Anil Kumar.

On Tue, Oct 25, 2016 at 2:35 PM, AnilKumar B  wrote:

> Hi,
>
> I am using Spark SQL to transform data. My Source is ORACLE, In general, I
> am extracting multiple tables and joining them and then doing some other
> transformations in Spark.
>
> Is there any possibility for pushing down join operator to ORACLE using
> SPARK SQL, instead of fetching and joining in Spark? I am unable find any
> options for these optimizations rules at https://spark.apache.org/docs/
> 1.6.0/sql-programming-guide.html#jdbc-to-other-databases.
>
> I am currently using spark-1.6 version.
>
> Thanks & Regards,
> B Anil Kumar.
>


Re: Zero Data Loss in Spark with Kafka

2016-10-25 Thread Sunita Arvind
The error in the file I just shared is here:

val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" +
partition._2(0);  --> this was just partition and hence there was an
error

fetching the offset.

Still testing. Somehow Cody, your code never lead to file already
exists sort of errors (I am saving the output of the dstream
as parquet file, after converting it to a dataframe. The batch
interval will be 2 hrs)

The code in the main is here:

  val offsetsStore = new
ZooKeeperOffsetsStore(conf.getString("zkHosts"),
conf.getString("groupId"), conf.getString("topics"))
   val storedOffsets = offsetsStore.readOffsets()
 LogHandler.log.info("Fetched the offset from zookeeper")

 val kafkaArr =  storedOffsets match {
   case None =>
 // start from the initial offsets
 
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
kafkaProps, Set(topics))

   case Some(fromOffsets) =>
 // start from previously saved offsets
 val messageHandler: MessageAndMetadata[String, Array[Byte]] =>
(String, Array[Byte]) = (mmd: MessageAndMetadata[String, Array[Byte]])
=> (mmd.key, mmd.message)
 
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String,
Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler)

 //KafkaUtils.createRDD[String,Row,StringDecoder,ProtobufMessage,
(String, Row)](sc, kafkaProps, fromOffsets, messageHandler)
 }

 kafkaArr.foreachRDD{ (rdd,time) =>

val schema =
SchemaConverters.toSqlType(BeaconAvroData.getClassSchema).dataType.asInstanceOf[StructType]
val ardd:RDD[Row] = rdd.mapPartitions{itr => itr.map(r =>
Row.fromSeq(AvroUtils.avroToList(AvrodataUtils.getAvroData(r._2)).toArray))
}
val df = sql.createDataFrame(ardd,schema)
   LogHandler.log.info("Created dataframe")
   val offsetSaved =
offsetsStore.saveOffsets(topics,rdd).replace(":","-").replace(",","_")
   LogHandler.log.info("Saved offset to Zookeeper")
   df.saveAsParquetFile(conf.getString("ParquetOutputPath")+offsetSaved)
   LogHandler.log.info("Created the parquet file")
 }

Thanks

Sunita





On Tue, Oct 25, 2016 at 2:11 PM, Sunita Arvind 
wrote:

> Attached is the edited code. Am I heading in right direction? Also, I am
> missing something due to which, it seems to work well as long as the
> application is running and the files are created right. But as soon as I
> restart the application, it goes back to fromOffset as 0. Any thoughts?
>
> regards
> Sunita
>
> On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind 
> wrote:
>
>> Thanks for confirming Cody.
>> To get to use the library, I had to do:
>>
>> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
>> "/consumers/topics/"+ topics + "/0")
>>
>> It worked well. However, I had to specify the partitionId in the zkPath.
>> If I want the library to pick all the partitions for a topic, without me
>> specifying the path, is it possible out of the box or I need to tweak?
>>
>> regards
>> Sunita
>>
>>
>> On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger 
>> wrote:
>>
>>> You are correct that you shouldn't have to worry about broker id.
>>>
>>> I'm honestly not sure specifically what else you are asking at this
>>> point.
>>>
>>> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind 
>>> wrote:
>>> > Just re-read the kafka architecture. Something that slipped my mind
>>> is, it
>>> > is leader based. So topic/partitionId pair will be same on all the
>>> brokers.
>>> > So we do not need to consider brokerid while storing offsets. Still
>>> > exploring rest of the items.
>>> > regards
>>> > Sunita
>>> >
>>> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind >> >
>>> > wrote:
>>> >>
>>> >> Hello Experts,
>>> >>
>>> >> I am trying to use the saving to ZK design. Just saw Sudhir's comments
>>> >> that it is old approach. Any reasons for that? Any issues observed
>>> with
>>> >> saving to ZK. The way we are planning to use it is:
>>> >> 1. Following
>>> >> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
>>> g-zero-data-loss.html
>>> >> 2. Saving to the same file with offsetRange as a part of the file. We
>>> hope
>>> >> that there are no partial writes/ overwriting is possible and
>>> offsetRanges
>>> >>
>>> >> However I have below doubts which I couldnt figure out from the code
>>> here
>>> >> -
>>> >> https://github.com/ippontech/spark-kafka-source/blob/master/
>>> src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
>>> >> 1. The brokerId is not part of the OffsetRange. How will just the
>>> >> partitionId:FromOffset stay unique in a cluster with multiple brokers
>>> and
>>> >> multiple partitions/topic.
>>> >> 2. Do we have to specify zkPath to include the partitionid. I tried
>>> using
>>> >> the ZookeeperOffsetStore as is and it required me to specify the
>>> >> partitionId:
>>> >>
>>> >> val offsetsStore = new 

Operator push down through JDBC driver

2016-10-25 Thread AnilKumar B
Hi,

I am using Spark SQL to transform data. My Source is ORACLE, In general, I
am extracting multiple tables and joining them and then doing some other
transformations in Spark.

Is there any possibility for pushing down join operator to ORACLE using
SPARK SQL, instead of fetching and joining in Spark? I am unable find any
options for these optimizations rules at
https://spark.apache.org/docs/1.6.0/sql-programming-guide.html#jdbc-to-other-databases
.

I am currently using spark-1.6 version.

Thanks & Regards,
B Anil Kumar.


Re: Zero Data Loss in Spark with Kafka

2016-10-25 Thread Sunita Arvind
Attached is the edited code. Am I heading in right direction? Also, I am
missing something due to which, it seems to work well as long as the
application is running and the files are created right. But as soon as I
restart the application, it goes back to fromOffset as 0. Any thoughts?

regards
Sunita

On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind 
wrote:

> Thanks for confirming Cody.
> To get to use the library, I had to do:
>
> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
> "/consumers/topics/"+ topics + "/0")
>
> It worked well. However, I had to specify the partitionId in the zkPath.
> If I want the library to pick all the partitions for a topic, without me
> specifying the path, is it possible out of the box or I need to tweak?
>
> regards
> Sunita
>
>
> On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger 
> wrote:
>
>> You are correct that you shouldn't have to worry about broker id.
>>
>> I'm honestly not sure specifically what else you are asking at this point.
>>
>> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind 
>> wrote:
>> > Just re-read the kafka architecture. Something that slipped my mind is,
>> it
>> > is leader based. So topic/partitionId pair will be same on all the
>> brokers.
>> > So we do not need to consider brokerid while storing offsets. Still
>> > exploring rest of the items.
>> > regards
>> > Sunita
>> >
>> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind 
>> > wrote:
>> >>
>> >> Hello Experts,
>> >>
>> >> I am trying to use the saving to ZK design. Just saw Sudhir's comments
>> >> that it is old approach. Any reasons for that? Any issues observed with
>> >> saving to ZK. The way we are planning to use it is:
>> >> 1. Following
>> >> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
>> g-zero-data-loss.html
>> >> 2. Saving to the same file with offsetRange as a part of the file. We
>> hope
>> >> that there are no partial writes/ overwriting is possible and
>> offsetRanges
>> >>
>> >> However I have below doubts which I couldnt figure out from the code
>> here
>> >> -
>> >> https://github.com/ippontech/spark-kafka-source/blob/master/
>> src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
>> >> 1. The brokerId is not part of the OffsetRange. How will just the
>> >> partitionId:FromOffset stay unique in a cluster with multiple brokers
>> and
>> >> multiple partitions/topic.
>> >> 2. Do we have to specify zkPath to include the partitionid. I tried
>> using
>> >> the ZookeeperOffsetStore as is and it required me to specify the
>> >> partitionId:
>> >>
>> >> val offsetsStore = new ZooKeeperOffsetsStore(conf.get
>> String("zkHosts"),
>> >> "/consumers/topics/"+ topics + "/0")
>> >>
>> >> For our usecases it is too limiting to include partitionId in the path.
>> >> To get it to work by automatically detecting the existing partitions
>> for a
>> >> given topic, I changed it as below (inspired from
>> >> http://www.programcreek.com/java-api-examples/index.php?api=
>> kafka.utils.ZKGroupTopicDirs):
>> >>
>> >> /**
>> >>   * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
>> >>   * groupID consumer group to get offsets for
>> >>   * topic topic to get offsets for
>> >>   * return - mapping of (topic and) partition to offset
>> >>   */
>> >> private def getOffsets(groupID :String, topic: String):Option[String]
>> = {
>> >>   val topicDirs = new ZKGroupTopicDirs(groupID, topic)
>> >>   val offsets = new mutable.HashMap[TopicAndPartition,Long]()
>> >>   val topicSeq = List(topic).toSeq
>> >>  // try {
>> >> val partitions = ZkUtils.getPartitionsForTopics(zkClient,
>> topicSeq)
>> >> var partition:Object=null
>> >> for (partition <- partitions) {
>> >>   val partitionOffsetPath:String = topicDirs.consumerOffsetDir +
>> "/" +
>> >> partition;
>> >>   val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(zkCl
>> ient,
>> >> partitionOffsetPath)._1;
>> >>   val offset:Long = if(maybeOffset.isDefined)
>> maybeOffset.get.toLong
>> >> else 0L;
>> >>   val topicAndPartition:TopicAndPartition  = new
>> >> TopicAndPartition(topic, Integer.parseInt(partition.toString));
>> >>   offsets.put(topicAndPartition, offset)
>> >> }
>> >>   //}
>> >> Option(offsets.mkString(","))
>> >> }
>> >>
>> >> // Read the previously saved offsets from Zookeeper
>> >> override def readOffsets: Option[Map[TopicAndPartition, Long]] = {
>> >>
>> >>   LogHandler.log.info("Reading offsets from ZooKeeper")
>> >>
>> >>   val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
>> >>   val start = System.currentTimeMillis()
>> >>   offsetsRangesStrOpt match {
>> >> case Some(offsetsRangesStr) =>
>> >>   LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}")
>> >>
>> >>   val offsets = offsetsRangesStr.split(",")
>> >> .map(s => s.split(":"))
>> >> .map { case Array(partitionStr, offsetStr) =>
>> 

Re: Zero Data Loss in Spark with Kafka

2016-10-25 Thread Sunita Arvind
Thanks for confirming Cody.
To get to use the library, I had to do:

val offsetsStore = new
ZooKeeperOffsetsStore(conf.getString("zkHosts"), "/consumers/topics/"+
topics + "/0")

It worked well. However, I had to specify the partitionId in the zkPath.
If I want the library to pick all the partitions for a topic, without me
specifying the path, is it possible out of the box or I need to tweak?

regards
Sunita


On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger  wrote:

> You are correct that you shouldn't have to worry about broker id.
>
> I'm honestly not sure specifically what else you are asking at this point.
>
> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind 
> wrote:
> > Just re-read the kafka architecture. Something that slipped my mind is,
> it
> > is leader based. So topic/partitionId pair will be same on all the
> brokers.
> > So we do not need to consider brokerid while storing offsets. Still
> > exploring rest of the items.
> > regards
> > Sunita
> >
> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind 
> > wrote:
> >>
> >> Hello Experts,
> >>
> >> I am trying to use the saving to ZK design. Just saw Sudhir's comments
> >> that it is old approach. Any reasons for that? Any issues observed with
> >> saving to ZK. The way we are planning to use it is:
> >> 1. Following
> >> http://aseigneurin.github.io/2016/05/07/spark-kafka-
> achieving-zero-data-loss.html
> >> 2. Saving to the same file with offsetRange as a part of the file. We
> hope
> >> that there are no partial writes/ overwriting is possible and
> offsetRanges
> >>
> >> However I have below doubts which I couldnt figure out from the code
> here
> >> -
> >> https://github.com/ippontech/spark-kafka-source/blob/
> master/src/main/scala/com/ippontech/kafka/stores/
> ZooKeeperOffsetsStore.scala
> >> 1. The brokerId is not part of the OffsetRange. How will just the
> >> partitionId:FromOffset stay unique in a cluster with multiple brokers
> and
> >> multiple partitions/topic.
> >> 2. Do we have to specify zkPath to include the partitionid. I tried
> using
> >> the ZookeeperOffsetStore as is and it required me to specify the
> >> partitionId:
> >>
> >> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"),
> >> "/consumers/topics/"+ topics + "/0")
> >>
> >> For our usecases it is too limiting to include partitionId in the path.
> >> To get it to work by automatically detecting the existing partitions
> for a
> >> given topic, I changed it as below (inspired from
> >> http://www.programcreek.com/java-api-examples/index.php?
> api=kafka.utils.ZKGroupTopicDirs):
> >>
> >> /**
> >>   * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
> >>   * groupID consumer group to get offsets for
> >>   * topic topic to get offsets for
> >>   * return - mapping of (topic and) partition to offset
> >>   */
> >> private def getOffsets(groupID :String, topic: String):Option[String] =
> {
> >>   val topicDirs = new ZKGroupTopicDirs(groupID, topic)
> >>   val offsets = new mutable.HashMap[TopicAndPartition,Long]()
> >>   val topicSeq = List(topic).toSeq
> >>  // try {
> >> val partitions = ZkUtils.getPartitionsForTopics(zkClient, topicSeq)
> >> var partition:Object=null
> >> for (partition <- partitions) {
> >>   val partitionOffsetPath:String = topicDirs.consumerOffsetDir +
> "/" +
> >> partition;
> >>   val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(
> zkClient,
> >> partitionOffsetPath)._1;
> >>   val offset:Long = if(maybeOffset.isDefined) maybeOffset.get.toLong
> >> else 0L;
> >>   val topicAndPartition:TopicAndPartition  = new
> >> TopicAndPartition(topic, Integer.parseInt(partition.toString));
> >>   offsets.put(topicAndPartition, offset)
> >> }
> >>   //}
> >> Option(offsets.mkString(","))
> >> }
> >>
> >> // Read the previously saved offsets from Zookeeper
> >> override def readOffsets: Option[Map[TopicAndPartition, Long]] = {
> >>
> >>   LogHandler.log.info("Reading offsets from ZooKeeper")
> >>
> >>   val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
> >>   val start = System.currentTimeMillis()
> >>   offsetsRangesStrOpt match {
> >> case Some(offsetsRangesStr) =>
> >>   LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}")
> >>
> >>   val offsets = offsetsRangesStr.split(",")
> >> .map(s => s.split(":"))
> >> .map { case Array(partitionStr, offsetStr) =>
> >> (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
> >> .toMap
> >>
> >>   LogHandler.log.info("Done reading offsets from ZooKeeper. Took "
> +
> >> (System.currentTimeMillis() - start))
> >>
> >>   Some(offsets)
> >> case None =>
> >>   LogHandler.log.info("No offsets found in ZooKeeper. Took " +
> >> (System.currentTimeMillis() - start))
> >>   None
> >>   }
> >>
> >> }
> >>
> >> However, I am concerned if the saveOffsets will work well with this
> >> approach. 

Re: Zero Data Loss in Spark with Kafka

2016-10-25 Thread Cody Koeninger
You are correct that you shouldn't have to worry about broker id.

I'm honestly not sure specifically what else you are asking at this point.

On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind  wrote:
> Just re-read the kafka architecture. Something that slipped my mind is, it
> is leader based. So topic/partitionId pair will be same on all the brokers.
> So we do not need to consider brokerid while storing offsets. Still
> exploring rest of the items.
> regards
> Sunita
>
> On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind 
> wrote:
>>
>> Hello Experts,
>>
>> I am trying to use the saving to ZK design. Just saw Sudhir's comments
>> that it is old approach. Any reasons for that? Any issues observed with
>> saving to ZK. The way we are planning to use it is:
>> 1. Following
>> http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html
>> 2. Saving to the same file with offsetRange as a part of the file. We hope
>> that there are no partial writes/ overwriting is possible and offsetRanges
>>
>> However I have below doubts which I couldnt figure out from the code here
>> -
>> https://github.com/ippontech/spark-kafka-source/blob/master/src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
>> 1. The brokerId is not part of the OffsetRange. How will just the
>> partitionId:FromOffset stay unique in a cluster with multiple brokers and
>> multiple partitions/topic.
>> 2. Do we have to specify zkPath to include the partitionid. I tried using
>> the ZookeeperOffsetStore as is and it required me to specify the
>> partitionId:
>>
>> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"),
>> "/consumers/topics/"+ topics + "/0")
>>
>> For our usecases it is too limiting to include partitionId in the path.
>> To get it to work by automatically detecting the existing partitions for a
>> given topic, I changed it as below (inspired from
>> http://www.programcreek.com/java-api-examples/index.php?api=kafka.utils.ZKGroupTopicDirs):
>>
>> /**
>>   * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
>>   * groupID consumer group to get offsets for
>>   * topic topic to get offsets for
>>   * return - mapping of (topic and) partition to offset
>>   */
>> private def getOffsets(groupID :String, topic: String):Option[String] = {
>>   val topicDirs = new ZKGroupTopicDirs(groupID, topic)
>>   val offsets = new mutable.HashMap[TopicAndPartition,Long]()
>>   val topicSeq = List(topic).toSeq
>>  // try {
>> val partitions = ZkUtils.getPartitionsForTopics(zkClient, topicSeq)
>> var partition:Object=null
>> for (partition <- partitions) {
>>   val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" +
>> partition;
>>   val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(zkClient,
>> partitionOffsetPath)._1;
>>   val offset:Long = if(maybeOffset.isDefined) maybeOffset.get.toLong
>> else 0L;
>>   val topicAndPartition:TopicAndPartition  = new
>> TopicAndPartition(topic, Integer.parseInt(partition.toString));
>>   offsets.put(topicAndPartition, offset)
>> }
>>   //}
>> Option(offsets.mkString(","))
>> }
>>
>> // Read the previously saved offsets from Zookeeper
>> override def readOffsets: Option[Map[TopicAndPartition, Long]] = {
>>
>>   LogHandler.log.info("Reading offsets from ZooKeeper")
>>
>>   val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
>>   val start = System.currentTimeMillis()
>>   offsetsRangesStrOpt match {
>> case Some(offsetsRangesStr) =>
>>   LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}")
>>
>>   val offsets = offsetsRangesStr.split(",")
>> .map(s => s.split(":"))
>> .map { case Array(partitionStr, offsetStr) =>
>> (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
>> .toMap
>>
>>   LogHandler.log.info("Done reading offsets from ZooKeeper. Took " +
>> (System.currentTimeMillis() - start))
>>
>>   Some(offsets)
>> case None =>
>>   LogHandler.log.info("No offsets found in ZooKeeper. Took " +
>> (System.currentTimeMillis() - start))
>>   None
>>   }
>>
>> }
>>
>> However, I am concerned if the saveOffsets will work well with this
>> approach. Thats when I realized we are not considering brokerIds which
>> storing offsets and probably the OffsetRanges does not have it either. It
>> can only provide Topic, partition, from and until offsets.
>>
>> I am probably missing something very basic. Probably the library works
>> well by itself. Can someone/ Cody explain?
>>
>> Cody, Thanks a lot for sharing your work.
>>
>> regards
>> Sunita
>>
>>
>> On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger 
>> wrote:
>>>
>>> See
>>> https://github.com/koeninger/kafka-exactly-once
>>>
>>> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed"
>>>  wrote:

 Hi Experts,

 I am looking for some information on how to acheive zero data 

Re: Zero Data Loss in Spark with Kafka

2016-10-25 Thread Sunita Arvind
Just re-read the kafka architecture. Something that slipped my mind is, it
is leader based. So topic/partitionId pair will be same on all the brokers.
So we do not need to consider brokerid while storing offsets. Still
exploring rest of the items.
regards
Sunita

On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind 
wrote:

> Hello Experts,
>
> I am trying to use the saving to ZK design. Just saw Sudhir's comments
> that it is old approach. Any reasons for that? Any issues observed with
> saving to ZK. The way we are planning to use it is:
> 1. Following http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
> g-zero-data-loss.html
> 2. Saving to the same file with offsetRange as a part of the file. We hope
> that there are no partial writes/ overwriting is possible and offsetRanges
>
> However I have below doubts which I couldnt figure out from the code here
> - https://github.com/ippontech/spark-kafka-source/blob/master/
> src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
> 1. The brokerId is not part of the OffsetRange. How will just the
> partitionId:FromOffset stay unique in a cluster with multiple brokers and
> multiple partitions/topic.
> 2. Do we have to specify zkPath to include the partitionid. I tried using
> the ZookeeperOffsetStore as is and it required me to specify the
> partitionId:
>
> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
> "/consumers/topics/"+ topics + "/0")
>
> For our usecases it is too limiting to include partitionId in the path.
> To get it to work by automatically detecting the existing partitions for a 
> given topic, I changed it as below (inspired from 
> http://www.programcreek.com/java-api-examples/index.php?api=kafka.utils.ZKGroupTopicDirs):
>
> /**
>   * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
>   * groupID consumer group to get offsets for
>   * topic topic to get offsets for
>   * return - mapping of (topic and) partition to offset
>   */
> private def getOffsets(groupID :String, topic: String):Option[String] = {
>   val topicDirs = new ZKGroupTopicDirs(groupID, topic)
>   val offsets = new mutable.HashMap[TopicAndPartition,Long]()
>   val topicSeq = List(topic).toSeq
>  // try {
> val partitions = ZkUtils.getPartitionsForTopics(zkClient, topicSeq)
> var partition:Object=null
> for (partition <- partitions) {
>   val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" + 
> partition;
>   val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(zkClient, 
> partitionOffsetPath)._1;
>   val offset:Long = if(maybeOffset.isDefined) maybeOffset.get.toLong else 
> 0L;
>   val topicAndPartition:TopicAndPartition  = new TopicAndPartition(topic, 
> Integer.parseInt(partition.toString));
>   offsets.put(topicAndPartition, offset)
> }
>   //}
> Option(offsets.mkString(","))
> }
>
> // Read the previously saved offsets from Zookeeper
> override def readOffsets: Option[Map[TopicAndPartition, Long]] = {
>
>   LogHandler.log.info("Reading offsets from ZooKeeper")
>
>   val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
>   val start = System.currentTimeMillis()
>   offsetsRangesStrOpt match {
> case Some(offsetsRangesStr) =>
>   LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}")
>
>   val offsets = offsetsRangesStr.split(",")
> .map(s => s.split(":"))
> .map { case Array(partitionStr, offsetStr) => 
> (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
> .toMap
>
>   LogHandler.log.info("Done reading offsets from ZooKeeper. Took " + 
> (System.currentTimeMillis() - start))
>
>   Some(offsets)
> case None =>
>   LogHandler.log.info("No offsets found in ZooKeeper. Took " + 
> (System.currentTimeMillis() - start))
>   None
>   }
>
> }
>
> However, I am concerned if the saveOffsets will work well with this
> approach. Thats when I realized we are not considering brokerIds which
> storing offsets and probably the OffsetRanges does not have it either. It
> can only provide Topic, partition, from and until offsets.
>
> I am probably missing something very basic. Probably the library works
> well by itself. Can someone/ Cody explain?
>
> Cody, Thanks a lot for sharing your work.
>
> regards
> Sunita
>
>
> On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger 
> wrote:
>
>> See
>> https://github.com/koeninger/kafka-exactly-once
>> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed" 
>> wrote:
>>
>>> Hi Experts,
>>>
>>> I am looking for some information on how to acheive zero data loss while
>>> working with kafka and Spark. I have searched online and blogs have
>>> different answer. Please let me know if anyone has idea on this.
>>>
>>> Blog 1:
>>> https://databricks.com/blog/2015/01/15/improved-driver-fault
>>> -tolerance-and-zero-data-loss-in-spark-streaming.html
>>>
>>>
>>> Blog2:
>>> 

Re: Getting the IP address of Spark Driver in yarn-cluster mode

2016-10-25 Thread Masood Krohy
Thanks Steve.

Here is the Python pseudo code that got it working for me:

  import time; 
  import urllib2
  nodes= ({'worker1_hostname':'worker1_ip', ... })
  YARN_app_queue = 'default'
  YARN_address = 'http://YARN_IP:8088'

  YARN_app_startedTimeBegin = str(int(time.time() - 3600)) # We allow 
3,600 sec from start of the app up to this point

  requestedURL = (YARN_address + 
 '/ws/v1/cluster/apps?states=RUNNING=SPARK=1' + 
  '=' + YARN_app_queue + 
  '=' + YARN_app_startedTimeBegin)
  print 'Sent request to YARN: ' + requestedURL
  response = urllib2.urlopen(requestedURL)
  html = response.read()
  amHost_start = html.find('amHostHttpAddress') + 
len('amHostHttpAddress":"')
  amHost_length = len('worker1_hostname')
  amHost = html[amHost_start : amHost_start + amHost_length]
  print 'amHostHttpAddress is: ' + amHost
  try:
  self.websock = ...
  print 'Connected to server running on %s' % nodes[amHost] 
  except:
  print 'Could not connect to server on %s' % nodes[amHost]



--
Masood Krohy, Ph.D.
Data Scientist, Intact Lab-R
Intact Financial Corporation




De :Steve Loughran 
A : Masood Krohy 
Cc :"user@spark.apache.org" 
Date :  2016-10-24 17:09
Objet : Re: Getting the IP address of Spark Driver in yarn-cluster mode




On 24 Oct 2016, at 19:34, Masood Krohy  wrote:

Hi everyone, 
Is there a way to set the IP address/hostname that the Spark Driver is 
going to be running on when launching a program through spark-submit in 
yarn-cluster mode (PySpark 1.6.0)? 
I do not see an option for this. If not, is there a way to get this IP 
address after the Spark app has started running? (through an API call at 
the beginning of the program to be used in the rest of the program). 
spark-submit outputs “ApplicationMaster host: 10.0.0.9” in the console 
(and changes on every run due to yarn cluster mode) and I am wondering if 
this can be accessed within the program. It does not seem to me that a 
YARN node label can be used to tie the Spark Driver/AM to a node, while 
allowing the Executors to run on all the nodes. 



you can grab it off the YARN API itself; there's a REST view as well as a 
fussier RPC level. That is, assuming you want the web view, which is what 
is registered. 

If you know the application ID, you can also construct a URL through the 
YARN proxy; any attempt to talk direct to the AM is going to get 302'd 
back there anyway so any kerberos credentials can be verified.





Re: Zero Data Loss in Spark with Kafka

2016-10-25 Thread Sunita Arvind
Hello Experts,

I am trying to use the saving to ZK design. Just saw Sudhir's comments that
it is old approach. Any reasons for that? Any issues observed with saving
to ZK. The way we are planning to use it is:
1. Following http://aseigneurin.github.io/2016/05/07/spark-kafka-
achieving-zero-data-loss.html
2. Saving to the same file with offsetRange as a part of the file. We hope
that there are no partial writes/ overwriting is possible and offsetRanges

However I have below doubts which I couldnt figure out from the code here -
https://github.com/ippontech/spark-kafka-source/blob/
master/src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
1. The brokerId is not part of the OffsetRange. How will just the
partitionId:FromOffset stay unique in a cluster with multiple brokers and
multiple partitions/topic.
2. Do we have to specify zkPath to include the partitionid. I tried using
the ZookeeperOffsetStore as is and it required me to specify the
partitionId:

val offsetsStore = new
ZooKeeperOffsetsStore(conf.getString("zkHosts"), "/consumers/topics/"+
topics + "/0")

For our usecases it is too limiting to include partitionId in the path.
To get it to work by automatically detecting the existing partitions
for a given topic, I changed it as below (inspired from
http://www.programcreek.com/java-api-examples/index.php?api=kafka.utils.ZKGroupTopicDirs):

/**
  * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
  * groupID consumer group to get offsets for
  * topic topic to get offsets for
  * return - mapping of (topic and) partition to offset
  */
private def getOffsets(groupID :String, topic: String):Option[String] = {
  val topicDirs = new ZKGroupTopicDirs(groupID, topic)
  val offsets = new mutable.HashMap[TopicAndPartition,Long]()
  val topicSeq = List(topic).toSeq
 // try {
val partitions = ZkUtils.getPartitionsForTopics(zkClient, topicSeq)
var partition:Object=null
for (partition <- partitions) {
  val partitionOffsetPath:String = topicDirs.consumerOffsetDir +
"/" + partition;
  val maybeOffset:Option[String] =
ZkUtils.readDataMaybeNull(zkClient, partitionOffsetPath)._1;
  val offset:Long = if(maybeOffset.isDefined)
maybeOffset.get.toLong else 0L;
  val topicAndPartition:TopicAndPartition  = new
TopicAndPartition(topic, Integer.parseInt(partition.toString));
  offsets.put(topicAndPartition, offset)
}
  //}
Option(offsets.mkString(","))
}

// Read the previously saved offsets from Zookeeper
override def readOffsets: Option[Map[TopicAndPartition, Long]] = {

  LogHandler.log.info("Reading offsets from ZooKeeper")

  val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
  val start = System.currentTimeMillis()
  offsetsRangesStrOpt match {
case Some(offsetsRangesStr) =>
  LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}")

  val offsets = offsetsRangesStr.split(",")
.map(s => s.split(":"))
.map { case Array(partitionStr, offsetStr) =>
(TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
.toMap

  LogHandler.log.info("Done reading offsets from ZooKeeper. Took "
+ (System.currentTimeMillis() - start))

  Some(offsets)
case None =>
  LogHandler.log.info("No offsets found in ZooKeeper. Took " +
(System.currentTimeMillis() - start))
  None
  }

}

However, I am concerned if the saveOffsets will work well with this
approach. Thats when I realized we are not considering brokerIds which
storing offsets and probably the OffsetRanges does not have it either. It
can only provide Topic, partition, from and until offsets.

I am probably missing something very basic. Probably the library works well
by itself. Can someone/ Cody explain?

Cody, Thanks a lot for sharing your work.

regards
Sunita

On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger  wrote:

> See
> https://github.com/koeninger/kafka-exactly-once
> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed" 
> wrote:
>
>> Hi Experts,
>>
>> I am looking for some information on how to acheive zero data loss while
>> working with kafka and Spark. I have searched online and blogs have
>> different answer. Please let me know if anyone has idea on this.
>>
>> Blog 1:
>> https://databricks.com/blog/2015/01/15/improved-driver-fault
>> -tolerance-and-zero-data-loss-in-spark-streaming.html
>>
>>
>> Blog2:
>> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
>> g-zero-data-loss.html
>>
>>
>> Blog one simply says configuration change with checkpoint directory and
>> blog 2 give details about on how to save offsets to zoo keeper. can you
>> please help me out with right approach.
>>
>> Thanks,
>> Asmath
>>
>>
>>


Getting only results out of Spark Shell

2016-10-25 Thread Mich Talebzadeh
Is it possible using Spark Shell to printout the actual output without
commands passed through?

Below all I am interested are those three numbers in red

Spark context Web UI available at http://50.140.197.217:5
Spark context available as 'sc' (master = local, app id =
local-1477410914051).
Spark session available as 'spark'.
Loading ./stocks.scala...
import org.apache.spark.sql.functions._
import java.util.Calendar
import org.joda.time._
import java.util.Calendar
import org.joda.time._
ticker: String = tsco

today: org.joda.time.DateTime = 2016-10-25T16:55:17.261+01:00
df1: org.apache.spark.sql.DataFrame = _c0: string, _c1: string ... 6 more
fields
defined class columns
df2: org.apache.spark.sql.Datasetcolumns = Stock: string, Ticker: string
... 6 more fields
changeToDate: (TradeDate: String)org.apache.spark.sql.Column
rs: org.apache.spark.sql.Datasetorg.apache.spark.sql.Row =
AverageDailyPrice: double
328.0
327.13
325.63

I can do it in shell but there must be a way of running the commands
silently?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Spark Sql - "broadcast-exchange-1" java.lang.OutOfMemoryError: Java heap space

2016-10-25 Thread Selvam Raman
Hi,

Need a help to figure out and solve heap space problem.

I have query which contains 15+ table and when i trying to print out the
result(Just 23 rows) it throws heap space error.

Following command i tried in standalone mode:
(My mac machine having 8 core and 15GB ram)

spark.conf().set("spark.sql.shuffle.partitions", 20);

./spark-submit --master spark://selva:7077 --executor-memory 2g
--total-executor-cores 4 --class MemIssue --conf
'spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+UseG1GC
-XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
-XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark'
/Users/rs/Desktop/test.jar

This is my below query:

select concat(sf1.scode, ''-'', m.mcode, ''-'', rf.rnum) , sf1.scode ,
concat(p.lname,'', '',ci.pyear), at.atext Alias, m.mcode Method, mt.mcode,
v.vname, nd.vmeas " +

" from  result r " +

"  join  var v on v.vnum = r.vnum " +

"  join  numa nd on nd.rnum = r.num " +

"  join  feat  fa on fa.fnum = r.fnum " +

"  join  samp  sf1 on sf1.snum = fa.snum " +

"  join  spe  sp on sf1.snum = sp.snum and sp.mnum not in
(1,2)" +

"  join  act  a on a.anum = fa.anum " +

"  join  met  m on m.mnum = a.mnum " +

"  join  sampl  sfa on sfa.snum = sf1.snum " +

"  join  ann  at on at.anum = sfa.anum AND at.atypenum = 11 " +

"  join  data  dr on r.rnum = dr.rnum " +

"  join  cit  cd on dr.dnum = cd.dnum " +

"  join  cit  on cd.cnum = ci.cnum " +

"  join  aut  al on ci.cnum = al.cnum and al.aorder = 1 " +

"  join  per  p on al.pnum = p.pnum " +

"  left join  rel  rf on sf1.snum = rf.snum " +

"  left join  samp sf2 on rf.rnum = sf2.snum " +

"  left join  spe  s on s.snum = sf1.snum " +

"  left join  mat  mt on mt.mnum = s.mnum " +

" where sf1.sampling_feature_code = '1234test''" +

" order by 1,2


spark.sql(query).show


When i checked wholstagecode, first it reads all data from the table. Why
it is reading all the data from table and doing sort merge join for 3 or 4
tables. Why it is not applying any filtering value.


Though i have given large memory for executor it is still throws the same
error. when spark sql do the joining how it is utilizing memory and cores.

Any guidelines would be greatly welcome.
-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Transforming Spark SQL AST with extraOptimizations

2016-10-25 Thread Michael David Pedersen
Hi,

I'm wanting to take a SQL string as a user input, then transform it before
execution. In particular, I want to modify the top-level projection (select
clause), injecting additional columns to be retrieved by the query.

I was hoping to achieve this by hooking into Catalyst using
sparkSession.experimental.extraOptimizations. I know that what I'm
attempting isn't strictly speaking an optimisation (the transformation
changes the semantics of the SQL statement), but the API still seems
suitable. However, my transformation seems to be ignored by the query
executor.

Here is a minimal example to illustrate the issue I'm having. First define
a row case class:

case class TestRow(a: Int, b: Int, c: Int)

Then define an optimisation rule which simply discards any projection:

object RemoveProjectOptimisationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case x: Project => x.child
}
}

Now create a dataset, register the optimisation, and run a SQL query:

// Create a dataset and register table.
val dataset = List(TestRow(1, 2, 3)).toDS()
val tableName: String = "testtable"
dataset.createOrReplaceTempView(tableName)

// Register "optimisation".
sparkSession.experimental.extraOptimizations =
Seq(RemoveProjectOptimisationRule)

// Run query.
val projected = sqlContext.sql("SELECT a FROM " + tableName + " WHERE a =
1")

// Print query result and the queryExecution object.
println("Query result:")
projected.collect.foreach(println)
println(projected.queryExecution)

Here is the output:

Query result:
[1]

== Parsed Logical Plan ==
'Project ['a]
+- 'Filter ('a = 1)
   +- 'UnresolvedRelation `testtable`

== Analyzed Logical Plan ==
a: int
Project [a#3]
+- Filter (a#3 = 1)
   +- SubqueryAlias testtable
  +- LocalRelation [a#3, b#4, c#5]

== Optimized Logical Plan ==
Filter (a#3 = 1)
+- LocalRelation [a#3, b#4, c#5]

== Physical Plan ==
*Filter (a#3 = 1)
+- LocalTableScan [a#3, b#4, c#5]

We see that the result is identical to that of the original SQL statement,
without the transformation applied. Yet, when printing the logical and
physical plans, the projection has indeed been removed. I've also confirmed
(through debug log output) that the transformation is indeed being invoked.

Any suggestions as to what's going on here? Maybe the optimiser simply
ignores "optimisations" that change semantics?

If using the optimisations isn't the way to go, can anybody suggest an
alternative? All I really want to do is parse the input SQL statement,
transform it, and pass the transformed AST to Spark for execution. But as
far as I can see, the APIs for doing this are private to the Spark sql
package. It may be possible to use reflection, but I'd like to avoid that.

Any pointers would be much appreciated.

Cheers,
Michael

PS: I've previously posted this on StackOverflow, here:
http://stackoverflow.com/questions/40235566/transforming-spark-sql-ast-with-extraoptimizations
.


How can I log the moment an action is called on a DataFrame?

2016-10-25 Thread coldhyll
Hello,

I'm building an ML Pipeline which extract features from a DataFrame and I'd
like it to behave like the following :

Log "Extracting feature 1"
Extract feature 1
Log "Extracting feature 2"
Extract feature 2
...
Log "Extracting feature n"
Extract feature n

The things is, transformations being lazy, I end up with the following :

Log "Extracting feature 1"
Log "Extracting feature 2"
Log "Extracting feature n"
Extract feature 1
Extract feature 2
...
Extract feature n

My transform method looks a bit like that :

override def transform(dataset: DataFrame): DataFrame = {
   var joinedDataFrame = extract(dataset, featuresToExtract head)

   for (featureToExtract <- featuresToExtract.tail) {
 // LOGGING HERE THAT I WANT CALLED JUST BEFORE THE CORRESPONDING
TRANSFORMATION
 joinedDataFrame = joinedDataFrame.join(extract(dataset,
featureToExtract), joinOn, "outer")
   }
   joinedDataFrame
}

Any idea on how to proceed ?

Thanks

[original question :
http://stackoverflow.com/questions/40157032/how-can-i-log-the-moment-an-action-is-called-on-a-dataframe]



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-log-the-moment-an-action-is-called-on-a-DataFrame-tp27962.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Need help with SVM

2016-10-25 Thread Aseem Bansal
Is there any labeled point with label 0 in your dataset?

On Tue, Oct 25, 2016 at 2:13 AM, aditya1702  wrote:

> Hello,
> I am using linear SVM to train my model and generate a line through my
> data.
> However my model always predicts 1 for all the feature examples. Here is my
> code:
>
> print data_rdd.take(5)
> [LabeledPoint(1.0, [1.9643,4.5957]), LabeledPoint(1.0, [2.2753,3.8589]),
> LabeledPoint(1.0, [2.9781,4.5651]), LabeledPoint(1.0, [2.932,3.5519]),
> LabeledPoint(1.0, [3.5772,2.856])]
>
> 
> 
> from pyspark.mllib.classification import SVMWithSGD
> from pyspark.mllib.linalg import Vectors
> from sklearn.svm import SVC
> data_rdd=x_df.map(lambda x:LabeledPoint(x[1],x[0]))
>
> model = SVMWithSGD.train(data_rdd, iterations=1000,regParam=1)
>
> X=x_df.map(lambda x:x[0]).collect()
> Y=x_df.map(lambda x:x[1]).collect()
>
> 
> 
> pred=[]
> for i in X:
>   pred.append(model.predict(i))
> print pred
>
> [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
> 1]
>
>
> My dataset is as follows:
>  file/n27955/Screen_Shot_2016-10-25_at_2.png>
>
>
> Can someone please help?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Need-help-with-SVM-tp27955.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark streaming communication with different versions of kafka

2016-10-25 Thread Cody Koeninger
Kafka consumers should be backwards compatible with kafka brokers, so
at the very least you should be able to use the
streaming-spark-kafka-0-10 to do what you're talking about.

On Tue, Oct 25, 2016 at 4:30 AM, Prabhu GS  wrote:
> Hi,
>
> I would like to know if the same spark streaming job can consume from kafka
> 0.8.1 and write the data to kafka 0.9. Just trying to replicate the kafka
> server.
>
> Yes, Kafka's MirrorMaker can be used to replicate, but was curious to know
> if that can be achieved by spark streaming.
>
> Please share your thoughts
>
> --
> Prabhu
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



i get the error of Py4JJavaError: An error occurred while calling o177.showString while running code below

2016-10-25 Thread muhammet pakyürek
i used spark 2.0.1 and work pypsaprk.sql dataframe


lower = arguments["lower"]
lower_udf = udf(lambda x: lower if x

Re: [Spark ML] Using GBTClassifier in OneVsRest

2016-10-25 Thread eliasah
Well as for now, the GBTClassifier is considered as a Predictor and not a
Classifier. That's why you get that error. Unless you'd want to re-write
your own GBTClassifier that extends Classifier there is no solution for now
to use the OneVsAll Estimator on it.

Nevertheless, there is a associated JIRA
https://issues.apache.org/jira/browse/SPARK-16739 to the topic. You can
follow the issue there.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ML-Using-GBTClassifier-in-OneVsRest-tp27933p27961.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 1.2

2016-10-25 Thread ayan guha
Thank you both.

On Tue, Oct 25, 2016 at 11:30 PM, Sean Owen  wrote:

> archive.apache.org will always have all the releases:
> http://archive.apache.org/dist/spark/
>
> On Tue, Oct 25, 2016 at 1:17 PM ayan guha  wrote:
>
>> Just in case, anyone knows how I can download Spark 1.2? It is not
>> showing up in Spark download page drop down
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>


-- 
Best Regards,
Ayan Guha


Re: Making more features in Logistic Regression

2016-10-25 Thread eliasah
Your question isn't clear. Would you care elaborate ? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Making-more-features-in-Logistic-Regression-tp27915p27960.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Proper saving/loading of MatrixFactorizationModel

2016-10-25 Thread eliasah
I know that this haven't been accepted yet but any news on it ? How can we
cache the product and user factor ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Proper-saving-loading-of-MatrixFactorizationModel-tp23952p27959.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 1.2

2016-10-25 Thread Luciano Resende
All previous releases are available on the Release Archives

http://archive.apache.org/dist/spark/

On Tue, Oct 25, 2016 at 2:17 PM, ayan guha  wrote:

> Just in case, anyone knows how I can download Spark 1.2? It is not showing
> up in Spark download page drop down
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Luciano Resende
http://twitter.com/lresende1975
http://lresende.blogspot.com/


Re: Spark 1.2

2016-10-25 Thread Sean Owen
archive.apache.org will always have all the releases:
http://archive.apache.org/dist/spark/

On Tue, Oct 25, 2016 at 1:17 PM ayan guha  wrote:

> Just in case, anyone knows how I can download Spark 1.2? It is not showing
> up in Spark download page drop down
>
>
> --
> Best Regards,
> Ayan Guha
>


Spark 1.2

2016-10-25 Thread ayan guha
Just in case, anyone knows how I can download Spark 1.2? It is not showing
up in Spark download page drop down

-- 
Best Regards,
Ayan Guha


Re: Passing command line arguments to Spark-shell in Spark 2.0.1

2016-10-25 Thread Mich Talebzadeh
Hi,

The correct way of doing it for a String argument is using eche ' ' passing
the string directly as below

spark-shell -i <(echo 'val ticker = "tsco"' ; cat stock.scala)

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 25 October 2016 at 11:23, Mich Talebzadeh 
wrote:

> Hi guys,
>
> Besides using shell parameters is there anyway of passing a parameter to
> Spark-shell like in Zeppelin
>
> val ticker = z.input("Ticker to analyze? default MSFT", "msft").toString
>
> I gather this can be done in Spark shell
>
> export TICKER="msft"
> spark-shell -i <(echo val ticker = $TICKER ; cat )
>
>
> as describe here
> 
>
>
> Thanks
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Passing command line arguments to Spark-shell in Spark 2.0.1

2016-10-25 Thread Mich Talebzadeh
Hi guys,

Besides using shell parameters is there anyway of passing a parameter to
Spark-shell like in Zeppelin

val ticker = z.input("Ticker to analyze? default MSFT", "msft").toString

I gather this can be done in Spark shell

export TICKER="msft"
spark-shell -i <(echo val ticker = $TICKER ; cat )


as describe here



Thanks




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Spark streaming communication with InfluxDB

2016-10-25 Thread Gioacchino

Hi,


I wouild like to know if there is code example to write data in InfluxDB 
from Spark Streaming in Scala / Python.



Thanks in advance

Gioacchino


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark streaming communication with different versions of kafka

2016-10-25 Thread Prabhu GS
Hi,

I would like to know if the same spark streaming job can consume from kafka
0.8.1 and write the data to kafka 0.9. Just trying to replicate the kafka
server.

Yes, Kafka's MirrorMaker can be used to replicate, but was curious to know
if that can be achieved by spark streaming.

Please share your thoughts

-- 
Prabhu


Re: Spark SQL is slower when DataFrame is cache in Memory

2016-10-25 Thread Chin Wei Low
Hi Kazuaki,

I print a debug log right before I call the collect, and use that to
compare against the job start log (it is available when turning on debug
log).
Anyway, I test that in Spark 2.0.1 and never see it happen. But, the query
on cached dataframe is still slightly slower than the one without cached
when it is running on Spark 2.0.1.

Regards,
Low Chin Wei

On Tue, Oct 25, 2016 at 3:39 AM, Kazuaki Ishizaki 
wrote:

> Hi Chin Wei,
> I am sorry for being late to reply.
>
> Got it. Interesting behavior. How did you measure the time between 1st and
> 2nd events?
>
> Best Regards,
> Kazuaki Ishizaki
>
>
>
> From:Chin Wei Low 
> To:Kazuaki Ishizaki/Japan/IBM@IBMJP
> Cc:user@spark.apache.org
> Date:2016/10/10 11:33
>
> Subject:Re: Spark SQL is slower when DataFrame is cache in Memory
> --
>
>
>
> Hi Ishizaki san,
>
> Thanks for the reply.
>
> So, when I pre-cache the dataframe, the cache is being used during the job
> execution.
>
> Actually there are 3 events:
> 1. call res.collect
> 2. job started
> 3. job completed
>
> I am concerning about the longer time taken between 1st and 2nd events. It
> seems like the query planning and optimization is longer when query on
> cached dataframe.
>
>
> Regards,
> Chin Wei
>
> On Fri, Oct 7, 2016 at 10:14 PM, Kazuaki Ishizaki <*ishiz...@jp.ibm.com*
> > wrote:
> Hi Chin Wei,
> Yes, since you force to create a cache by executing df.count, Spark starts
> to get data from cache for the following task:
> val res = sqlContext.sql("table1 union table2 union table3")
> res.collect()
>
> If you insert 'res.explain', you can confirm which resource you use to get
> data, cache or parquet?
> val res = sqlContext.sql("table1 union table2 union table3")
> res.explain(true)
> res.collect()
>
> Do I make some misunderstandings?
>
> Best Regards,
> Kazuaki Ishizaki
>
>
>
> From:Chin Wei Low <*lowchin...@gmail.com* >
> To:Kazuaki Ishizaki/Japan/IBM@IBMJP
> Cc:*user@spark.apache.org* 
> Date:2016/10/07 20:06
> Subject:Re: Spark SQL is slower when DataFrame is cache in Memory
>
> --
>
>
>
> Hi Ishizaki san,
>
> So there is a gap between res.collect
> and when I see this log:   spark.SparkContext: Starting job: collect at
> :26
>
> What you mean is, during this time Spark already start to get data from
> cache? Isn't it should only get the data after the job is started and tasks
> are distributed?
>
> Regards,
> Chin Wei
>
>
> On Fri, Oct 7, 2016 at 3:43 PM, Kazuaki Ishizaki <*ishiz...@jp.ibm.com*
> > wrote:
> Hi,
> I think that the result looks correct. The current Spark spends extra time
> for getting data from a cache. There are two reasons. One is for a
> complicated path to get a data. The other is for decompression in the case
> of a primitive type.
> The new implementation (*https://github.com/apache/spark/pull/15219*
> ) is ready for review. It
> would achieve 1.2x performance improvement for a compressed column and much
> performance improvement for an uncompressed column.
>
> Best Regards,
> Kazuaki Ishizaki
>
>
>
> From:Chin Wei Low <*lowchin...@gmail.com* >
> To:*user@spark.apache.org* 
> Date:2016/10/07 13:05
> Subject:Spark SQL is slower when DataFrame is cache in Memory
> --
>
>
>
>
> Hi,
>
> I am using Spark 1.6.0. I have a Spark application that create and cache
> (in memory) DataFrames (around 50+, with some on single parquet file and
> some on folder with a few parquet files) with the following codes:
>
> val df = sqlContext.read.parquet
> df.persist
> df.count
>
> I union them to 3 DataFrames and register that as temp table.
>
> Then, run the following codes:
> val res = sqlContext.sql("table1 union table2 union table3")
> res.collect()
>
> The res.collect() is slower when I cache the DataFrame compare to without
> cache. e.g. 3 seconds vs 1 second
>
> I turn on the DEBUG log and see there is a gap from the res.collect() to
> start the Spark job.
>
> Is the extra time taken by the query planning & optimization? It does not
> show the gap when I do not cache the dataframe.
>
> Anything I am missing here?
>
> Regards,
> Chin Wei
>
>
>
>
>
>


Re: java.lang.NoSuchMethodError - GraphX

2016-10-25 Thread Brian Wilson
I have discovered that this dijkstra's function was written for scala 1.6. The 
remainder of my code is 2.11.

I have checked the functions within the dijkstra function and can’t see any 
that are illegal. For example `mapVertices`, `aggregateMessages` and 
`outerJoinVertices` are all being used correctly.

What else could this be?

Thanks

Brian

> On 25 Oct 2016, at 08:47, Brian Wilson  wrote:
> 
> Thank you Michael! This looks perfect but I have a `NoSuchMethodError` that I 
> cannot understand. 
> 
> I am trying to implement a weighted shortest path algorithm from your `Spark 
> GraphX in Action` book. The part in question is Listing 6.4 "Executing the 
> shortest path algorithm that uses breadcrumbs"  from Chapter 6 [here][1].
> 
> I have my own graph that I create from two RDDs. There are `344436` vertices 
> and `772983` edges. I can perform an unweighted shortest path computation 
> using the native GraphX library and I'm confident in the graph construction. 
> 
> In this case I use their Dijkstra's implementation as follows:
> 
> val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, 
> edgesRDD).cache()
>   
>   def dijkstra[VD](g:Graph[VD,Double], origin:VertexId) = {
>   var g2 = g.mapVertices(
>   (vid,vd) => (false, if (vid == origin) 0 else 
> Double.MaxValue,
>   
> List[VertexId]()))
> 
>   for (i <- 1L to g.vertices.count-1) {
>   val currentVertexId =
>   g2.vertices.filter(!_._2._1)
>   
> .fold((0L,(false,Double.MaxValue,List[VertexId](((a,b) =>
>   if (a._2._2 < b._2._2) 
> a else b)
>   ._1
> 
>   val newDistances = 
> g2.aggregateMessages[(Double,List[VertexId])](
>   ctx => if (ctx.srcId == 
> currentVertexId)
>
> ctx.sendToDst((ctx.srcAttr._2 + ctx.attr,
>   
> ctx.srcAttr._3 :+ ctx.srcId)),
>   (a,b) => if (a._1 < b._1) a 
> else b)
> 
>   g2 = g2.outerJoinVertices(newDistances)((vid, 
> vd, newSum) => {
>   val newSumVal =
>   
> newSum.getOrElse((Double.MaxValue,List[VertexId]()))
>   (vd._1 || vid == currentVertexId,
>   math.min(vd._2, newSumVal._1),
>   if (vd._2 < newSumVal._1) vd._3 else 
> newSumVal._2)})
>   }
>   
>   g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
>   (vd, 
> dist.getOrElse((false,Double.MaxValue,List[VertexId]()))
>.productIterator.toList.tail))
>   }
> 
>   //  Path Finding - random node from which to find all paths
>   val v1 = 400028222916L
> 
> I then call their function with my graph and a random vertex ID. Previously I 
> had issues with `v1` not being recognised as `long` type and the `L` suffix 
> solved this.
> 
>   val results = dijkstra(my_graph, 1L).vertices.map(_._2).collect
>   
>   println(results)
>   
> However, this returns the following:
> 
> Error: Exception in thread "main" java.lang.NoSuchMethodError: 
> scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
>   at GraphX$.dijkstra$1(GraphX.scala:51)
>   at GraphX$.main(GraphX.scala:85)
>   at GraphX.main(GraphX.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
>   at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 
> Line 51 refers to the line `var g2 = g.mapVertices(`
> Line 85 refers to the line `val results = dijkstra(my_graph, 
> 1L).vertices.map(_._2).collect`
> 
> What method is this exception referring to? I am able to package with `sbt` 
> without error and I canno see what method I am 

Re: java.lang.NoSuchMethodError - GraphX

2016-10-25 Thread Brian Wilson
Thank you Michael! This looks perfect but I have a `NoSuchMethodError` that I 
cannot understand. 

I am trying to implement a weighted shortest path algorithm from your `Spark 
GraphX in Action` book. The part in question is Listing 6.4 "Executing the 
shortest path algorithm that uses breadcrumbs"  from Chapter 6 [here][1].

I have my own graph that I create from two RDDs. There are `344436` vertices 
and `772983` edges. I can perform an unweighted shortest path computation using 
the native GraphX library and I'm confident in the graph construction. 

In this case I use their Dijkstra's implementation as follows:

val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, 
edgesRDD).cache()

def dijkstra[VD](g:Graph[VD,Double], origin:VertexId) = {
var g2 = g.mapVertices(
(vid,vd) => (false, if (vid == origin) 0 else 
Double.MaxValue,

List[VertexId]()))

for (i <- 1L to g.vertices.count-1) {
val currentVertexId =
g2.vertices.filter(!_._2._1)

.fold((0L,(false,Double.MaxValue,List[VertexId](((a,b) =>
if (a._2._2 < b._2._2) 
a else b)
._1

val newDistances = 
g2.aggregateMessages[(Double,List[VertexId])](
ctx => if (ctx.srcId == 
currentVertexId)
 
ctx.sendToDst((ctx.srcAttr._2 + ctx.attr,

ctx.srcAttr._3 :+ ctx.srcId)),
(a,b) => if (a._1 < b._1) a 
else b)

g2 = g2.outerJoinVertices(newDistances)((vid, 
vd, newSum) => {
val newSumVal =

newSum.getOrElse((Double.MaxValue,List[VertexId]()))
(vd._1 || vid == currentVertexId,
math.min(vd._2, newSumVal._1),
if (vd._2 < newSumVal._1) vd._3 else 
newSumVal._2)})
}

g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
(vd, 
dist.getOrElse((false,Double.MaxValue,List[VertexId]()))
 .productIterator.toList.tail))
}

//  Path Finding - random node from which to find all paths
val v1 = 400028222916L

I then call their function with my graph and a random vertex ID. Previously I 
had issues with `v1` not being recognised as `long` type and the `L` suffix 
solved this.

val results = dijkstra(my_graph, 1L).vertices.map(_._2).collect

println(results)

However, this returns the following:

Error: Exception in thread "main" java.lang.NoSuchMethodError: 
scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
at GraphX$.dijkstra$1(GraphX.scala:51)
at GraphX$.main(GraphX.scala:85)
at GraphX.main(GraphX.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Line 51 refers to the line `var g2 = g.mapVertices(`
Line 85 refers to the line `val results = dijkstra(my_graph, 
1L).vertices.map(_._2).collect`

What method is this exception referring to? I am able to package with `sbt` 
without error and I canno see what method I am calling whcih does not exist. 

Many thanks!

Brian

  [1]: https://www.manning.com/books/spark-graphx-in-action#downloads 


> On 24 Oct 2016, at 16:54, Michael Malak  wrote:
> 
> Chapter 6 of my book implements Dijkstra's Algorithm. The source code is 
> available to download for free. 
> https://www.manning.com/books/spark-graphx-in-action 
> 
> 
> 
> 
> 
> From: Brian Wilson 

Re: [Spark 2.0.1] Error in generated code, possible regression?

2016-10-25 Thread Kazuaki Ishizaki
Can you have a smaller program that can reproduce the same error? If you 
also create a JIRA entry, it would be great.

Kazuaki Ishizaki



From:   Efe Selcuk 
To: "user @spark" 
Date:   2016/10/25 10:23
Subject:[Spark 2.0.1] Error in generated code, possible 
regression?



I have an application that works in 2.0.0 but has been dying at runtime on 
the 2.0.1 distribution.

at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
at 
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at 
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
... 30 more
Caused by: org.codehaus.commons.compiler.CompileException: File 
'generated.java', Line 74, Column 145: Unknown variable or type "value4"

It also includes a massive 1800-line generated code output (which repeats 
over and over, even on 1 thread, which makes this a pain), but fortunately 
the error occurs early so I can give at least some context.

/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificMutableProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificMutableProjection extends 
org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private MutableRow mutableRow;
/* 009 */   private Object[] values;
... // many lines of class variables, mostly errMsg strings and Object[]
/* 071 */   private void apply2_7(InternalRow i) {
/* 072 */
/* 073 */ boolean isNull215 = false;
/* 074 */ final com.mypackage.MyThing value215 = isNull215 ? null : 
(com.mypackage.MyThing) value4._2();
/* 075 */ isNull215 = value215 == null;
/* 076 */
...

As you can see, on line 74 there's a reference to value4 but nothing 
called value4 has been defined. I have no idea of where to even begin 
looking for what caused this, or even whether it's my fault or a bug in 
the code generation. Any help is appreciated.

Efe





Grouping into Arrays

2016-10-25 Thread Matt Smith
I worked up the following for grouping a DataFrame by a key and aggregating
into arrays.  It works, but I think it is horrible.   Is there a better
way?  Especially one that does not require RDDs?  This is a common pattern
we need as we often want to explode JSON arrays, do something to enrich the
data, then collapse it back into a structure similar to pre-exploded, but
with the enriched data.  collect_list seems to be the pattern I am looking
for but it only works with Hive and only with primitives. Help?

thx.

  def groupToArray(df: DataFrame, groupByCols: Seq[String], arrayCol:
String): DataFrame = {
val sourceSchema = df.schema
val arrayField = StructField(arrayCol,
ArrayType(sourceSchema(arrayCol).dataType))
val groupByIndexes = groupByCols.map( colName =>
sourceSchema.fieldIndex(colName))
val arrayIndex = sourceSchema.fieldIndex(arrayCol)
val destSchema = StructType(
  groupByCols.map( colName => sourceSchema(colName)) :+
  arrayField
)
val rowRdd = df
  .rdd
  .groupBy( r => groupByIndexes.map(r(_)) )
  .map{ case (_, rowsIter) =>
  val rowValues = rowsIter.head.toSeq
  val arr = rowsIter.map { r => r(arrayIndex) }
  val keys = groupByIndexes.map( ndx => rowValues(ndx))
  Row.fromSeq(keys :+ arr)
  }

df.sqlContext.createDataFrame(rowRdd, destSchema)
  }


Help regarding reading text file within rdd operations

2016-10-25 Thread Rohit Verma
Hi Team,

Please help me with scenario, I tried on stackoverflow but no response, so 
excuse me for mailing on this thread.

I have two string lists containing text file path, List a, List b.I want to to 
cartesian product of list a,b to achieve a cartesian dataframe comparison.

The way I am trying is first do cartesian product, transfer it to pairRdd and 
then on foreach apply operation. Basically these lists are small, a ~50 
elements, b~1000 elements

 List a = Lists.newList("/data/1.text",/data/2.text","/data/3.text");
 List b = Lists.newList("/data/4.text",/data/5.text","/data/6.text");

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
List> cartesian = cartesian(a,b);
jsc.parallelizePairs(cartesian).filter(new Function, 
Boolean>() {
@Override public Boolean call(Tuple2 tup) throws 
Exception {
Dataset text1 = spark.read().text(tup._1); <-- this throw 
NullPointerException
Dataset text2 = spark.read().text(tup._2);
return text1.first()==text2.first(); <-- this is an indicative 
function only
});
Even I can use spark to do cartesian as, but I believe spark overhead is more 
here.

JavaRDD sourceRdd = jsc.parallelize(a);
JavaRDD allRdd = jsc.parallelize(b);

sourceRdd.cache().cartesian(allRdd).filter(new Function, 
Boolean>() {
@Override public Boolean call(Tuple2 tup) throws 
Exception {
Dataset text1 = spark.read().text(tup._1);  <-- same issue
Dataset text2 = spark.read().text(tup._2);
return text1.first()==text2.first();
}
});
Please suggest good approach to handle this.

Regards
Rohit verma


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org