Spark 2.4 lifetime

2020-11-10 Thread Netanel Malka
Hi folks,
Do you know about how long Spark will continue to maintain version 2.4?

Thanks.

-- 
Best regards,
Netanel Malka.


Pyspark application hangs (no error messages) on Python RDD .map

2020-11-10 Thread Daniel Stojanov
Hi,

This code will hang indefinitely at the last line (the .map()).
Interestingly, if I run the same code at the beginning of my application
(removing the .write step) it executes as expected. Otherwise, the code
appears further along in my application which is where it hangs. The
debugging message "I saw a row" never appears in the executor's standard
output.

Note, this error occurs when running on a yarn cluster, but not on a
standalone cluster or in local mode. I have tried running with num-cores=1
and 1 executor.

I have been working on this for a long time, any clues would be appreciated.

Regards,


def map_to_keys(row):
print("I saw a row", row["id"])
return (hash(row["id"]), row)

df.write.mode("overwrite").format("orc").save("/tmp/df_full")
df = spark.read.format("orc").load("/tmp/df_full")
rdd = df.rdd.map(map_to_keys)


Blacklisting in Spark Stateful Structured Streaming

2020-11-10 Thread Eric Beabes
Currently we’ve a “Stateful” Spark Structured Streaming job that computes
aggregates for each ID. I need to implement a new requirement which says
that if the no. of incoming messages for a particular ID exceeds a certain
value then add this ID to a blacklist & remove the state for it. Going
forward for any ID that’s blacklisted we will not create a state for it.
The message will simply get filtered out if the ID is blacklisted.

What’s the best way to implement this in Spark Structured Streaming?
Essentially what we need to do is create a Distributed HashSet that gets
updated intermittently & make this HashSet available to all Executors so
that they can filter out unwanted messages.

Any pointers would be greatly appreciated. Is the only option to use a
3rdparty Distributed Cache tool such as EhCache, Redis etc?


Re: Cannot perform operation after producer has been closed

2020-11-10 Thread Eric Beabes
BTW, we are seeing this message as well:
*"org.apache.kafka.common.KafkaException:
Producer** closed while send in progress"*. I am assuming this happens
because of the previous issue.."producer has been closed", right? Or are
they unrelated? Please advise. Thanks.

On Tue, Nov 10, 2020 at 11:17 AM Eric Beabes 
wrote:

> Thanks for the reply. We are on Spark 2.4. Is there no way to get this
> fixed in Spark 2.4?
>
> On Mon, Nov 2, 2020 at 8:32 PM Jungtaek Lim 
> wrote:
>
>> Which Spark version do you use? There's a known issue on Kafka producer
>> pool in Spark 2.x which was fixed in Spark 3.0, so you'd like to check
>> whether your case is bound to the known issue or not.
>>
>> https://issues.apache.org/jira/browse/SPARK-21869
>>
>>
>> On Tue, Nov 3, 2020 at 1:53 AM Eric Beabes 
>> wrote:
>>
>>> I know this is related to Kafka but it happens during the Spark
>>> Structured Streaming job that's why I am asking on this mailing list.
>>>
>>> How would you debug this or get around this in Spark Structured
>>> Streaming? Any tips would be appreciated. Thanks.
>>>
>>>
>>> java.lang.IllegalStateException: Cannot perform operation after producer
>>> has been closed at
>>> org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:853)
>>> at
>>> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:862)
>>> at
>>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
>>> at
>>> org.apache.spark.sql.kafka010.KafkaRowWriter.sendRow(KafkaWriteTask.scala:92)
>>> at
>>> org.apache.spark.sql.kafka010.KafkaStreamDataWriter.write(KafkaStreamWriter.scala:95)
>>>
>>


Re: Cannot perform operation after producer has been closed

2020-11-10 Thread Eric Beabes
Thanks for the reply. We are on Spark 2.4. Is there no way to get this
fixed in Spark 2.4?

On Mon, Nov 2, 2020 at 8:32 PM Jungtaek Lim 
wrote:

> Which Spark version do you use? There's a known issue on Kafka producer
> pool in Spark 2.x which was fixed in Spark 3.0, so you'd like to check
> whether your case is bound to the known issue or not.
>
> https://issues.apache.org/jira/browse/SPARK-21869
>
>
> On Tue, Nov 3, 2020 at 1:53 AM Eric Beabes 
> wrote:
>
>> I know this is related to Kafka but it happens during the Spark
>> Structured Streaming job that's why I am asking on this mailing list.
>>
>> How would you debug this or get around this in Spark Structured
>> Streaming? Any tips would be appreciated. Thanks.
>>
>>
>> java.lang.IllegalStateException: Cannot perform operation after producer
>> has been closed at
>> org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:853)
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:862)
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
>> at
>> org.apache.spark.sql.kafka010.KafkaRowWriter.sendRow(KafkaWriteTask.scala:92)
>> at
>> org.apache.spark.sql.kafka010.KafkaStreamDataWriter.write(KafkaStreamWriter.scala:95)
>>
>


