Re: Using Spark as a fail-over platform for Java app

2021-03-12 Thread Jungtaek Lim
That's what resource managers provide to you. So you can code and deal with
resource managers, but I assume you're finding ways to not deal with
resource managers directly and let Spark do it instead.

I admit I have no experience (I did the similar with Apache Storm on
standalone setup 5+ years ago), but the question can be simply changed as
"making driver fault-tolerant" as your app logic can run under driver even
if you don't do any calculation with Spark. And there seems to be lots of
answers in google for the new question, including the old one;
https://stackoverflow.com/questions/26618464/what-happens-if-the-driver-program-crashes


On Sat, Mar 13, 2021 at 5:21 AM Lalwani, Jayesh 
wrote:

> Can I cut a steak with a hammer? Sure you can, but the steak would taste
> awful
>
>
>
> Do you have organizational/bureaucratic issues with using a Load Balancer?
> Because that’s what you really need. Run your application on multiple nodes
> with a load balancer in front. When a node crashes, the load balancer will
> shift the traffic to the healthy node until the crashed node recovers.
>
>
>
> *From: *Sergey Oboguev 
> *Date: *Friday, March 12, 2021 at 2:53 PM
> *To: *User 
> *Subject: *[EXTERNAL] Using Spark as a fail-over platform for Java app
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> I have an existing plain-Java (non-Spark) application that needs to run in
> a fault-tolerant way, i.e. if the node crashes then the application is
> restarted on another node, and if the application crashes because of
> internal fault, the application is restarted too.
>
> Normally I would run it in a Kubernetes, but in this specific case
> Kubernetes is unavailable because of organizational/bureaucratic issues,
> and the only execution platform available in the domain is Spark.
>
> Is it possible to wrap the application into a Spark-based launcher that
> will take care of executing the application and restarts?
>
> Execution must be in a separate JVM, apart from other apps.
>
> And for optimum performance, the application also needs to be assigned
> guaranteed resources, i.e. the number of cores and amount of RAM required
> for it, so it would be great if the launcher could take care of this too.
>
> Thanks for advice.
>


Re: Using Spark as a fail-over platform for Java app

2021-03-12 Thread Lalwani, Jayesh
Can I cut a steak with a hammer? Sure you can, but the steak would taste awful

Do you have organizational/bureaucratic issues with using a Load Balancer? 
Because that’s what you really need. Run your application on multiple nodes 
with a load balancer in front. When a node crashes, the load balancer will 
shift the traffic to the healthy node until the crashed node recovers.

From: Sergey Oboguev 
Date: Friday, March 12, 2021 at 2:53 PM
To: User 
Subject: [EXTERNAL] Using Spark as a fail-over platform for Java app


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


I have an existing plain-Java (non-Spark) application that needs to run in a 
fault-tolerant way, i.e. if the node crashes then the application is restarted 
on another node, and if the application crashes because of internal fault, the 
application is restarted too.

Normally I would run it in a Kubernetes, but in this specific case Kubernetes 
is unavailable because of organizational/bureaucratic issues, and the only 
execution platform available in the domain is Spark.

Is it possible to wrap the application into a Spark-based launcher that will 
take care of executing the application and restarts?

Execution must be in a separate JVM, apart from other apps.

And for optimum performance, the application also needs to be assigned 
guaranteed resources, i.e. the number of cores and amount of RAM required for 
it, so it would be great if the launcher could take care of this too.

Thanks for advice.


Using Spark as a fail-over platform for Java app

2021-03-12 Thread Sergey Oboguev
I have an existing plain-Java (non-Spark) application that needs to run in
a fault-tolerant way, i.e. if the node crashes then the application is
restarted on another node, and if the application crashes because of
internal fault, the application is restarted too.

Normally I would run it in a Kubernetes, but in this specific case
Kubernetes is unavailable because of organizational/bureaucratic issues,
and the only execution platform available in the domain is Spark.

Is it possible to wrap the application into a Spark-based launcher that
will take care of executing the application and restarts?

