Re: OutOfDirectMemoryError for Spark 2.2

2018-03-07 Thread Chawla,Sumit
Hi

Anybody got any pointers on this one?

Regards
Sumit Chawla


On Tue, Mar 6, 2018 at 8:58 AM, Chawla,Sumit  wrote:

> No,  This is the only Stack trace i get.  I have tried DEBUG but didn't
> notice much of a log change.
>
> Yes,  I have tried bumping MaxDirectMemorySize to get rid of this error.
> It does work if i throw 4G+ memory at it.  However,  I am trying to
> understand this behavior so that i can setup this number to appropriate
> value.
>
> Regards
> Sumit Chawla
>
>
> On Tue, Mar 6, 2018 at 8:07 AM, Vadim Semenov  wrote:
>
>> Do you have a trace? i.e. what's the source of `io.netty.*` calls?
>>
>> And have you tried bumping `-XX:MaxDirectMemorySize`?
>>
>> On Tue, Mar 6, 2018 at 12:45 AM, Chawla,Sumit 
>> wrote:
>>
>>> Hi All
>>>
>>> I have a job which processes a large dataset.  All items in the dataset
>>> are unrelated.  To save on cluster resources,  I process these items in
>>> chunks.  Since chunks are independent of each other,  I start and shut down
>>> the spark context for each chunk.  This allows me to keep DAG smaller and
>>> not retry the entire DAG in case of failures.   This mechanism used to work
>>> fine with Spark 1.6.  Now,  as we have moved to 2.2,  the job started
>>> failing with OutOfDirectMemoryError error.
>>>
>>> 2018-03-03 22:00:59,687 WARN  [rpc-server-48-1]
>>> server.TransportChannelHandler 
>>> (TransportChannelHandler.java:exceptionCaught(78))
>>> - Exception in connection from /10.66.73.27:60374
>>>
>>> io.netty.util.internal.OutOfDirectMemoryError: failed to allocate
>>> 8388608 byte(s) of direct memory (used: 1023410176, max: 1029177344)
>>>
>>> at io.netty.util.internal.PlatformDependent.incrementMemoryCoun
>>> ter(PlatformDependent.java:506)
>>>
>>> at io.netty.util.internal.PlatformDependent.allocateDirectNoCle
>>> aner(PlatformDependent.java:460)
>>>
>>> at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolAre
>>> na.java:701)
>>>
>>> at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:690)
>>>
>>> at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237)
>>>
>>> at io.netty.buffer.PoolArena.allocate(PoolArena.java:213)
>>>
>>> at io.netty.buffer.PoolArena.allocate(PoolArena.java:141)
>>>
>>> at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(Poole
>>> dByteBufAllocator.java:271)
>>>
>>> at io.netty.buffer.AbstractByteBufAllocator.directBuffer(Abstra
>>> ctByteBufAllocator.java:177)
>>>
>>> at io.netty.buffer.AbstractByteBufAllocator.directBuffer(Abstra
>>> ctByteBufAllocator.java:168)
>>>
>>> at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractBy
>>> teBufAllocator.java:129)
>>>
>>> at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.all
>>> ocate(AdaptiveRecvByteBufAllocator.java:104)
>>>
>>> at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.re
>>> ad(AbstractNioByteChannel.java:117)
>>>
>>> at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEven
>>> tLoop.java:564)
>>>
>>> I got some clue on what is causing this from https://github.com/netty/
>>> netty/issues/6343,  However I am not able to add up numbers on what is
>>> causing 1 GB of Direct Memory to fill up.
>>>
>>> Output from jmap
>>>
>>>
>>> 7: 22230 1422720 io.netty.buffer.PoolSubpage
>>>
>>> 12: 1370 804640 io.netty.buffer.PoolSubpage[]
>>>
>>> 41: 3600 144000 io.netty.buffer.PoolChunkList
>>>
>>> 98: 1440 46080 io.netty.buffer.PoolThreadCache$SubPageMemoryRegionCache
>>>
>>> 113: 300 40800 io.netty.buffer.PoolArena$HeapArena
>>>
>>> 114: 300 40800 io.netty.buffer.PoolArena$DirectArena
>>>
>>> 192: 198 15840 io.netty.buffer.PoolChunk
>>>
>>> 274: 120 8320 io.netty.buffer.PoolThreadCache$MemoryRegionCache[]
>>>
>>> 406: 120 3840 io.netty.buffer.PoolThreadCache$NormalMemoryRegionCache
>>>
>>> 422: 72 3552 io.netty.buffer.PoolArena[]
>>>
>>> 458: 30 2640 io.netty.buffer.PooledUnsafeDirectByteBuf
>>>
>>> 500: 36 2016 io.netty.buffer.PooledByteBufAllocator
>>>
>>> 529: 32 1792 io.netty.buffer.UnpooledUnsafeHeapByteBuf
>>>
>>> 589: 20 1440 io.netty.buffer.PoolThreadCache
>>>
>>> 630: 37 1184 io.netty.buffer.EmptyByteBuf
>>>
>>> 703: 36 864 io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache
>>>
>>> 852: 22 528 io.netty.buffer.AdvancedLeakAwareByteBuf
>>>
>>> 889: 10 480 io.netty.buffer.SlicedAbstractByteBuf
>>>
>>> 917: 8 448 io.netty.buffer.UnpooledHeapByteBuf
>>>
>>> 1018: 20 320 io.netty.buffer.PoolThreadCache$1
>>>
>>> 1305: 4 128 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
>>>
>>> 1404: 1 80 io.netty.buffer.PooledUnsafeHeapByteBuf
>>>
>>> 1473: 3 72 io.netty.buffer.PoolArena$SizeClass
>>>
>>> 1529: 1 64 io.netty.buffer.AdvancedLeakAwareCompositeByteBuf
>>>
>>> 1541: 2 64 io.netty.buffer.CompositeByteBuf$Component
>>>
>>> 1568: 1 56 io.netty.buffer.CompositeByteBuf
>>>
>>> 1896: 1 32 io.netty.buffer.PoolArena$SizeClass[]
>>>
>>> 2042: 1 24 io.netty.buffer.PooledUnsafeDirectByteBuf$1
>>>
>>> 2046: 1 24 

