Transforming json string in structured streaming problem

2018-04-13 Thread Junfeng Chen
Hi all,

I need to read some string data in json format from kafka, and convert them
to dataframe and write to parquet file at last.
But now I meet some problems. The spark.readStream().json() can only
support json file on a specified location, cannot support Dataset
like spark.read.json.
I found some potential solution in
https://stackoverflow.com/questions/48617474/how-to-convert-json-dataset-to-dataframe-in-spark-structured-streaming
,
but it needs to construct the StructType, while the structure of my json
data is variable.

So how to solve it?

Thanks!


Regard,
Junfeng Chen


Re: Nullpointerexception error when in repartition

2018-04-12 Thread Junfeng Chen
Hi,
I know it, but my purpose it to transforming json string in DataSet
to Dataset, while spark.readStream can only support read json file in
specified path.
https://stackoverflow.com/questions/48617474/how-to-convert-json-dataset-to-dataframe-in-spark-structured-streaming
gives an essential method, but the formats of every json data are not same.
Either Spark java api seems not supporting grammer like

.select(from_json($"value", colourSchema))



Regard,
Junfeng Chen

On Fri, Apr 13, 2018 at 7:09 AM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Have you read through the documentation of Structured Streaming?
> https://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html
>
> One of the basic mistakes you are making is defining the dataset as with
> `spark.read()`. You define a streaming Dataset as `spark.readStream()`
>
> On Thu, Apr 12, 2018 at 3:02 AM, Junfeng Chen <darou...@gmail.com> wrote:
>
>> Hi, Tathagata
>>
>> I have tried structured streaming, but in line
>>
>>> Dataset rowDataset = spark.read().json(jsondataset);
>>
>>
>> Always throw
>>
>>> Queries with streaming sources must be executed with writeStream.start()
>>
>>
>> But what i need to do in this step is only transforming json string data
>> to Dataset . How to fix it?
>>
>> Thanks!
>>
>>
>> Regard,
>> Junfeng Chen
>>
>> On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> It's not very surprising that doing this sort of RDD to DF conversion
>>> inside DStream.foreachRDD has weird corner cases like this. In fact, you
>>> are going to have additional problems with partial parquet files (when
>>> there are failures) in this approach. I strongly suggest that you use
>>> Structured Streaming, which is designed to do this sort of processing. It
>>> will take care of tracking the written parquet files correctly.
>>>
>>> TD
>>>
>>> On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen <darou...@gmail.com>
>>> wrote:
>>>
>>>> I write a program to read some json data from kafka and purpose to save
>>>> them to parquet file on hdfs.
>>>> Here is my code:
>>>>
>>>>> JavaInputDstream stream = ...
>>>>> JavaDstream rdd = stream.map...
>>>>> rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD
>>>>> stringjavardd->{
>>>>> Dataset df = spark.read().json( stringjavardd ); // convert
>>>>> json to df
>>>>> JavaRDD rowJavaRDD = df.javaRDD().map...  //add some new
>>>>> fields
>>>>> StructType type = df.schema()...; // constuct new type for new
>>>>> added fields
>>>>> Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type);
>>>>> //create new dataframe
>>>>> newdf.repatition(taskNum).write().mode(SaveMode.Append).pati
>>>>> tionedBy("appname").parquet(savepath); // save to parquet
>>>>> })
>>>>
>>>>
>>>>
>>>> However, if I remove the repartition method of newdf in writing parquet
>>>> stage, the program always throw nullpointerexception error in json convert
>>>> line:
>>>>
>>>> Java.lang.NullPointerException
>>>>>  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.
>>>>> scala:1783)
>>>>> ...
>>>>
>>>>
>>>> While it looks make no sense, writing parquet operation should be in
>>>> different stage with json transforming operation.
>>>> So how to solve it? Thanks!
>>>>
>>>> Regard,
>>>> Junfeng Chen
>>>>
>>>
>>>
>>
>


Re: Nullpointerexception error when in repartition

2018-04-12 Thread Junfeng Chen
Hi, Tathagata

I have tried structured streaming, but in line

> Dataset rowDataset = spark.read().json(jsondataset);


Always throw

> Queries with streaming sources must be executed with writeStream.start()


But what i need to do in this step is only transforming json string data to
Dataset . How to fix it?

Thanks!


Regard,
Junfeng Chen