Execution must be in a separate JVM, apart from other apps.

And for optimum performance, the application also needs to be assigned
guaranteed resources, i.e. the number of cores and amount of RAM required
for it, so it would be great if the launcher could take care of this too.

Thanks for advice.


Re: How to upgrade kafka client in spark_streaming_kafka 2.2

2021-03-12 Thread Renu Yadav
Ok, thanks for the clarification.
I will  try to migrate my project to structured streaming .

Regards,
Renu


On Fri, Mar 12, 2021 at 7:38 PM Gabor Somogyi 
wrote:

> Mainly bugfixes and no breaking AFAIK.
>
> As a side note there were intentions to close DStreams and discontinue
> as-is.
> It's not yet happened but it's on the road so I strongly recommend to
> migrate to Structured Streaming...
> We simply can't support 2 streaming engines for huge amount of time.
>
> G
>
>
> On Fri, Mar 12, 2021 at 3:02 PM Renu Yadav  wrote:
>
>> Hi Gabor,
>>
>> It seems like it is better to upgrade my spark version .
>>
>> Are there major changes in terms of streaming from spark 2.2 to spark 2.4?
>>
>> PS: I am using KafkaUtils api to create steam
>>
>> Thanks & Regards,
>> Renu yadav
>>
>> On Fri, Mar 12, 2021 at 7:25 PM Renu Yadav  wrote:
>>
>>> Thanks Gabor,
>>> This is  very useful.
>>>
>>> Regards,
>>> Renu Yadav
>>>
>>> On Fri, Mar 12, 2021 at 5:36 PM Gabor Somogyi 
>>> wrote:
>>>
 Kafka client upgrade is not a trivial change which may or may not work
 since new versions can contain incompatible API and/or behavior changes.
 I've collected how Spark evolved in terms of Kafka client and there
 I've gathered the breaking changes to make our life easier.
 Have a look and based on that you can make your choice:
 https://gist.github.com/gaborgsomogyi/3476c32d69ff2087ed5d7d031653c7a9

 As a general suggestion it would be best to upgrade Spark as-is because
 we've added many fixes which one can face...

 Hope this helps!

 G


 On Fri, Mar 12, 2021 at 9:45 AM Renu Yadav  wrote:

> Hi Team,
>  I am using spark -2.2 and spark_streamin_kafka 2.2  , which is
> pointing to kafka-client 0.10 . How can I upgrade a kafka client to kafka
> 2.2.0 ?
>
> Thanks & Regards,
> Renu Yadav
>



Re: How to upgrade kafka client in spark_streaming_kafka 2.2

2021-03-12 Thread Gabor Somogyi
Mainly bugfixes and no breaking AFAIK.

As a side note there were intentions to close DStreams and discontinue
as-is.
It's not yet happened but it's on the road so I strongly recommend to
migrate to Structured Streaming...
We simply can't support 2 streaming engines for huge amount of time.

G


On Fri, Mar 12, 2021 at 3:02 PM Renu Yadav  wrote:

> Hi Gabor,
>
> It seems like it is better to upgrade my spark version .
>
> Are there major changes in terms of streaming from spark 2.2 to spark 2.4?
>
> PS: I am using KafkaUtils api to create steam
>
> Thanks & Regards,
> Renu yadav
>
> On Fri, Mar 12, 2021 at 7:25 PM Renu Yadav  wrote:
>
>> Thanks Gabor,
>> This is  very useful.
>>
>> Regards,
>> Renu Yadav
>>
>> On Fri, Mar 12, 2021 at 5:36 PM Gabor Somogyi 
>> wrote:
>>
>>> Kafka client upgrade is not a trivial change which may or may not work
>>> since new versions can contain incompatible API and/or behavior changes.
>>> I've collected how Spark evolved in terms of Kafka client and there I've
>>> gathered the breaking changes to make our life easier.
>>> Have a look and based on that you can make your choice:
>>> https://gist.github.com/gaborgsomogyi/3476c32d69ff2087ed5d7d031653c7a9
>>>
>>> As a general suggestion it would be best to upgrade Spark as-is because
>>> we've added many fixes which one can face...
>>>
>>> Hope this helps!
>>>
>>> G
>>>
>>>
>>> On Fri, Mar 12, 2021 at 9:45 AM Renu Yadav  wrote:
>>>
 Hi Team,
  I am using spark -2.2 and spark_streamin_kafka 2.2  , which is
 pointing to kafka-client 0.10 . How can I upgrade a kafka client to kafka
 2.2.0 ?

 Thanks & Regards,
 Renu Yadav

