Re: [Structured Streaming] Metrics or logs of events that are ignored due to watermark

2018-07-02 Thread Jungtaek Lim
Hi,

I have tried it via https://github.com/apache/spark/pull/21617 but soon
realized that it is not accurate count of late input rows because Spark
lazily applies watermark and discards rows at state operator(s) which
inputs are not necessarily same as origin input rows (some already filtered
out, multiple rows aggregated into one).

To get accurate count (or rows itself) of late input rows, we should filter
out late input rows in first phase of query. It would be less flexible
(mostly derived field no longer becomes watermark field) but majority of
streaming frameworks adopt this policy and provide late input rows based on
this.

So I think this is valuable to address, and I'm planning to try to address
it, but it would be OK for someone to address it earlier.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 7월 3일 (화) 오전 3:39, subramgr 님이 작성:

> Hi all,
>
> Do we have some logs or some metrics that get recorded in log files or some
> metrics sinker about the number of events that are ignored due to watermark
> in structured streaming?
>
> Thanks
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Druid Ingestion

2018-07-02 Thread gosoy
Hi Nayan,

Were you able to resolve this issue? Is it because of some file/folder
permission problems?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to avoid duplicate column names after join with multiple conditions

2018-07-02 Thread Nirav Patel
Expr is `df1(a) === df2(a) and df1(b) === df2(c)`

How to avoid duplicate column 'a' in result? I don't see any api that
combines both. Rename manually?

-- 


 

 
   
   
      



[Structured Streaming] Metrics or logs of events that are ignored due to watermark

2018-07-02 Thread subramgr
Hi all, 

Do we have some logs or some metrics that get recorded in log files or some
metrics sinker about the number of events that are ignored due to watermark
in structured streaming?

Thanks




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Question about Spark, Inner Join and Delegation to a Parquet Table

2018-07-02 Thread Mike Buck
I have a question about Spark and how it delegates filters to a Parquet-based 
table. I have two tables in Hive in Parquet format. Table1 has with four 
columns of type double and table2 has two columns of type double. I am doing an 
INNER JOIN of the following:

SELECT table1.name FROM table1 INNER JOIN table2 ON table2.x BETWEEN 
table1.xmin AND table1.xmax AND table2.y BETWEEN table1.ymin AND table1.ymax

I noticed that the execution plan as reported by Spark is only delegating the 
IsNull filter to the tables and not any other filters:

== Physical Plan ==
*Project [name#0]
+- BroadcastNestedLoopJoin BuildRight, Inner, x#36 >= xmin#13) && (x#36 <= 
xmax#15)) && (y#37 >= ymin#14)) && (y#37 <= ymax#16))
   :- *Project [name#0, xmin#13, ymin#14, xmax#15, ymax#16]
   :  +- *Filter (((isnotnull(xmin#13) && isnotnull(ymin#14)) && 
isnotnull(ymax#16)) && isnotnull(xmax#15))
   : +- *FileScan parquet [name#0,xmin#13,ymin#14,xmax#15,ymax#16] Batched: 
true, Format: Parquet, Location: 
InMemoryFileIndex[hdfs://...:8020/apps/hive/warehouse/table1, 
PartitionFilters: [], PushedFilters: [IsNotNull(xmin), IsNotNull(ymin), 
IsNotNull(ymax), IsNotNull(xmax)], ReadSchema: 
struct
   +- BroadcastExchange IdentityBroadcastMode
  +- *Project [x#36, y#37]
 +- *Filter (isnotnull(y#37) && isnotnull(x#36))
+- *FileScan parquet [x#36,y#37] Batched: true, Format: Parquet, 
Location: 
InMemoryFileIndex[hdfs://...:8020/apps/hive/warehouse/table2], 
PartitionFilters: [], PushedFilters: [IsNotNull(y), IsNotNull(x)], ReadSchema: 
struct

However, when doing a filter against a single table the filter is delegated to 
the table:

SELECT name FROM table1 where table1.xmin > -73.4454183678

== Physical Plan ==
CollectLimit 21
+- *Project [pbkey#150]
   +- *Filter (isnotnull(xmin#163) && (xmin#163 > -73.4454183678))
  +- *FileScan parquet [pbkey#150,xmin#163] Batched: true, Format: Parquet, 
Location: 
InMemoryFileIndex[hdfs://...:8020/apps/hive/warehouse/table1, 
PartitionFilters: [], PushedFilters: [IsNotNull(xmin), 
GreaterThan(xmin,-73.4454183678)], ReadSchema: struct

So the question is: does Spark delegate filters in a join condition to a 
Parquet table or is this an error in the "explain plan" output?

Thanks.



Error while doing stream-stream inner join (java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access)

2018-07-02 Thread kant kodali
Hi All,

I get the below error quite often when I do an stream-stream inner join on
two data frames. After running through several experiments stream-stream
joins dont look stable enough for production yet. any advice on this?

Thanks!

java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access
18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1361)
18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
org.apache.spark.sql.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:301)


union of multiple twitter streams [spark-streaming-twitter_2.11]

2018-07-02 Thread Imran Rajjad
Hello,

Has anybody tried to union two streams of Twitter Statues? I am
instantiating two twitter streams through two different set of credentials
and passing them through a union function, but the console does not show
any activity neither there are any errors.


--static function that returns JavaReceiverInputDStream--



public static JavaReceiverInputDStream
getTwitterStream(JavaStreamingContext spark, String consumerKey, String
consumerSecret,String accessToken, String accessTokenSecret,String[]
filter) {
  // Enable Oauth
  ConfigurationBuilder cb = new ConfigurationBuilder();
  cb.setDebugEnabled(false)
.setOAuthConsumerKey(consumerKey).setOAuthConsumerSecret(consumerSecret)

.setOAuthAccessToken(accessToken).setOAuthAccessTokenSecret(accessTokenSecret)
.setJSONStoreEnabled(true);
  TwitterFactory tf = new TwitterFactory(cb.build());
  Twitter twitter = tf.getInstance();

  // Create stream
  return TwitterUtils.createStream(spark,
twitter.getAuthorization(),filter);
 }
---trying to union two twitter streams---

JavaStreamingContext jssc = new JavaStreamingContext(conf,
Durations.minutes(5));

jssc.sparkContext().setLogLevel("ERROR");


JavaReceiverInputDStream twitterStreamByHashtag =
TwitterUtil.getTwitterStream(jssc, consumerKey1, consumerSecret1,
accessToken1, accessTokenSecret1,new String[]{"#Twitter"});
  // JavaReceiverInputDStream twitterStreamByUser =
TwitterUtil.getTwitterStream(jssc, consumerKey2, consumerSecret2,
accessToken2, accessTokenSecret2,new String[]{"@Twitter"});


JavaDStream statuses = twitterStreamByHashtag
.union(twitterStreamByUser)
.map(s->{return s.getText();});


regards,
Imran

-- 
I.R


Re: Dataframe reader does not read microseconds, but TimestampType supports microseconds

2018-07-02 Thread Jörn Franke
How do you read the files ? Do you have some source code ? It could be related 
to the Json data source.

What Spark version do you use?

> On 2. Jul 2018, at 09:03, Colin Williams  
> wrote:
> 
> I'm confused as to why Sparks Dataframe reader does not support reading json 
> or similar with microsecond timestamps to microseconds, but instead reads 
> into millis.
> 
> This seems strange when the TimestampType supports microseconds.
> 
> For example create a schema for a json object with a column of TimestampType. 
> Then read data from that column with timestamps with microseconds like 
> 
> 2018-05-13 20:25:34.153712
> 
> 2018-05-13T20:25:37.348006
> 
> You will end up with timestamps with millisecond precision. 
> 
> E.G. 2018-05-13 20:25:34.153
> 
> 
> 
> When reading about TimestampType: The data type representing 
> java.sql.Timestamp values. Please use the singleton DataTypes.TimestampType. 
> 
> java.sql.timestamp provides a method that reads timestamps like 
> Timestamp.valueOf("2018-05-13 20:25:37.348006") including milliseconds.
> 
> So why does Spark's DataFrame reader drop the ball on this?


Dataframe reader does not read microseconds, but TimestampType supports microseconds

2018-07-02 Thread Colin Williams
I'm confused as to why Sparks Dataframe reader does not support reading
json or similar with microsecond timestamps to microseconds, but instead
reads into millis.

This seems strange when the TimestampType supports microseconds.

For example create a schema for a json object with a column of
TimestampType. Then read data from that column with timestamps with
microseconds like

2018-05-13 20:25:34.153712

2018-05-13T20:25:37.348006

You will end up with timestamps with millisecond precision.

E.G. 2018-05-13 20:25:34.153



When reading about TimestampType: The data type representing
java.sql.Timestamp values. Please use the singleton DataTypes.TimestampType.


java.sql.timestamp provides a method that reads timestamps like
Timestamp.valueOf("2018-05-13 20:25:37.348006") including milliseconds.

So why does Spark's DataFrame reader drop the ball on this?