On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> It's not very surprising that doing this sort of RDD to DF conversion
> inside DStream.foreachRDD has weird corner cases like this. In fact, you
> are going to have additional problems with partial parquet files (when
> there are failures) in this approach. I strongly suggest that you use
> Structured Streaming, which is designed to do this sort of processing. It
> will take care of tracking the written parquet files correctly.
>
> TD
>
> On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen <darou...@gmail.com> wrote:
>
>> I write a program to read some json data from kafka and purpose to save
>> them to parquet file on hdfs.
>> Here is my code:
>>
>>> JavaInputDstream stream = ...
>>> JavaDstream rdd = stream.map...
>>> rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD
>>> stringjavardd->{
>>> Dataset df = spark.read().json( stringjavardd ); // convert
>>> json to df
>>> JavaRDD rowJavaRDD = df.javaRDD().map...  //add some new fields
>>> StructType type = df.schema()...; // constuct new type for new added
>>> fields
>>> Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type);
>>> //create new dataframe
>>> newdf.repatition(taskNum).write().mode(SaveMode.Append).pati
>>> tionedBy("appname").parquet(savepath); // save to parquet
>>> })
>>
>>
>>
>> However, if I remove the repartition method of newdf in writing parquet
>> stage, the program always throw nullpointerexception error in json convert
>> line:
>>
>> Java.lang.NullPointerException
>>>  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.
>>> scala:1783)
>>> ...
>>
>>
>> While it looks make no sense, writing parquet operation should be in
>> different stage with json transforming operation.
>> So how to solve it? Thanks!
>>
>> Regard,
>> Junfeng Chen
>>
>
>


Nullpointerexception error when in repartition

2018-04-11 Thread Junfeng Chen
I write a program to read some json data from kafka and purpose to save
them to parquet file on hdfs.
Here is my code:

> JavaInputDstream stream = ...
> JavaDstream rdd = stream.map...
> rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD
> stringjavardd->{
> Dataset df = spark.read().json( stringjavardd ); // convert json
> to df
> JavaRDD rowJavaRDD = df.javaRDD().map...  //add some new fields
> StructType type = df.schema()...; // constuct new type for new added
> fields
> Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type); //create
> new dataframe
>
> newdf.repatition(taskNum).write().mode(SaveMode.Append).patitionedBy("appname").parquet(savepath);
> // save to parquet
> })



However, if I remove the repartition method of newdf in writing parquet
stage, the program always throw nullpointerexception error in json convert
line:

Java.lang.NullPointerException
>  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1783)
> ...


While it looks make no sense, writing parquet operation should be in
different stage with json transforming operation.
So how to solve it? Thanks!

Regard,
Junfeng Chen


Re: spark application running in yarn client mode is slower than in local mode.

2018-04-10 Thread Junfeng Chen
But I still have one question. I find the task number in stage is 3. So
where is this 3 from? How to increase the parallelism?


Regard,
Junfeng Chen

On Tue, Apr 10, 2018 at 11:31 AM, Junfeng Chen <darou...@gmail.com> wrote:

> Yeah, I have increase the executor number and executor cores, and it runs
> normally now.  The hdp spark 2 have only 2 executor and 1 executor cores by
> default.
>
>
> Regard,
> Junfeng Chen
>
> On Tue, Apr 10, 2018 at 10:19 AM, Saisai Shao <sai.sai.s...@gmail.com>
> wrote:
>
>> In yarn mode, only two executor are assigned to process the task, since
>>> one executor can process one task only, they need 6 min in total.
>>>
>>
>> This is not true. You should set --executor-cores/--num-executors to
>> increase the task parallelism for executor. To be fair, Spark application
>> should have same resources (cpu/memory) when comparing between local and
>> yarn mode.
>>
>> 2018-04-10 10:05 GMT+08:00 Junfeng Chen <darou...@gmail.com>:
>>
>>> I found the potential reason.
>>>
>>> In local mode, all tasks in one stage runs concurrently, while tasks in
>>> yarn mode runs in sequence.
>>>
>>> For example, in one stage, each task costs 3 mins. If in local mode,
>>> they will run together, and cost 3 min in total. In yarn mode, only two
>>> executor are assigned to process the task, since one executor can process
>>> one task only, they need 6 min in total.
>>>
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>> On Mon, Apr 9, 2018 at 2:12 PM, Jörn Franke <jornfra...@gmail.com>
>>> wrote:
>>>
>>>> Probably network / shuffling cost? Or broadcast variables? Can you
>>>> provide more details what you do and some timings?
>>>>
>>>> > On 9. Apr 2018, at 07:07, Junfeng Chen <darou...@gmail.com> wrote:
>>>> >
>>>> > I have wrote an spark streaming application reading kafka data and
>>>> convert the json data to parquet and save to hdfs.
>>>> > What make me puzzled is, the processing time of app in yarn mode cost
>>>> 20% to 50% more time than in local mode. My cluster have three nodes with
>>>> three node managers, and all three hosts have same hardware, 40cores and
>>>> 256GB memory. .
>>>> >
>>>> > Why? How to solve it?
>>>> >
>>>> > Regard,
>>>> > Junfeng Chen
>>>>
>>>
>>>
>>
>