>>>


Re: How to upgrade kafka client in spark_streaming_kafka 2.2

2021-03-12 Thread Renu Yadav
Hi Gabor,

It seems like it is better to upgrade my spark version .

Are there major changes in terms of streaming from spark 2.2 to spark 2.4?

PS: I am using KafkaUtils api to create steam

Thanks & Regards,
Renu yadav

On Fri, Mar 12, 2021 at 7:25 PM Renu Yadav  wrote:

> Thanks Gabor,
> This is  very useful.
>
> Regards,
> Renu Yadav
>
> On Fri, Mar 12, 2021 at 5:36 PM Gabor Somogyi 
> wrote:
>
>> Kafka client upgrade is not a trivial change which may or may not work
>> since new versions can contain incompatible API and/or behavior changes.
>> I've collected how Spark evolved in terms of Kafka client and there I've
>> gathered the breaking changes to make our life easier.
>> Have a look and based on that you can make your choice:
>> https://gist.github.com/gaborgsomogyi/3476c32d69ff2087ed5d7d031653c7a9
>>
>> As a general suggestion it would be best to upgrade Spark as-is because
>> we've added many fixes which one can face...
>>
>> Hope this helps!
>>
>> G
>>
>>
>> On Fri, Mar 12, 2021 at 9:45 AM Renu Yadav  wrote:
>>
>>> Hi Team,
>>>  I am using spark -2.2 and spark_streamin_kafka 2.2  , which is pointing
>>> to kafka-client 0.10 . How can I upgrade a kafka client to kafka 2.2.0 ?
>>>
>>> Thanks & Regards,
>>> Renu Yadav
>>>
>>


Re: How to upgrade kafka client in spark_streaming_kafka 2.2

2021-03-12 Thread Renu Yadav
Thanks Gabor,
This is  very useful.

Regards,
Renu Yadav

On Fri, Mar 12, 2021 at 5:36 PM Gabor Somogyi 
wrote:

> Kafka client upgrade is not a trivial change which may or may not work
> since new versions can contain incompatible API and/or behavior changes.
> I've collected how Spark evolved in terms of Kafka client and there I've
> gathered the breaking changes to make our life easier.
> Have a look and based on that you can make your choice:
> https://gist.github.com/gaborgsomogyi/3476c32d69ff2087ed5d7d031653c7a9
>
> As a general suggestion it would be best to upgrade Spark as-is because
> we've added many fixes which one can face...
>
> Hope this helps!
>
> G
>
>
> On Fri, Mar 12, 2021 at 9:45 AM Renu Yadav  wrote:
>
>> Hi Team,
>>  I am using spark -2.2 and spark_streamin_kafka 2.2  , which is pointing
>> to kafka-client 0.10 . How can I upgrade a kafka client to kafka 2.2.0 ?
>>
>> Thanks & Regards,
>> Renu Yadav
>>
>


Fwd: compile spark 3.1.1 error

2021-03-12 Thread Attila Zsolt Piros
Hi!

Zinc cache be cleaned by the shutdown i showed you earlier:
./build/zinc-0.3.15/bin/zinc -shutdown

But as I just seen SPARK-34539
: it is absolutely not
needed anymore as the standalone zinc is not used by the compiler plugin
from v3.0.0.

