Transforming json string in structured streaming problem

2018-04-13 Thread Junfeng Chen
some potential solution in https://stackoverflow.com/questions/48617474/how-to-convert-json-dataset-to-dataframe-in-spark-structured-streaming , but it needs to construct the StructType, while the structure of my json data is variable. So how to solve it? Thanks! Regard, Junfeng Chen

Re: Nullpointerexception error when in repartition

2018-04-12 Thread Junfeng Chen
method, but the formats of every json data are not same. Either Spark java api seems not supporting grammer like .select(from_json($"value", colourSchema)) Regard, Junfeng Chen On Fri, Apr 13, 2018 at 7:09 AM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Ha

Re: Nullpointerexception error when in repartition

2018-04-12 Thread Junfeng Chen
ataset . How to fix it? Thanks! Regard, Junfeng Chen On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > It's not very surprising that doing this sort of RDD to DF conversion > inside DStream.foreachRDD has weird corner cases like this. In fact, you &

Nullpointerexception error when in repartition

2018-04-11 Thread Junfeng Chen
ram always throw nullpointerexception error in json convert line: Java.lang.NullPointerException > at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1783) > ... While it looks make no sense, writing parquet operation should be in different stage with json transforming operation. So how to solve it? Thanks! Regard, Junfeng Chen

Re: spark application running in yarn client mode is slower than in local mode.

2018-04-10 Thread Junfeng Chen
But I still have one question. I find the task number in stage is 3. So where is this 3 from? How to increase the parallelism? Regard, Junfeng Chen On Tue, Apr 10, 2018 at 11:31 AM, Junfeng Chen <darou...@gmail.com> wrote: > Yeah, I have increase the executor number and execu

Re: spark application running in yarn client mode is slower than in local mode.

2018-04-09 Thread Junfeng Chen
Yeah, I have increase the executor number and executor cores, and it runs normally now. The hdp spark 2 have only 2 executor and 1 executor cores by default. Regard, Junfeng Chen On Tue, Apr 10, 2018 at 10:19 AM, Saisai Shao <sai.sai.s...@gmail.com> wrote: > In yarn mode, only two

Re: spark application running in yarn client mode is slower than in local mode.

2018-04-09 Thread Junfeng Chen
to process the task, since one executor can process one task only, they need 6 min in total. Regard, Junfeng Chen On Mon, Apr 9, 2018 at 2:12 PM, Jörn Franke <jornfra...@gmail.com> wrote: > Probably network / shuffling cost? Or broadcast variables? Can you provide > more details

Re: spark application running in yarn client mode is slower than in local mode.

2018-04-09 Thread Junfeng Chen
Hi Jorn, I checked the log info of my application: The ResultStage3 (parquet writing) cost a very long time,nearly 300s, where the total processing time of this loop is 6 mins. Regard, Junfeng Chen On Mon, Apr 9, 2018 at 2:12 PM, Jörn Franke <jornfra...@gmail.com> wrote: > Probabl

Re: spark application running in yarn client mode is slower than in local mode.

2018-04-09 Thread Junfeng Chen
hi, My kafka topic has three partitions. The time cost I mentioned means , each streaming loop cost more time with yarn client mode. For example yarn mode cost 300 seconds to process some data, and local mode just cost 200 seconds to process similar amount of data. Regard, Junfeng Chen

Re: spark application running in yarn client mode is slower than in local mode.

2018-04-09 Thread Junfeng Chen
aRDD,type); ... At last write the dataset to parquet file newdf.write().mode(SaveMode.Append).partitionedBy("stream","appname","year","month","day","hour").parquet(savePath); How to determine if it is caused by shuffle or broadcast

spark application running in yarn client mode is slower than in local mode.

2018-04-08 Thread Junfeng Chen
hosts have same hardware, 40cores and 256GB memory. . Why? How to solve it? Regard, Junfeng Chen

Re: How to delete empty columns in df when writing to parquet?

2018-04-07 Thread Junfeng Chen
Hi, Thanks for explaining! Regard, Junfeng Chen On Wed, Apr 4, 2018 at 7:43 PM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi, > > I do not think that in a columnar database it makes much of a difference. > The amount of data that you will be parsing will n

Re: How to delete empty columns in df when writing to parquet?

