asfgit closed pull request #6597: [FLINK-10136] [table] Add REPEAT supported in 
Table API and SQL
URL: https://github.com/apache/flink/pull/6597
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 24d8d70080b..759cf2f8db0 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -2401,6 +2401,18 @@ RTRIM(string)
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight text %}
+REPEAT(string, integer)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a string that repeats the base <i>string</i> <i>integer</i> 
times.</p> 
+        <p>E.g., <code>REPEAT('This is a test String.', 2)</code> returns 
"This is a test String.This is a test String.".</p>
+      </td>
+    </tr>
+
     <tr>
       <td>
         {% highlight text %}
@@ -2614,6 +2626,18 @@ STRING.rtrim()
         <p>E.g., <code>'This is a test String. '.rtrim()</code> returns "This 
is a test String.".</p>
       </td>
     </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+STRING.repeat(INT)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a string that repeats the base <i>STRING</i> <i>INT</i> 
times.</p> 
+        <p>E.g., <code>"This is a test String. ".repeat(2)</code> returns 
"This is a test String.This is a test String.".</p>
+      </td>
+    </tr>    
     
     <tr>
       <td>
@@ -2830,6 +2854,18 @@ STRING.rtrim()
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight scala %}
+STRING.repeat(INT)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a string that repeats the base <i>STRING</i> <i>INT</i> 
times.</p> 
+        <p>E.g., <code>"This is a test String. ".repeat(2)</code> returns 
"This is a test String.This is a test String.".</p>
+      </td>
+    </tr> 
+
     <tr>
       <td>
         {% highlight scala %}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 8b08af68117..a0cfac65923 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -573,6 +573,11 @@ trait ImplicitExpressionOperations {
     */
   def rtrim() = RTrim(expr)
 
+  /**
+    * Returns a string that repeats the base string n times.
+    */
+  def repeat(n: Expression) = Repeat(expr, n)
+
   // Temporal operations
 
   /**
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 7eb91d3806d..899cb0ff35a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -141,4 +141,10 @@ object BuiltInMethods {
   val HEX_STRING: Method = Types.lookupMethod(classOf[ScalarFunctions], "hex", 
classOf[String])
 
   val UUID: Method = Types.lookupMethod(classOf[ScalarFunctions], "uuid")
+
+  val REPEAT: Method = Types.lookupMethod(
+    classOf[ScalarFunctions],
+    "repeat",
+    classOf[String],
+    classOf[Int])
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 7c328c98be1..c7eb869a5b5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -176,6 +176,12 @@ object FunctionGenerator {
     STRING_TYPE_INFO,
     BuiltInMethod.RTRIM.method)
 
+  addSqlFunctionMethod(
+    REPEAT,
+    Seq(STRING_TYPE_INFO, INT_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethods.REPEAT)
+
   // 
----------------------------------------------------------------------------------------------
   // Arithmetic functions
   // 
----------------------------------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index b2d7a3deb18..70794407cc0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -459,3 +459,31 @@ case class RTrim(child: Expression) extends 
UnaryExpression with InputTypeSpec {
 
   override def toString = s"($child).rtrim"
 }
+
+/**
+  * Returns a string that repeats the base str n times.
+  */
+case class Repeat(str: Expression, n: Expression) extends Expression with 
InputTypeSpec {
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(STRING_TYPE_INFO, INT_TYPE_INFO)
+
+  override private[flink] def children: Seq[Expression] = Seq(str, n)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+    relBuilder.call(ScalarSqlFunctions.REPEAT, str.toRexNode, n.toRexNode)
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (str.resultType == STRING_TYPE_INFO && n.resultType == INT_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Repeat operator requires (String, Int) input, " +
+        s"but ($str, $n) is of type (${str.resultType}, ${n.resultType})")
+    }
+  }
+
+  override def toString: String = s"($str).repeat($n)"
+}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
index d41900027e5..db67e3985db 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
@@ -222,4 +222,12 @@ object ScalarSqlFunctions {
     OperandTypes.STRING,
     SqlFunctionCategory.STRING)
 
+  val REPEAT = new SqlFunction(
+    "REPEAT",
+    SqlKind.OTHER_FUNCTION,
+    ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), 
SqlTypeTransforms.TO_NULLABLE),
+    InferTypes.RETURN_TYPE,
+    OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.INTEGER),
+    SqlFunctionCategory.STRING)
+
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index d92af7aaae2..1aadf3140da 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -21,6 +21,7 @@ import java.lang.{StringBuilder, Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
 
 import org.apache.commons.codec.binary.{Base64, Hex}
+import org.apache.commons.lang3.StringUtils
 
 import scala.annotation.varargs
 
@@ -227,4 +228,10 @@ object ScalarFunctions {
     * Returns an UUID string using Java utilities.
     */
   def uuid(): String = java.util.UUID.randomUUID().toString
+
+  /**
+    * Returns a string that repeats the base string n times.
+    */
+  def repeat(base: String, n: Int): String = StringUtils.repeat(base, n)
+
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 67b8a0e5d5e..fe508aa1f4a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -207,6 +207,7 @@ object FunctionCatalog {
     "uuid" -> classOf[UUID],
     "ltrim" -> classOf[LTrim],
     "rtrim" -> classOf[RTrim],
+    "repeat" -> classOf[Repeat],
 
     // math functions
     "plus" -> classOf[Plus],
@@ -459,6 +460,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable 
{
     ScalarSqlFunctions.UUID,
     ScalarSqlFunctions.LTRIM,
     ScalarSqlFunctions.RTRIM,
+    ScalarSqlFunctions.REPEAT,
 
     // EXTENSIONS
     BasicOperatorTable.TUMBLE,
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 27b8afbd100..503825475e7 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -680,6 +680,45 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "null")
   }
 
+  @Test
+  def testRepeat(): Unit = {
+    testAllApis(
+      'f0.repeat(1),
+      "f0.repeat(1)",
+      "REPEAT(f0, 1)",
+      "This is a test String.")
+
+    testAllApis(
+      'f0.repeat(2),
+      "f0.repeat(2)",
+      "REPEAT(f0, 2)",
+      "This is a test String.This is a test String.")
+
+    testAllApis(
+      'f0.repeat(0),
+      "f0.repeat(0)",
+      "REPEAT(f0, 0)",
+      "")
+
+    testAllApis(
+      'f0.repeat(-1),
+      "f0.repeat(-1)",
+      "REPEAT(f0, -1)",
+      "")
+
+    testAllApis(
+      'f33.repeat(2),
+      "f33.repeat(2)",
+      "REPEAT(f33, 2)",
+      "null")
+
+    testAllApis(
+      "".repeat(1),
+      "''.repeat(1)",
+      "REPEAT('', 2)",
+      "")
+  }
+
   // 
----------------------------------------------------------------------------------------------
   // Math functions
   // 
----------------------------------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
index 94fabbebe12..4a79a61d977 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
@@ -151,6 +151,9 @@ class SqlExpressionTest extends ExpressionTestBase {
     testSqlApi("RPAD('hi',4,'??')", "hi??")
     testSqlApi("FROM_BASE64('aGVsbG8gd29ybGQ=')", "hello world")
     testSqlApi("TO_BASE64('hello world')", "aGVsbG8gd29ybGQ=")
+    testSqlApi(
+      "REPEAT('This is a test String.', 2)",
+      "This is a test String.This is a test String.")
   }
 
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to