Re: spark application running in yarn client mode is slower than in local mode.

2018-04-09 Thread Junfeng Chen
Yeah, I have increase the executor number and executor cores, and it runs
normally now.  The hdp spark 2 have only 2 executor and 1 executor cores by
default.


Regard,
Junfeng Chen

On Tue, Apr 10, 2018 at 10:19 AM, Saisai Shao <sai.sai.s...@gmail.com>
wrote:

> In yarn mode, only two executor are assigned to process the task, since
>> one executor can process one task only, they need 6 min in total.
>>
>
> This is not true. You should set --executor-cores/--num-executors to
> increase the task parallelism for executor. To be fair, Spark application
> should have same resources (cpu/memory) when comparing between local and
> yarn mode.
>
> 2018-04-10 10:05 GMT+08:00 Junfeng Chen <darou...@gmail.com>:
>
>> I found the potential reason.
>>
>> In local mode, all tasks in one stage runs concurrently, while tasks in
>> yarn mode runs in sequence.
>>
>> For example, in one stage, each task costs 3 mins. If in local mode, they
>> will run together, and cost 3 min in total. In yarn mode, only two executor
>> are assigned to process the task, since one executor can process one task
>> only, they need 6 min in total.
>>
>>
>> Regard,
>> Junfeng Chen
>>
>> On Mon, Apr 9, 2018 at 2:12 PM, Jörn Franke <jornfra...@gmail.com> wrote:
>>
>>> Probably network / shuffling cost? Or broadcast variables? Can you
>>> provide more details what you do and some timings?
>>>
>>> > On 9. Apr 2018, at 07:07, Junfeng Chen <darou...@gmail.com> wrote:
>>> >
>>> > I have wrote an spark streaming application reading kafka data and
>>> convert the json data to parquet and save to hdfs.
>>> > What make me puzzled is, the processing time of app in yarn mode cost
>>> 20% to 50% more time than in local mode. My cluster have three nodes with
>>> three node managers, and all three hosts have same hardware, 40cores and
>>> 256GB memory. .
>>> >
>>> > Why? How to solve it?
>>> >
>>> > Regard,
>>> > Junfeng Chen
>>>
>>
>>
>


Re: spark application running in yarn client mode is slower than in local mode.

2018-04-09 Thread Junfeng Chen
I found the potential reason.

In local mode, all tasks in one stage runs concurrently, while tasks in
yarn mode runs in sequence.

For example, in one stage, each task costs 3 mins. If in local mode, they
will run together, and cost 3 min in total. In yarn mode, only two executor
are assigned to process the task, since one executor can process one task
only, they need 6 min in total.


Regard,
Junfeng Chen

On Mon, Apr 9, 2018 at 2:12 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> Probably network / shuffling cost? Or broadcast variables? Can you provide
> more details what you do and some timings?
>
> > On 9. Apr 2018, at 07:07, Junfeng Chen <darou...@gmail.com> wrote:
> >
> > I have wrote an spark streaming application reading kafka data and
> convert the json data to parquet and save to hdfs.
> > What make me puzzled is, the processing time of app in yarn mode cost
> 20% to 50% more time than in local mode. My cluster have three nodes with
> three node managers, and all three hosts have same hardware, 40cores and
> 256GB memory. .
> >
> > Why? How to solve it?
> >
> > Regard,
> > Junfeng Chen
>


Re: spark application running in yarn client mode is slower than in local mode.

2018-04-09 Thread Junfeng Chen
Hi Jorn,

I checked the log info of my application:
The ResultStage3 (parquet writing) cost a very long time,nearly 300s, where
the total processing time of this loop is 6 mins.


Regard,
Junfeng Chen