Regarding make-distribution.sh with hadoop-2.7 I can confirm it is working
on my machine:

$ SPARK_HOME=$PWD ./dev/make-distribution.sh --name custom-spark --pip
 --tgz -Phive -Phive-thriftserver -Pyarn -Phadoop-2.7  -DskipTests
...
[INFO] Reactor Summary for Spark Project Parent POM 3.1.1:
[INFO]
[INFO] Spark Project Parent POM ... SUCCESS [
 2.251 s]
[INFO] Spark Project Tags . SUCCESS [
 4.513 s]
[INFO] Spark Project Sketch ... SUCCESS [
 4.878 s]
[INFO] Spark Project Local DB . SUCCESS [
 1.259 s]
[INFO] Spark Project Networking ... SUCCESS [
 3.173 s]
[INFO] Spark Project Shuffle Streaming Service  SUCCESS [
 1.364 s]
[INFO] Spark Project Unsafe ... SUCCESS [
 6.672 s]
[INFO] Spark Project Launcher . SUCCESS [
 1.782 s]
[INFO] Spark Project Core . SUCCESS [01:48
min]
[INFO] Spark Project ML Local Library . SUCCESS [
33.861 s]
[INFO] Spark Project GraphX ... SUCCESS [
30.114 s]
[INFO] Spark Project Streaming  SUCCESS [
42.267 s]
[INFO] Spark Project Catalyst . SUCCESS [02:15
min]
[INFO] Spark Project SQL .. SUCCESS [03:05
min]
[INFO] Spark Project ML Library ... SUCCESS [02:16
min]
[INFO] Spark Project Tools  SUCCESS [
 7.109 s]
[INFO] Spark Project Hive . SUCCESS [01:20
min]
[INFO] Spark Project REPL . SUCCESS [
20.758 s]
[INFO] Spark Project YARN Shuffle Service . SUCCESS [
10.377 s]
[INFO] Spark Project YARN . SUCCESS [
47.571 s]
[INFO] Spark Project Hive Thrift Server ... SUCCESS [
36.327 s]
[INFO] Spark Project Assembly . SUCCESS [
 4.618 s]
[INFO] Kafka 0.10+ Token Provider for Streaming ... SUCCESS [
18.895 s]
[INFO] Spark Integration for Kafka 0.10 ... SUCCESS [
28.380 s]
[INFO] Kafka 0.10+ Source for Structured Streaming  SUCCESS [
 02:00 h]
[INFO] Spark Project Examples . SUCCESS [26:28
min]
[INFO] Spark Integration for Kafka 0.10 Assembly .. SUCCESS [
 4.884 s]
[INFO] Spark Avro . SUCCESS [
35.016 s]
[INFO]

[INFO] BUILD SUCCESS
[INFO]

[INFO] Total time:  02:43 h
[INFO] Finished at: 2021-03-11T08:12:02+01:00
[INFO]