DStreams stop consuming from Kafka

2020-11-10 Thread Razvan-Daniel Mihai
Hello,

I have a usecase where I have to stream events from Kafka to a JDBC sink.
Kafka producers write events in bursts of hourly batches.

I started with a structured streaming approach, but it turns out that
structured streaming has no JDBC sink. I found an implementation in Apache
Bahir, but it's buggy and looks abandoned.

So I reimplemented the job using DStreams and everything works fine except
that the executors stop consuming anything once they've reached the latest
offsets. All future events are discarded. The last INFO level messages are
the lines of :

20/11/10 16:19:02 INFO KafkaRDD: Beginning offset 7908480 is the same as
ending offset skipping dev_applogs 10

Hier dev_applogs is the topic being consumed and 10 is the partition number.

I played with different versions of "auto.offset.reset" and
"enable.auto.commit" but they all lead to the same behaviour. The settings
I actually need for my usecase are:

auto.offset.reset=latest
enable.auto.commit=true

I use spark 2.4.0 and kafka 2.2.1.

Is this the expected behavior ? Shouldn't the spark executors poll the
Kafka partitions continuously for new offsets ? This is actually the
behaviour with DataStreamReader and it's what I also expected to find with
DStreams.

Thanks,
R.


Spark Parquet file size

2020-11-10 Thread Tzahi File
Hi,

We have many Spark jobs that create multiple small files. We would like to
improve analyst reading performance, doing so I'm testing the parquet
optimal file size.
I've found that the optimal file size should be around 1GB, and not less
than 128MB, depending on the size of the data.

I took one process to examine, in my process I'm using shuffle partitions =
600, which creates files of size 11MB. I've added a repartition part to
recreate less files - ~12 files of 600gb. After testing it (select * from
table where ...) I saw that the old version (with more files) ran faster
than the new one. I tried to increase the num of files to 40 - ~130MB each
file, and still it runs slower.

Would appreciate your experience with file sizes, and how to optimize the
num and size of files.

Thanks,
Tzahi


Creating hive table through df.write.mode("overwrite").saveAsTable("DB.TABLE")

2020-11-10 Thread Mich Talebzadeh
Hi,

In Spark I specifically specify the format of the table to be created

 sqltext = """
 CREATE TABLE test.randomDataPy(
   ID INT
 , CLUSTERED INT
 , SCATTERED INT
 , RANDOMISED INT
 , RANDOM_STRING VARCHAR(50)
 , SMALL_VC VARCHAR(50)
 , PADDING  VARCHAR(4000)
)
STORED AS PARQUET
"""
  sqlContext.sql(sqltext)

However, I can store it through

df.write.mode("overwrite").saveAsTable("test.ABCD")

The table created through the above* is stored as parquet.*

 desc test.abcd
+++--+
|col_name| data_type  | comment  |
+++--+
| id | bigint |  |
| clustered  | double |  |
| scattered  | double |  |
| randomised | double |  |
| random_string  | string |  |
| small_vc   | string |  |
| padding| string |  |
+++--+


Is this the default behaviour expected?

Thanks

Mich



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Distribution of spark 3.0.1 with Hive1.2

2020-11-10 Thread Dmitry
Hi all, I am trying to make distribution 3.0.1 with spark 3 using
./dev/make-distribution.sh --name spark3-hive12 --pip  --tgz  -Phive-1.2
-Phadoop-2.7 -Pyarn
The problem is maven can't found right profile for hive and build ends
without hive jars
++ /Users/reireirei/spark/spark/build/mvn help:evaluate
-Dexpression=project.activeProfiles -pl sql/hive -Phive-1.2 -Phadoop-2.7
-Pyarn
++ grep -v INFO
++ grep -v WARNING
++ fgrep --count 'hive'
++ echo -n
+ SPARK_HIVE=0
+ '[' spark3-hive12 == none ']'
+ echo 'Spark version is 3.0.1'
Spark version is 3.0.1

What is the correct way to build in hive 1.2  jars in spark distribution?


spark cassandra questiom

2020-11-10 Thread adfel70
I an very very new to both spark and spark structured streaming. I have to
write an application that receives a very very large csv files in hdfs
folder. the app must take the file and on each row it must read from
Cassandra data base some rows (not many rows will be returned for each row
in csv). On each row it red it must preform some simple calculations and
update the rows it red with the results and save the updated rows to
Cassandra.

I have spark version 2.4 and must use python.

Is this a suitable scenario for spark structured streaming?

thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org