[ 
https://issues.apache.org/jira/browse/BAHIR-168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514738#comment-16514738
 ] 

Luciano Resende commented on BAHIR-168:
---------------------------------------

Sorry for the delay responding, I have been traveling. 

Detailed steps to contribute a new extension is documented at 
[http://bahir.apache.org/contributing-extensions/]

I would recommend creating a PR at the Bahir repository, but as we are in the 
process of migrating the extensions to Data Source V2, it would be good if we 
could have it already based on the Data Source V2, otherwise it will not be 
able to be released when we do the Bahir release supporting Spark 2.3

> Kinesis support in Structured Streaming
> ---------------------------------------
>
>                 Key: BAHIR-168
>                 URL: https://issues.apache.org/jira/browse/BAHIR-168
>             Project: Bahir
>          Issue Type: New Feature
>          Components: Spark Structured Streaming Connectors
>    Affects Versions: Spark-2.3
>            Reporter: Takako Shimamoto
>            Priority: Major
>
> Implement Kinesis based sources and sinks for Structured Streaming
> h2. Kinesis Sources
> I hope that [this|https://github.com/qubole/kinesis-sql] will be contributed 
> to Apache Bahir, as commented in the SPARK-18165.
> h2. Kinesis Sinks
> I've implemented a Sink here: 
> [https://github.com/shimamoto/bahir/tree/kinesis-writer/sql-kinesis]
>  This requires the Spark 2.3 and datasource v2 API. I plan on opening a PR, 
> but Bahir hasn't supported Spark 2.3 yet. We need to handle BAHIR-167 first.
> A brief overview is listed below.
> h3. Description
> Add a new Kinesis Sink and Kinesis Relation for writing streaming and batch 
> queries, respectively, to AWS Kinesis.
> The Dataframe being written to Kinesis should have the following columns in 
> schema:
> ||Column||Type||
> |partitionKey (optional)|string|
> |data (required)|string or binary|
> If the partition key column is missing, then a SHA-256 digest of data as a 
> hex string will be automatically added.
> h4. Streaming Kinesis Sink
> {code}
> val df = inputStream.toDS().toDF("partitionKey", "data")
> val writer = df.writeStream
>   .format("kinesis")
>   .option("streamName", "test-stream")
>   .option("region", "us-east-1")
>   .option("checkpointLocation", checkpointDir.getCanonicalPath)
>   .start()
> {code}
> h4. Batch Kinesis Sink
> {code}
> val df = Seq("partitionKey-1" -> "data1", "partitionKey-2" -> "data2")
>   .toDF("partitionKey", "data")
> df.write
>   .format("kinesis")
>   .option("streamName", "test-stream")
>   .option("region", "us-east-1")
>   .save()
> {code}
> h3. Configuration
> The following options must be set for both batch and streaming queries.
> ||Option||value||default||meaning||
> |streamName|string|-|The stream name associated with the Sink.|
> |region|string|-|The region name for Kinesis Stream.|
> The following configurations are optional.
> ||Option||value||default||meaning||
> |chunksize|int|50|Rate limit on maximum number of records processed per 
> PutRecords request.|
> |endpoint|string|(none)|Only use this if using a non-standard service 
> endpoint.|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to