...
Creating tar archive
removing 'pyspark-3.1.1' (and everything under it)
+ popd
+ '[' false == true ']'
+ echo 'Skipping building R source package'
Skipping building R source package
+ mkdir /Users/attilazsoltpiros/git/attilapiros/spark/dist/conf
+ cp
/Users/attilazsoltpiros/git/attilapiros/spark/conf/fairscheduler.xml.template
/Users/attilazsoltpiros/git/attilapiros/spark/conf/log4j.properties.template
/Users/attilazsoltpiros/git/attilapiros/spark/conf/metrics.properties.template
/Users/attilazsoltpiros/git/attilapiros/spark/conf/spark-defaults.conf.template
/Users/attilazsoltpiros/git/attilapiros/spark/conf/spark-env.sh.template
/Users/attilazsoltpiros/git/attilapiros/spark/conf/workers.template
/Users/attilazsoltpiros/git/attilapiros/spark/dist/conf
+ cp /Users/attilazsoltpiros/git/attilapiros/spark/README.md
/Users/attilazsoltpiros/git/attilapiros/spark/dist
+ cp -r /Users/attilazsoltpiros/git/attilapiros/spark/bin
/Users/attilazsoltpiros/git/attilapiros/spark/dist
+ cp -r /Users/attilazsoltpiros/git/attilapiros/spark/python
/Users/attilazsoltpiros/git/attilapiros/spark/dist
+ '[' true == true ']'
+ rm -f
/Users/attilazsoltpiros/git/attilapiros/spark/dist/python/dist/pyspark-3.1.1.tar.gz
/Users/attilazsoltpiros/git/attilapiros/spark/dist/python/dist/pyspark-3.2.0.dev0.tar.gz
+ cp -r /Users/attilazsoltpiros/git/attilapiros/spark/sbin
/Users/attilazsoltpiros/git/attilapiros/spark/dist
+ '[' -d /Users/attilazsoltpiros/git/attilapiros/spark/R/lib/SparkR ']'
+ mkdir -p /Users/attilazsoltpiros/git/attilapiros/spark/dist/R/lib
+ cp -r /Users/attilazsoltpiros/git/attilapiros/spark/R/lib/SparkR
/Users/attilazsoltpiros/git/attilapiros/spark/dist/R/lib
+ cp /Users/attilazsoltpiros/git/attilapiros/spark/R/lib/sparkr.zip
/Users/attilazsoltpiros/git/attilapiros/spark/dist/R/lib
+ '[' true == true ']'
+ T

Re: Issue while consuming message in kafka using structured streaming

2021-03-12 Thread Gabor Somogyi
Please see that driver side for example resolved in 3.1.0...

G


On Fri, Mar 12, 2021 at 1:03 PM Sachit Murarka 
wrote:

> Hi Gabor,
>
> Thanks a lot for the response. I am using Spark 3.0.1 and this is spark
> structured streaming.
>
> Kind Regards,
> Sachit Murarka
>
>
> On Fri, Mar 12, 2021 at 5:30 PM Gabor Somogyi 
> wrote:
>
>> Since you've not provided any version I guess you're using 2.x and you're
>> hitting this issue: https://issues.apache.org/jira/browse/SPARK-28367
>> The executor side must be resolved out of the box in the latest Spark
>> version however on driver side one must set "
>> spark.sql.streaming.kafka.useDeprecatedOffsetFetching=false" to use the
>> new way of fetching.
>>
>> If it doesn't solve your problem then Kafka side must be checked why it's
>> not returning...
>>
>> Hope this helps!
>>
>> G
>>
>>
>> On Fri, Mar 12, 2021 at 12:29 PM Sachit Murarka 
>> wrote:
>>
>>> Hi All,
>>>
>>> I am getting following error in spark structured streaming while
>>> connecting to Kakfa
>>>
>>> Main issue from logs::
>>> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
>>> 6ms expired before the position for partition my-topic-1 could be
>>> determined
>>>
>>> Current Committed Offsets: {KafkaV2[Subscribe[my-topic]]:
>>> {“my-topic”:{“1":1498,“0”:1410}}}
>>> Current Available Offsets: {KafkaV2[Subscribe[my-topic]]:
>>> {“my-topic”:{“1”:1499,“0":1410}}}
>>>
>>>
>>> Full logs::
>>>
>>> 21/03/12 11:04:35 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4
>>> times; aborting job
>>> 21/03/12 11:04:35 ERROR WriteToDataSourceV2Exec: Data source write
>>> support
>>> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1eff441c
>>> is aborting.
>>> 21/03/12 11:04:35 ERROR WriteToDataSourceV2Exec: Data source write
>>> support
>>> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1eff441c
>>> aborted.
>>> 21/03/12 11:04:35 ERROR MicroBatchExecution: Query [id =
>>> 2d788a3a-f0ee-4903-9679-0d13bc401e12, runId =
>>> 1b387c28-c8e3-4336-9c9f-57db16aa8132] terminated with error
>>> org.apache.spark.SparkException: Writing job aborted.
>>> at
>>> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:413)
>>> at
>>> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
>>> at
>>> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:322)
>>> at
>>> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:329)
>>> at
>>> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
>>> at
>>> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
>>> at
>>> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
>>> at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
>>> at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940)
>>> at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
>>> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>>> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
>>> at org.apache.spark.sql.Dataset.collect(Dataset.scala:2940)
>>> at
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:575)
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
>>> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>>> at
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:570)
>>> at
>>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
>>> at
>>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
>>> at
>>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>>> at
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:570)
>>> at
>>> org.a

