Transforming json string in structured streaming problem
Hi all, I need to read some string data in json format from kafka, and convert them to dataframe and write to parquet file at last. But now I meet some problems. The spark.readStream().json() can only support json file on a specified location, cannot support Dataset like spark.read.json. I found some potential solution in https://stackoverflow.com/questions/48617474/how-to-convert-json-dataset-to-dataframe-in-spark-structured-streaming , but it needs to construct the StructType, while the structure of my json data is variable. So how to solve it? Thanks! Regard, Junfeng Chen
Re: Nullpointerexception error when in repartition
Hi, I know it, but my purpose it to transforming json string in DataSet to Dataset, while spark.readStream can only support read json file in specified path. https://stackoverflow.com/questions/48617474/how-to-convert-json-dataset-to-dataframe-in-spark-structured-streaming gives an essential method, but the formats of every json data are not same. Either Spark java api seems not supporting grammer like .select(from_json($"value", colourSchema)) Regard, Junfeng Chen On Fri, Apr 13, 2018 at 7:09 AM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Have you read through the documentation of Structured Streaming? > https://spark.apache.org/docs/latest/structured-streaming- > programming-guide.html > > One of the basic mistakes you are making is defining the dataset as with > `spark.read()`. You define a streaming Dataset as `spark.readStream()` > > On Thu, Apr 12, 2018 at 3:02 AM, Junfeng Chen <darou...@gmail.com> wrote: > >> Hi, Tathagata >> >> I have tried structured streaming, but in line >> >>> Dataset rowDataset = spark.read().json(jsondataset); >> >> >> Always throw >> >>> Queries with streaming sources must be executed with writeStream.start() >> >> >> But what i need to do in this step is only transforming json string data >> to Dataset . How to fix it? >> >> Thanks! >> >> >> Regard, >> Junfeng Chen >> >> On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> It's not very surprising that doing this sort of RDD to DF conversion >>> inside DStream.foreachRDD has weird corner cases like this. In fact, you >>> are going to have additional problems with partial parquet files (when >>> there are failures) in this approach. I strongly suggest that you use >>> Structured Streaming, which is designed to do this sort of processing. It >>> will take care of tracking the written parquet files correctly. >>> >>> TD >>> >>> On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen <darou...@gmail.com> >>> wrote: >>> >>>> I write a program to read some json data from kafka and purpose to save >>>> them to parquet file on hdfs. >>>> Here is my code: >>>> >>>>> JavaInputDstream stream = ... >>>>> JavaDstream rdd = stream.map... >>>>> rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD >>>>> stringjavardd->{ >>>>> Dataset df = spark.read().json( stringjavardd ); // convert >>>>> json to df >>>>> JavaRDD rowJavaRDD = df.javaRDD().map... //add some new >>>>> fields >>>>> StructType type = df.schema()...; // constuct new type for new >>>>> added fields >>>>> Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type); >>>>> //create new dataframe >>>>> newdf.repatition(taskNum).write().mode(SaveMode.Append).pati >>>>> tionedBy("appname").parquet(savepath); // save to parquet >>>>> }) >>>> >>>> >>>> >>>> However, if I remove the repartition method of newdf in writing parquet >>>> stage, the program always throw nullpointerexception error in json convert >>>> line: >>>> >>>> Java.lang.NullPointerException >>>>> at org.apache.spark.SparkContext.getPreferredLocs(SparkContext. >>>>> scala:1783) >>>>> ... >>>> >>>> >>>> While it looks make no sense, writing parquet operation should be in >>>> different stage with json transforming operation. >>>> So how to solve it? Thanks! >>>> >>>> Regard, >>>> Junfeng Chen >>>> >>> >>> >> >
Re: Nullpointerexception error when in repartition
Hi, Tathagata I have tried structured streaming, but in line > Dataset rowDataset = spark.read().json(jsondataset); Always throw > Queries with streaming sources must be executed with writeStream.start() But what i need to do in this step is only transforming json string data to Dataset . How to fix it? Thanks! Regard, Junfeng Chen On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > It's not very surprising that doing this sort of RDD to DF conversion > inside DStream.foreachRDD has weird corner cases like this. In fact, you > are going to have additional problems with partial parquet files (when > there are failures) in this approach. I strongly suggest that you use > Structured Streaming, which is designed to do this sort of processing. It > will take care of tracking the written parquet files correctly. > > TD > > On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen <darou...@gmail.com> wrote: > >> I write a program to read some json data from kafka and purpose to save >> them to parquet file on hdfs. >> Here is my code: >> >>> JavaInputDstream stream = ... >>> JavaDstream rdd = stream.map... >>> rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD >>> stringjavardd->{ >>> Dataset df = spark.read().json( stringjavardd ); // convert >>> json to df >>> JavaRDD rowJavaRDD = df.javaRDD().map... //add some new fields >>> StructType type = df.schema()...; // constuct new type for new added >>> fields >>> Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type); >>> //create new dataframe >>> newdf.repatition(taskNum).write().mode(SaveMode.Append).pati >>> tionedBy("appname").parquet(savepath); // save to parquet >>> }) >> >> >> >> However, if I remove the repartition method of newdf in writing parquet >> stage, the program always throw nullpointerexception error in json convert >> line: >> >> Java.lang.NullPointerException >>> at org.apache.spark.SparkContext.getPreferredLocs(SparkContext. >>> scala:1783) >>> ... >> >> >> While it looks make no sense, writing parquet operation should be in >> different stage with json transforming operation. >> So how to solve it? Thanks! >> >> Regard, >> Junfeng Chen >> > >
Nullpointerexception error when in repartition
I write a program to read some json data from kafka and purpose to save them to parquet file on hdfs. Here is my code: > JavaInputDstream stream = ... > JavaDstream rdd = stream.map... > rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD > stringjavardd->{ > Dataset df = spark.read().json( stringjavardd ); // convert json > to df > JavaRDD rowJavaRDD = df.javaRDD().map... //add some new fields > StructType type = df.schema()...; // constuct new type for new added > fields > Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type); //create > new dataframe > > newdf.repatition(taskNum).write().mode(SaveMode.Append).patitionedBy("appname").parquet(savepath); > // save to parquet > }) However, if I remove the repartition method of newdf in writing parquet stage, the program always throw nullpointerexception error in json convert line: Java.lang.NullPointerException > at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1783) > ... While it looks make no sense, writing parquet operation should be in different stage with json transforming operation. So how to solve it? Thanks! Regard, Junfeng Chen
Re: spark application running in yarn client mode is slower than in local mode.
But I still have one question. I find the task number in stage is 3. So where is this 3 from? How to increase the parallelism? Regard, Junfeng Chen On Tue, Apr 10, 2018 at 11:31 AM, Junfeng Chen <darou...@gmail.com> wrote: > Yeah, I have increase the executor number and executor cores, and it runs > normally now. The hdp spark 2 have only 2 executor and 1 executor cores by > default. > > > Regard, > Junfeng Chen > > On Tue, Apr 10, 2018 at 10:19 AM, Saisai Shao <sai.sai.s...@gmail.com> > wrote: > >> In yarn mode, only two executor are assigned to process the task, since >>> one executor can process one task only, they need 6 min in total. >>> >> >> This is not true. You should set --executor-cores/--num-executors to >> increase the task parallelism for executor. To be fair, Spark application >> should have same resources (cpu/memory) when comparing between local and >> yarn mode. >> >> 2018-04-10 10:05 GMT+08:00 Junfeng Chen <darou...@gmail.com>: >> >>> I found the potential reason. >>> >>> In local mode, all tasks in one stage runs concurrently, while tasks in >>> yarn mode runs in sequence. >>> >>> For example, in one stage, each task costs 3 mins. If in local mode, >>> they will run together, and cost 3 min in total. In yarn mode, only two >>> executor are assigned to process the task, since one executor can process >>> one task only, they need 6 min in total. >>> >>> >>> Regard, >>> Junfeng Chen >>> >>> On Mon, Apr 9, 2018 at 2:12 PM, Jörn Franke <jornfra...@gmail.com> >>> wrote: >>> >>>> Probably network / shuffling cost? Or broadcast variables? Can you >>>> provide more details what you do and some timings? >>>> >>>> > On 9. Apr 2018, at 07:07, Junfeng Chen <darou...@gmail.com> wrote: >>>> > >>>> > I have wrote an spark streaming application reading kafka data and >>>> convert the json data to parquet and save to hdfs. >>>> > What make me puzzled is, the processing time of app in yarn mode cost >>>> 20% to 50% more time than in local mode. My cluster have three nodes with >>>> three node managers, and all three hosts have same hardware, 40cores and >>>> 256GB memory. . >>>> > >>>> > Why? How to solve it? >>>> > >>>> > Regard, >>>> > Junfeng Chen >>>> >>> >>> >> >
Re: spark application running in yarn client mode is slower than in local mode.
Yeah, I have increase the executor number and executor cores, and it runs normally now. The hdp spark 2 have only 2 executor and 1 executor cores by default. Regard, Junfeng Chen On Tue, Apr 10, 2018 at 10:19 AM, Saisai Shao <sai.sai.s...@gmail.com> wrote: > In yarn mode, only two executor are assigned to process the task, since >> one executor can process one task only, they need 6 min in total. >> > > This is not true. You should set --executor-cores/--num-executors to > increase the task parallelism for executor. To be fair, Spark application > should have same resources (cpu/memory) when comparing between local and > yarn mode. > > 2018-04-10 10:05 GMT+08:00 Junfeng Chen <darou...@gmail.com>: > >> I found the potential reason. >> >> In local mode, all tasks in one stage runs concurrently, while tasks in >> yarn mode runs in sequence. >> >> For example, in one stage, each task costs 3 mins. If in local mode, they >> will run together, and cost 3 min in total. In yarn mode, only two executor >> are assigned to process the task, since one executor can process one task >> only, they need 6 min in total. >> >> >> Regard, >> Junfeng Chen >> >> On Mon, Apr 9, 2018 at 2:12 PM, Jörn Franke <jornfra...@gmail.com> wrote: >> >>> Probably network / shuffling cost? Or broadcast variables? Can you >>> provide more details what you do and some timings? >>> >>> > On 9. Apr 2018, at 07:07, Junfeng Chen <darou...@gmail.com> wrote: >>> > >>> > I have wrote an spark streaming application reading kafka data and >>> convert the json data to parquet and save to hdfs. >>> > What make me puzzled is, the processing time of app in yarn mode cost >>> 20% to 50% more time than in local mode. My cluster have three nodes with >>> three node managers, and all three hosts have same hardware, 40cores and >>> 256GB memory. . >>> > >>> > Why? How to solve it? >>> > >>> > Regard, >>> > Junfeng Chen >>> >> >> >
Re: spark application running in yarn client mode is slower than in local mode.
I found the potential reason. In local mode, all tasks in one stage runs concurrently, while tasks in yarn mode runs in sequence. For example, in one stage, each task costs 3 mins. If in local mode, they will run together, and cost 3 min in total. In yarn mode, only two executor are assigned to process the task, since one executor can process one task only, they need 6 min in total. Regard, Junfeng Chen On Mon, Apr 9, 2018 at 2:12 PM, Jörn Franke <jornfra...@gmail.com> wrote: > Probably network / shuffling cost? Or broadcast variables? Can you provide > more details what you do and some timings? > > > On 9. Apr 2018, at 07:07, Junfeng Chen <darou...@gmail.com> wrote: > > > > I have wrote an spark streaming application reading kafka data and > convert the json data to parquet and save to hdfs. > > What make me puzzled is, the processing time of app in yarn mode cost > 20% to 50% more time than in local mode. My cluster have three nodes with > three node managers, and all three hosts have same hardware, 40cores and > 256GB memory. . > > > > Why? How to solve it? > > > > Regard, > > Junfeng Chen >
Re: spark application running in yarn client mode is slower than in local mode.
Hi Jorn, I checked the log info of my application: The ResultStage3 (parquet writing) cost a very long time,nearly 300s, where the total processing time of this loop is 6 mins. Regard, Junfeng Chen On Mon, Apr 9, 2018 at 2:12 PM, Jörn Franke <jornfra...@gmail.com> wrote: > Probably network / shuffling cost? Or broadcast variables? Can you provide > more details what you do and some timings? > > > On 9. Apr 2018, at 07:07, Junfeng Chen <darou...@gmail.com> wrote: > > > > I have wrote an spark streaming application reading kafka data and > convert the json data to parquet and save to hdfs. > > What make me puzzled is, the processing time of app in yarn mode cost > 20% to 50% more time than in local mode. My cluster have three nodes with > three node managers, and all three hosts have same hardware, 40cores and > 256GB memory. . > > > > Why? How to solve it? > > > > Regard, > > Junfeng Chen >
Re: spark application running in yarn client mode is slower than in local mode.
hi, My kafka topic has three partitions. The time cost I mentioned means , each streaming loop cost more time with yarn client mode. For example yarn mode cost 300 seconds to process some data, and local mode just cost 200 seconds to process similar amount of data. Regard, Junfeng Chen On Mon, Apr 9, 2018 at 2:20 PM, Gopala Krishna Manchukonda < gopala_krishna_manchuko...@apple.com> wrote: > Hi Junfeng , > > Is your kafka topic partitioned? > > Are you referring to the duration or the CPU time spent by the job as > being 20% - 50% higher than running in local? > > Thanks & Regards > Gopal > > > > On 09-Apr-2018, at 11:42 AM, Jörn Franke <jornfra...@gmail.com> wrote: > > > > Probably network / shuffling cost? Or broadcast variables? Can you > provide more details what you do and some timings? > > > >> On 9. Apr 2018, at 07:07, Junfeng Chen <darou...@gmail.com> wrote: > >> > >> I have wrote an spark streaming application reading kafka data and > convert the json data to parquet and save to hdfs. > >> What make me puzzled is, the processing time of app in yarn mode cost > 20% to 50% more time than in local mode. My cluster have three nodes with > three node managers, and all three hosts have same hardware, 40cores and > 256GB memory. . > >> > >> Why? How to solve it? > >> > >> Regard, > >> Junfeng Chen > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > >
Re: spark application running in yarn client mode is slower than in local mode.
I read json string value from kafka, then transform them to df: Dataset df = spark.read().json(stringjavaRDD); Then add some new data to each row: > JavaRDD rowJavaRDD = df.javaRDD().map(...) > StructType type = df.schema().add() > Dataset newdf = spark.createDataFrame(rowJavaRDD,type); ... At last write the dataset to parquet file newdf.write().mode(SaveMode.Append).partitionedBy("stream","appname","year","month","day","hour").parquet(savePath); How to determine if it is caused by shuffle or broadcast? Regard, Junfeng Chen On Mon, Apr 9, 2018 at 2:12 PM, Jörn Franke <jornfra...@gmail.com> wrote: > Probably network / shuffling cost? Or broadcast variables? Can you provide > more details what you do and some timings? > > > On 9. Apr 2018, at 07:07, Junfeng Chen <darou...@gmail.com> wrote: > > > > I have wrote an spark streaming application reading kafka data and > convert the json data to parquet and save to hdfs. > > What make me puzzled is, the processing time of app in yarn mode cost > 20% to 50% more time than in local mode. My cluster have three nodes with > three node managers, and all three hosts have same hardware, 40cores and > 256GB memory. . > > > > Why? How to solve it? > > > > Regard, > > Junfeng Chen >
spark application running in yarn client mode is slower than in local mode.
I have wrote an spark streaming application reading kafka data and convert the json data to parquet and save to hdfs. What make me puzzled is, the processing time of app in yarn mode cost 20% to 50% more time than in local mode. My cluster have three nodes with three node managers, and all three hosts have same hardware, 40cores and 256GB memory. . Why? How to solve it? Regard, Junfeng Chen
Re: How to delete empty columns in df when writing to parquet?
Hi, Thanks for explaining! Regard, Junfeng Chen On Wed, Apr 4, 2018 at 7:43 PM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi, > > I do not think that in a columnar database it makes much of a difference. > The amount of data that you will be parsing will not be much anyways. > > Regards, > Gourav Sengupta > > On Wed, Apr 4, 2018 at 11:02 AM, Junfeng Chen <darou...@gmail.com> wrote: > >> Our users ask for it >> >> >> Regard, >> Junfeng Chen >> >> On Wed, Apr 4, 2018 at 5:45 PM, Gourav Sengupta < >> gourav.sengu...@gmail.com> wrote: >> >>> Hi Junfeng, >>> >>> can I ask why it is important to remove the empty column? >>> >>> Regards, >>> Gourav Sengupta >>> >>> On Tue, Apr 3, 2018 at 4:28 AM, Junfeng Chen <darou...@gmail.com> wrote: >>> >>>> I am trying to read data from kafka and writing them in parquet format >>>> via Spark Streaming. >>>> The problem is, the data from kafka are in variable data structure. For >>>> example, app one has columns A,B,C, app two has columns B,C,D. So the data >>>> frame I read from kafka has all columns ABCD. When I decide to write the >>>> dataframe to parquet file partitioned with app name, >>>> the parquet file of app one also contains columns D, where the columns >>>> D is empty and it contains no data actually. So how to filter the empty >>>> columns when I writing dataframe to parquet? >>>> >>>> Thanks! >>>> >>>> >>>> Regard, >>>> Junfeng Chen >>>> >>> >>> >> >
Re: How to delete empty columns in df when writing to parquet?
Our users ask for it Regard, Junfeng Chen On Wed, Apr 4, 2018 at 5:45 PM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi Junfeng, > > can I ask why it is important to remove the empty column? > > Regards, > Gourav Sengupta > > On Tue, Apr 3, 2018 at 4:28 AM, Junfeng Chen <darou...@gmail.com> wrote: > >> I am trying to read data from kafka and writing them in parquet format >> via Spark Streaming. >> The problem is, the data from kafka are in variable data structure. For >> example, app one has columns A,B,C, app two has columns B,C,D. So the data >> frame I read from kafka has all columns ABCD. When I decide to write the >> dataframe to parquet file partitioned with app name, >> the parquet file of app one also contains columns D, where the columns D >> is empty and it contains no data actually. So how to filter the empty >> columns when I writing dataframe to parquet? >> >> Thanks! >> >> >> Regard, >> Junfeng Chen >> > >
Re: How to delete empty columns in df when writing to parquet?
You mean I should start two spark streaming application and read topics respectively? Regard, Junfeng Chen On Tue, Apr 3, 2018 at 10:31 PM, naresh Goud <nareshgoud.du...@gmail.com> wrote: > I don’t see any option other than staring two individual queries. It’s > just a thought. > > Thank you, > Naresh > > On Mon, Apr 2, 2018 at 10:29 PM Junfeng Chen <darou...@gmail.com> wrote: > >> I am trying to read data from kafka and writing them in parquet format >> via Spark Streaming. >> The problem is, the data from kafka are in variable data structure. For >> example, app one has columns A,B,C, app two has columns B,C,D. So the data >> frame I read from kafka has all columns ABCD. When I decide to write the >> dataframe to parquet file partitioned with app name, >> the parquet file of app one also contains columns D, where the columns D >> is empty and it contains no data actually. So how to filter the empty >> columns when I writing dataframe to parquet? >> >> Thanks! >> >> >> Regard, >> Junfeng Chen >> > -- > Thanks, > Naresh > www.linkedin.com/in/naresh-dulam > http://hadoopandspark.blogspot.com/ > >
How to delete empty columns in df when writing to parquet?
I am trying to read data from kafka and writing them in parquet format via Spark Streaming. The problem is, the data from kafka are in variable data structure. For example, app one has columns A,B,C, app two has columns B,C,D. So the data frame I read from kafka has all columns ABCD. When I decide to write the dataframe to parquet file partitioned with app name, the parquet file of app one also contains columns D, where the columns D is empty and it contains no data actually. So how to filter the empty columns when I writing dataframe to parquet? Thanks! Regard, Junfeng Chen
[Spark Java] Add new column in DataSet based on existed column
I am working on adding a date transformed field on existed dataset. The current dataset contains a column named timestamp in ISO format. I want to parse this field to joda time type, and then extract the year, month, day, hour info as new column attaching to original dataset. I have tried df.withColumn function, but it seems only support simple expression rather than customized function like MapFunction. How to solve it? Thanks! Regard, Junfeng Chen
Queries with streaming sources must be executed with writeStream.start();;
I am reading some data from kafka, and willing to save them to parquet on hdfs with structured streaming. The data from kafka is in JSON format. I try to convert them to DataSet with spark.read.json(). However, I get the exception: > > Queries with streaming sources must be executed with > writeStream.start() Here is my code: > > Dataset df = spark.readStream().format("kafka")... > Dataset jsonDataset = df.selectExpr("CAST(value AS STRING)").map... > Dataset rowDataset = spark.read().json(jsonDataset); > > rowDataset.writeStream().outputMode(OutputMode.Append()).partitionBy("appname").format("parquet").option("path",savePath).start().awaitTermination(); How to solve it? Thanks! Regard, Junfeng Chen
DataSet save to parquet partition problem
I am trying to save a DataSet object to parquet file via > df.write().partitionBy("...").parquet(path) while this dataset contains the following struct: time: struct -dayOfMonth -monthOfYear ... Can I use the child field like time.monthOfYear as above in partition ? If yes, how? Thanks! Regard, Junfeng Chen
Re: Reading kafka and save to parquet problem
I have ever tried to use readStream and writeStream, but it throw "Uri without authority: hdfs:/data/_spark_metadata" exception, which is not seen in normal read mode. The parquet path I specified is hdfs:///data Regard, Junfeng Chen On Thu, Mar 8, 2018 at 9:38 AM, naresh Goud <nareshgoud.du...@gmail.com> wrote: > change it to readStream instead of read as below > > val df = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", "host1:port1,host2:port2") > .option("subscribe", "topic1") > .load() > > > Check is this helpful > > https://github.com/ndulam/KafkaSparkStreams/blob/master/SampleStreamApp/src/main/scala/com/naresh/org/SensorDataSave.scala > > > > > > > > > Thanks, > Naresh > www.linkedin.com/in/naresh-dulam > http://hadoopandspark.blogspot.com/ > > > > On Wed, Mar 7, 2018 at 7:33 PM Junfeng Chen <darou...@gmail.com> wrote: > >> I am struggling in trying to read data in kafka and save them to parquet >> file on hdfs by using spark streaming according to this post >> https://stackoverflow.com/questions/45827664/read- >> from-kafka-and-write-to-hdfs-in-parquet >> >> My code is similar to following >> >> val df = spark >> .read >> .format("kafka") >> .option("kafka.bootstrap.servers", "host1:port1,host2:port2") >> .option("subscribe", "topic1") >> .load() >> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") >> .as[(String, String)] >> >> .write.parquet("hdfs://data.parquet") >> >> >> What the difference is I am writing in Java language. >> >> But in practice, this code just run once and then exit gracefully. >> Although it produces the parquet file successfully and no any exception is >> threw out , it runs like a normal spark program rather than a spark >> streaming program. >> >> What should I do if want to read kafka and save them to parquet in batch? >> >> Regard, >> Junfeng Chen >> >
Reading kafka and save to parquet problem
I am struggling in trying to read data in kafka and save them to parquet file on hdfs by using spark streaming according to this post https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet My code is similar to following val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] .write.parquet("hdfs://data.parquet") What the difference is I am writing in Java language. But in practice, this code just run once and then exit gracefully. Although it produces the parquet file successfully and no any exception is threw out , it runs like a normal spark program rather than a spark streaming program. What should I do if want to read kafka and save them to parquet in batch? Regard, Junfeng Chen
Re: CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning
Spark 2.1.1. Actually it is a warning rather than an exception, so there is no stack trace. Just many this line: > CachedKafkaConsumer: CachedKafkaConsumer is not running in > UninterruptibleThread. It may hang when CachedKafkaConsumer's method are > interrupted because of KAFKA-1894. Regard, Junfeng Chen On Wed, Mar 7, 2018 at 3:34 AM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Which version of Spark are you using? And can you give us the full stack > trace of the exception? > > On Tue, Mar 6, 2018 at 1:53 AM, Junfeng Chen <darou...@gmail.com> wrote: > >> I am trying to read kafka and save the data as parquet file on hdfs >> according to this https://stackoverflow.com/questions/45827664/read-from >> -kafka-and-write-to-hdfs-in-parquet >> <https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet> >> >> >> The code is similar to : >> >> val df = spark >> .read >> .format("kafka") >> .option("kafka.bootstrap.servers", "host1:port1,host2:port2") >> .option("subscribe", "topic1") >> .load() >> >> while I am writing in Java. >> >> However, I keep throwing the following warning: >> CachedKafkaConsumer: CachedKafkaConsumer is not running in >> UninterruptibleThread. It may hang when CachedKafkaConsumer's method are >> interrupted because of KAFKA-1894. >> >> How to solve it? Thanks! >> >> >> Regard, >> Junfeng Chen >> > >
CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning
I am trying to read kafka and save the data as parquet file on hdfs according to this https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet <https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet> The code is similar to : val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() while I am writing in Java. However, I keep throwing the following warning: CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread. It may hang when CachedKafkaConsumer's method are interrupted because of KAFKA-1894. How to solve it? Thanks! Regard, Junfeng Chen
spark streaming with flume: cannot assign requested address error
I am trying to connect spark streaming with flume with pull mode. I have three machine and each one runs spark and flume agent at the same time, where they are master, slave1, slave2. I have set flume sink to slave1 on port 6689. Telnet slave1 6689 on other two machine works well. In my code, I set FlumeUtils.createStream(ssc,"slave1",6689), and submit it on master machine with --master local[2]. Then it throw the error: ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.jboss.netty.channel.ChannelException: Failed to bind to: slave1/10.25.*.*:6689 Caused by: java.net.BindException: Cannot assign requested address I have read this thread https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Streaming-Fails-on-Cluster-mode-Flume-as-source/m-p/25577/highlight/true#M621 but i am sure no process use this port by checking netstat -anp | grep 6689. Also the ip address 10.25.** is not a routable address. So someone can help me to solve it? Regard, Junfeng Chen
Re: Add snappy support for spark in Windows
I have put winutils and hadoop.dll within HADOOP_HOME, and spark works well with it, but snappy decompress function throw the above exception. Regard, Junfeng Chen On Mon, Dec 4, 2017 at 7:07 PM, Qiao, Richard <richard.q...@capitalone.com> wrote: > Junjeng, it worth a try to start your spark local with > hadoop.dll/winutils.exe etc hadoop windows support package in HADOOP_HOME, > if you didn’t do that yet. > > > > Best Regards > > Richard > > > > > > *From: *Junfeng Chen <darou...@gmail.com> > *Date: *Monday, December 4, 2017 at 3:53 AM > *To: *"Qiao, Richard" <richard.q...@capitalone.com> > *Cc: *"user@spark.apache.org" <user@spark.apache.org> > *Subject: *Re: Add snappy support for spark in Windows > > > > But I am working on my local development machine, so it should have no > relative to workers/executers. > > > > I find some documents about enable snappy on hadoop. If I want to use > snappy with spark, do I need to config spark as hadoop or have some easy > way to access it? > > > > > Regard, > Junfeng Chen > > > > On Mon, Dec 4, 2017 at 4:12 PM, Qiao, Richard <richard.q...@capitalone.com> > wrote: > > It seems a common mistake that the path is not accessible by > workers/executors. > > > > Best regards > > Richard > > Sent from my iPhone > > > On Dec 3, 2017, at 22:32, Junfeng Chen <darou...@gmail.com> wrote: > > I am working on importing snappy compressed json file into spark rdd or > dataset. However I meet this error: java.lang.UnsatisfiedLinkError: > org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z > > I have set the following configuration: > > SparkConf conf = new SparkConf() > > .setAppName("normal spark") > > .setMaster("local") > > .set("spark.io.compression.codec", > "org.apache.spark.io.SnappyCompressionCodec") > > > .set("spark.driver.extraLibraryPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars") > > > .set("spark.driver.extraClassPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars") > > > .set("spark.executor.extraLibraryPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars") > > > .set("spark.executor.extraClassPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars") > > ; > > Where D:\Downloads\spark-2.2.0-bin-hadoop2.7 is my spark unpacked path, > and I can find the snappy jar file snappy-0.2.jar and > snappy-java-1.1.2.6.jar in > > D:\Downloads\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\jars\ > > However nothing works and even the error message not change. > > How can I fix it? > > > > ref of stackoverflow: https://stackoverflow.com/questions/ > 47626012/config-snappy-support-for-spark-in-windows > <https://stackoverflow.com/questions/47626012/config-snappy-support-for-spark-in-windows> > > > > > > > Regard, > Junfeng Chen > > > -- > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. > > > > -- > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. >
Re: Add snappy support for spark in Windows
But I am working on my local development machine, so it should have no relative to workers/executers. I find some documents about enable snappy on hadoop. If I want to use snappy with spark, do I need to config spark as hadoop or have some easy way to access it? Regard, Junfeng Chen On Mon, Dec 4, 2017 at 4:12 PM, Qiao, Richard <richard.q...@capitalone.com> wrote: > It seems a common mistake that the path is not accessible by > workers/executors. > > Best regards > Richard > > Sent from my iPhone > > On Dec 3, 2017, at 22:32, Junfeng Chen <darou...@gmail.com> wrote: > > I am working on importing snappy compressed json file into spark rdd or > dataset. However I meet this error: java.lang.UnsatisfiedLinkError: > org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z > > I have set the following configuration: > > SparkConf conf = new SparkConf() > .setAppName("normal spark") > .setMaster("local") > .set("spark.io.compression.codec", > "org.apache.spark.io.SnappyCompressionCodec") > > .set("spark.driver.extraLibraryPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars") > > .set("spark.driver.extraClassPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars") > > .set("spark.executor.extraLibraryPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars") > > .set("spark.executor.extraClassPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars") > ; > > Where D:\Downloads\spark-2.2.0-bin-hadoop2.7 is my spark unpacked path, > and I can find the snappy jar file snappy-0.2.jar and > snappy-java-1.1.2.6.jar in > > D:\Downloads\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\jars\ > > However nothing works and even the error message not change. > > How can I fix it? > > > ref of stackoverflow: https://stackoverflow.com/questions/47626012/ > config-snappy-support-for-spark-in-windows > <https://stackoverflow.com/questions/47626012/config-snappy-support-for-spark-in-windows> > > > > Regard, > Junfeng Chen > > > -- > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. >
Add snappy support for spark in Windows
I am working on importing snappy compressed json file into spark rdd or dataset. However I meet this error: java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z I have set the following configuration: SparkConf conf = new SparkConf() .setAppName("normal spark") .setMaster("local") .set("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec") .set("spark.driver.extraLibraryPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars") .set("spark.driver.extraClassPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars") .set("spark.executor.extraLibraryPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars") .set("spark.executor.extraClassPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars") ; Where D:\Downloads\spark-2.2.0-bin-hadoop2.7 is my spark unpacked path, and I can find the snappy jar file snappy-0.2.jar and snappy-java-1.1.2.6.jar in D:\Downloads\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\jars\ However nothing works and even the error message not change. How can I fix it? ref of stackoverflow: https://stackoverflow.com/questions/ 47626012/config-snappy-support-for-spark-in-windows <https://stackoverflow.com/questions/47626012/config-snappy-support-for-spark-in-windows> Regard, Junfeng Chen