Count() not working on streaming dataframe/structured streaming

2018-12-29 Thread Ritesh Shah
I have written this simple code to try streaming aggregation in spark 2.4. 
Somehow, job keeps running but not returning any result. It returns me 3 
columns JobType, Timestamp and TS if I remove groupby and count aggregation 
function.

Would really appreciate any help.

   val edgeDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "...")
.option("subscribe", "edgemw")
.load()

   val edgeSelectDF = 
edgeDF.select(get_json_object(($"value").cast("string"),"$.after.JOBTYPE").alias("JobType"),
   
get_json_object(($"value").cast("string"),"$.current_ts").alias("Timestamp"))
   
.select($"JobType",$"Timestamp",from_utc_timestamp($"Timestamp", 
"UTC").alias("TS"))
   .groupBy("JobType")
   .count()


   val query = edgeSelectDF
  .writeStream
  .format("console")
  .outputMode(OutputMode.Complete())
  .trigger(Trigger.ProcessingTime(5.second))
  .start()
  .awaitTermination()

Thanks & regards,
Ritesh


Disclaimer:  This message and the information contained herein is proprietary 
and confidential and subject to the Tech Mahindra policy statement, you may 
review the policy at http://www.techmahindra.com/Disclaimer.html 
 externally 
http://tim.techmahindra.com/tim/disclaimer.html 
 internally within 
TechMahindra.




RE: What are the alternatives to nested DataFrames?

2018-12-29 Thread email
1 - I am not sure how can I do what you suggest for #1 because I  use the 
entries in the initial df to build the query and then from it I get the second 
df. Could you explain more?

 

2 - I also thought about doing what you consider in #2 , but if I am not 
mistaken If I use regular Scala data structures it won’t be distributed and it 
might run out of memory?

 

 

I also tried collecting the second dataframe to a Seq , but it also produced 
the null pointer.  

 

From: Shahab Yunus  
Sent: Friday, December 28, 2018 11:21 PM
To: em...@yeikel.com
Cc: Andrew Melo ; user 
Subject: Re: What are the alternatives to nested DataFrames?

 

2 options I can think of:

 

1- Can you perform a union of dfs returned by elastic research queries. It 
would still be distributed but I don't know if you will run out of how many 
union operations you can perform at a time.

 

2- Can you used some other api method of elastic search other than which 
returns a dataframe?

 

On Fri, Dec 28, 2018 at 10:30 PM mailto:em...@yeikel.com> > 
wrote:

I could , but only if I had it beforehand.  I do not know what the dataframe is 
until I pass the query parameter and receive the resultant dataframe inside the 
iteration.  

 

The steps are : 

 

Original DF -> Iterate -> Pass every element to a function that takes the 
element of the original DF and returns a new dataframe including all the 
matching terms

 

 

From: Andrew Melo mailto:andrew.m...@gmail.com> > 
Sent: Friday, December 28, 2018 8:48 PM
To: em...@yeikel.com  
Cc: Shahab Yunus mailto:shahab.yu...@gmail.com> >; 
user mailto:user@spark.apache.org> >
Subject: Re: What are the alternatives to nested DataFrames?

 

Could you join() the DFs on a common key?

 

On Fri, Dec 28, 2018 at 18:35 mailto:em...@yeikel.com> > 
wrote:

Shabad , I am not sure what you are trying to say. Could you please give me an 
example? The result of the Query is a Dataframe that is created after 
iterating, so I am not sure how could I map that to a column without iterating 
and getting the values. 

 

I have a Dataframe that contains a list of cities for which I would like to 
iterate over and search in Elasticsearch.  This list is stored in Dataframe 
because it contains hundreds of thousands of elements with multiple properties 
that would not fit in a single machine. 

 

The issue is that the elastic-spark connector returns a Dataframe as well which 
leads to a dataframe creation within a Dataframe

 

The only solution I found is to store the list of cities in a a regular scala 
Seq and iterate over that, but as far as I know this would make Seq centralized 
instead of distributed (run at the executor only?)

 

Full example : 

 

val cities= Seq("New York","Michigan")

cities.foreach(r => {

  val qb = QueryBuilders.matchQuery("name", r).operator(Operator.AND)
  print(qb.toString)

  val dfs = sqlContext.esDF("cities/docs", qb.toString) // Returns a dataframe 
for each city

  dfs.show() // Works as expected. It prints the individual dataframe with the 
result of the query

})

 

 

val cities = Seq("New York","Michigan").toDF()

 

cities.foreach(r => {

 

  val city  = r.getString(0)

 

  val qb = QueryBuilders.matchQuery("name", city).operator(Operator.AND)

  print(qb.toString)

 

  val dfs = sqlContext.esDF("cities/docs", qb.toString) // null pointer

 

  dfs.show()

 

})

 

 

From: Shahab Yunus mailto:shahab.yu...@gmail.com> > 
Sent: Friday, December 28, 2018 12:34 PM
To: em...@yeikel.com  
Cc: user mailto:user@spark.apache.org> >
Subject: Re: What are the alternatives to nested DataFrames?

 

Can you have a dataframe with a column which stores json (type string)? Or you 
can also have a column of array type in which you store all cities matching 
your query.

 

 

 

On Fri, Dec 28, 2018 at 2:48 AM mailto:em...@yeikel.com> > 
wrote:

Hi community ,  

 

As shown in other answers online , Spark does not support the nesting of 
DataFrames , but what are the options?

 

I have the following scenario :

 

dataFrame1 = List of Cities

 

dataFrame2 = Created after searching in ElasticSearch for each city in 
dataFrame1

 

I've tried :

 

 val cities= sc.parallelize(Seq("New York")).toDF()

   cities.foreach(r => {

val companyName = r.getString(0)

println(companyName)

val dfs = sqlContext.esDF("cities/docs", "?q=" + companyName)  //returns a 
DataFrame consisting of all the cities matching the entry in cities

})

 

Which triggers the expected null pointer exception

 

java.lang.NullPointerException

at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:53)

at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:51)

at 
org.elasticsearch.spark.sql.package$SQLContextFunctions.esDF(package.scala:37)

at Main$$anonfun$main$1.apply(Main.scala:43)

at Main$$anonfun$main$1.apply(Main.scala:39)

at 

Postgres Read JDBC with COPY TO STDOUT

2018-12-29 Thread Nicolas Paris
Hi

The spark postgres JDBC reader is limited because it relies on basic
SELECT statements with fetchsize and crashes on large tables even if
multiple partitions are setup with lower/upper bounds.

I am about writing a new postgres JDBC reader based on "COPY TO STDOUT".
It would stream the data and produce CSV on the fileSystem (hdfs or
local).  The CSV would be then parsed with the spark CSV reader to
produce a dataframe. It would send multiple "COPY TO STDOUT" for each
executor.

Right now, I am able to loop over an output stream and write the string
somewhere.
I am wondering what would be the best way to process the resulting
string stream. In particular the best way to direct it to a hdfs folder
or maybe parse it on the fly into a dataframe.

Thanks,

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [spark-sql] Hive failing on insert empty array into parquet table

2018-12-29 Thread 李斌松
https://issues.apache.org/jira/browse/HIVE-13632

李斌松  于2018年12月29日周六 下午4:08写道:

> Hive has fixed this problem, which is not fixed in
> hive-exec-1.2.1.spark2.jar
>
> [image: image.png]
>
>