Re: How to upgrade kafka client in spark_streaming_kafka 2.2

2021-03-12 Thread Gabor Somogyi
Kafka client upgrade is not a trivial change which may or may not work
since new versions can contain incompatible API and/or behavior changes.
I've collected how Spark evolved in terms of Kafka client and there I've
gathered the breaking changes to make our life easier.
Have a look and based on that you can make your choice:
https://gist.github.com/gaborgsomogyi/3476c32d69ff2087ed5d7d031653c7a9

As a general suggestion it would be best to upgrade Spark as-is because
we've added many fixes which one can face...

Hope this helps!

G


On Fri, Mar 12, 2021 at 9:45 AM Renu Yadav  wrote:

> Hi Team,
>  I am using spark -2.2 and spark_streamin_kafka 2.2  , which is pointing
> to kafka-client 0.10 . How can I upgrade a kafka client to kafka 2.2.0 ?
>
> Thanks & Regards,
> Renu Yadav
>


Re: Issue while consuming message in kafka using structured streaming

2021-03-12 Thread Sachit Murarka
Hi Gabor,

Thanks a lot for the response. I am using Spark 3.0.1 and this is spark
structured streaming.

Kind Regards,
Sachit Murarka


On Fri, Mar 12, 2021 at 5:30 PM Gabor Somogyi 
wrote:

> Since you've not provided any version I guess you're using 2.x and you're
> hitting this issue: https://issues.apache.org/jira/browse/SPARK-28367
> The executor side must be resolved out of the box in the latest Spark
> version however on driver side one must set "
> spark.sql.streaming.kafka.useDeprecatedOffsetFetching=false" to use the
> new way of fetching.
>
> If it doesn't solve your problem then Kafka side must be checked why it's
> not returning...
>
> Hope this helps!
>
> G
>
>
> On Fri, Mar 12, 2021 at 12:29 PM Sachit Murarka 
> wrote:
>
>> Hi All,
>>
>> I am getting following error in spark structured streaming while
>> connecting to Kakfa
>>
>> Main issue from logs::
>> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
>> 6ms expired before the position for partition my-topic-1 could be
>> determined
>>
>> Current Committed Offsets: {KafkaV2[Subscribe[my-topic]]:
>> {“my-topic”:{“1":1498,“0”:1410}}}
>> Current Available Offsets: {KafkaV2[Subscribe[my-topic]]:
>> {“my-topic”:{“1”:1499,“0":1410}}}
>>
>>
>> Full logs::
>>
>> 21/03/12 11:04:35 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4
>> times; aborting job
>> 21/03/12 11:04:35 ERROR WriteToDataSourceV2Exec: Data source write
>> support
>> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1eff441c
>> is aborting.
>> 21/03/12 11:04:35 ERROR WriteToDataSourceV2Exec: Data source write
>> support
>> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1eff441c
>> aborted.
>> 21/03/12 11:04:35 ERROR MicroBatchExecution: Query [id =
>> 2d788a3a-f0ee-4903-9679-0d13bc401e12, runId =
>> 1b387c28-c8e3-4336-9c9f-57db16aa8132] terminated with error
>> org.apache.spark.SparkException: Writing job aborted.
>> at
>> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:413)
>> at
>> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
>> at
>> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:322)
>> at
>> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:329)
>> at
>> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
>> at
>> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
>> at
>> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
>> at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
>> at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940)
>> at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
>> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
>> at org.apache.spark.sql.Dataset.collect(Dataset.scala:2940)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:575)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
>> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:570)
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:570)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
>> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>> at
>> org.apache.spark.sql.execution.streamin