On Mon, Apr 9, 2018 at 2:12 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> Probably network / shuffling cost? Or broadcast variables? Can you provide
> more details what you do and some timings?
>
> > On 9. Apr 2018, at 07:07, Junfeng Chen <darou...@gmail.com> wrote:
> >
> > I have wrote an spark streaming application reading kafka data and
> convert the json data to parquet and save to hdfs.
> > What make me puzzled is, the processing time of app in yarn mode cost
> 20% to 50% more time than in local mode. My cluster have three nodes with
> three node managers, and all three hosts have same hardware, 40cores and
> 256GB memory. .
> >
> > Why? How to solve it?
> >
> > Regard,
> > Junfeng Chen
>


Re: spark application running in yarn client mode is slower than in local mode.

2018-04-09 Thread Junfeng Chen
hi,

My kafka topic has three partitions.  The time cost I mentioned means ,
each streaming loop cost more time with yarn client mode. For example yarn
mode cost 300 seconds to process some data, and local mode just cost 200
seconds  to process similar amount of data.


Regard,
Junfeng Chen

On Mon, Apr 9, 2018 at 2:20 PM, Gopala Krishna Manchukonda <
gopala_krishna_manchuko...@apple.com> wrote:

> Hi Junfeng ,
>
> Is your kafka topic partitioned?
>
> Are you referring to the duration or the CPU time spent by the job as
> being 20% - 50% higher than running in local?
>
> Thanks & Regards
> Gopal
>
>
> > On 09-Apr-2018, at 11:42 AM, Jörn Franke <jornfra...@gmail.com> wrote:
> >
> > Probably network / shuffling cost? Or broadcast variables? Can you
> provide more details what you do and some timings?
> >
> >> On 9. Apr 2018, at 07:07, Junfeng Chen <darou...@gmail.com> wrote:
> >>
> >> I have wrote an spark streaming application reading kafka data and
> convert the json data to parquet and save to hdfs.
> >> What make me puzzled is, the processing time of app in yarn mode cost
> 20% to 50% more time than in local mode. My cluster have three nodes with
> three node managers, and all three hosts have same hardware, 40cores and
> 256GB memory. .
> >>
> >> Why? How to solve it?
> >>
> >> Regard,
> >> Junfeng Chen
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
>


Re: spark application running in yarn client mode is slower than in local mode.

2018-04-09 Thread Junfeng Chen
I read json string value from kafka, then transform them to df:

Dataset df = spark.read().json(stringjavaRDD);


Then add some new data to each row:

> JavaRDD rowJavaRDD = df.javaRDD().map(...)
> StructType type = df.schema().add()
> Dataset newdf = spark.createDataFrame(rowJavaRDD,type);


...

At last write the dataset to parquet file

newdf.write().mode(SaveMode.Append).partitionedBy("stream","appname","year","month","day","hour").parquet(savePath);


How to determine if it is caused by shuffle or broadcast?


Regard,
Junfeng Chen

On Mon, Apr 9, 2018 at 2:12 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> Probably network / shuffling cost? Or broadcast variables? Can you provide
> more details what you do and some timings?
>
> > On 9. Apr 2018, at 07:07, Junfeng Chen <darou...@gmail.com> wrote:
> >
> > I have wrote an spark streaming application reading kafka data and
> convert the json data to parquet and save to hdfs.
> > What make me puzzled is, the processing time of app in yarn mode cost
> 20% to 50% more time than in local mode. My cluster have three nodes with
> three node managers, and all three hosts have same hardware, 40cores and
> 256GB memory. .
> >
> > Why? How to solve it?
> >
> > Regard,
> > Junfeng Chen
>


spark application running in yarn client mode is slower than in local mode.

2018-04-08 Thread Junfeng Chen
I have wrote an spark streaming application reading kafka data and convert
the json data to parquet and save to hdfs.
What make me puzzled is, the processing time of app in yarn mode cost 20%
to 50% more time than in local mode. My cluster have three nodes with three
node managers, and all three hosts have same hardware, 40cores and 256GB
memory. .

Why? How to solve it?

Regard,
Junfeng Chen


Re: How to delete empty columns in df when writing to parquet?

2018-04-07 Thread Junfeng Chen
Hi,
Thanks for explaining!


Regard,
Junfeng Chen

