Re: Static partitioning in partitionBy()

2019-05-07 Thread Felix Cheung
You could

df.filter(col(“c”) = “c1”).write().partitionBy(“c”).save

It could get some data skew problem but might work for you




From: Burak Yavuz 
Sent: Tuesday, May 7, 2019 9:35:10 AM
To: Shubham Chaurasia
Cc: dev; user@spark.apache.org
Subject: Re: Static partitioning in partitionBy()

It depends on the data source. Delta Lake (https://delta.io) allows you to do 
it with the .option("replaceWhere", "c = c1"). With other file formats, you can 
write directly into the partition directory (tablePath/c=c1), but you lose 
atomicity.

On Tue, May 7, 2019, 6:36 AM Shubham Chaurasia 
mailto:shubh.chaura...@gmail.com>> wrote:
Hi All,

Is there a way I can provide static partitions in partitionBy()?

Like:
df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save

Above code gives following error as it tries to find column `c=c1` in df.

org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found in 
schema struct;

Thanks,
Shubham


Create table from Avro-generated parquet files?

2019-05-07 Thread Coolbeth, Matthew
I have a “directory” in S3 containing Parquet files created from Avro using the 
AvroParquetWriter in the parquet-mr project.

I can load the contents of these files as a DataFrame using
val it = spark.read.parquet("s3a://coolbeth/file=testy")

but I have not found a way to define a permanent table based on these parquet 
files.

If I do a regular CREATE EXTERNAL TABLE STORED AS PARQUET, then the 
deserialization crashes at query time, I think because there is no Spark schema 
stored in the parquet metadata (there is instead an Avro schema).

Is there a way to create the table I want from these Avro-generated Parquet 
files?

Thanks,

Matt Coolbeth
Software Engineer
Disney DTCI



Re: Static partitioning in partitionBy()

2019-05-07 Thread Burak Yavuz
It depends on the data source. Delta Lake (https://delta.io) allows you to
do it with the .option("replaceWhere", "c = c1"). With other file formats,
you can write directly into the partition directory (tablePath/c=c1), but
you lose atomicity.

On Tue, May 7, 2019, 6:36 AM Shubham Chaurasia 
wrote:

> Hi All,
>
> Is there a way I can provide static partitions in partitionBy()?
>
> Like:
> df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save
>
> Above code gives following error as it tries to find column `c=c1` in df.
>
> org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found
> in schema struct;
>
> Thanks,
> Shubham
>


Structured Streaming Kafka - Weird behavior with performance and logs

2019-05-07 Thread Austin Weaver
Hey Spark Experts,

After listening to some of you, and the presentations at Spark Summit in
SF, I am transitioning from d-streams to structured streaming however I am
seeing some weird results.

My use case is as follows: I am reading in a stream from a kafka topic,
transforming a message, and writing the transformed message to another
kafka topic.

While running my stream, I can see the transformed messages on the output
topic so I know the basic structure of my stream seems to be running as
intended.

Inside my transformation, I am logging the total transform time as well as
the raw message being transformed. (Java by the way)

The 2 weird things I am seeing:
1) I am seeing that the consumer lag for this particular consumer group on
the input topic is increasing. This does not make sense to me - looking at
the transform time from the logs, it should easily be able to handle the
incoming feed. To give an example the transform times are < 10 ms per
record and the sample of data does not contain > 100 messages per second.
The stream should be reducing consumer lag as it runs (especially
considering multiple workers and partitions)

2) I am seeing the same log transformation messages over and over on the
dataproc spark cluster logs. For example, I am currently looking at my logs
and the last 20+ log messages are the exact same

I thought 2 may be due to offsets not being handled correctly, but I am
seeing a reasonable range of transformed messages on the target topic, and
I'm using the built in checkpointing for spark to handle the offsets for me.

In terms of 1, why would I be seeing the same log messages over and over?
It doesnt make sense to me - wouldnt the message only be transformed once
and it's offset committed?

If anything stands out as incorrect, or something you've seen please let me
know - this is rather new to me and my code seems to be following the same
as other examples I see across the net

Here's a redacted snippet of my stream:

spark.readStream().format("kafka").option("kafka.bootstrap.servers",
"X")
.option("kafka.partition.assignment.strategy",
RoundRobinAssignor.class.getName())
.option("subscribe", """")
.option("startingOffsets", "earliest")
.load()
.select("value")
.as(Encoders.STRING())
.map((MapFunction) value -> transform(value),
Encoders.STRING())
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "X")
.option("topic", ""X"")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/testCheckpoint")
.start()
.awaitTermination();

Thanks!
Austin


Static partitioning in partitionBy()

2019-05-07 Thread Shubham Chaurasia
Hi All,

Is there a way I can provide static partitions in partitionBy()?

Like:
df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save

Above code gives following error as it tries to find column `c=c1` in df.

org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found
in schema struct;

Thanks,
Shubham


ThriftServer gc over exceed and memory problem

2019-05-07 Thread shicheng31...@gmail.com
Hi all:
My spark's version is 2.3.2. I start thriftserver with default spark 
config. On another hand, I use java-application to query result  via JDBC  .
The query application has plenty of statement to execute. The previous 
statement executes very quickly, and the latter statement executes slower and 
slower.  I try to observe actions of appliction 'Thrit JDBC Server' on web ui. 
I keep refreshing the page, but the page response is getting slower and slower.
Finally, it shows gc over exceed. 
  Then , I try to config the memory of executor in config  spark-env.sh . And 
the executor's memory does increase. But the problem still exists.
  What puzzles me is  The JDBC Server  application serves as driver, only 
handle some code distribution and rpc connection works.Does it need so much 
meormy? If so , how to increase it's memory?   



shicheng31...@gmail.com


Re: Dynamic metric names

2019-05-07 Thread Roberto Coluccio
It would be a dream to have an easy-to-use dynamic metric system AND a
reliable counting system (accumulator-like) in Spark...

Thanks
Roberto

On Tue, May 7, 2019 at 3:54 AM Saisai Shao  wrote:

> I think the main reason why that was not merged is that Spark itself
> doesn't have such requirement, and the metrics system is mainly used for
> spark itself. Most of the needs are from the custom sources/sinks, but
> Spark's MetricsSystem is not designed as a public API.
>
> I think we could revisit or improve that PR if there's a solid reason
> about it.
>
> Thanks
> Saisai
>
> Sergey Zhemzhitsky  于2019年5月7日周二 上午5:49写道:
>
>> Hi Saisai,
>>
>> Thanks a lot for the link! This is exactly what I need.
>> Just curious, why this PR has not been merged, as it seems to implement
>> rather natural requirement.
>>
>> There are a number or use cases which can benefit from this feature, e.g.
>> - collecting business metrics based on the data's attributes and
>> reporting them into the monitoring system as a side effect of the data
>> processing
>> - visualizing technical metrics by means of alternative software (e.g.
>> grafana) - currently it's hardly possible to know the actual number of
>> jobs, stages, tasks and their names and IDs in advance to register all the
>> corresponding metrics statically.
>>
>>
>> Kind Regards,
>> Sergey
>>
>>
>> On Mon, May 6, 2019, 16:07 Saisai Shao  wrote:
>>
>>> I remembered there was a PR about doing similar thing (
>>> https://github.com/apache/spark/pull/18406). From my understanding,
>>> this seems like a quite specific requirement, it may requires code change
>>> to support your needs.
>>>
>>> Thanks
>>> Saisai
>>>
>>> Sergey Zhemzhitsky  于2019年5月4日周六 下午4:44写道:
>>>
 Hello Spark Users!

 Just wondering whether it is possible to register a metric source
 without metrics known in advance and add the metrics themselves to this
 source later on?

 It seems that currently MetricSystem puts all the metrics from the
 source's MetricRegistry into a shared MetricRegistry of a MetricSystem
 during metric source registration [1].

 So in case there is a new metric with a new name added to the source's
 registry after this source registration, then this new metric will not be
 reported to the sinks.

 What I'd like to achieve is to be able to register new metrics with new
 names dynamically using a single metric source.
 Is it somehow possible?


 [1]
 https://github.com/apache/spark/blob/51de86baed0776304c6184f2c04b6303ef48df90/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L162

>>>


Re: Spark SQL met "Block broadcast_xxx not found"

2019-05-07 Thread Jacek Laskowski
Hi,

I'm curious about "I found the bug code". Can you point me at it? Thanks.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Tue, May 7, 2019 at 9:34 AM Xilang Yan  wrote:

> Ok... I am sure it is a bug of spark, I found the bug code, but the code is
> removed in 2.2.3, so I just upgrade spark to fix the problem.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark SQL met "Block broadcast_xxx not found"

2019-05-07 Thread Xilang Yan
Ok... I am sure it is a bug of spark, I found the bug code, but the code is
removed in 2.2.3, so I just upgrade spark to fix the problem.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark structured streaming watermarks on nested attributes

2019-05-07 Thread Joe Ammann
Hi Yuanjian

On 5/7/19 4:55 AM, Yuanjian Li wrote:
> Hi Joe
> 
> I think you met this issue: https://issues.apache.org/jira/browse/SPARK-27340
> You can check the description in Jira and PR. We also met this in our 
> production env and fixed by the providing PR.
> 
> The PR is still in review. cc Langchang Zhu(zhuliangch...@baidu.com 
> ), who's the author for the fix.

Yes, this very much looks like the issue I'm having. As an exercise for me 
(never built Spark locally) I will try to build your branch and see if it fixes 
my issue

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org