[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-11 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r468554247



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
##
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.util.{List => JList, Map => JMap}
+
+import com.yammer.metrics.core.MetricsRegistry
+import org.apache.helix.model.InstanceConfig
+import org.apache.pinot.common.metrics.BrokerMetrics
+import org.apache.pinot.common.request.BrokerRequest
+import org.apache.pinot.common.utils.DataTable
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableTypes
+import org.apache.pinot.connector.spark.datasource.PinotDataSourceReadOptions
+import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.connector.spark.utils.Logging
+import org.apache.pinot.core.transport.{AsyncQueryResponse, QueryRouter, 
ServerInstance}
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler
+
+import scala.collection.JavaConverters._
+
+/**
+ * Fetch data from specified Pinot server.
+ */
+private[pinot] class PinotServerDataFetcher(
+partitionId: Int,
+pinotSplit: PinotSplit,
+dataSourceOptions: PinotDataSourceReadOptions)
+  extends Logging {
+  private val sqlCompiler = new CalciteSqlCompiler()
+  private val brokerId = "apache_spark"

Review comment:
   I could not find any information about that for now. I'll look it in the 
next release





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-09 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r467567841



##
File path: pinot-connectors/pinot-spark-connector/documentation/read_model.md
##
@@ -0,0 +1,145 @@
+
+# Read Model
+
+Connector can scan offline, hybrid and realtime tables. `table` parameter have 
to given like below;

Review comment:
   Hmm, table type option would be good. Table type option will be required 
option with the table name. It will be more clear, thanks!
   
   Update: Implemented





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-09 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r467567841



##
File path: pinot-connectors/pinot-spark-connector/documentation/read_model.md
##
@@ -0,0 +1,145 @@
+
+# Read Model
+
+Connector can scan offline, hybrid and realtime tables. `table` parameter have 
to given like below;

Review comment:
   Hmm, table type option would be good. Table type option will be required 
option with the table name. It will be more clear, thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-04 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r464311850



##
File path: config/.scalafmt.conf
##
@@ -0,0 +1,17 @@
+version = "2.4.0"

Review comment:
   Defined rules to format scala code.
   
   This file removed due to travis fail. It required maven 3.6.0 version.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-03 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r464543147



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
##
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.util.{List => JList, Map => JMap}
+
+import com.yammer.metrics.core.MetricsRegistry
+import org.apache.helix.model.InstanceConfig
+import org.apache.pinot.common.metrics.BrokerMetrics
+import org.apache.pinot.common.request.BrokerRequest
+import org.apache.pinot.common.utils.DataTable
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableTypes
+import org.apache.pinot.connector.spark.datasource.PinotDataSourceReadOptions
+import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.connector.spark.utils.Logging
+import org.apache.pinot.core.transport.{AsyncQueryResponse, QueryRouter, 
ServerInstance}
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler
+
+import scala.collection.JavaConverters._
+
+/**
+ * Fetch data from specified Pinot server.
+ */
+private[pinot] class PinotServerDataFetcher(
+partitionId: Int,
+pinotSplit: PinotSplit,
+dataSourceOptions: PinotDataSourceReadOptions)
+  extends Logging {
+  private val sqlCompiler = new CalciteSqlCompiler()

Review comment:
   I've created PR related this, can you check? @fx19880617 
   
   https://github.com/apache/incubator-pinot/pull/5791
   
   I've changed routing table endpoint(in `PinotClusterClient`) with new 
endpoint in PR #5791 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-03 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r464543147



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
##
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.util.{List => JList, Map => JMap}
+
+import com.yammer.metrics.core.MetricsRegistry
+import org.apache.helix.model.InstanceConfig
+import org.apache.pinot.common.metrics.BrokerMetrics
+import org.apache.pinot.common.request.BrokerRequest
+import org.apache.pinot.common.utils.DataTable
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableTypes
+import org.apache.pinot.connector.spark.datasource.PinotDataSourceReadOptions
+import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.connector.spark.utils.Logging
+import org.apache.pinot.core.transport.{AsyncQueryResponse, QueryRouter, 
ServerInstance}
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler
+
+import scala.collection.JavaConverters._
+
+/**
+ * Fetch data from specified Pinot server.
+ */
+private[pinot] class PinotServerDataFetcher(
+partitionId: Int,
+pinotSplit: PinotSplit,
+dataSourceOptions: PinotDataSourceReadOptions)
+  extends Logging {
+  private val sqlCompiler = new CalciteSqlCompiler()

Review comment:
   I've created PR related this, can you check? @fx19880617 
   
   https://github.com/apache/incubator-pinot/pull/5791
   
   I've changed routing table endpoint with new endpoint in PR #5791 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-03 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r464544165



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
##
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.util.{List => JList, Map => JMap}
+
+import com.yammer.metrics.core.MetricsRegistry
+import org.apache.helix.model.InstanceConfig
+import org.apache.pinot.common.metrics.BrokerMetrics
+import org.apache.pinot.common.request.BrokerRequest
+import org.apache.pinot.common.utils.DataTable
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableTypes
+import org.apache.pinot.connector.spark.datasource.PinotDataSourceReadOptions
+import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.connector.spark.utils.Logging
+import org.apache.pinot.core.transport.{AsyncQueryResponse, QueryRouter, 
ServerInstance}
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler
+
+import scala.collection.JavaConverters._
+
+/**
+ * Fetch data from specified Pinot server.
+ */
+private[pinot] class PinotServerDataFetcher(
+partitionId: Int,
+pinotSplit: PinotSplit,
+dataSourceOptions: PinotDataSourceReadOptions)
+  extends Logging {
+  private val sqlCompiler = new CalciteSqlCompiler()
+  private val brokerId = "apache_spark"

Review comment:
   i'll look for more info about executor





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-03 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r464522910



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala
##
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.net.{URI, URLEncoder}
+import java.util.regex.Pattern
+
+import org.apache.pinot.connector.spark.decodeTo
+import org.apache.pinot.connector.spark.exceptions.{HttpStatusCodeException, 
PinotException}
+import org.apache.pinot.connector.spark.utils.{HttpUtils, Logging}
+
+import scala.util.{Failure, Success, Try}
+import io.circe.generic.auto._
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableTypes
+import org.apache.pinot.connector.spark.connector.query.GeneratedSQLs
+import org.apache.pinot.spi.data.Schema
+
+/**
+ * Client that read/write/prepare required data from/to Pinot.
+ */
+private[pinot] object PinotClusterClient extends Logging {
+
+  def getTableSchema(controllerUrl: String, tableName: String): Schema = {
+val rawTableName = PinotUtils.getRawTableName(tableName)
+Try {
+  val uri = new URI(s"http://$controllerUrl/tables/$rawTableName/schema;)
+  val response = HttpUtils.sendGetRequest(uri)
+  Schema.fromString(response)

Review comment:
   Yes, but i guess required one more http request? is this way outdated?  
@fx19880617 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-03 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r464543147



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
##
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.util.{List => JList, Map => JMap}
+
+import com.yammer.metrics.core.MetricsRegistry
+import org.apache.helix.model.InstanceConfig
+import org.apache.pinot.common.metrics.BrokerMetrics
+import org.apache.pinot.common.request.BrokerRequest
+import org.apache.pinot.common.utils.DataTable
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableTypes
+import org.apache.pinot.connector.spark.datasource.PinotDataSourceReadOptions
+import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.connector.spark.utils.Logging
+import org.apache.pinot.core.transport.{AsyncQueryResponse, QueryRouter, 
ServerInstance}
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler
+
+import scala.collection.JavaConverters._
+
+/**
+ * Fetch data from specified Pinot server.
+ */
+private[pinot] class PinotServerDataFetcher(
+partitionId: Int,
+pinotSplit: PinotSplit,
+dataSourceOptions: PinotDataSourceReadOptions)
+  extends Logging {
+  private val sqlCompiler = new CalciteSqlCompiler()

Review comment:
   I've created PR related this, can you check? @fx19880617 
   
   https://github.com/apache/incubator-pinot/pull/5791





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-03 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r464526782



##
File path: pinot-connectors/pinot-spark-connector/documentation/read_model.md
##
@@ -0,0 +1,145 @@
+
+# Read Model
+
+Connector can scan offline, hybrid and realtime tables. `table` parameter have 
to given like below;

Review comment:
   Users can not specify table name without type if table not hybrid. If 
table hybrid, it checks routing tables of realtime and offline query separately.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-03 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r464524102



##
File path: pinot-connectors/pinot-spark-connector/documentation/read_model.md
##
@@ -0,0 +1,145 @@
+
+# Read Model
+
+Connector can scan offline, hybrid and realtime tables. `table` parameter have 
to given like below;
+- For offline table `tbl_OFFLINE`
+- For realtime table `tbl_REALTIME`
+- For hybrid table `tbl`
+
+An example scan;
+
+```scala
+val df = spark.read
+  .format("pinot")
+  .option("table", "airlineStats")
+  .load()
+```
+
+Custom schema can be specified directly. If schema is not specified, connector 
read table schema from Pinot controller, and then convert to the Spark schema. 
+
+
+### Architecture
+
+Connector reads data from `Pinot Servers` directly. For this operation, 
firstly, connector creates query with given filters(if filter push down is 
enabled) and columns, then finds routing table for created query. It creates 
pinot splits that contains **ONE PINOT SERVER and ONE OR MORE SEGMENT per spark 
partition**, based on the routing table and `segmentsPerSplit`(detailed explain 
is defined below). Lastly, each partition read data from specified pinot server 
in parallel.
+
+![Spark-Pinot Connector 
Architecture](images/spark-pinot-connector-executor-server-interaction.jpg)
+
+
+Each Spark partition open connection with Pinot server, and read data. For 
example, assume that routing table informations for specified query is like 
that:
+
+```
+- realtime ->
+   - realtimeServer1 -> (segment1, segment2, segment3)
+   - realtimeServer2 -> (segment4)
+- offline ->
+   - offlineServer10 -> (segment10, segment20)
+```
+
+If `segmentsPerSplit` is equal to 3, there will be created 3 Spark partition 
like below;
+
+| Spark Partition  | Queried Pinot Server/Segments |
+| - | - |
+| partition1  | realtimeServer1 / segment1, segment2, segment3  |
+| partition2  | realtimeServer2 / segment4  |
+| partition3  | offlineServer10 / segment10, segment20 |
+
+
+If `segmentsPerSplit` is equal to 1, there will be created 6 Spark partition;
+
+| Spark Partition  | Queried Pinot Server/Segments |
+| - | - |
+| partition1  | realtimeServer1 / segment1 |
+| partition2  | realtimeServer1 / segment2  |
+| partition3  | realtimeServer1 / segment3 |
+| partition4  | realtimeServer2 / segment4 |
+| partition5  | offlineServer10 / segment10 |
+| partition6  | offlineServer10 / segment20 |
+
+
+If `segmentsPerSplit` value is too low, that means more parallelism. But this 
also mean that a lot of connection will be opened with Pinot servers, and will 
increase QPS on the Pinot servers. 
+
+If `segmetnsPerSplit` value is too high, that means less parallelism. Each 
Pinot server will scan more segments per request.  
+
+**Note:** Pinot servers prunes segments based on the segment metadata when 
query comes. In some cases(for example filtering based on the some columns), 
some servers may not return data. Therefore, some Spark partitions will be 
empty. In this cases, `repartition()` may be applied for efficient data 
analysis after loading data to Spark.
+
+
+### Filter And Column Push Down
+Connector supports filter and column push down. Filters and columns are pushed 
to the pinot servers. Filter and column push down improves the performance 
while reading data because of its minimizing data transfer between Pinot and 
Spark. In default, filter push down enabled. If filters are desired to be 
applied in Spark, `usePushDownFilters` should be set as `false`.
+
+Connector supports `Equal, In, LessThan, LessThanOrEqual, Greater, 
GreaterThan, Not, TEXT_MATCH, And, Or` filters for now.

Review comment:
   The filters section of the readme outdated. I've changed pql with sql, 
but i did forget to change supported filters section. The connector supports 
all sql filters now. I'll fix it 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-03 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r464522910



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala
##
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.net.{URI, URLEncoder}
+import java.util.regex.Pattern
+
+import org.apache.pinot.connector.spark.decodeTo
+import org.apache.pinot.connector.spark.exceptions.{HttpStatusCodeException, 
PinotException}
+import org.apache.pinot.connector.spark.utils.{HttpUtils, Logging}
+
+import scala.util.{Failure, Success, Try}
+import io.circe.generic.auto._
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableTypes
+import org.apache.pinot.connector.spark.connector.query.GeneratedSQLs
+import org.apache.pinot.spi.data.Schema
+
+/**
+ * Client that read/write/prepare required data from/to Pinot.
+ */
+private[pinot] object PinotClusterClient extends Logging {
+
+  def getTableSchema(controllerUrl: String, tableName: String): Schema = {
+val rawTableName = PinotUtils.getRawTableName(tableName)
+Try {
+  val uri = new URI(s"http://$controllerUrl/tables/$rawTableName/schema;)
+  val response = HttpUtils.sendGetRequest(uri)
+  Schema.fromString(response)

Review comment:
   Yes, but i guess required one more http request? is this way is 
outdated?  @fx19880617 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-03 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r464513043



##
File path: 
pinot-connectors/pinot-spark-connector/src/test/resources/schema/pinot-schema.json
##
@@ -0,0 +1,57 @@
+{

Review comment:
   yes





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-03 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r464504242



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala
##
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.datasource
+
+import java.util.{List => JList}
+
+import 
org.apache.pinot.connector.spark.connector.query.SQLSelectionQueryGenerator
+import org.apache.pinot.connector.spark.connector.{
+  FilterPushDown,
+  PinotClusterClient,
+  PinotSplitter,
+  PinotUtils
+}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{
+  DataSourceReader,
+  InputPartition,
+  SupportsPushDownFilters,
+  SupportsPushDownRequiredColumns
+}
+import org.apache.spark.sql.types._
+
+import scala.collection.JavaConverters._
+
+class PinotDataSourceReader(options: DataSourceOptions, userSchema: 
Option[StructType] = None)
+  extends DataSourceReader
+  with SupportsPushDownFilters
+  with SupportsPushDownRequiredColumns {
+
+  private val pinotDataSourceOptions = PinotDataSourceReadOptions.from(options)
+  private var acceptedFilters: Array[Filter] = Array.empty
+  private var currentSchema: StructType = _
+
+  override def readSchema(): StructType = {
+if (currentSchema == null) {
+  currentSchema = userSchema.getOrElse {
+val pinotTableSchema = PinotClusterClient.getTableSchema(
+  pinotDataSourceOptions.controller,
+  pinotDataSourceOptions.tableName
+)
+PinotUtils.pinotSchemaToSparkSchema(pinotTableSchema)
+  }
+}
+currentSchema
+  }
+
+  override def planInputPartitions(): JList[InputPartition[InternalRow]] = {
+val schema = readSchema()
+val tableType = PinotUtils.getTableType(pinotDataSourceOptions.tableName)
+
+// Time boundary is used when table is hybrid to ensure that the overlap
+// between realtime and offline segment data is queried exactly once
+val timeBoundaryInfo =
+  if (tableType.isDefined) {
+None
+  } else {
+PinotClusterClient.getTimeBoundaryInfo(
+  pinotDataSourceOptions.broker,
+  pinotDataSourceOptions.tableName
+)
+  }
+
+val whereCondition = 
FilterPushDown.compileFiltersToSqlWhereClause(this.acceptedFilters)
+val generatedSQLs = SQLSelectionQueryGenerator.generate(
+  pinotDataSourceOptions.tableName,
+  timeBoundaryInfo,
+  schema.fieldNames,
+  whereCondition
+)
+
+val routingTable =
+  PinotClusterClient.getRoutingTable(pinotDataSourceOptions.broker, 
generatedSQLs)

Review comment:
   Routing table is required. Spark reads data in parallel like broker. 
Each server-segments represented as spark partition. This is most efficient way 
to receive data from Pinot. Also Presto uses this architecture in 
pinot-connector. 
   
   If broker not available, connector throws exception. To avoid this, we can 
create a related API in controller side, and use it like metadata manager in 
other systems. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-03 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r464326792



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
##
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.util.{List => JList, Map => JMap}
+
+import com.yammer.metrics.core.MetricsRegistry
+import org.apache.helix.model.InstanceConfig
+import org.apache.pinot.common.metrics.BrokerMetrics
+import org.apache.pinot.common.request.BrokerRequest
+import org.apache.pinot.common.utils.DataTable
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableTypes
+import org.apache.pinot.connector.spark.datasource.PinotDataSourceReadOptions
+import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.connector.spark.utils.Logging
+import org.apache.pinot.core.transport.{AsyncQueryResponse, QueryRouter, 
ServerInstance}
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler
+
+import scala.collection.JavaConverters._
+
+/**
+ * Fetch data from specified Pinot server.
+ */
+private[pinot] class PinotServerDataFetcher(
+partitionId: Int,
+pinotSplit: PinotSplit,
+dataSourceOptions: PinotDataSourceReadOptions)
+  extends Logging {
+  private val sqlCompiler = new CalciteSqlCompiler()

Review comment:
   Yeah. I catch one thing. Connector uses broker `debug/routingTable?pql` 
API to get routing table for a query(for segment pruning). But in connector, 
i've changed pql to sql. I'll add sql endpoint to get routing table.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-03 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r464326792



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
##
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.util.{List => JList, Map => JMap}
+
+import com.yammer.metrics.core.MetricsRegistry
+import org.apache.helix.model.InstanceConfig
+import org.apache.pinot.common.metrics.BrokerMetrics
+import org.apache.pinot.common.request.BrokerRequest
+import org.apache.pinot.common.utils.DataTable
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableTypes
+import org.apache.pinot.connector.spark.datasource.PinotDataSourceReadOptions
+import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.connector.spark.utils.Logging
+import org.apache.pinot.core.transport.{AsyncQueryResponse, QueryRouter, 
ServerInstance}
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler
+
+import scala.collection.JavaConverters._
+
+/**
+ * Fetch data from specified Pinot server.
+ */
+private[pinot] class PinotServerDataFetcher(
+partitionId: Int,
+pinotSplit: PinotSplit,
+dataSourceOptions: PinotDataSourceReadOptions)
+  extends Logging {
+  private val sqlCompiler = new CalciteSqlCompiler()

Review comment:
   SQL endpoint? 
   
   SQL is just used in query generation. User can create its sql query with lot 
of spark methods with filter push down etc. In the executor side, sql query is 
converted to the broker query. Then data will be received with 
`pinot.core.transport` APIs using broker request. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-03 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r464324725



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/query/SQLSelectionQueryGenerator.scala
##
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector.query
+
+import org.apache.pinot.connector.spark.connector.Constants.{PinotTableType, 
PinotTableTypes}
+import org.apache.pinot.connector.spark.connector.{PinotUtils, 
TimeBoundaryInfo}
+
+/**
+ * Generate realtime and offline SQL queries for specified table with given 
columns and filters.
+ */
+private[pinot] class SQLSelectionQueryGenerator(
+tableNameWithType: String,
+timeBoundaryInfo: Option[TimeBoundaryInfo],
+columns: Array[String],
+whereClause: Option[String]) {
+  private val columnsExpression = columnsAsExpression()
+  private val rawTableName = PinotUtils.getRawTableName(tableNameWithType)
+  private val tableType = PinotUtils.getTableType(tableNameWithType)
+
+  def generatePQLs(): GeneratedSQLs = {
+val offlineSelectQuery = buildSelectQuery(PinotTableTypes.OFFLINE)
+val realtimeSelectQuery = buildSelectQuery(PinotTableTypes.REALTIME)
+GeneratedSQLs(
+  rawTableName,
+  tableType,
+  offlineSelectQuery,
+  realtimeSelectQuery
+)
+  }
+
+  /**
+   * Get all columns if selecting columns empty(eg: resultDataFrame.count())
+   */
+  private def columnsAsExpression(): String = {
+if (columns.isEmpty) "*" else columns.mkString(",")
+  }
+
+  /**
+   * Build realtime or offline PQL selection query.
+   */
+  private def buildSelectQuery(tableType: PinotTableType): String = {
+val tableNameWithType = s"${rawTableName}_$tableType"
+val queryBuilder = new StringBuilder(s"SELECT $columnsExpression FROM 
$tableNameWithType")
+
+// add where clause if exists
+whereClause.foreach { x =>
+  queryBuilder.append(s" WHERE $x")
+}
+
+// add time boundary filter if exists
+timeBoundaryInfo.foreach { tbi =>
+  val timeBoundaryFilter =
+if (tableType == PinotTableTypes.OFFLINE) {
+  tbi.getOfflinePredicate
+} else {
+  tbi.getRealtimePredicate
+}
+
+  if (whereClause.isEmpty) {
+queryBuilder.append(s" WHERE $timeBoundaryFilter")
+  } else {
+queryBuilder.append(s" AND $timeBoundaryFilter")
+  }
+}
+
+// query will be converted to Pinot 'BrokerRequest' with PQL compiler
+// pinot set limit to 10 automatically
+// to prevent this add limit to query
+queryBuilder.append(s" LIMIT ${Int.MaxValue}")

Review comment:
   I do not know. Connector supports only selection queries to read and 
re-index data etc. In this case, `LIMIT` is unnecessary i think except one 
case. If we will support `order by` in selection query, `limit` can be used. 
But this is not good practice while working with spark. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-03 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r464317075



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotSplitter.scala
##
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.util.regex.{Matcher, Pattern}
+
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableType
+import org.apache.pinot.connector.spark.connector.query.GeneratedSQLs
+import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.connector.spark.utils.Logging
+
+private[pinot] object PinotSplitter extends Logging {
+  private val PINOT_SERVER_PATTERN = Pattern.compile("Server_(.*)_(\\d+)")
+
+  def generatePinotSplits(
+  generatedSQLs: GeneratedSQLs,
+  routingTable: Map[String, Map[String, List[String]]],
+  segmentsPerSplit: Int): List[PinotSplit] = {
+routingTable.flatMap {
+  case (tableType, serversToSegments) =>
+serversToSegments
+  .map { case (server, segments) => parseServerInput(server, segments) 
}
+  .flatMap {
+case (matcher, segments) =>
+  createPinotSplitsFromSubSplits(
+tableType,
+generatedSQLs,
+matcher,
+segments,
+segmentsPerSplit
+  )
+  }
+}.toList
+  }
+
+  private def parseServerInput(server: String, segments: List[String]): 
(Matcher, List[String]) = {
+val matcher = PINOT_SERVER_PATTERN.matcher(server)
+if (matcher.matches() && matcher.groupCount() == 2) matcher -> segments
+else throw PinotException(s"'$server' did not match!?")
+  }
+
+  private def createPinotSplitsFromSubSplits(
+  tableType: PinotTableType,
+  generatedSQLs: GeneratedSQLs,
+  serverMatcher: Matcher,
+  segments: List[String],
+  segmentsPerSplit: Int): Iterator[PinotSplit] = {
+val serverHost = serverMatcher.group(1)
+val serverPort = serverMatcher.group(2)
+val maxSegmentCount = Math.min(segments.size, segmentsPerSplit)
+segments.grouped(maxSegmentCount).map { subSegments =>
+  val serverAndSegments =
+PinotServerAndSegments(serverHost, serverPort, subSegments, tableType)
+  PinotSplit(generatedSQLs, serverAndSegments)
+}
+  }
+}
+
+private[pinot] case class PinotSplit(
+generatedSQLs: GeneratedSQLs,
+serverAndSegments: PinotServerAndSegments)
+
+private[pinot] case class PinotServerAndSegments(
+serverHost: String,
+serverPort: String,
+segments: List[String],
+serverType: PinotTableType) {

Review comment:
   `GeneratedSQLs` contains the realtime and offline queries. `PinotSplit` 
contains the generated queries with the server and segment informations. In the 
executor side, we are send query to the specific server. Connector checks the 
`serverType` to learn which query will be converted, and sent to the which 
server. Thus we need to the `serverType` in the current design. This is the 
usage;
   
   ```scala
   val pinotServerAsyncQueryResponse = pinotSplit.serverAndSegments.serverType 
match {
 case TableType.REALTIME =>
   val realtimeBrokerRequest =
 
sqlCompiler.compileToBrokerRequest(pinotSplit.generatedSQLs.realtimeSelectQuery)
   submitRequestToPinotServer(null, null, realtimeBrokerRequest, 
routingTableForRequest)
 case TableType.OFFLINE =>
   val offlineBrokerRequest =
 
sqlCompiler.compileToBrokerRequest(pinotSplit.generatedSQLs.offlineSelectQuery)
   submitRequestToPinotServer(offlineBrokerRequest, 
routingTableForRequest, null, null)
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To 

[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-03 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r464311850



##
File path: config/.scalafmt.conf
##
@@ -0,0 +1,17 @@
+version = "2.4.0"

Review comment:
   Defined rules to format scala code





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-01 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r463983068



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
##
@@ -0,0 +1 @@
+org.apache.pinot.connector.spark.datasource.PinotDataSourceV2

Review comment:
   version. 
   
   Spark has datasource read/write api v1 and v2. V2 api has support a lot of 
features like push down filters, columnar read etc. For more details [Spark 
DatasourceV2](https://www.slideshare.net/databricks/apache-spark-data-source-v2-with-wenchen-fan-and-gengliang-wang)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-01 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r463983154



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala
##
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.net.{URI, URLEncoder}
+import java.util.regex.Pattern
+
+import org.apache.pinot.connector.spark.decodeTo
+import org.apache.pinot.connector.spark.exceptions.{HttpStatusCodeException, 
PinotException}
+import org.apache.pinot.connector.spark.utils.{HttpUtils, Logging}
+
+import scala.util.{Failure, Success, Try}
+import io.circe.generic.auto._
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableTypes
+import org.apache.pinot.connector.spark.connector.query.GeneratedPQLs
+import org.apache.pinot.spi.data.Schema
+
+/**
+ * Client that read/write/prepare required data from/to Pinot.
+ */
+private[pinot] object PinotClusterClient extends Logging {
+
+  def getTableSchema(controllerUrl: String, tableName: String): Schema = {
+val rawTableName = PinotUtils.getRawTableName(tableName)
+Try {
+  val uri = new URI(s"http://$controllerUrl/tables/$rawTableName/schema;)
+  val response = HttpUtils.sendGetRequest(uri)
+  Schema.fromString(response)
+} match {
+  case Success(response) =>
+logDebug(s"Pinot schema received successfully for table 
'$rawTableName'")
+response
+  case Failure(exception) =>
+throw PinotException(
+  s"An error occurred while getting Pinot schema for table 
'$rawTableName'",
+  exception
+)
+}
+  }
+
+  /**
+   * Get available broker urls(host:port) for given table.
+   * This method is used when if broker instances not defined in the 
datasource options.
+   */
+  def getBrokerInstances(controllerUrl: String, tableName: String): 
List[String] = {
+val brokerPattern = Pattern.compile("Broker_(.*)_(\\d+)")
+val rawTableName = PinotUtils.getRawTableName(tableName)
+Try {
+  val uri = new 
URI(s"http://$controllerUrl/tables/$rawTableName/instances;)

Review comment:
   What is the changes? How can i use? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-01 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r463983521



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceV2.scala
##
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.datasource
+
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport}
+import org.apache.spark.sql.types.StructType
+
+class PinotDataSourceV2 extends DataSourceV2 with ReadSupport with 
DataSourceRegister {

Review comment:
   version2. In spark, generally, version is added as V1 or V2 to the 
datasource class to learn which datasource api was used





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-01 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r463983068



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
##
@@ -0,0 +1 @@
+org.apache.pinot.connector.spark.datasource.PinotDataSourceV2

Review comment:
   Spark has datasource read/write api v1 and v2. V2 api has support a lot 
of features like push down filters, columnar read etc. For more details [Spark 
DatasourceV2](https://www.slideshare.net/databricks/apache-spark-data-source-v2-with-wenchen-fan-and-gengliang-wang)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-01 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r463990289



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/query/GeneratedPQLs.scala
##
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector.query
+
+import org.apache.pinot.connector.spark.connector.Constants.{PinotTableType, 
PinotTableTypes}
+
+private[pinot] case class GeneratedPQLs(

Review comment:
   pql was changed with sql 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-01 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r463983521



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceV2.scala
##
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.datasource
+
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport}
+import org.apache.spark.sql.types.StructType
+
+class PinotDataSourceV2 extends DataSourceV2 with ReadSupport with 
DataSourceRegister {

Review comment:
   version2. In spark, generally, version is added as V1 or V2 to the 
datasource class.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-01 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r463983285



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/query/GeneratedPQLs.scala
##
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector.query
+
+import org.apache.pinot.connector.spark.connector.Constants.{PinotTableType, 
PinotTableTypes}
+
+private[pinot] case class GeneratedPQLs(

Review comment:
   we can convert pql to sql. i'll update supported filters and query 
generation for that, and use sql compiler instead of pql-compiler





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-01 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r463983154



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala
##
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.net.{URI, URLEncoder}
+import java.util.regex.Pattern
+
+import org.apache.pinot.connector.spark.decodeTo
+import org.apache.pinot.connector.spark.exceptions.{HttpStatusCodeException, 
PinotException}
+import org.apache.pinot.connector.spark.utils.{HttpUtils, Logging}
+
+import scala.util.{Failure, Success, Try}
+import io.circe.generic.auto._
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableTypes
+import org.apache.pinot.connector.spark.connector.query.GeneratedPQLs
+import org.apache.pinot.spi.data.Schema
+
+/**
+ * Client that read/write/prepare required data from/to Pinot.
+ */
+private[pinot] object PinotClusterClient extends Logging {
+
+  def getTableSchema(controllerUrl: String, tableName: String): Schema = {
+val rawTableName = PinotUtils.getRawTableName(tableName)
+Try {
+  val uri = new URI(s"http://$controllerUrl/tables/$rawTableName/schema;)
+  val response = HttpUtils.sendGetRequest(uri)
+  Schema.fromString(response)
+} match {
+  case Success(response) =>
+logDebug(s"Pinot schema received successfully for table 
'$rawTableName'")
+response
+  case Failure(exception) =>
+throw PinotException(
+  s"An error occurred while getting Pinot schema for table 
'$rawTableName'",
+  exception
+)
+}
+  }
+
+  /**
+   * Get available broker urls(host:port) for given table.
+   * This method is used when if broker instances not defined in the 
datasource options.
+   */
+  def getBrokerInstances(controllerUrl: String, tableName: String): 
List[String] = {
+val brokerPattern = Pattern.compile("Broker_(.*)_(\\d+)")
+val rawTableName = PinotUtils.getRawTableName(tableName)
+Try {
+  val uri = new 
URI(s"http://$controllerUrl/tables/$rawTableName/instances;)

Review comment:
   i'll check it. Connector was written with pinot 0.4.0. I can update it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mangrrua commented on a change in pull request #5787: [Feature] - Spark Pinot Connector

2020-08-01 Thread GitBox


mangrrua commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r463983068



##
File path: 
pinot-connectors/pinot-spark-connector/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
##
@@ -0,0 +1 @@
+org.apache.pinot.connector.spark.datasource.PinotDataSourceV2

Review comment:
   Spark has datasource read/write api v1 and v2. V2 api has support a lot 
of features like push down filters, columnar read etc. For for details [Spark 
DatasourceV2](https://www.slideshare.net/databricks/apache-spark-data-source-v2-with-wenchen-fan-and-gengliang-wang)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org