Re: Spark streaming with Confluent kafka

2020-07-03 Thread Gabor Somogyi
The error is clear:
Caused by: java.lang.IllegalArgumentException: No serviceName defined in
either JAAS or Kafka config

On Fri, 3 Jul 2020, 15:40 dwgw,  wrote:

> Hi
>
> I am trying to stream confluent kafka topic in the spark shell. For that i
> have invoked spark shell using following command.
>
> # spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
> --conf
>
> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf"
> --driver-java-options
> "-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --files
> /home/spark/kafka_jaas.conf
>
> kafka_jaas.conf
> -
>
> KafkaClient {
>
>  org.apache.kafka.common.security.plain.PlainLoginModule required
>username="XXX"
>password="XXX";
> };
>
> Readstream
> -
>
> scala> val df = spark.
> | readStream.
> | format("kafka").
> | option("kafka.bootstrap.servers", "pkc-XXX.cloud:9092").
> | option("subscribe", "INTERNAL_VIS_R1227_CDC_ICX_SESSIONS").
> | option("kafka.sasl.mechanisms", "PLAIN").
> | option("kafka.security.protocol", "SASL_SSL").
> | option("startingOffsets", "earliest").
> | load.
> | select($"value".cast("string").alias("value"))
> df: org.apache.spark.sql.DataFrame = [value: string]
>
> Writestream
> --
>
> scala> df.writeStream.
> | format("console").
> | outputMode("append").
> | trigger(Trigger.ProcessingTime("20 seconds")).
> | start
> 2020-07-03 04:07:48,366 WARN streaming.StreamingQueryManager: Temporary
> checkpoint location created which is deleted normally when the query didn't
> fail: /tmp/temporary-897996c3-a86a-4b7d-ac19-62168a14d279. If it's required
> to delete it under any circumstances, please set
> spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to
> know deleting temp checkpoint folder is best effort.
> res0: org.apache.spark.sql.streaming.StreamingQuery =
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@324a5741
>
> scala> 2020-07-03 04:07:49,139 WARN kafka010.KafkaOffsetReader: Error in
> attempt 1 getting Kafka offsets:
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:820)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:612)
> at
>
> org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:76)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:88)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:538)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:600)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
>
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:599)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:536)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:567)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:536)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:298)
> at
>
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151)
> at scala.Option.getOrElse(Option.scala:189)
> at
>
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148)
> at
>
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76)
> at
>
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:378)
> at scala.Option.getOrElse(Option.scala:189)
> at
>
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:378)
> at
>
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
> at
>
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
> at
>
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
> at
>
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:371)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
> at 

Spark streaming with Confluent kafka

2020-07-03 Thread dwgw
Hi

I am trying to stream confluent kafka topic in the spark shell. For that i
have invoked spark shell using following command.

# spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
--conf
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf"
--driver-java-options
"-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --files
/home/spark/kafka_jaas.conf

kafka_jaas.conf
-

KafkaClient {
                     
 org.apache.kafka.common.security.plain.PlainLoginModule required
                       username="XXX"
                       password="XXX";
};

Readstream
-

scala> val df = spark.
| readStream.
| format("kafka").
| option("kafka.bootstrap.servers", "pkc-XXX.cloud:9092").
| option("subscribe", "INTERNAL_VIS_R1227_CDC_ICX_SESSIONS").
| option("kafka.sasl.mechanisms", "PLAIN").
| option("kafka.security.protocol", "SASL_SSL").
| option("startingOffsets", "earliest").
| load.
| select($"value".cast("string").alias("value"))
df: org.apache.spark.sql.DataFrame = [value: string]

Writestream
--

scala> df.writeStream.
| format("console").
| outputMode("append").
| trigger(Trigger.ProcessingTime("20 seconds")).
| start
2020-07-03 04:07:48,366 WARN streaming.StreamingQueryManager: Temporary
checkpoint location created which is deleted normally when the query didn't
fail: /tmp/temporary-897996c3-a86a-4b7d-ac19-62168a14d279. If it's required
to delete it under any circumstances, please set
spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to
know deleting temp checkpoint folder is best effort.
res0: org.apache.spark.sql.streaming.StreamingQuery =
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@324a5741