On Wed, Apr 4, 2018 at 7:43 PM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi,
>
> I do not think that in a columnar database it makes much of a difference.
> The amount of data that you will be parsing will not be much anyways.
>
> Regards,
> Gourav Sengupta
>
> On Wed, Apr 4, 2018 at 11:02 AM, Junfeng Chen <darou...@gmail.com> wrote:
>
>> Our users ask for it
>>
>>
>> Regard,
>> Junfeng Chen
>>
>> On Wed, Apr 4, 2018 at 5:45 PM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi Junfeng,
>>>
>>> can I ask why it is important to remove the empty column?
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Tue, Apr 3, 2018 at 4:28 AM, Junfeng Chen <darou...@gmail.com> wrote:
>>>
>>>> I am trying to read data from kafka and writing them in parquet format
>>>> via Spark Streaming.
>>>> The problem is, the data from kafka are in variable data structure. For
>>>> example, app one has columns A,B,C, app two has columns B,C,D. So the data
>>>> frame I read from kafka has all columns ABCD. When I decide to write the
>>>> dataframe to parquet file partitioned with app name,
>>>> the parquet file of app one also contains columns D, where the columns
>>>> D is empty and it contains no data actually. So how to filter the empty
>>>> columns when I writing dataframe to parquet?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>> Regard,
>>>> Junfeng Chen
>>>>
>>>
>>>
>>
>


Re: How to delete empty columns in df when writing to parquet?

2018-04-04 Thread Junfeng Chen
Our users ask for it


Regard,
Junfeng Chen

On Wed, Apr 4, 2018 at 5:45 PM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi Junfeng,
>
> can I ask why it is important to remove the empty column?
>
> Regards,
> Gourav Sengupta
>
> On Tue, Apr 3, 2018 at 4:28 AM, Junfeng Chen <darou...@gmail.com> wrote:
>
>> I am trying to read data from kafka and writing them in parquet format
>> via Spark Streaming.
>> The problem is, the data from kafka are in variable data structure. For
>> example, app one has columns A,B,C, app two has columns B,C,D. So the data
>> frame I read from kafka has all columns ABCD. When I decide to write the
>> dataframe to parquet file partitioned with app name,
>> the parquet file of app one also contains columns D, where the columns D
>> is empty and it contains no data actually. So how to filter the empty
>> columns when I writing dataframe to parquet?
>>
>> Thanks!
>>
>>
>> Regard,
>> Junfeng Chen
>>
>
>


Re: How to delete empty columns in df when writing to parquet?

2018-04-03 Thread Junfeng Chen
You mean I should start two spark streaming application and read topics
respectively?


Regard,
Junfeng Chen

On Tue, Apr 3, 2018 at 10:31 PM, naresh Goud <nareshgoud.du...@gmail.com>
wrote:

> I don’t see any option other than staring two individual queries. It’s
> just a thought.
>
> Thank you,
> Naresh
>
> On Mon, Apr 2, 2018 at 10:29 PM Junfeng Chen <darou...@gmail.com> wrote:
>
>> I am trying to read data from kafka and writing them in parquet format
>> via Spark Streaming.
>> The problem is, the data from kafka are in variable data structure. For
>> example, app one has columns A,B,C, app two has columns B,C,D. So the data
>> frame I read from kafka has all columns ABCD. When I decide to write the
>> dataframe to parquet file partitioned with app name,
>> the parquet file of app one also contains columns D, where the columns D
>> is empty and it contains no data actually. So how to filter the empty
>> columns when I writing dataframe to parquet?
>>
>> Thanks!
>>
>>
>> Regard,
>> Junfeng Chen
>>
> --
> Thanks,
> Naresh
> www.linkedin.com/in/naresh-dulam
> http://hadoopandspark.blogspot.com/
>
>


How to delete empty columns in df when writing to parquet?

2018-04-02 Thread Junfeng Chen
I am trying to read data from kafka and writing them in parquet format via
Spark Streaming.
The problem is, the data from kafka are in variable data structure. For
example, app one has columns A,B,C, app two has columns B,C,D. So the data
frame I read from kafka has all columns ABCD. When I decide to write the
dataframe to parquet file partitioned with app name,
the parquet file of app one also contains columns D, where the columns D is
empty and it contains no data actually. So how to filter the empty columns
when I writing dataframe to parquet?

Thanks!


Regard,
Junfeng Chen


[Spark Java] Add new column in DataSet based on existed column

2018-03-28 Thread Junfeng Chen
I am working on adding a date transformed field on existed dataset.

