Any help on the above?

On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu <aakash.spark....@gmail.com>
wrote:

> Hi,
>
> I progressed a bit in the above mentioned topic -
>
> 1) I am feeding a CSV file into the Kafka topic.
> 2) Feeding the Kafka topic as readStream as TD's article suggests.
> 3) Then, simply trying to do a show on the streaming dataframe, using
> queryName('XYZ') in the writeStream and writing a sql query on top of it,
> but that doesn't show anything.
> 4) Once all the above problems are resolved, I want to perform a
> stream-stream join.
>
> The CSV file I'm ingesting into Kafka has -
>
> id,first_name,last_name
> 1,Kellyann,Moyne
> 2,Morty,Blacker
> 3,Tobit,Robardley
> 4,Wilona,Kells
> 5,Reggy,Comizzoli
>
>
> My test code -
>
> from pyspark.sql import SparkSession
> import time
>
> class test:
>
>
>     spark = SparkSession.builder \
>         .appName("DirectKafka_Spark_Stream_Stream_Join") \
>         .getOrCreate()
>     # ssc = StreamingContext(spark, 20)
>
>     table1_stream = 
> (spark.readStream.format("kafka").option("startingOffsets", 
> "earliest").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "test1").load())
>
>     # table2_stream = 
> (spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "test2").load())
>
>     # joined_Stream = table1_stream.join(table2_stream, "Id")
>     #
>     # joined_Stream.show()
>
>     query = 
> table1_stream.writeStream.format("console").queryName("table_A").start()  # 
> .format("memory")
>     # spark.sql("select * from table_A").show()
>     # time.sleep(10)  # sleep 20 seconds
>     # query.stop()
>     query.awaitTermination()
>
>
> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit --packages 
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 Stream_Stream_Join.py
>
>
> The output I'm getting (whereas I simply want to show() my dataframe) -
>
> +----+--------------------+-----+---------+------+----------
> ----------+-------------+
> | key|               value|topic|partition|offset|
> timestamp|timestampType|
> +----+--------------------+-----+---------+------+----------
> ----------+-------------+
> |null|[69 64 2C 66 69 7...|test1|        0|  5226|2018-03-15
> 15:48:...|            0|
> |null|[31 2C 4B 65 6C 6...|test1|        0|  5227|2018-03-15
> 15:48:...|            0|
> |null|[32 2C 4D 6F 72 7...|test1|        0|  5228|2018-03-15
> 15:48:...|            0|
> |null|[33 2C 54 6F 62 6...|test1|        0|  5229|2018-03-15
> 15:48:...|            0|
> |null|[34 2C 57 69 6C 6...|test1|        0|  5230|2018-03-15
> 15:48:...|            0|
> |null|[35 2C 52 65 67 6...|test1|        0|  5231|2018-03-15
> 15:48:...|            0|
> +----+--------------------+-----+---------+------+----------
> ----------+-------------+
>
> 18/03/15 15:48:07 INFO StreamExecution: Streaming query made progress: {
>   "id" : "ca7e2862-73c6-41bf-9a6f-c79e533a2bf8",
>   "runId" : "0758ddbd-9b1c-428b-aa52-1dd40d477d21",
>   "name" : "table_A",
>   "timestamp" : "2018-03-15T10:18:07.218Z",
>   "numInputRows" : 6,
>   "inputRowsPerSecond" : 461.53846153846155,
>   "processedRowsPerSecond" : 14.634146341463415,
>   "durationMs" : {
>     "addBatch" : 241,
>     "getBatch" : 15,
>     "getOffset" : 2,
>     "queryPlanning" : 2,
>     "triggerExecution" : 410,
>     "walCommit" : 135
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
>     "description" : "KafkaSource[Subscribe[test1]]",
>     "startOffset" : {
>       "test1" : {
>         "0" : 5226
>       }
>     },
>     "endOffset" : {
>       "test1" : {
>         "0" : 5232
>       }
>     },
>     "numInputRows" : 6,
>     "inputRowsPerSecond" : 461.53846153846155,
>     "processedRowsPerSecond" : 14.634146341463415
>   } ],
>   "sink" : {
>     "description" : "org.apache.spark.sql.execution.streaming.
> ConsoleSink@3dfc7990"
>   }
> }
>
> P.S - If I add the below piece in the code, it doesn't print a DF of the
> actual table.
>
> spark.sql("select * from table_A").show()
>
>
> Any help?
>
>
> Thanks,
> Aakash.
>
> On Thu, Mar 15, 2018 at 10:52 AM, Aakash Basu <aakash.spark....@gmail.com>
> wrote:
>
>> Thanks to TD, the savior!
>>
>> Shall look into it.
>>
>> On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Relevant: https://databricks.com/blog/2018/03/13/introducing
>>> -stream-stream-joins-in-apache-spark-2-3.html
>>>
>>> This is true stream-stream join which will automatically buffer delayed
>>> data and appropriately join stuff with SQL join semantics. Please check it
>>> out :)
>>>
>>> TD
>>>
>>>
>>>
>>> On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes <djmggue...@gmail.com>
>>> wrote:
>>>
>>>> I misread it, and thought that you question was if pyspark supports
>>>> kafka lol. Sorry!
>>>>
>>>> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <
>>>> aakash.spark....@gmail.com> wrote:
>>>>
>>>>> Hey Dylan,
>>>>>
>>>>> Great!
>>>>>
>>>>> Can you revert back to my initial and also the latest mail?
>>>>>
>>>>> Thanks,
>>>>> Aakash.
>>>>>
>>>>> On 15-Mar-2018 12:27 AM, "Dylan Guedes" <djmggue...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I've been using the Kafka with pyspark since 2.1.
>>>>>>
>>>>>> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <
>>>>>> aakash.spark....@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'm yet to.
>>>>>>>
>>>>>>> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
>>>>>>> allows Python? I read somewhere, as of now Scala and Java are the 
>>>>>>> languages
>>>>>>> to be used.
>>>>>>>
>>>>>>> Please correct me if am wrong.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Aakash.
>>>>>>>
>>>>>>> On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Did you try spark 2.3 with structured streaming? There watermarking
>>>>>>>> and plain sql might be really interesting for you.
>>>>>>>> Aakash Basu <aakash.spark....@gmail.com> schrieb am Mi. 14. März
>>>>>>>> 2018 um 14:57:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>>>>>>>>
>>>>>>>>> *Spark 2.2.1*
>>>>>>>>> *Kafka 1.0.1*
>>>>>>>>>
>>>>>>>>> As of now, I am feeding paragraphs in Kafka console producer and
>>>>>>>>> my Spark, which is acting as a receiver is printing the flattened 
>>>>>>>>> words,
>>>>>>>>> which is a complete RDD operation.
>>>>>>>>>
>>>>>>>>> *My motive is to read two tables continuously (being updated) as
>>>>>>>>> two distinct Kafka topics being read as two Spark Dataframes and join 
>>>>>>>>> them
>>>>>>>>> based on a key and produce the output. *(I am from Spark-SQL
>>>>>>>>> background, pardon my Spark-SQL-ish writing)
>>>>>>>>>
>>>>>>>>> *It may happen, the first topic is receiving new data 15 mins
>>>>>>>>> prior to the second topic, in that scenario, how to proceed? I should 
>>>>>>>>> not
>>>>>>>>> lose any data.*
>>>>>>>>>
>>>>>>>>> As of now, I want to simply pass paragraphs, read them as RDD,
>>>>>>>>> convert to DF and then join to get the common keys as the output. 
>>>>>>>>> (Just for
>>>>>>>>> R&D).
>>>>>>>>>
>>>>>>>>> Started using Spark Streaming and Kafka today itself.
>>>>>>>>>
>>>>>>>>> Please help!
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Aakash.
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>>
>>
>

Reply via email to