is there a way to catch exceptions on executor level

2018-03-07 Thread Chethan Bhawarlal
Hi Dev,

I am doing spark operations on Rdd level for each row like this,

 private def obj(row: org.apache.spark.sql.Row): Put = {



row.schema.fields.foreach(x => {

  x.dataType match {

   case (StringType)=> //some operation


so, when I get some empty or garbage value my code fails and I am not able
to catch the exceptions as these failures are occurring at executors.


is there a way I can catch these exceptions and accumulate them and print
to driver logs?


any sample examples provided will be of great help.


Thanks,

Chethan.

-- 
Collective[i] dramatically improves sales and marketing performance using 
technology, applications and a revolutionary network designed to provide 
next generation analytics and decision-support directly to business users. 
Our goal is to maximize human potential and minimize mistakes. In most 
cases, the results are astounding. We cannot, however, stop emails from 
sometimes being sent to the wrong person. If you are not the intended 
recipient, please notify us by replying to this email's sender and deleting 
it (and any attachments) permanently from your system. If you are, please 
respect the confidentiality of this communication's contents.


Spark Streaming logging on Yarn : issue with rolling in yarn-client mode for driver log

2018-03-07 Thread chandan prakash
Hi All,
I am running my spark streaming in yarn-client mode.
I want to enable rolling and aggregation  in node manager container.
I am using configs as suggested in spark doc
:
${spark.yarn.app.container.log.dir}/spark.log  in log4j.properties

Also for Aggregation on yarn I have enabled these properties :
spark.yarn.rolledLog.includePattern=spark*
yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds=3600
 on spark and yarn side respectively.

At executors, my logs are getting rolled up and aggregated after every 1
hour as expected.
*But the issue is:*
 for driver,  in yarn-client mode, ${spark.yarn.app.container.log.dir} value
is not available when driver starts and so for driver ,so I am not able to
see driver logs in yarn app container directory.
My restrictions are:
1. want to use yarn-client mode only
2. want to enable logging in yarn container only so that it is aggregated
and backed up by yarn every hour to hdfs/s3

*How can I get a workaround this to enable driver logs rolling and
aggregation as well?*

Any pointers will be helpful.
thanks in advance.

-- 
Chandan Prakash


Re: Spark StreamingContext Question

2018-03-07 Thread रविशंकर नायर
Got it, thanks

On Wed, Mar 7, 2018 at 4:32 AM, Gerard Maas  wrote:

> Hi,
>
> You can run as many jobs in your cluster as you want, provided you have
> enough capacity.
> The one streaming context constrain is per job.
>
> You can submit several jobs for Flume and some other for Twitter, Kafka,
> etc...
>
> If you are getting started with Streaming with Spark, I'd recommend you to
> look into Structured Streaming first.
> In Structured Streaming, you can have many streaming queries running under
> the same spark session.
> Yet, that does not mean you need to put them all in the same job. You can
> (and should) still submit different jobs for different application concerns.
>
> kind regards, Gerard.
>
>
>
> On Wed, Mar 7, 2018 at 4:56 AM, ☼ R Nair (रविशंकर नायर) <
> ravishankar.n...@gmail.com> wrote:
>
>> Hi all,
>>
>> Understand from documentation that, only one streaming context can be
>> active in a JVM at the same time.
>>
>> Hence in an enterprise cluster, how can we manage/handle multiple users
>> are having many different streaming applications, one may be ingesting data
>> from Flume, another from Twitter etc? Is this not available now?
>>
>> Best,
>> Passion
>>
>
>


Re: Reading kafka and save to parquet problem

2018-03-07 Thread Junfeng Chen
I have ever tried to use readStream and writeStream, but it throw "Uri
without authority: hdfs:/data/_spark_metadata" exception, which is not seen
in normal read mode.
The parquet path I specified is  hdfs:///data


Regard,
Junfeng Chen

On Thu, Mar 8, 2018 at 9:38 AM, naresh Goud 
wrote:

> change it to readStream instead of read as below
>
> val df = spark
>   .readStream
> .format("kafka")
>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>   .option("subscribe", "topic1")
>   .load()
>
>
> Check is this helpful
>
> https://github.com/ndulam/KafkaSparkStreams/blob/master/SampleStreamApp/src/main/scala/com/naresh/org/SensorDataSave.scala
>
>
>
>
>
>
>
>
> Thanks,
> Naresh
> www.linkedin.com/in/naresh-dulam
> http://hadoopandspark.blogspot.com/
>
>
>
> On Wed, Mar 7, 2018 at 7:33 PM Junfeng Chen  wrote:
>
>> I am struggling in trying to read data in kafka and save them to parquet
>> file on hdfs by using spark streaming according to this post
>> https://stackoverflow.com/questions/45827664/read-
>> from-kafka-and-write-to-hdfs-in-parquet
>>
>> My code is similar to  following
>>
>> val df = spark
>>   .read
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>>   .option("subscribe", "topic1")
>>   .load()
>> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>>   .as[(String, String)]
>>
>>   .write.parquet("hdfs://data.parquet")
>>
>>
>> What the difference is I am writing in Java language.
>>
>> But in practice, this code just run once and then exit gracefully.
>> Although it produces the parquet file successfully and no any exception is
>> threw out , it runs like a normal spark program rather than a spark
>> streaming program.
>>
>> What should I do if want to read kafka and save them to parquet in batch?
>>
>> Regard,
>> Junfeng Chen
>>
>


Re: Reading kafka and save to parquet problem

2018-03-07 Thread naresh Goud
change it to readStream instead of read as below

val df = spark
  .readStream
.format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()


Check is this helpful

https://github.com/ndulam/KafkaSparkStreams/blob/master/SampleStreamApp/src/main/scala/com/naresh/org/SensorDataSave.scala








Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/



On Wed, Mar 7, 2018 at 7:33 PM Junfeng Chen  wrote:

> I am struggling in trying to read data in kafka and save them to parquet
> file on hdfs by using spark streaming according to this post
> https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet
>
> My code is similar to  following
>
> val df = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>   .option("subscribe", "topic1")
>   .load()
> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>   .as[(String, String)]
>
>   .write.parquet("hdfs://data.parquet")
>
>
> What the difference is I am writing in Java language.
>
> But in practice, this code just run once and then exit gracefully.
> Although it produces the parquet file successfully and no any exception is
> threw out , it runs like a normal spark program rather than a spark
> streaming program.
>
> What should I do if want to read kafka and save them to parquet in batch?
>
> Regard,
> Junfeng Chen
>


Reading kafka and save to parquet problem

2018-03-07 Thread Junfeng Chen
I am struggling in trying to read data in kafka and save them to parquet
file on hdfs by using spark streaming according to this post
https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet

My code is similar to  following

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

  .write.parquet("hdfs://data.parquet")


What the difference is I am writing in Java language.

But in practice, this code just run once and then exit gracefully. Although
it produces the parquet file successfully and no any exception is threw out
, it runs like a normal spark program rather than a spark streaming program.

What should I do if want to read kafka and save them to parquet in batch?

Regard,
Junfeng Chen


Re: dependencies conflict in oozie spark action for spark 2

2018-03-07 Thread Lian Jiang
I found below inconsistency between oozie and spark2 jars:

jackson-core-2.4.4.jar oozie
jackson-core-2.6.5.jar spark2

jackson-databind-2.4.4.jar oozie
jackson-databind-2.6.5.jar spark2

jackson-annotations-2.4.0.jar oozie
jackson-annotations-2.6.5.jar spark2

I removed the lower version jars from oozie. Then spark job cannot
communicate to Yarn due to this error:

18/03/07 18:24:30 INFO Utils: Using initial executors = 0, max of
spark.dynamicAllocation.initialExecutors,
spark.dynamicAllocation.minExecutors and spark.executor.instances
Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.hadoop.yarn.proto.YarnProtos$ResourceProto$Builder.setMemory(J)Lorg/apache/hadoop/yarn/proto/YarnProtos$ResourceProto$Builder;
at
org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl.setMemorySize(ResourcePBImpl.java:78)
at
org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl.setMemory(ResourcePBImpl.java:72)
at
org.apache.hadoop.yarn.api.records.Resource.newInstance(Resource.java:58)
at
org.apache.spark.deploy.yarn.YarnAllocator.(YarnAllocator.scala:140)
at
org.apache.spark.deploy.yarn.YarnRMClient.register(YarnRMClient.scala:77)
at
org.apache.spark.deploy.yarn.ApplicationMaster.registerAM(ApplicationMaster.scala:387)
at
org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:430)
at
org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:282)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:768)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:67)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:66)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
at
org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:766)
at
org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)