The current dataset contains a column named timestamp in ISO format. I want
to parse this field to joda time type, and then extract the year, month,
day, hour info as new column attaching to original dataset.
I have tried df.withColumn function, but it seems only support simple
expression rather than customized function like MapFunction.
How to solve it?

Thanks!



Regard,
Junfeng Chen


Queries with streaming sources must be executed with writeStream.start();;

2018-03-27 Thread Junfeng Chen
I am reading some data from kafka, and willing to save them to parquet on
hdfs with structured streaming.
The data from kafka is in JSON format. I try to convert them to
DataSet with spark.read.json(). However, I get the exception:
>
> Queries with streaming sources must be executed with
> writeStream.start()

Here is my code:
>
> Dataset df = spark.readStream().format("kafka")...
> Dataset jsonDataset = df.selectExpr("CAST(value AS STRING)").map...
> Dataset rowDataset = spark.read().json(jsonDataset);
>
> rowDataset.writeStream().outputMode(OutputMode.Append()).partitionBy("appname").format("parquet").option("path",savePath).start().awaitTermination();



How to solve it?

Thanks!

Regard,
Junfeng Chen


DataSet save to parquet partition problem

2018-03-08 Thread Junfeng Chen
I am trying to save a DataSet object to parquet file via

> df.write().partitionBy("...").parquet(path)


while this dataset contains the following struct:
time: struct
-dayOfMonth
-monthOfYear
...

Can I use the child field like time.monthOfYear as above in partition ? If
yes, how?

Thanks!

Regard,
Junfeng Chen


Re: Reading kafka and save to parquet problem

2018-03-07 Thread Junfeng Chen
I have ever tried to use readStream and writeStream, but it throw "Uri
without authority: hdfs:/data/_spark_metadata" exception, which is not seen
in normal read mode.
The parquet path I specified is  hdfs:///data


Regard,
Junfeng Chen

On Thu, Mar 8, 2018 at 9:38 AM, naresh Goud <nareshgoud.du...@gmail.com>
wrote:

> change it to readStream instead of read as below
>
> val df = spark
>   .readStream
> .format("kafka")
>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>   .option("subscribe", "topic1")
>   .load()
>
>
> Check is this helpful
>
> https://github.com/ndulam/KafkaSparkStreams/blob/master/SampleStreamApp/src/main/scala/com/naresh/org/SensorDataSave.scala
>
>
>
>
>
>
>
>
> Thanks,
> Naresh
> www.linkedin.com/in/naresh-dulam
> http://hadoopandspark.blogspot.com/
>
>
>
> On Wed, Mar 7, 2018 at 7:33 PM Junfeng Chen <darou...@gmail.com> wrote:
>
>> I am struggling in trying to read data in kafka and save them to parquet
>> file on hdfs by using spark streaming according to this post
>> https://stackoverflow.com/questions/45827664/read-
>> from-kafka-and-write-to-hdfs-in-parquet
>>
>> My code is similar to  following
>>
>> val df = spark
>>   .read
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>>   .option("subscribe", "topic1")
>>   .load()
>> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>>   .as[(String, String)]
>>
>>   .write.parquet("hdfs://data.parquet")
>>
>>
>> What the difference is I am writing in Java language.
>>
>> But in practice, this code just run once and then exit gracefully.
>> Although it produces the parquet file successfully and no any exception is
>> threw out , it runs like a normal spark program rather than a spark
>> streaming program.
>>
>> What should I do if want to read kafka and save them to parquet in batch?
>>
>> Regard,
>> Junfeng Chen
>>
>


Reading kafka and save to parquet problem

2018-03-07 Thread Junfeng Chen
I am struggling in trying to read data in kafka and save them to parquet
file on hdfs by using spark streaming according to this post
https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet

My code is similar to  following

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

  .write.parquet("hdfs://data.parquet")


What the difference is I am writing in Java language.

But in practice, this code just run once and then exit gracefully. Although
it produces the parquet file successfully and no any exception is threw out
, it runs like a normal spark program rather than a spark streaming program.

What should I do if want to read kafka and save them to parquet in batch?

Regard,
Junfeng Chen


Re: CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning

2018-03-06 Thread Junfeng Chen
Spark 2.1.1.

Actually it is a warning rather than an exception, so there is no stack
trace. Just many this line:

> CachedKafkaConsumer: CachedKafkaConsumer is not running in
> UninterruptibleThread. It may hang when CachedKafkaConsumer's method are
> interrupted because of KAFKA-1894.



Regard,
Junfeng Chen

On Wed, Mar 7, 2018 at 3:34 AM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Which version of Spark are you using? And can you give us the full stack
> trace of the exception?
>
> On Tue, Mar 6, 2018 at 1:53 AM, Junfeng Chen <darou...@gmail.com> wrote:
>
>> I am trying to read kafka and save the data as parquet file on hdfs
>> according to this  https://stackoverflow.com/questions/45827664/read-from
>> -kafka-and-write-to-hdfs-in-parquet
>> <https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet>
>>
>>
>> The code is similar to :
>>
>> val df = spark
>>   .read
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>>   .option("subscribe", "topic1")
>>   .load()
>>
>> while I am writing in Java.
>>
>> However, I keep throwing the following warning:
>> CachedKafkaConsumer: CachedKafkaConsumer is not running in
>> UninterruptibleThread. It may hang when CachedKafkaConsumer's method are
>> interrupted because of KAFKA-1894.
>>
>> How to solve it? Thanks!
>>
>>
>> Regard,
>> Junfeng Chen
>>
>
>


CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning

2018-03-06 Thread Junfeng Chen
I am trying to read kafka and save the data as parquet file on hdfs
according to this
https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet
<https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet>


The code is similar to :

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

while I am writing in Java.

However, I keep throwing the following warning:
CachedKafkaConsumer: CachedKafkaConsumer is not running in
UninterruptibleThread. It may hang when CachedKafkaConsumer's method are
interrupted because of KAFKA-1894.

How to solve it? Thanks!


Regard,
Junfeng Chen


spark streaming with flume: cannot assign requested address error

2017-12-13 Thread Junfeng Chen
I am trying to connect spark streaming with flume with pull mode.

I have three machine and each one runs spark  and flume agent at the same
time, where they are master, slave1, slave2.
I have set flume sink to slave1 on port 6689. Telnet slave1 6689 on other
two machine works well.

In my code, I set FlumeUtils.createStream(ssc,"slave1",6689), and submit it
on master  machine with --master local[2].

Then it throw the error:
ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting
receiver 0 - org.jboss.netty.channel.ChannelException: Failed to bind to:
slave1/10.25.*.*:6689

Caused by: java.net.BindException: Cannot assign requested address

I have read this thread
https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Streaming-Fails-on-Cluster-mode-Flume-as-source/m-p/25577/highlight/true#M621
but i am sure no process use this port by checking netstat -anp | grep
6689. Also the ip address 10.25.** is not a routable address.

So someone can help me to solve it?


Regard,
Junfeng Chen


Re: Add snappy support for spark in Windows

2017-12-04 Thread Junfeng Chen
I have put winutils and hadoop.dll within HADOOP_HOME, and spark works well
with it, but snappy decompress function throw the above exception.


Regard,
Junfeng Chen

On Mon, Dec 4, 2017 at 7:07 PM, Qiao, Richard <richard.q...@capitalone.com>
wrote:

> Junjeng, it worth a try to start your spark local with
> hadoop.dll/winutils.exe etc hadoop windows support package in HADOOP_HOME,
> if you didn’t do that yet.
>
>
>
> Best Regards
>
> Richard
>
>
>
>
>
> *From: *Junfeng Chen <darou...@gmail.com>
> *Date: *Monday, December 4, 2017 at 3:53 AM
> *To: *"Qiao, Richard" <richard.q...@capitalone.com>
> *Cc: *"user@spark.apache.org" <user@spark.apache.org>
> *Subject: *Re: Add snappy support for spark in Windows
>
>
>
> But I am working on my local development machine, so it should have no
> relative to workers/executers.
>
>
>
> I find some documents about enable snappy on hadoop. If I want to use
> snappy with spark, do I need to config spark as hadoop or have some easy
> way to access it?
>
>
>
>
> Regard,
> Junfeng Chen
>
>
>
> On Mon, Dec 4, 2017 at 4:12 PM, Qiao, Richard <richard.q...@capitalone.com>
> wrote:
>
> It seems a common mistake that the path is not accessible by
> workers/executors.
>
>
>
> Best regards
>
> Richard
>
> Sent from my iPhone
>
>
> On Dec 3, 2017, at 22:32, Junfeng Chen <darou...@gmail.com> wrote:
>
> I am working on importing snappy compressed json file into spark rdd or
> dataset. However I meet this error: java.lang.UnsatisfiedLinkError:
> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
>
> I have set the following configuration:
>
> SparkConf conf = new SparkConf()
>
> .setAppName("normal spark")
>
> .setMaster("local")
>
> .set("spark.io.compression.codec", 
> "org.apache.spark.io.SnappyCompressionCodec")
>
> 
> .set("spark.driver.extraLibraryPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")
>
> 
> .set("spark.driver.extraClassPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")
>
> 
> .set("spark.executor.extraLibraryPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")
>
> 
> .set("spark.executor.extraClassPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")
>
> ;
>
> Where D:\Downloads\spark-2.2.0-bin-hadoop2.7 is my spark unpacked path,
> and I can find the snappy jar file snappy-0.2.jar and
> snappy-java-1.1.2.6.jar in
>
> D:\Downloads\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\jars\
>
> However nothing works and even the error message not change.
>
> How can I fix it?
>
>
>
> ref of stackoverflow: https://stackoverflow.com/questions/
> 47626012/config-snappy-support-for-spark-in-windows
> <https://stackoverflow.com/questions/47626012/config-snappy-support-for-spark-in-windows>
>
>
>
>
>
>
> Regard,
> Junfeng Chen
>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: Add snappy support for spark in Windows

