Re: Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder

2018-03-27 Thread Gourav Sengupta
Hi,


As per documentation in:
https://spark.apache.org/docs/latest/configuration.html


spark.local.dir /tmp Directory to use for "scratch" space in Spark,
including map output files and RDDs that get stored on disk. This should be
on a fast, local disk in your system. It can also be a comma-separated list
of multiple directories on different disks. NOTE: In Spark 1.0 and later
this will be overridden by SPARK_LOCAL_DIRS (Standalone, Mesos) or
LOCAL_DIRS (YARN) environment variables set by the cluster manager.

Regards,
Gourav Sengupta





On Mon, Mar 26, 2018 at 8:28 PM, Michael Shtelma  wrote:

> Hi Keith,
>
> Thanks  for the suggestion!
> I have solved this already.
> The problem was, that the yarn process was not responding to
> start/stop commands and has not applied my configuration changes.
> I have killed it and restarted my cluster, and after that yarn has
> started using yarn.nodemanager.local-dirs parameter defined in
> yarn-site.xml.
> After this change, -Djava.io.tmpdir for the spark executor was set
> correctly,  according to yarn.nodemanager.local-dirs parameter.
>
> Best,
> Michael
>
>
> On Mon, Mar 26, 2018 at 9:15 PM, Keith Chapman 
> wrote:
> > Hi Michael,
> >
> > sorry for the late reply. I guess you may have to set it through the hdfs
> > core-site.xml file. The property you need to set is "hadoop.tmp.dir"
> which
> > defaults to "/tmp/hadoop-${user.name}"
> >
> > Regards,
> > Keith.
> >
> > http://keith-chapman.com
> >
> > On Mon, Mar 19, 2018 at 1:05 PM, Michael Shtelma 
> wrote:
> >>
> >> Hi Keith,
> >>
> >> Thank you for the idea!
> >> I have tried it, so now the executor command is looking in the following
> >> way :
> >>
> >> /bin/bash -c /usr/java/latest//bin/java -server -Xmx51200m
> >> '-Djava.io.tmpdir=my_prefered_path'
> >>
> >> -Djava.io.tmpdir=/tmp/hadoop-msh/nm-local-dir/usercache/
> msh/appcache/application_1521110306769_0041/container_
> 1521110306769_0041_01_04/tmp
> >>
> >> JVM is using the second Djava.io.tmpdir parameter and writing
> >> everything to the same directory as before.
> >>
> >> Best,
> >> Michael
> >> Sincerely,
> >> Michael Shtelma
> >>
> >>
> >> On Mon, Mar 19, 2018 at 6:38 PM, Keith Chapman  >
> >> wrote:
> >> > Can you try setting spark.executor.extraJavaOptions to have
> >> > -Djava.io.tmpdir=someValue
> >> >
> >> > Regards,
> >> > Keith.
> >> >
> >> > http://keith-chapman.com
> >> >
> >> > On Mon, Mar 19, 2018 at 10:29 AM, Michael Shtelma  >
> >> > wrote:
> >> >>
> >> >> Hi Keith,
> >> >>
> >> >> Thank you for your answer!
> >> >> I have done this, and it is working for spark driver.
> >> >> I would like to make something like this for the executors as well,
> so
> >> >> that the setting will be used on all the nodes, where I have
> executors
> >> >> running.
> >> >>
> >> >> Best,
> >> >> Michael
> >> >>
> >> >>
> >> >> On Mon, Mar 19, 2018 at 6:07 PM, Keith Chapman
> >> >> 
> >> >> wrote:
> >> >> > Hi Michael,
> >> >> >
> >> >> > You could either set spark.local.dir through spark conf or
> >> >> > java.io.tmpdir
> >> >> > system property.
> >> >> >
> >> >> > Regards,
> >> >> > Keith.
> >> >> >
> >> >> > http://keith-chapman.com
> >> >> >
> >> >> > On Mon, Mar 19, 2018 at 9:59 AM, Michael Shtelma <
> mshte...@gmail.com>
> >> >> > wrote:
> >> >> >>
> >> >> >> Hi everybody,
> >> >> >>
> >> >> >> I am running spark job on yarn, and my problem is that the
> >> >> >> blockmgr-*
> >> >> >> folders are being created under
> >> >> >> /tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/
> application_id/*
> >> >> >> The size of this folder can grow to a significant size and does
> not
> >> >> >> really fit into /tmp file system for one job, which makes a real
> >> >> >> problem for my installation.
> >> >> >> I have redefined hadoop.tmp.dir in core-site.xml and
> >> >> >> yarn.nodemanager.local-dirs in yarn-site.xml pointing to other
> >> >> >> location and expected that the block manager will create the files
> >> >> >> there and not under /tmp, but this is not the case. The files are
> >> >> >> created under /tmp.
> >> >> >>
> >> >> >> I am wondering if there is a way to make spark not use /tmp at all
> >> >> >> and
> >> >> >> configure it to create all the files somewhere else ?
> >> >> >>
> >> >> >> Any assistance would be greatly appreciated!
> >> >> >>
> >> >> >> Best,
> >> >> >> Michael
> >> >> >>
> >> >> >>
> >> >> >> 
> -
> >> >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >> >>
> >> >> >
> >> >
> >> >
> >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: java.lang.UnsupportedOperationException: CSV data source does not support struct/ERROR RetryingBlockFetcher

2018-03-27 Thread Mina Aslani
Hi Naresh,

Thank you for the quick response, appreciate it.
Removing the option("header","true") and trying

df = spark.read.parquet("test.parquet"), now can read the parquet works.
However, I would like to find a way to have the data in csv/readable.
still I cannot save df as csv as it throws.
ava.lang.UnsupportedOperationException: CSV data source does not support
struct,values:array> data
type.

Any idea?


Best regards,

Mina


On Tue, Mar 27, 2018 at 10:51 PM, naresh Goud 
wrote:

> In case of storing as parquet file I don’t think it requires header.
> option("header","true")
>
> Give a try by removing header option and then try to read it.  I haven’t
> tried. Just a thought.
>
> Thank you,
> Naresh
>
>
> On Tue, Mar 27, 2018 at 9:47 PM Mina Aslani  wrote:
>
>> Hi,
>>
>>
>> I am using pyspark. To transform my sample data and create model, I use
>> stringIndexer and OneHotEncoder.
>>
>>
>> However, when I try to write data as csv using below command
>>
>> df.coalesce(1).write.option("header","true").mode("
>> overwrite").csv("output.csv")
>>
>>
>> I get UnsupportedOperationException
>>
>> java.lang.UnsupportedOperationException: CSV data source does not
>> support struct,values:array>
>> data type.
>>
>> Therefore, to save data and avoid getting the error I use
>>
>>
>> df.coalesce(1).write.option("header","true").mode("
>> overwrite").save("output")
>>
>>
>> The above command saves data but it's in parquet format.
>> How can I read parquet file and convert to csv to observe the data?
>>
>> When I use
>>
>> df = spark.read.parquet("1.parquet"), it throws:
>>
>> ERROR RetryingBlockFetcher: Exception while beginning fetch of 1
>> outstanding blocks
>>
>> Your input is appreciated.
>>
>>
>> Best regards,
>>
>> Mina
>>
>>
>>
>> --
> Thanks,
> Naresh
> www.linkedin.com/in/naresh-dulam
> http://hadoopandspark.blogspot.com/
>
>


Re: java.lang.UnsupportedOperationException: CSV data source does not support struct/ERROR RetryingBlockFetcher

2018-03-27 Thread naresh Goud
In case of storing as parquet file I don’t think it requires header.
option("header","true")

Give a try by removing header option and then try to read it.  I haven’t
tried. Just a thought.

Thank you,
Naresh


On Tue, Mar 27, 2018 at 9:47 PM Mina Aslani  wrote:

> Hi,
>
>
> I am using pyspark. To transform my sample data and create model, I use
> stringIndexer and OneHotEncoder.
>
>
> However, when I try to write data as csv using below command
>
>
> df.coalesce(1).write.option("header","true").mode("overwrite").csv("output.csv")
>
>
> I get UnsupportedOperationException
>
> java.lang.UnsupportedOperationException: CSV data source does not support
> struct,values:array> data
> type.
>
> Therefore, to save data and avoid getting the error I use
>
>
>
> df.coalesce(1).write.option("header","true").mode("overwrite").save("output")
>
>
> The above command saves data but it's in parquet format.
> How can I read parquet file and convert to csv to observe the data?
>
> When I use
>
> df = spark.read.parquet("1.parquet"), it throws:
>
> ERROR RetryingBlockFetcher: Exception while beginning fetch of 1
> outstanding blocks
>
> Your input is appreciated.
>
>
> Best regards,
>
> Mina
>
>
>
> --
Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


java.lang.UnsupportedOperationException: CSV data source does not support struct/ERROR RetryingBlockFetcher

2018-03-27 Thread Mina Aslani
Hi,


I am using pyspark. To transform my sample data and create model, I use
stringIndexer and OneHotEncoder.


However, when I try to write data as csv using below command

df.coalesce(1).write.option("header","true").mode("overwrite").csv("output.csv")


I get UnsupportedOperationException

java.lang.UnsupportedOperationException: CSV data source does not support
struct,values:array> data
type.

Therefore, to save data and avoid getting the error I use


df.coalesce(1).write.option("header","true").mode("overwrite").save("output")


The above command saves data but it's in parquet format.
How can I read parquet file and convert to csv to observe the data?

When I use

df = spark.read.parquet("1.parquet"), it throws:

ERROR RetryingBlockFetcher: Exception while beginning fetch of 1
outstanding blocks

Your input is appreciated.


Best regards,

Mina


Re: ORC native in Spark 2.3, with zlib, gives java.nio.BufferUnderflowException during read

2018-03-27 Thread Dongjoon Hyun
You may hit SPARK-23355 (convertMetastore should not ignore table properties).

Since it's a known Spark issue for all Hive tables (Parquet/ORC), could you 
check that too?

Bests,
Dongjoon.

On 2018/03/28 01:00:55, Dongjoon Hyun  wrote: 
> Hi, Eric.
> 
> For me, Spark 2.3 works correctly like the following. Could you give us some 
> reproducible example?
> 
> ```
> scala> sql("set spark.sql.orc.impl=native")
> 
> scala> sql("set spark.sql.orc.compression.codec=zlib")
> res1: org.apache.spark.sql.DataFrame = [key: string, value: string]
> 
> scala> spark.range(10).write.orc("/tmp/zlib_test")
> 
> scala> spark.read.orc("/tmp/zlib_test").show
> +---+
> | id|
> +---+
> |  8|
> |  9|
> |  5|
> |  0|
> |  3|
> |  4|
> |  6|
> |  7|
> |  1|
> |  2|
> +---+
> 
> scala> sc.version
> res4: String = 2.3.0
> ```
> 
> Bests,
> Dongjoon.
> 
> 
> On 2018/03/23 15:03:29, Eirik Thorsnes  wrote: 
> > Hi all,
> > 
> > I'm trying the new ORC native in Spark 2.3
> > (org.apache.spark.sql.execution.datasources.orc).
> > 
> > I've compiled Spark 2.3 from the git branch-2.3 as of March 20th.
> > I also get the same error for the Spark 2.2 from Hortonworks HDP 2.6.4.
> > 
> > *NOTE*: the error only occurs with zlib compression, and I see that with
> > Snappy I get an extra log-line saying "OrcCodecPool: Got brand-new codec
> > SNAPPY". Perhaps zlib codec is never loaded/triggered in the new code?
> > 
> > I can write using the new native codepath without errors, but *reading*
> > zlib-compressed ORC, either the newly written ORC-files *or* older
> > ORC-files written with Spark 2.2/1.6 I get the following exception.
> > 
> > === cut =
> > 2018-03-23 10:36:08,249 INFO FileScanRDD: Reading File path:
> > hdfs://.../year=1999/part-r-0-2573bfff-1f18-47d9-b0fb-37dc216b8a99.orc,
> > range: 0-134217728, partition values: [1999]
> > 2018-03-23 10:36:08,326 INFO ReaderImpl: Reading ORC rows from
> > hdfs://.../year=1999/part-r-0-2573bfff-1f18-47d9-b0fb-37dc216b8a99.orc
> > with {include: [true, true, true, true, true, true, true, true, true],
> > offset: 0, length: 134217728}
> > 2018-03-23 10:36:08,326 INFO RecordReaderImpl: Reader schema not
> > provided -- using file schema
> > struct
> > 
> > 2018-03-23 10:36:08,824 ERROR Executor: Exception in task 0.0 in stage
> > 1.0 (TID 1)
> > java.nio.BufferUnderflowException
> > at java.nio.Buffer.nextGetIndex(Buffer.java:500)
> > at java.nio.DirectByteBuffer.get(DirectByteBuffer.java:249)
> > at
> > org.apache.orc.impl.InStream$CompressedStream.read(InStream.java:248)
> > at
> > org.apache.orc.impl.RunLengthIntegerReaderV2.readValues(RunLengthIntegerReaderV2.java:58)
> > at
> > org.apache.orc.impl.RunLengthIntegerReaderV2.next(RunLengthIntegerReaderV2.java:323)
> > at
> > org.apache.orc.impl.TreeReaderFactory$TimestampTreeReader.nextVector(TreeReaderFactory.java:976)
> > at
> > org.apache.orc.impl.TreeReaderFactory$StructTreeReader.nextBatch(TreeReaderFactory.java:1815)
> > at
> > org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1184)
> > at
> > org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.nextBatch(OrcColumnarBatchReader.scala:186)
> > at
> > org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.nextKeyValue(OrcColumnarBatchReader.scala:114)
> > at
> > org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> > at
> > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
> > at
> > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
> > at
> > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
> > at
> > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
> > Source)
> > at
> > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> > Source)
> > at
> > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> > at
> > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> > at
> > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
> > at
> > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> > at
> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)

Re: ORC native in Spark 2.3, with zlib, gives java.nio.BufferUnderflowException during read

2018-03-27 Thread Dongjoon Hyun
Hi, Eric.

For me, Spark 2.3 works correctly like the following. Could you give us some 
reproducible example?

```
scala> sql("set spark.sql.orc.impl=native")

scala> sql("set spark.sql.orc.compression.codec=zlib")
res1: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> spark.range(10).write.orc("/tmp/zlib_test")

scala> spark.read.orc("/tmp/zlib_test").show
+---+
| id|
+---+
|  8|
|  9|
|  5|
|  0|
|  3|
|  4|
|  6|
|  7|
|  1|
|  2|
+---+

scala> sc.version
res4: String = 2.3.0
```

Bests,
Dongjoon.


On 2018/03/23 15:03:29, Eirik Thorsnes  wrote: 
> Hi all,
> 
> I'm trying the new ORC native in Spark 2.3
> (org.apache.spark.sql.execution.datasources.orc).
> 
> I've compiled Spark 2.3 from the git branch-2.3 as of March 20th.
> I also get the same error for the Spark 2.2 from Hortonworks HDP 2.6.4.
> 
> *NOTE*: the error only occurs with zlib compression, and I see that with
> Snappy I get an extra log-line saying "OrcCodecPool: Got brand-new codec
> SNAPPY". Perhaps zlib codec is never loaded/triggered in the new code?
> 
> I can write using the new native codepath without errors, but *reading*
> zlib-compressed ORC, either the newly written ORC-files *or* older
> ORC-files written with Spark 2.2/1.6 I get the following exception.
> 
> === cut =
> 2018-03-23 10:36:08,249 INFO FileScanRDD: Reading File path:
> hdfs://.../year=1999/part-r-0-2573bfff-1f18-47d9-b0fb-37dc216b8a99.orc,
> range: 0-134217728, partition values: [1999]
> 2018-03-23 10:36:08,326 INFO ReaderImpl: Reading ORC rows from
> hdfs://.../year=1999/part-r-0-2573bfff-1f18-47d9-b0fb-37dc216b8a99.orc
> with {include: [true, true, true, true, true, true, true, true, true],
> offset: 0, length: 134217728}
> 2018-03-23 10:36:08,326 INFO RecordReaderImpl: Reader schema not
> provided -- using file schema
> struct
> 
> 2018-03-23 10:36:08,824 ERROR Executor: Exception in task 0.0 in stage
> 1.0 (TID 1)
> java.nio.BufferUnderflowException
> at java.nio.Buffer.nextGetIndex(Buffer.java:500)
> at java.nio.DirectByteBuffer.get(DirectByteBuffer.java:249)
> at
> org.apache.orc.impl.InStream$CompressedStream.read(InStream.java:248)
> at
> org.apache.orc.impl.RunLengthIntegerReaderV2.readValues(RunLengthIntegerReaderV2.java:58)
> at
> org.apache.orc.impl.RunLengthIntegerReaderV2.next(RunLengthIntegerReaderV2.java:323)
> at
> org.apache.orc.impl.TreeReaderFactory$TimestampTreeReader.nextVector(TreeReaderFactory.java:976)
> at
> org.apache.orc.impl.TreeReaderFactory$StructTreeReader.nextBatch(TreeReaderFactory.java:1815)
> at
> org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1184)
> at
> org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.nextBatch(OrcColumnarBatchReader.scala:186)
> at
> org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.nextKeyValue(OrcColumnarBatchReader.scala:114)
> at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)

Re: ORC native in Spark 2.3, with zlib, gives java.nio.BufferUnderflowException during read

2018-03-27 Thread Xiao Li
Hi, Eirik,

Yes, please open a JIRA.

Thanks,

Xiao

2018-03-23 8:03 GMT-07:00 Eirik Thorsnes :

> Hi all,
>
> I'm trying the new ORC native in Spark 2.3
> (org.apache.spark.sql.execution.datasources.orc).
>
> I've compiled Spark 2.3 from the git branch-2.3 as of March 20th.
> I also get the same error for the Spark 2.2 from Hortonworks HDP 2.6.4.
>
> *NOTE*: the error only occurs with zlib compression, and I see that with
> Snappy I get an extra log-line saying "OrcCodecPool: Got brand-new codec
> SNAPPY". Perhaps zlib codec is never loaded/triggered in the new code?
>
> I can write using the new native codepath without errors, but *reading*
> zlib-compressed ORC, either the newly written ORC-files *or* older
> ORC-files written with Spark 2.2/1.6 I get the following exception.
>
> === cut =
> 2018-03-23 10:36:08,249 INFO FileScanRDD: Reading File path:
> hdfs://.../year=1999/part-r-0-2573bfff-1f18-47d9-b0fb-
> 37dc216b8a99.orc,
> range: 0-134217728, partition values: [1999]
> 2018-03-23 10:36:08,326 INFO ReaderImpl: Reading ORC rows from
> hdfs://.../year=1999/part-r-0-2573bfff-1f18-47d9-b0fb-37dc216b8a99.orc
> with {include: [true, true, true, true, true, true, true, true, true],
> offset: 0, length: 134217728}
> 2018-03-23 10:36:08,326 INFO RecordReaderImpl: Reader schema not
> provided -- using file schema
> struct v10:smallint,lcc:smallint,mcc:smallint,hcc:smallint>
>
> 2018-03-23 10:36:08,824 ERROR Executor: Exception in task 0.0 in stage
> 1.0 (TID 1)
> java.nio.BufferUnderflowException
> at java.nio.Buffer.nextGetIndex(Buffer.java:500)
> at java.nio.DirectByteBuffer.get(DirectByteBuffer.java:249)
> at
> org.apache.orc.impl.InStream$CompressedStream.read(InStream.java:248)
> at
> org.apache.orc.impl.RunLengthIntegerReaderV2.readValues(
> RunLengthIntegerReaderV2.java:58)
> at
> org.apache.orc.impl.RunLengthIntegerReaderV2.next(
> RunLengthIntegerReaderV2.java:323)
> at
> org.apache.orc.impl.TreeReaderFactory$TimestampTreeReader.
> nextVector(TreeReaderFactory.java:976)
> at
> org.apache.orc.impl.TreeReaderFactory$StructTreeReader.nextBatch(
> TreeReaderFactory.java:1815)
> at
> org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1184)
> at
> org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.
> nextBatch(OrcColumnarBatchReader.scala:186)
> at
> org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.
> nextKeyValue(OrcColumnarBatchReader.scala:114)
> at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(
> RecordReaderIterator.scala:39)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:105)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.
> nextIterator(FileScanRDD.scala:177)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:105)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.scan_nextBatch$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:234)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:228)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:827)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:827)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> === cut =
>
> I have the following set in spark-defaults.conf:
>
> spark.sql.hive.convertMetastoreOrc true
> spark.sql.orc.char.enabled true
> spark.sql.orc.enabled true
> spark.sql.orc.filterPushdown true
> spark.sql.orc.impl native
> spark.sql.orc.enableVectorizedReader true
>
>
> If I set these to false and use the old hive reader (or specify the full

closure issues: wholeTextFiles

2018-03-27 Thread Gourav Sengupta
Hi,

I can understand facing closure issues while executing this code:



package spark

//this package is about understanding closures as mentioned in:
http://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-


import org.apache.spark.sql.SparkSession


object understandClosures extends  App {

  var counter = 0

  //the error thrown is removed in case we use local[*] as master
  val sparkSession = SparkSession
.builder
.master("spark://Gouravs-iMac:7077")
//.master("local[*]")
.appName("test")
.getOrCreate()


  val valueRDD = sparkSession.sparkContext.parallelize(1 until 100)

  println(valueRDD.count())

  valueRDD.foreach(x => counter += x)

  //but even if we use the master as local[*] the total appears as
some random number as -1234435435
  println("the value is " + counter.toString())


  sparkSession.close()

}





Can anyone explain me why am I facing closure issue while executing this
code?

package spark

import org.apache.spark.sql.SparkSession
// Import this utility for working with URLs. Unlike Java the
semicolon ';' is not required.
import java.net.URL
// Use {...} to provide a list of things to import, when you don't
want to import everything
// in a package and you don't want to write a separate line for each type.
import java.io.{File, BufferedInputStream, BufferedOutputStream,
FileOutputStream}




object justenoughTest extends App {

  val sparkSession = SparkSession
.builder
.master("spark://Gouravs-iMac:7077")
//.master("local[*]")
.appName("test")
.getOrCreate()

  println(sparkSession.version)

  println("Spark version:  " + sparkSession.version)
  println("Spark master:   " + sparkSession.sparkContext.master)
  println("Running 'locally'?: " + sparkSession.sparkContext.isLocal)

  val pathSeparator = File.separator

  // The target directory, which we'll now create, if necessary.
  val shakespeare = new
File("/Users/gouravsengupta/Development/data/shakespeare")

  println(sparkSession.version)

  //val fileContents =
sparkSession.read.text("file:///Users/gouravsengupta/Development/data/shakespeare/")
  //val fileContents = sparkSession.read.text(shakespeare.toString)
  val fileContents =
sparkSession.sparkContext.wholeTextFiles(shakespeare.toString)
  println(fileContents.count())

  //I am facing  the closure issues below

  val testThis = fileContents.foreach(x => "printing value" + x._1)


sparkSession.close()

}


Regards,
Gourav Sengupta


unsubscribe

2018-03-27 Thread Andrei Balici
-- 
Andrei Balici
Student at the School of Computer Science,
University of Manchester


PySpark Structured Streaming : Writing to DB in Python and Foreach Sink.

2018-03-27 Thread Ramaswamy, Muthuraman
Hi All,

I am exploring PySpark Structured Streaming and the documentation says the 
Foreach Sink is not supported in Python and is available only with Java/Scala. 
Given the unavailability of this sink, what options are there for the following:


  1.  Will there be support for Foreach Sink in Python in future Spark 
Structured Streaming release?
  2.  What options are there to write streaming query output to Database?
 *   In other words, the streaming query output should be written to a 
database at every trigger interval
 *   I cannot use Memory sink as it is recommended for use only with Debug.

Any suggestions to write to database in PySpark Structured Streaming will help. 
Appreciate your time.

Thank you,

Muthu Ramaswamy


Re: unsubscribe

2018-03-27 Thread Romero, Saul
unsubscribe

On Tue, Mar 27, 2018 at 1:15 PM, Nicholas Sharkey  wrote:

>
>


unsubscribe

2018-03-27 Thread Nicholas Sharkey



Spark on K8s resource staging server timeout

2018-03-27 Thread Jenna Hoole
So I'm running into an issue with my resource staging server that's
producing a stacktrace like Issue 342
, but I don't
think for the same reasons. What's happening is that every time after I
start up a resource staging server, the first job submitted that uses it
will fail with a java.net.SocketTimeoutException: timeout, and then every
subsequent job will run perfectly. Including with different jars and
different users. It's only ever the first job that fails and it always
fails. I know I'm also running into Issue 577
 in that it takes
about three minutes before the resource staging server is accessible, but
I'm still failing waiting over ten minutes or in one case overnight. And
I'm just using the examples jar, so it's not a super large jar like in
Issue 342.

This isn't great for our CI process, so has anyone seen anything like this
before or know how to increase the timeout if it just takes a while on
initial contact? Using spark.network.timeout has no effect.

[jhoole@nid6 spark]$ kubectl get pods | grep jhoole-spark

jhoole-spark-resource-staging-server-6475c8-w5cdm   1/1   Running
0  13m

[jhoole@nid6 spark]$ kubectl get svc | grep jhoole-spark

jhoole-spark-resource-staging-service   NodePort10.96.143.55
  1:30622/TCP 13m

[jhoole@nid6 spark]$ bin/spark-submit --class
org.apache.spark.examples.SparkPi --conf spark.app.name=spark-pi --conf
spark.kubernetes.resourceStagingServer.uri=http://192.168.0.1:30622
./examples/target/scala-2.11/jars/spark-examples_2.11-2.2.0-k8s-0.5.0.jar

2018-03-27 12:30:13 WARN  NativeCodeLoader:62 - Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable

2018-03-27 12:30:13 INFO  UserGroupInformation:966 - Login successful for
user jhoole@local using keytab file /security/secrets/jhoole.keytab

2018-03-27 12:30:14 INFO  HadoopStepsOrchestrator:54 - Hadoop Conf
directory: /etc/hadoop/conf

2018-03-27 12:30:14 INFO  SecurityManager:54 - Changing view acls to: jhoole

2018-03-27 12:30:14 INFO  SecurityManager:54 - Changing modify acls to:
jhoole

2018-03-27 12:30:14 INFO  SecurityManager:54 - Changing view acls groups to:


2018-03-27 12:30:14 INFO  SecurityManager:54 - Changing modify acls groups
to:

2018-03-27 12:30:14 INFO  SecurityManager:54 - SecurityManager:
authentication disabled; ui acls disabled; users  with view permissions:
Set(jhoole); groups with view permissions: Set(); users  with modify
permissions: Set(jhoole); groups with modify permissions: Set()

Exception in thread "main" java.net.SocketTimeoutException: timeout

at okio.Okio$4.newTimeoutException(Okio.java:230)

at okio.AsyncTimeout.exit(AsyncTimeout.java:285)

at okio.AsyncTimeout$2.read(AsyncTimeout.java:241)

at okio.RealBufferedSource.indexOf(RealBufferedSource.java:345)

at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:217)

at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:211)

at
okhttp3.internal.http1.Http1Codec.readResponseHeaders(Http1Codec.java:189)

at
okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:75)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at
okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at
okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at
okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)

at okhttp3.RealCall.execute(RealCall.java:69)

at retrofit2.OkHttpCall.execute(OkHttpCall.java:174)

at
org.apache.spark.deploy.k8s.submit.SubmittedDependencyUploaderImpl.getTypedResponseResult(SubmittedDependencyUploaderImpl.scala:101)

at
org.apache.spark.deploy.k8s.submit.SubmittedDependencyUploaderImpl.doUpload(SubmittedDependencyUploaderImpl.scala:97)

at
org.apache.spark.deploy.k8s.submit.SubmittedDependencyUploaderImpl.uploadJars(SubmittedDependencyUploaderImpl.scala:70)

at
org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.SubmittedResourcesInitContainerConfigurationStep.configureInitContainer(

Re: Class cast exception while using Data Frames

2018-03-27 Thread Nikhil Goyal
You can run this on spark shell

*CODE:*

case class InstanceData(service: String, metric: String, zone:
String, source: String, time: Long, value: Double )

val seq = sc.parallelize(Seq(
  InstanceData("serviceA", "metricA", "zoneA", "sourceA", 1000L,
1.0),
  InstanceData("serviceA", "metricA", "zoneA", "hostA", 1000L, 1.0),
  InstanceData("serviceD", "metricA", "zoneB", "hostB", 1000L, 2.0),
  InstanceData("serviceA", "metricF", "zoneA", "hostB", 1000L, 1.0)
))

val instData =  spark.createDataFrame(seq)

def makeMap = udf((service: String, metric: String, value: Double)
=> Map((service, metric) -> value))

val instDF = instData.withColumn("metricMap", makeMap($"service",
$"metric", $"value"))

def avgMapValueUDF = udf((newMap: Map[(String, String), Double],
count: Long) => {
  newMap.keys
.map { keyTuple =>
  val sum = newMap.getOrElse(keyTuple, 0.0)
  (keyTuple, sum / count.toDouble)
}.toMap
})

instDF.withColumn("customMap", avgMapValueUDF(col("metricMap"),
lit(1))).show



On Mon, Mar 26, 2018 at 11:51 PM, Shmuel Blitz 
wrote:

> Hi Nikhil,
>
> Can you please put a code snippet that reproduces the issue?
>
> Shmuel
>
> On Tue, Mar 27, 2018 at 12:55 AM, Nikhil Goyal 
> wrote:
>
>>  |-- myMap: map (nullable = true)
>>  ||-- key: struct
>>  ||-- value: double (valueContainsNull = true)
>>  |||-- _1: string (nullable = true)
>>  |||-- _2: string (nullable = true)
>>  |-- count: long (nullable = true)
>>
>> On Mon, Mar 26, 2018 at 1:41 PM, Gauthier Feuillen > > wrote:
>>
>>> Can you give the output of “printSchema” ?
>>>
>>>
>>> On 26 Mar 2018, at 22:39, Nikhil Goyal  wrote:
>>>
>>> Hi guys,
>>>
>>> I have a Map[(String, String), Double] as one of my columns. Using
>>>
>>> input.getAs[Map[(String, String), Double]](0)
>>>
>>>  throws exception: Caused by: java.lang.ClassCastException: 
>>> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be 
>>> cast to scala.Tuple2
>>>
>>> Even the schema says that key is of type struct of (string, string).
>>>
>>> Any idea why this is happening?
>>>
>>>
>>> Thanks
>>>
>>> Nikhil
>>>
>>>
>>>
>>
>
>
> --
> Shmuel Blitz
> Big Data Developer
> Email: shmuel.bl...@similarweb.com
> www.similarweb.com
> 
> 
> 
>


[Spark R] Proposal: Exposing RBackend in RRunner

2018-03-27 Thread Jeremy Liu
Spark Users,

In SparkR, RBackend is created in RRunner.main(). This in particular makes
it difficult to control or use the RBackend. For my use case, I am looking
to access the JVMObjectTracker that RBackend maintains for SparkR
dataframes.

Analogously, pyspark starts a py4j.GatewayServer in PythonRunner.main().
It's then possible to start a ClientServer that then has access to the
object bindings between Python/Java.

Is there something similar for SparkR? Or a reasonable way to expose
RBackend?

Thanks!


-- 
-
Jeremy Liu
jeremy.jl@gmail.com


Re: Using CBO on Spark 2.3 with analyzed hive tables

2018-03-27 Thread Michael Shtelma
Hi,

the Jira Bug is here: https://issues.apache.org/jira/browse/SPARK-23799
I have also created the PR for the issue:
https://github.com/apache/spark/pull/20913
With this fix, it is working for me really well.

Best,
Michael


On Sat, Mar 24, 2018 at 12:39 AM, Takeshi Yamamuro
 wrote:
> Can you file a jira if this is a bug?
> Thanks!
>
> On Sat, Mar 24, 2018 at 1:23 AM, Michael Shtelma  wrote:
>>
>> Hi Maropu,
>>
>> the problem seems to be in FilterEstimation.scala on lines 50 and 52:
>>
>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala?utf8=✓#L50-L52
>>
>> val filterSelectivity =
>> calculateFilterSelectivity(plan.condition).getOrElse(1.0)
>> val filteredRowCount: BigInt =
>> ceil(BigDecimal(childStats.rowCount.get) * filterSelectivity)
>>
>> The problem is, that filterSelectivity gets NaN value in my case and
>> NaN cannot be converted to BigDecimal.
>> I can try adding simple if, checking the NaN value and test if this helps.
>> I will also try to understand, why in my case, I am getting NaN.
>>
>> Best,
>> Michael
>>
>>
>> On Fri, Mar 23, 2018 at 1:51 PM, Takeshi Yamamuro 
>> wrote:
>> > hi,
>> >
>> > What's a query to reproduce this?
>> > It seems when casting double to BigDecimal, it throws the exception.
>> >
>> > // maropu
>> >
>> > On Fri, Mar 23, 2018 at 6:20 PM, Michael Shtelma 
>> > wrote:
>> >>
>> >> Hi all,
>> >>
>> >> I am using Spark 2.3 with activated cost-based optimizer and a couple
>> >> of hive tables, that were analyzed previously.
>> >>
>> >> I am getting the following exception for different queries:
>> >>
>> >> java.lang.NumberFormatException
>> >>
>> >> at java.math.BigDecimal.(BigDecimal.java:494)
>> >>
>> >> at java.math.BigDecimal.(BigDecimal.java:824)
>> >>
>> >> at scala.math.BigDecimal$.decimal(BigDecimal.scala:52)
>> >>
>> >> at scala.math.BigDecimal$.decimal(BigDecimal.scala:55)
>> >>
>> >> at scala.math.BigDecimal$.double2bigDecimal(BigDecimal.scala:343)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation.estimate(FilterEstimation.scala:52)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:43)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:25)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.visit(LogicalPlanVisitor.scala:30)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:35)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:33)
>> >>
>> >> at scala.Option.getOrElse(Option.scala:121)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$class.stats(LogicalPlanStats.scala:33)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.stats(LogicalPlan.scala:30)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)
>> >>
>> >> at
>> >>
>> >> scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(IndexedSeqOptimized.scala:38)
>> >>
>> >> at
>> >>
>> >> scala.collection.IndexedSeqOptimized$class.forall(IndexedSeqOptimized.scala:43)
>> >>
>> >> at scala.collection.mutable.WrappedArray.forall(WrappedArray.scala:35)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$.rowCountsExist(EstimationUtils.scala:32)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ProjectEstimation$.estimate(ProjectEstimation.scala:27)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitProject(BasicStatsPlanVisitor.scala:63)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitProject(BasicStatsPlanVisitor.scala:25)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.visit(LogicalPlanVisitor.scala:37)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stat

java spark udf error

2018-03-27 Thread 崔苗
Hi,
I define a udf to mark the empty string in java like that:

 public class MarkUnknown implements UDF2 {
@Override
public String call(String processor,String fillContent){
if(processor.trim().equals("")){
logger.info("find empty string");
return fillContent;
}
else{
return processor;
}
}
}
and register by sparkSession: 
 spark.udf().register("markUnknown",markUnknown,StringType);

but when I use the udf in sql : "select markUnknown(useId,'unknown') FROM 
table", I got a exception:

Exception in thread "main" java.lang.ClassCastException: 
org.apache.spark.sql.UDFRegistration$$anonfun$27 cannot be cast to 
scala.Function2
at 
org.apache.spark.sql.catalyst.expressions.ScalaUDF.(ScalaUDF.scala:97)
at 
org.apache.spark.sql.UDFRegistration$$anonfun$register$26.apply(UDFRegistration.scala:515)
at 
org.apache.spark.sql.UDFRegistration$$anonfun$register$26.apply(UDFRegistration.scala:515)
at 
org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:91)
at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction(SessionCatalog.scala:1165)
at 
org.apache.spark.sql.hive.HiveSessionCatalog.org$apache$spark$sql$hive$HiveSessionCatalog$$super$lookupFunction(HiveSessionCatalog.scala:129)
at 
org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$3.apply(HiveSessionCatalog.scala:129)
at 
org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$3.apply(HiveSessionCatalog.scala:129)
at scala.util.Try$.apply(Try.scala:192)
at 
org.apache.spark.sql.hive.HiveSessionCatalog.lookupFunction0(HiveSessionCatalog.scala:129)
at 
org.apache.spark.sql.hive.HiveSessionCatalog.lookupFunction(HiveSessionCatalog.scala:122)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$7$$anonfun$applyOrElse$53.apply(Analyzer.scala:1189)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$7$$anonfun$applyOrElse$53.apply(Analyzer.scala:1189)

I replaced the String "unknown" with other column : "select 
markUnknown(useId,companyId) FROM table" , still got the same exception.
so how to define the udf in java?


thanks for any reply







Re: Calculate co-occurring terms

2018-03-27 Thread Donni Khan
Hi again,

I found example in Scala

 but I don't have any experience with scala?
can anyone convert it to java please?

Thank you,
Donni

On Fri, Mar 23, 2018 at 8:57 AM, Donni Khan 
wrote:

> Hi,
>
> I have a collection of text documents, I extracted the list of significat
> terms from that collection.
> I want to calculate co-occurance matrix for the extracted terms by using
> spark.
>
> I actually stored the the collection of text document in a DataFrame,
>
> StructType schema = *new* StructType(*new* StructField[] {
>
> *new* StructField("ID", DataTypes.*StringType*, *false*,
>
> Metadata.*empty*()),
>
> *new* StructField("text", DataTypes.*StringType*, *false*,
>
> Metadata.*empty*()) });
>
> // Create a DataFrame *wrt* a new schema
>
> DataFrame preProcessedDF = sqlContext.createDataFrame(jrdd, schema);
>
> I can extract the list of terms from "preProcessedDF " into a List or RDD
> or DataFrame.
> for each (term_i,term_j) I want to calculate the realted frequency from
> the original dataset "preProcessedDF "
>
> anyone has scalbale soloution?
>
> thank you,
> Donni
>
>
>
>
>
>
>
>
>


Queries with streaming sources must be executed with writeStream.start();;

2018-03-27 Thread Junfeng Chen
I am reading some data from kafka, and willing to save them to parquet on
hdfs with structured streaming.
The data from kafka is in JSON format. I try to convert them to
DataSet with spark.read.json(). However, I get the exception:
>
> Queries with streaming sources must be executed with
> writeStream.start()

Here is my code:
>
> Dataset df = spark.readStream().format("kafka")...
> Dataset jsonDataset = df.selectExpr("CAST(value AS STRING)").map...
> Dataset rowDataset = spark.read().json(jsonDataset);
>
> rowDataset.writeStream().outputMode(OutputMode.Append()).partitionBy("appname").format("parquet").option("path",savePath).start().awaitTermination();



How to solve it?

Thanks!

Regard,
Junfeng Chen