Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16918#discussion_r100937332
  
    --- Diff: docs/structured-streaming-kafka-integration.md ---
    @@ -119,6 +119,122 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
     </div>
     </div>
     
    +### Creating a Kafka Source Batch
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +// Subscribe to 1 topic defaults to the earliest and latest offsets
    +val ds1 = spark
    +  .read
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("subscribe", "topic1")
    +  .load()
    +ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .as[(String, String)]
    +
    +// Subscribe to multiple topics, specifying explicit Kafka offsets
    +val ds2 = spark
    +  .read
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("subscribe", "topic1,topic2")
    +  .option("startingOffsets", 
"""{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
    +  .option("endingOffsets", 
"""{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
    +  .load()
    +ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .as[(String, String)]
    +
    +// Subscribe to a pattern, at the earliest and latest offsets
    +val ds3 = spark
    +  .read
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("subscribePattern", "topic.*")
    +  .option("startingOffsets", "earliest")
    +  .option("endingOffsets", "latest")
    +  .load()
    +ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .as[(String, String)]
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +// Subscribe to 1 topic defaults to the earliest and latest offsets
    +Dataset<Row> ds1 = spark
    +  .read()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("subscribe", "topic1")
    +  .load()
    +ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +
    +// Subscribe to multiple topics, specifying explicit Kafka offsets
    +Dataset<Row> ds2 = spark
    +  .read()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("subscribe", "topic1,topic2")
    +  .option("startingOffsets", 
"{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
    +  .option("endingOffsets", 
"{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
    +  .load()
    --- End diff --
    
    nit: missing ";"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to