2017-12-04 Thread Junfeng Chen
But I am working on my local development machine, so it should have no
relative to workers/executers.

I find some documents about enable snappy on hadoop. If I want to use
snappy with spark, do I need to config spark as hadoop or have some easy
way to access it?


Regard,
Junfeng Chen

On Mon, Dec 4, 2017 at 4:12 PM, Qiao, Richard <richard.q...@capitalone.com>
wrote:

> It seems a common mistake that the path is not accessible by
> workers/executors.
>
> Best regards
> Richard
>
> Sent from my iPhone
>
> On Dec 3, 2017, at 22:32, Junfeng Chen <darou...@gmail.com> wrote:
>
> I am working on importing snappy compressed json file into spark rdd or
> dataset. However I meet this error: java.lang.UnsatisfiedLinkError:
> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
>
> I have set the following configuration:
>
> SparkConf conf = new SparkConf()
> .setAppName("normal spark")
> .setMaster("local")
> .set("spark.io.compression.codec", 
> "org.apache.spark.io.SnappyCompressionCodec")
> 
> .set("spark.driver.extraLibraryPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")
> 
> .set("spark.driver.extraClassPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")
> 
> .set("spark.executor.extraLibraryPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")
> 
> .set("spark.executor.extraClassPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")
> ;
>
> Where D:\Downloads\spark-2.2.0-bin-hadoop2.7 is my spark unpacked path,
> and I can find the snappy jar file snappy-0.2.jar and
> snappy-java-1.1.2.6.jar in
>
> D:\Downloads\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\jars\
>
> However nothing works and even the error message not change.
>
> How can I fix it?
>
>
> ref of stackoverflow: https://stackoverflow.com/questions/47626012/
> config-snappy-support-for-spark-in-windows
> <https://stackoverflow.com/questions/47626012/config-snappy-support-for-spark-in-windows>
>
>
>
> Regard,
> Junfeng Chen
>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Add snappy support for spark in Windows

2017-12-03 Thread Junfeng Chen
I am working on importing snappy compressed json file into spark rdd or
dataset. However I meet this error: java.lang.UnsatisfiedLinkError:
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z

I have set the following configuration:

SparkConf conf = new SparkConf()
.setAppName("normal spark")
.setMaster("local")
.set("spark.io.compression.codec",
"org.apache.spark.io.SnappyCompressionCodec")

.set("spark.driver.extraLibraryPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")

.set("spark.driver.extraClassPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")

.set("spark.executor.extraLibraryPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")

.set("spark.executor.extraClassPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")
;

Where D:\Downloads\spark-2.2.0-bin-hadoop2.7 is my spark unpacked path, and
I can find the snappy jar file snappy-0.2.jar and snappy-java-1.1.2.6.jar in

D:\Downloads\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\jars\

However nothing works and even the error message not change.

How can I fix it?


ref of stackoverflow: https://stackoverflow.com/questions/
47626012/config-snappy-support-for-spark-in-windows
<https://stackoverflow.com/questions/47626012/config-snappy-support-for-spark-in-windows>



Regard,
Junfeng Chen