2018-04-04 Thread Junfeng Chen
Our users ask for it Regard, Junfeng Chen On Wed, Apr 4, 2018 at 5:45 PM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi Junfeng, > > can I ask why it is important to remove the empty column? > > Regards, > Gourav Sengupta > > On Tue, Apr 3, 2018 a

Re: How to delete empty columns in df when writing to parquet?

2018-04-03 Thread Junfeng Chen
You mean I should start two spark streaming application and read topics respectively? Regard, Junfeng Chen On Tue, Apr 3, 2018 at 10:31 PM, naresh Goud <nareshgoud.du...@gmail.com> wrote: > I don’t see any option other than staring two individual queries. It’s > just a thought. &

How to delete empty columns in df when writing to parquet?

2018-04-02 Thread Junfeng Chen
to write the dataframe to parquet file partitioned with app name, the parquet file of app one also contains columns D, where the columns D is empty and it contains no data actually. So how to filter the empty columns when I writing dataframe to parquet? Thanks! Regard, Junfeng Chen

[Spark Java] Add new column in DataSet based on existed column

2018-03-28 Thread Junfeng Chen
df.withColumn function, but it seems only support simple expression rather than customized function like MapFunction. How to solve it? Thanks! Regard, Junfeng Chen

Queries with streaming sources must be executed with writeStream.start();;

2018-03-27 Thread Junfeng Chen
tMode(OutputMode.Append()).partitionBy("appname").format("parquet").option("path",savePath).start().awaitTermination(); How to solve it? Thanks! Regard, Junfeng Chen

DataSet save to parquet partition problem

2018-03-08 Thread Junfeng Chen
? If yes, how? Thanks! Regard, Junfeng Chen

Re: Reading kafka and save to parquet problem

2018-03-07 Thread Junfeng Chen
I have ever tried to use readStream and writeStream, but it throw "Uri without authority: hdfs:/data/_spark_metadata" exception, which is not seen in normal read mode. The parquet path I specified is hdfs:///data Regard, Junfeng Chen On Thu, Mar 8, 2018 at 9:38 AM, naresh Goud <

Reading kafka and save to parquet problem

2018-03-07 Thread Junfeng Chen
ogram. What should I do if want to read kafka and save them to parquet in batch? Regard, Junfeng Chen

Re: CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning

2018-03-06 Thread Junfeng Chen
Regard, Junfeng Chen On Wed, Mar 7, 2018 at 3:34 AM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Which version of Spark are you using? And can you give us the full stack > trace of the exception? > > On Tue, Mar 6, 2018 at 1:53 AM, Junfeng Chen <darou...@gmai

CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning

2018-03-06 Thread Junfeng Chen
CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread. It may hang when CachedKafkaConsumer's method are interrupted because of KAFKA-1894. How to solve it? Thanks! Regard, Junfeng Chen

spark streaming with flume: cannot assign requested address error

2017-12-13 Thread Junfeng Chen
y checking netstat -anp | grep 6689. Also the ip address 10.25.** is not a routable address. So someone can help me to solve it? Regard, Junfeng Chen

Re: Add snappy support for spark in Windows

2017-12-04 Thread Junfeng Chen
I have put winutils and hadoop.dll within HADOOP_HOME, and spark works well with it, but snappy decompress function throw the above exception. Regard, Junfeng Chen On Mon, Dec 4, 2017 at 7:07 PM, Qiao, Richard <richard.q...@capitalone.com> wrote: > Junjeng, it worth a try to start y

Re: Add snappy support for spark in Windows

2017-12-04 Thread Junfeng Chen
But I am working on my local development machine, so it should have no relative to workers/executers. I find some documents about enable snappy on hadoop. If I want to use snappy with spark, do I need to config spark as hadoop or have some easy way to access it? Regard, Junfeng Chen On Mon

Add snappy support for spark in Windows

2017-12-03 Thread Junfeng Chen
ile snappy-0.2.jar and snappy-java-1.1.2.6.jar in D:\Downloads\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\jars\ However nothing works and even the error message not change. How can I fix it? ref of stackoverflow: https://stackoverflow.com/questions/ 47626012/config-snappy-support-for-spark-in-windows <https://stackoverflow.com/questions/47626012/config-snappy-support-for-spark-in-windows> Regard, Junfeng Chen