scala> 2020-07-03 04:07:49,139 WARN kafka010.KafkaOffsetReader: Error in
attempt 1 getting Kafka offsets:
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:820)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:612)
at
org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:76)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:88)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:538)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:600)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:599)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:536)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:567)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:536)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:298)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:378)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:378)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:371)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:368)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at

Re: REST Structured Steaming Sink

2020-07-03 Thread Sam Elamin
Hi Folks,

Great discussion! I will take into account rate-limiting and make it
configurable for the http request as well as all

I was wondering if there is anything I might have missed that would make it
technically impossible to do or at least difficult enough to not warrant
the effort

Is there anything I might have overlooked? Also, would this be useful to
people?

My idea is from a business perspective, why are we making them wait till
the next scheduled batch run for data that is already available from an
API. You could run a job every minute/hour but that in itself sounds like a
streaming use-case

Thoughts?

Regards
Sam

On Thu, Jul 2, 2020 at 3:31 AM Burak Yavuz  wrote:

> Well, the difference is, a technical user writes the UDF and a
> non-technical user may use this built-in thing (misconfigure it) and shoot
> themselves in the foot.
>
> On Wed, Jul 1, 2020, 6:40 PM Andrew Melo  wrote:
>
>> On Wed, Jul 1, 2020 at 8:13 PM Burak Yavuz  wrote:
>> >
>> > I'm not sure having a built-in sink that allows you to DDOS servers is
>> the best idea either. foreachWriter is typically used for such use cases,
>> not foreachBatch. It's also pretty hard to guarantee exactly-once, rate
>> limiting, etc.
>>
>> If you control the machines and can run arbitrary code, you can DDOS
>> whatever you want. What's the difference between this proposal and
>> writing a UDF that opens 1,000 connections to a target machine?
>>
>> > Best,
>> > Burak
>> >
>> > On Wed, Jul 1, 2020 at 5:54 PM Holden Karau 
>> wrote:
>> >>
>> >> I think adding something like this (if it doesn't already exist) could
>> help make structured streaming easier to use, foreachBatch is not the best
>> API.
>> >>
>> >> On Wed, Jul 1, 2020 at 2:21 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>> >>>
>> >>> I guess the method, query parameter, header, and the payload would be
>> all different for almost every use case - that makes it hard to generalize
>> and requires implementation to be pretty much complicated to be flexible
>> enough.
>> >>>
>> >>> I'm not aware of any custom sink implementing REST so your best bet
>> would be simply implementing your own with foreachBatch, but so someone
>> might jump in and provide a pointer if there is something in the Spark
>> ecosystem.
>> >>>
>> >>> Thanks,
>> >>> Jungtaek Lim (HeartSaVioR)
>> >>>
>> >>> On Thu, Jul 2, 2020 at 3:21 AM Sam Elamin 
>> wrote:
>> 
>>  Hi All,
>> 
>> 
>>  We ingest alot of restful APIs into our lake and I'm wondering if it
>> is at all possible to created a rest sink in structured streaming?
>> 
>>  For now I'm only focusing on restful services that have an
>> incremental ID so my sink can just poll for new data then ingest.
>> 
>>  I can't seem to find a connector that does this and my gut instinct
>> tells me it's probably because it isn't possible due to something
>> completely obvious that I am missing
>> 
>>  I know some RESTful API obfuscate the IDs to a hash of strings and
>> that could be a problem but since I'm planning on focusing on just
>> numerical IDs that just get incremented I think I won't be facing that issue
>> 
>> 
>>  Can anyone let me know if this sounds like a daft idea? Will I need
>> something like Kafka or kinesis as a buffer and redundancy or am I
>> overthinking this?
>> 
>> 
>>  I would love to bounce ideas with people who runs structured
>> streaming jobs in production
>> 
>> 
>>  Kind regards
>>  San
>> 
>> 
>> >>
>> >>
>> >> --
>> >> Twitter: https://twitter.com/holdenkarau
>> >> Books (Learning Spark, High Performance Spark, etc.):
>> https://amzn.to/2MaRAG9
>> >> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>
>