Any idea?


On Tue, Mar 6, 2018 at 4:17 PM, Lian Jiang  wrote:

> I am using HDP 2.6.4 and have followed https://docs.hortonworks.com/
> HDPDocuments/HDP2/HDP-2.6.1/bk_spark-component-guide/
> content/ch_oozie-spark-action.html
> to make oozie use spark2.
>
> After this, I found there are still a bunch of issues:
>
> 1. oozie and spark tries to add the same jars multiple time into cache.
> This is resolved by removing the duplicate jars from 
> /user/oozie/share/lib/lib_20180303065325/spark2/
> folder.
>
> 2. jar conflict which is not resolved. The exception is below:
>
> 18/03/06 23:51:18 ERROR ApplicationMaster: User class threw exception:
> java.lang.NoSuchFieldError: USE_DEFAULTS
> java.lang.NoSuchFieldError: USE_DEFAULTS
> at com.fasterxml.jackson.databind.introspect.
> JacksonAnnotationIntrospector.findSerializationInclusion(
> JacksonAnnotationIntrospector.java:498)
> at com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.
> findSerializationInclusion(AnnotationIntrospectorPair.java:332)
> at com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.
> findSerializationInclusion(AnnotationIntrospectorPair.java:332)
> at com.fasterxml.jackson.databind.introspect.BasicBeanDescription.
> findSerializationInclusion(BasicBeanDescription.java:381)
> at com.fasterxml.jackson.databind.ser.PropertyBuilder.<
> init>(PropertyBuilder.java:41)
> at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.
> constructPropertyBuilder(BeanSerializerFactory.java:507)
> at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.
> findBeanProperties(BeanSerializerFactory.java:558)
> at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.
> constructBeanSerializer(BeanSerializerFactory.java:361)
> at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.
> findBeanSerializer(BeanSerializerFactory.java:272)
> at com.fasterxml.jackson.databind.ser.BeanSerializerFactory._
> createSerializer2(BeanSerializerFactory.java:225)
> at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.
> createSerializer(BeanSerializerFactory.java:153)
> at com.fasterxml.jackson.databind.SerializerProvider._
> createUntypedSerializer(SerializerProvider.java:1203)
> at com.fasterxml.jackson.databind.SerializerProvider._
> createAndCacheUntypedSerializer(SerializerProvider.java:1157)
> at com.fasterxml.jackson.databind.SerializerProvider.findValueSerializer(
> SerializerProvider.java:481)
> at com.fasterxml.jackson.databind.SerializerProvider.
> findTypedValueSerializer(SerializerProvider.java:679)
> at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.
> serializeValue(DefaultSerializerProvider.java:107)
> at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(
> ObjectMapper.java:3559)

