[ 
https://issues.apache.org/jira/browse/GEARPUMP-24?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manu Zhang updated GEARPUMP-24:
-------------------------------
    Description: 
>From [https://github.com/gearpump/gearpump/issues/2013]:

The current DataSource API

{code}
trait DataSource extends java.io.Serializable {

  /**
   * open connection to data source
   * invoked in onStart() method of 
[[io.gearpump.streaming.source.DataSourceTask]]
   * @param context is the task context at runtime
   * @param startTime is the start time of system
   */
  def open(context: TaskContext, startTime: Option[TimeStamp]): Unit

  /**
   * read a number of messages from data source.
   * invoked in each onNext() method of 
[[io.gearpump.streaming.source.DataSourceTask]]
   * @param batchSize max number of messages to read
   * @return a list of messages wrapped in [[io.gearpump.Message]]
   */
  def read(batchSize: Int): List[Message]

  /**
   * close connection to data source.
   * invoked in onStop() method of 
[[io.gearpump.streaming.source.DataSourceTask]]
   */
  def close(): Unit
}
{code}

has several issues

1. read returns a scala list of Message which is unfriendly to Java 
DataSources. Same for Option parameter in open
2. the number of read messages may not be the same as the passed in batchSize 
which leaves uncertainty to users (users may access out of boundary list 
positions)
3. to return a list an extra buffer could be needed in read (e.g. KafkaSource) 
which is not best for performance

Update:

I'd like to add a "getWatermark" interface that helps to fix GEARPUMP-32

  was:
>From [https://github.com/gearpump/gearpump/issues/2013]:

The current DataSource API

{code}
trait DataSource extends java.io.Serializable {

  /**
   * open connection to data source
   * invoked in onStart() method of 
[[io.gearpump.streaming.source.DataSourceTask]]
   * @param context is the task context at runtime
   * @param startTime is the start time of system
   */
  def open(context: TaskContext, startTime: Option[TimeStamp]): Unit

  /**
   * read a number of messages from data source.
   * invoked in each onNext() method of 
[[io.gearpump.streaming.source.DataSourceTask]]
   * @param batchSize max number of messages to read
   * @return a list of messages wrapped in [[io.gearpump.Message]]
   */
  def read(batchSize: Int): List[Message]

  /**
   * close connection to data source.
   * invoked in onStop() method of 
[[io.gearpump.streaming.source.DataSourceTask]]
   */
  def close(): Unit
}
{code}

has several issues

1. read returns a scala list of Message which is unfriendly to Java 
DataSources. Same for Option parameter in open
2. the number of read messages may not be the same as the passed in batchSize 
which leaves uncertainty to users (users may access out of boundary list 
positions)
3. to return a list an extra buffer could be needed in read (e.g. KafkaSource) 
which is not best for performance


> refactor DataSource API
> -----------------------
>
>                 Key: GEARPUMP-24
>                 URL: https://issues.apache.org/jira/browse/GEARPUMP-24
>             Project: Apache Gearpump
>          Issue Type: Improvement
>          Components: streaming
>    Affects Versions: 0.8.0
>            Reporter: Manu Zhang
>            Assignee: Manu Zhang
>             Fix For: 0.8.1
>
>
> From [https://github.com/gearpump/gearpump/issues/2013]:
> The current DataSource API
> {code}
> trait DataSource extends java.io.Serializable {
>   /**
>    * open connection to data source
>    * invoked in onStart() method of 
> [[io.gearpump.streaming.source.DataSourceTask]]
>    * @param context is the task context at runtime
>    * @param startTime is the start time of system
>    */
>   def open(context: TaskContext, startTime: Option[TimeStamp]): Unit
>   /**
>    * read a number of messages from data source.
>    * invoked in each onNext() method of 
> [[io.gearpump.streaming.source.DataSourceTask]]
>    * @param batchSize max number of messages to read
>    * @return a list of messages wrapped in [[io.gearpump.Message]]
>    */
>   def read(batchSize: Int): List[Message]
>   /**
>    * close connection to data source.
>    * invoked in onStop() method of 
> [[io.gearpump.streaming.source.DataSourceTask]]
>    */
>   def close(): Unit
> }
> {code}
> has several issues
> 1. read returns a scala list of Message which is unfriendly to Java 
> DataSources. Same for Option parameter in open
> 2. the number of read messages may not be the same as the passed in batchSize 
> which leaves uncertainty to users (users may access out of boundary list 
> positions)
> 3. to return a list an extra buffer could be needed in read (e.g. 
> KafkaSource) which is not best for performance
> Update:
> I'd like to add a "getWatermark" interface that helps to fix GEARPUMP-32



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to