Re: Issue while consuming message in kafka using structured streaming

2021-03-12 Thread Gabor Somogyi
Since you've not provided any version I guess you're using 2.x and you're
hitting this issue: https://issues.apache.org/jira/browse/SPARK-28367
The executor side must be resolved out of the box in the latest Spark
version however on driver side one must set "
spark.sql.streaming.kafka.useDeprecatedOffsetFetching=false" to use the new
way of fetching.

If it doesn't solve your problem then Kafka side must be checked why it's
not returning...

Hope this helps!

G


On Fri, Mar 12, 2021 at 12:29 PM Sachit Murarka 
wrote:

> Hi All,
>
> I am getting following error in spark structured streaming while
> connecting to Kakfa
>
> Main issue from logs::
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
> 6ms expired before the position for partition my-topic-1 could be
> determined
>
> Current Committed Offsets: {KafkaV2[Subscribe[my-topic]]:
> {“my-topic”:{“1":1498,“0”:1410}}}
> Current Available Offsets: {KafkaV2[Subscribe[my-topic]]:
> {“my-topic”:{“1”:1499,“0":1410}}}
>
>
> Full logs::
>
> 21/03/12 11:04:35 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4
> times; aborting job
> 21/03/12 11:04:35 ERROR WriteToDataSourceV2Exec: Data source write support
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1eff441c
> is aborting.
> 21/03/12 11:04:35 ERROR WriteToDataSourceV2Exec: Data source write support
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1eff441c
> aborted.
> 21/03/12 11:04:35 ERROR MicroBatchExecution: Query [id =
> 2d788a3a-f0ee-4903-9679-0d13bc401e12, runId =
> 1b387c28-c8e3-4336-9c9f-57db16aa8132] terminated with error
> org.apache.spark.SparkException: Writing job aborted.
> at
> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:413)
> at
> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
> at
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:322)
> at
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:329)
> at
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
> at
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
> at
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
> at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
> at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940)
> at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
> at org.apache.spark.sql.Dataset.collect(Dataset.scala:2940)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:575)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:570)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:570)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
> at
> org.apache.spark.sql.execu

Issue while consuming message in kafka using structured streaming

2021-03-12 Thread Sachit Murarka
Hi All,

I am getting following error in spark structured streaming while connecting
to Kakfa

Main issue from logs::
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
6ms expired before the position for partition my-topic-1 could be
determined

Current Committed Offsets: {KafkaV2[Subscribe[my-topic]]:
{“my-topic”:{“1":1498,“0”:1410}}}
Current Available Offsets: {KafkaV2[Subscribe[my-topic]]:
{“my-topic”:{“1”:1499,“0":1410}}}


Full logs::

21/03/12 11:04:35 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times;
aborting job
21/03/12 11:04:35 ERROR WriteToDataSourceV2Exec: Data source write support
org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1eff441c
is aborting.
21/03/12 11:04:35 ERROR WriteToDataSourceV2Exec: Data source write support
org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1eff441c
aborted.
21/03/12 11:04:35 ERROR MicroBatchExecution: Query [id =
2d788a3a-f0ee-4903-9679-0d13bc401e12, runId =
1b387c28-c8e3-4336-9c9f-57db16aa8132] terminated with error
org.apache.spark.SparkException: Writing job aborted.
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:413)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:322)
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:329)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2940)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:575)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:570)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:570)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.StreamExecution.org
$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 0.0 failed 4 times, most recent failure

How to upgrade kafka client in spark_streaming_kafka 2.2

2021-03-12 Thread Renu Yadav
Hi Team,
 I am using spark -2.2 and spark_streamin_kafka 2.2  , which is pointing to
kafka-client 0.10 . How can I upgrade a kafka client to kafka 2.2.0 ?

Thanks & Regards,
Renu Yadav