Issues with large schema tables

2018-03-07 Thread Ballas, Ryan W
Hello All,

Our team is having a lot of issues with the Spark API particularly with large 
schema tables. We currently have a program written in Scala that utilizes the 
Apache spark API to create two tables from raw files. We have one particularly 
very large raw data file that contains around ~4700 columns and ~200,000 rows. 
Every week we get a new file that shows the updates, inserts and deletes that 
happened in the last week. Our program will create two files – a master file 
and a history file. The master file will be the most up to date version of this 
table while the history table shows all changes inserts and updates that 
happened to this table and showing what changed. For example, if we have the 
following schema where A and B are unique:

Week 1  
Week 2
A B C   
   A B C
1  2  3 
 1  2  4

Then the master table will now be
A B C
1  2  4

and History table will be
A B change_column  change_typeold_value 
 new_value
1  2  C  Update 
 3  4

This process is working flawlessly for shorter schema tables. We have a table 
that has 300 columns but over 100,000,000 rows and this code still runs. The 
process above for the larger schema table runs for around 15 hours, and then 
crashes with the following error:

Exception in thread "main" java.lang.StackOverflowError
at scala.collection.generic.Growable$class.loop$1(Growable.scala:52)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:57)
at 
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
at 
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
…

Some other notes… This is running on a very large MAPR cluster. We have tried 
running the job with upwards of ½ a TB of RAM and this still happens. All of 
our other smaller schema tables run except for this one.

Here is a code example that takes around 4 hours to run for this larger table, 
but runs in 20 seconds for other tables:

var dataframe_result = dataframe1.join(broadcast(dataframe2), 
Seq(listOfUniqueIds:_*)).repartition(100).cache()

We have tried all of the following with no success:

  *   Using hash broad-cast joins (dataframe2 is smaller, dataframe1 is huge)
  *   Repartioining on different numbers, as well as not repartitioning at all
  *   Caching the result of the dataframe (we originally did not do this).

What is causing this error and how do we go about fixing it? This code just 
takes in 1 parameter (the table to run) so it’s the exact same code for every 
table. It runs flawlessly for every other table except for this one. The only 
thing different between this table and all the other ones is the number of 
columns. This has the most columns at 4700 where the second most is 800.

If anyone has any ideas on how to fix this it would be greatly appreciated. 
Thank you in advance for the help!!


This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.


Spark-submit Py-files with EMR add step?

2018-03-07 Thread Afshin, Bardia
I’m writing this email to reach out to the community to demisty the py-files 
parameter when working with spark-submit and python projects.

Currently I have a project, say

Src/

  *   Main.py
  *   Modules/module1.py

When I zip up the src directory and submit it to spark via emr add step , the 
namespacing is lost.

Main.py example:
From Modules.module1 import SomeClass

My code returns and error that it cannot find this class, now this works if I 
goto the instance download my project, and submit it to spark from within the 
EMR instance via spark-submit , but not when adding it as a step in emr from 
external call.


Help?

Best,
Bardia
This message is confidential, intended only for the named recipient(s) and may 
contain information that is privileged or exempt from disclosure under 
applicable law. If you are not the intended recipient(s), you are notified that 
the dissemination, distribution, or copying of this message is strictly 
prohibited. If you receive this message in error or are not the named 
recipient(s), please notify the sender by return email and delete this message. 
Thank you.


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-07 Thread kant kodali
It looks to me that the StateStore described in this doc

Actually
has full outer join and every other join is a filter of that. Also the doc
talks about update mode but looks like Spark 2.3 ended up with append mode?
Anyways the moment it is in master I am ready to test so JIRA tickets on
this would help to keep track. please let me know.

Thanks!

On Tue, Mar 6, 2018 at 9:16 PM, kant kodali  wrote:

> Sorry I meant Spark 2.4 in my previous email
>
> On Tue, Mar 6, 2018 at 9:15 PM, kant kodali  wrote:
>
>> Hi TD,
>>
>> I agree I think we are better off either with a full fix or no fix. I am
>> ok with the complete fix being available in master or some branch. I guess
>> the solution for me is to just build from the source.
>>
>> On a similar note, I am not finding any JIRA tickets related to full
>> outer joins and update mode for maybe say Spark 2.3. I wonder how hard is
>> it two implement both of these? It turns out the update mode and full outer
>> join is very useful and required in my case, therefore, I'm just asking.
>>
>> Thanks!
>>
>> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> I thought about it.
>>> I am not 100% sure whether this fix should go into 2.3.1.
>>>
>>> There are two parts to this bug fix to enable self-joins.
>>>
>>> 1. Enabling deduping of leaf logical nodes by extending
>>> MultInstanceRelation
>>>   - This is safe to be backported into the 2.3 branch as it does not
>>> touch production code paths.
>>>
>>> 2. Fixing attribute rewriting in MicroBatchExecution, when the
>>> micro-batch plan is spliced into the streaming plan.
>>>   - This touches core production code paths and therefore, may not safe
>>> to backport.
>>>
>>> Part 1 enables self-joins in all but a small fraction of self-join
>>> queries. That small fraction can produce incorrect results, and part 2
>>> avoids that.
>>>
>>> So for 2.3.1, we can enable self-joins by merging only part 1, but it
>>> can give wrong results in some cases. I think that is strictly worse than
>>> no fix.
>>>
>>> TD
>>>
>>>
>>>
>>> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali  wrote:
>>>
 Hi TD,

 I pulled your commit that is listed on this ticket
 https://issues.apache.org/jira/browse/SPARK-23406 specifically I did
 the following steps and self joins work after I cherry-pick your commit!
 Good Job! I was hoping it will be part of 2.3.0 but looks like it is
 targeted for 2.3.1 :(

 git clone https://github.com/apache/spark.gitcd spark
 git fetch
 git checkout branch-2.3
 git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
 export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
 ./build/mvn -DskipTests compile
 ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
 -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn


 On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> Hey,
>
> Thanks for testing out stream-stream joins and reporting this issue. I
> am going to take a look at this.
>
> TD
>
>
>
> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali 
> wrote:
>
>> if I change it to the below code it works. However, I don't believe
>> it is the solution I am looking for. I want to be able to do it in raw
>> SQL and moreover, If a user gives a big chained raw spark SQL join query 
>> I
>> am not even sure how to make copies of the dataframe to achieve the
>> self-join. Is there any other way here?
>>
>>
>>
>> import org.apache.spark.sql.streaming.Trigger
>>
>> val jdf = 
>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", 
>> "join_test").option("startingOffsets", "earliest").load();
>> val jdf1 = 
>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", 
>> "join_test").option("startingOffsets", "earliest").load();
>>
>> jdf.createOrReplaceTempView("table")
>> jdf1.createOrReplaceTempView("table")
>>
>> val resultdf = spark.sql("select * from table inner join table1 on 
>> table.offset=table1.offset")
>>
>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>
>>
>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali 
>> wrote:
>>
>>> If I change it to this
>>>
>>>
>>>
>>>
>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali 
>>> wrote:
>>>
 Hi All,

 I have the following code

 import 

Re: Do values adjacent to exploded columns get duplicated?

2018-03-07 Thread Anshul Sachdeva
All the columns except exploded column will be duplicated after explode. As
it joins all the value of exploded column list with other columns.

Hope it clears.

Regards
Ansh

On Mar 7, 2018 4:54 PM, "Vitaliy Pisarev" 
wrote:

> This is a fairly basic question but I did not find an answer to it
> anywhere online:
>
> Suppose I have the following data frame (a and b are column names):
>
> a  |   b
> ---
> 1  |[x1,x2,x3,x4] # this is an array
>
>
> Now I explode column b and logically get:
>
> a  |   b
> ---
> 1  |  x1
> 1  |  x2
> 1  |  x3
> 1  |  x4
>
> Are the values in the adjacent columns *actually* duplicated?
>
>


Do values adjacent to exploded columns get duplicated?

2018-03-07 Thread Vitaliy Pisarev
This is a fairly basic question but I did not find an answer to it anywhere
online:

Suppose I have the following data frame (a and b are column names):

a  |   b
---
1  |[x1,x2,x3,x4] # this is an array


Now I explode column b and logically get:

a  |   b
---
1  |  x1
1  |  x2
1  |  x3
1  |  x4

Are the values in the adjacent columns *actually* duplicated?


Thrift server - ODBC

2018-03-07 Thread Paulo Maia da Costa Ribeiro
Hello,

I have Spark 2.2 installed but not Hive and I would like to expose Spark tables 
through ODBC. I am able to start thrift server , with apparently no errors and 
my ODBC driver is able to connect to thrift sever, but can’t see any Spark 
tables. Do I need to have Hive installed in order to my ODBC applications 
access  the Spark tables ?

Thanks 
Paulo 
Sent from my iPhone

Re: Spark StreamingContext Question

2018-03-07 Thread Gerard Maas
Hi,

You can run as many jobs in your cluster as you want, provided you have
enough capacity.
The one streaming context constrain is per job.

You can submit several jobs for Flume and some other for Twitter, Kafka,
etc...

If you are getting started with Streaming with Spark, I'd recommend you to
look into Structured Streaming first.
In Structured Streaming, you can have many streaming queries running under
the same spark session.
Yet, that does not mean you need to put them all in the same job. You can
(and should) still submit different jobs for different application concerns.

kind regards, Gerard.



On Wed, Mar 7, 2018 at 4:56 AM, ☼ R Nair (रविशंकर नायर) <
ravishankar.n...@gmail.com> wrote:

> Hi all,
>
> Understand from documentation that, only one streaming context can be
> active in a JVM at the same time.
>
> Hence in an enterprise cluster, how can we manage/handle multiple users
> are having many different streaming applications, one may be ingesting data
> from Flume, another from Twitter etc? Is this not available now?
>
> Best,
> Passion
>


Re: CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning

2018-03-07 Thread Tathagata Das
These issues have likely been solved in future versions. Please use the
latest release - Spark 2.3.0.

On Tue, Mar 6, 2018 at 5:11 PM, Junfeng Chen  wrote:

> Spark 2.1.1.
>
> Actually it is a warning rather than an exception, so there is no stack
> trace. Just many this line:
>
>> CachedKafkaConsumer: CachedKafkaConsumer is not running in
>> UninterruptibleThread. It may hang when CachedKafkaConsumer's method are
>> interrupted because of KAFKA-1894.
>
>
>
> Regard,
> Junfeng Chen
>
> On Wed, Mar 7, 2018 at 3:34 AM, Tathagata Das  > wrote:
>
>> Which version of Spark are you using? And can you give us the full stack
>> trace of the exception?
>>
>> On Tue, Mar 6, 2018 at 1:53 AM, Junfeng Chen  wrote:
>>
>>> I am trying to read kafka and save the data as parquet file on hdfs
>>> according to this  https://stackoverflow.co
>>> m/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet
>>> 
>>>
>>>
>>> The code is similar to :
>>>
>>> val df = spark
>>>   .read
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>>>   .option("subscribe", "topic1")
>>>   .load()
>>>
>>> while I am writing in Java.
>>>
>>> However, I keep throwing the following warning:
>>> CachedKafkaConsumer: CachedKafkaConsumer is not running in
>>> UninterruptibleThread. It may hang when CachedKafkaConsumer's method are
>>> interrupted because of KAFKA-1894.
>>>
>>> How to solve it? Thanks!
>>>
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>
>>
>


Re: Spark StreamingContext Question

2018-03-07 Thread sagar grover
Hi,
You can have multiple streams under same streaming context and process them
accordingly.

With regards,
Sagar Grover
Phone - 7022175584

On Wed, Mar 7, 2018 at 9:26 AM, ☼ R Nair (रविशंकर नायर) <
ravishankar.n...@gmail.com> wrote:

> Hi all,
>
> Understand from documentation that, only one streaming context can be
> active in a JVM at the same time.
>
> Hence in an enterprise cluster, how can we manage/handle multiple users
> are having many different streaming applications, one may be ingesting data
> from Flume, another from Twitter etc? Is this not available now?
>
> Best,
> Passion
>