On 2020/05/07 13:15, Fujii Masao wrote:


On 2020/05/02 11:29, Michael Paquier wrote:
On Thu, Apr 30, 2020 at 11:40:59PM +0900, Fujii Masao wrote:
Also the number of bytes can be added into and substracted from LSN using the
<literal>+(pg_lsn,numeric)</literal> and <literal>-(pg_lsn,numeric)</literal>
operators, respectively. Note that the calculated LSN should be in the range
of <type>pg_lsn</type> type, i.e., between <literal>0/0</literal> and
<literal>FFFFFFFF/FFFFFFFF</literal>.
-----------------

That reads fine.

Ok, I will update the docs in that way.

Done.



+   /* XXX would it be better to return NULL? */
+   if (NUMERIC_IS_NAN(num))
+       ereport(ERROR,
+               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                errmsg("cannot convert NaN to pg_lsn")));
That would be good to test, and an error sounds fine to me.

You mean that we should add the test that goes through this code block,
into the regression test?

Yes, that looks worth making sure to track, especially if the behavior
of this code changes in the future.

Ok, I will add that regression test.

Done. Attached is the updated version of the patch!

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
diff --git a/doc/src/sgml/datatype.sgml b/doc/src/sgml/datatype.sgml
index a8d0780387..a86b794ce0 100644
--- a/doc/src/sgml/datatype.sgml
+++ b/doc/src/sgml/datatype.sgml
@@ -4823,7 +4823,13 @@ SELECT * FROM pg_attribute
     standard comparison operators, like <literal>=</literal> and
     <literal>&gt;</literal>.  Two LSNs can be subtracted using the
     <literal>-</literal> operator; the result is the number of bytes separating
-    those write-ahead log locations.
+    those write-ahead log locations.  Also the number of bytes can be
+    added into and subtracted from LSN using the
+    <literal>+(pg_lsn,numeric)</literal> and
+    <literal>-(pg_lsn,numeric)</literal> operators, respectively. Note that
+    the calculated LSN should be in the range of <type>pg_lsn</type> type,
+    i.e., between <literal>0/0</literal> and
+    <literal>FFFFFFFF/FFFFFFFF</literal>.
    </para>
   </sect1>
 
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
index 9986132b45..19f300205b 100644
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -41,6 +41,7 @@
 #include "utils/guc.h"
 #include "utils/int8.h"
 #include "utils/numeric.h"
+#include "utils/pg_lsn.h"
 #include "utils/sortsupport.h"
 
 /* ----------
@@ -472,6 +473,7 @@ static void apply_typmod(NumericVar *var, int32 typmod);
 static bool numericvar_to_int32(const NumericVar *var, int32 *result);
 static bool numericvar_to_int64(const NumericVar *var, int64 *result);
 static void int64_to_numericvar(int64 val, NumericVar *var);
+static bool numericvar_to_uint64(const NumericVar *var, uint64 *result);
 #ifdef HAVE_INT128
 static bool numericvar_to_int128(const NumericVar *var, int128 *result);
 static void int128_to_numericvar(int128 val, NumericVar *var);
@@ -3688,6 +3690,31 @@ numeric_float4(PG_FUNCTION_ARGS)
 }
 
 
+Datum
+numeric_pg_lsn(PG_FUNCTION_ARGS)
+{
+       Numeric         num = PG_GETARG_NUMERIC(0);
+       NumericVar      x;
+       XLogRecPtr              result;
+
+       /* XXX would it be better to return NULL? */
+       if (NUMERIC_IS_NAN(num))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("cannot convert NaN to pg_lsn")));
+
+       /* Convert to variable format and thence to pg_lsn */
+       init_var_from_num(num, &x);
+
+       if (!numericvar_to_uint64(&x, (uint64 *) &result))
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                errmsg("pg_lsn out of range")));
+
+       PG_RETURN_LSN(result);
+}
+
+
 /* ----------------------------------------------------------------------
  *
  * Aggregate functions
@@ -6739,6 +6766,78 @@ int64_to_numericvar(int64 val, NumericVar *var)
        var->weight = ndigits - 1;
 }
 
+/*
+ * Convert numeric to uint64, rounding if needed.
+ *
+ * If overflow, return false (no error is raised).  Return true if okay.
+ */
+static bool
+numericvar_to_uint64(const NumericVar *var, uint64 *result)
+{
+       NumericDigit *digits;
+       int                     ndigits;
+       int                     weight;
+       int                     i;
+       uint64          val;
+       NumericVar      rounded;
+
+       /* Round to nearest integer */
+       init_var(&rounded);
+       set_var_from_var(var, &rounded);
+       round_var(&rounded, 0);
+
+       /* Check for zero input */
+       strip_var(&rounded);
+       ndigits = rounded.ndigits;
+       if (ndigits == 0)
+       {
+               *result = 0;
+               free_var(&rounded);
+               return true;
+       }
+
+       /* Check for negative input */
+       if (rounded.sign == NUMERIC_NEG)
+       {
+               free_var(&rounded);
+               return false;
+       }
+
+       /*
+        * For input like 10000000000, we must treat stripped digits as real. So
+        * the loop assumes there are weight+1 digits before the decimal point.
+        */
+       weight = rounded.weight;
+       Assert(weight >= 0 && ndigits <= weight + 1);
+
+       /* Construct the result */
+       digits = rounded.digits;
+       val = digits[0];
+       for (i = 1; i <= weight; i++)
+       {
+               if (unlikely(pg_mul_u64_overflow(val, NBASE, &val)))
+               {
+                       free_var(&rounded);
+                       return false;
+               }
+
+               if (i < ndigits)
+               {
+                       if (unlikely(pg_add_u64_overflow(val, digits[i], &val)))
+                       {
+                               free_var(&rounded);
+                               return false;
+                       }
+               }
+       }
+
+       free_var(&rounded);
+
+       *result = val;
+
+       return true;
+}
+
 #ifdef HAVE_INT128
 /*
  * Convert numeric to int128, rounding if needed.
diff --git a/src/backend/utils/adt/pg_lsn.c b/src/backend/utils/adt/pg_lsn.c
index d9754a7778..d3e289964c 100644
--- a/src/backend/utils/adt/pg_lsn.c
+++ b/src/backend/utils/adt/pg_lsn.c
@@ -16,6 +16,7 @@
 #include "funcapi.h"
 #include "libpq/pqformat.h"
 #include "utils/builtins.h"
+#include "utils/numeric.h"
 #include "utils/pg_lsn.h"
 
 #define MAXPG_LSNLEN                   17
@@ -248,3 +249,59 @@ pg_lsn_mi(PG_FUNCTION_ARGS)
 
        return result;
 }
+
+/*
+ * Add the number of bytes to pg_lsn, giving a new pg_lsn.
+ * Must handle both positive and negative numbers of bytes.
+ */
+Datum
+pg_lsn_pli(PG_FUNCTION_ARGS)
+{
+       XLogRecPtr      lsn = PG_GETARG_LSN(0);
+       Datum   nbytes = PG_GETARG_DATUM(1);
+       Datum   num;
+       Datum   res;
+       char            buf[256];
+
+       /* Convert to numeric */
+       snprintf(buf, sizeof(buf), UINT64_FORMAT, lsn);
+       num = DirectFunctionCall3(numeric_in,
+                                                         CStringGetDatum(buf),
+                                                         ObjectIdGetDatum(0),
+                                                         Int32GetDatum(-1));
+
+       /* Add two numerics */
+       res = DirectFunctionCall2(numeric_add,
+                                                          
NumericGetDatum(num), nbytes);
+
+       /* Convert to pg_lsn */
+       return DirectFunctionCall1(numeric_pg_lsn, res);
+}
+
+/*
+ * Substract the number of bytes from pg_lsn, giving a new pg_lsn.
+ * Must handle both positive and negative numbers of bytes.
+ */
+Datum
+pg_lsn_mii(PG_FUNCTION_ARGS)
+{
+       XLogRecPtr      lsn = PG_GETARG_LSN(0);
+       Datum   nbytes = PG_GETARG_DATUM(1);
+       Datum   num;
+       Datum   res;
+       char            buf[256];
+
+       /* Convert to numeric */
+       snprintf(buf, sizeof(buf), UINT64_FORMAT, lsn);
+       num = DirectFunctionCall3(numeric_in,
+                                                         CStringGetDatum(buf),
+                                                         ObjectIdGetDatum(0),
+                                                         Int32GetDatum(-1));
+
+       /* Add two numerics */
+       res = DirectFunctionCall2(numeric_sub,
+                                                          
NumericGetDatum(num), nbytes);
+
+       /* Convert to pg_lsn */
+       return DirectFunctionCall1(numeric_pg_lsn, res);
+}
diff --git a/src/include/catalog/pg_operator.dat 
b/src/include/catalog/pg_operator.dat
index 00ada7e48f..15dfa1497a 100644
--- a/src/include/catalog/pg_operator.dat
+++ b/src/include/catalog/pg_operator.dat
@@ -2909,6 +2909,17 @@
 { oid => '3228', descr => 'minus',
   oprname => '-', oprleft => 'pg_lsn', oprright => 'pg_lsn',
   oprresult => 'numeric', oprcode => 'pg_lsn_mi' },
+{ oid => '5025', descr => 'add',
+  oprname => '+', oprleft => 'pg_lsn', oprright => 'numeric',
+  oprresult => 'pg_lsn', oprcom => '+(numeric,pg_lsn)',
+  oprcode => 'pg_lsn_pli' },
+{ oid => '5026', descr => 'add',
+  oprname => '+', oprleft => 'numeric', oprright => 'pg_lsn',
+  oprresult => 'pg_lsn', oprcom => '+(pg_lsn,numeric)',
+  oprcode => 'numeric_pl_pg_lsn' },
+{ oid => '5027', descr => 'subtract',
+  oprname => '-', oprleft => 'pg_lsn', oprright => 'numeric',
+  oprresult => 'pg_lsn', oprcode => 'pg_lsn_mii' },
 
 # enum operators
 { oid => '3516', descr => 'equal',
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4bce3ad8de..0eb94d92e3 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -4398,6 +4398,9 @@
 { oid => '1783', descr => 'convert numeric to int2',
   proname => 'int2', prorettype => 'int2', proargtypes => 'numeric',
   prosrc => 'numeric_int2' },
+{ oid => '6103', descr => 'convert numeric to pg_lsn',
+  proname => 'pg_lsn', prorettype => 'pg_lsn', proargtypes => 'numeric',
+  prosrc => 'numeric_pg_lsn' },
 
 { oid => '3556', descr => 'convert jsonb to boolean',
   proname => 'bool', prorettype => 'bool', proargtypes => 'jsonb',
@@ -8578,6 +8581,15 @@
 { oid => '4188', descr => 'smaller of two',
   proname => 'pg_lsn_smaller', prorettype => 'pg_lsn',
   proargtypes => 'pg_lsn pg_lsn', prosrc => 'pg_lsn_smaller' },
+{ oid => '5022',
+  proname => 'pg_lsn_pli', prorettype => 'pg_lsn',
+  proargtypes => 'pg_lsn numeric', prosrc => 'pg_lsn_pli' },
+{ oid => '5023',
+  proname => 'numeric_pl_pg_lsn', prolang => 'sql', prorettype => 'pg_lsn',
+  proargtypes => 'numeric pg_lsn', prosrc => 'select $2 + $1' },
+{ oid => '5024',
+  proname => 'pg_lsn_mii', prorettype => 'pg_lsn',
+  proargtypes => 'pg_lsn numeric', prosrc => 'pg_lsn_mii' },
 
 # enum related procs
 { oid => '3504', descr => 'I/O',
diff --git a/src/test/regress/expected/numeric.out 
b/src/test/regress/expected/numeric.out
index c7fe63d037..978d38b729 100644
--- a/src/test/regress/expected/numeric.out
+++ b/src/test/regress/expected/numeric.out
@@ -2315,3 +2315,30 @@ FROM (VALUES (0::numeric, 0::numeric),
 
 SELECT lcm(9999 * (10::numeric)^131068 + (10::numeric^131068 - 1), 2); -- 
overflow
 ERROR:  value overflows numeric format
+--
+-- Tests for pg_lsn()
+--
+SELECT pg_lsn(23783416::numeric);
+  pg_lsn   
+-----------
+ 0/16AE7F8
+(1 row)
+
+SELECT pg_lsn(0::numeric);
+ pg_lsn 
+--------
+ 0/0
+(1 row)
+
+SELECT pg_lsn(18446744073709551615::numeric);
+      pg_lsn       
+-------------------
+ FFFFFFFF/FFFFFFFF
+(1 row)
+
+SELECT pg_lsn(-1::numeric);
+ERROR:  pg_lsn out of range
+SELECT pg_lsn(18446744073709551616::numeric);
+ERROR:  pg_lsn out of range
+SELECT pg_lsn('NaN'::numeric);
+ERROR:  cannot convert NaN to pg_lsn
diff --git a/src/test/regress/expected/pg_lsn.out 
b/src/test/regress/expected/pg_lsn.out
index 64d41dfdad..7eb6a387b1 100644
--- a/src/test/regress/expected/pg_lsn.out
+++ b/src/test/regress/expected/pg_lsn.out
@@ -71,6 +71,52 @@ SELECT '0/16AE7F8'::pg_lsn - '0/16AE7F7'::pg_lsn;
         1
 (1 row)
 
+SELECT '0/16AE7F7'::pg_lsn + 16::numeric;
+ ?column?  
+-----------
+ 0/16AE807
+(1 row)
+
+SELECT 16::numeric + '0/16AE7F7'::pg_lsn;
+ ?column?  
+-----------
+ 0/16AE807
+(1 row)
+
+SELECT '0/16AE7F7'::pg_lsn - 16::numeric;
+ ?column?  
+-----------
+ 0/16AE7E7
+(1 row)
+
+SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 1::numeric;
+     ?column?      
+-------------------
+ FFFFFFFF/FFFFFFFF
+(1 row)
+
+SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 2::numeric; -- out of range error
+ERROR:  pg_lsn out of range
+SELECT '0/1'::pg_lsn - 1::numeric;
+ ?column? 
+----------
+ 0/0
+(1 row)
+
+SELECT '0/1'::pg_lsn - 2::numeric; -- out of range error
+ERROR:  pg_lsn out of range
+SELECT '0/0'::pg_lsn + ('FFFFFFFF/FFFFFFFF'::pg_lsn - '0/0'::pg_lsn);
+     ?column?      
+-------------------
+ FFFFFFFF/FFFFFFFF
+(1 row)
+
+SELECT 'FFFFFFFF/FFFFFFFF'::pg_lsn - ('FFFFFFFF/FFFFFFFF'::pg_lsn - 
'0/0'::pg_lsn);
+ ?column? 
+----------
+ 0/0
+(1 row)
+
 -- Check btree and hash opclasses
 EXPLAIN (COSTS OFF)
 SELECT DISTINCT (i || '/' || j)::pg_lsn f
diff --git a/src/test/regress/sql/numeric.sql b/src/test/regress/sql/numeric.sql
index 41475a9a24..fdfe6fa3c7 100644
--- a/src/test/regress/sql/numeric.sql
+++ b/src/test/regress/sql/numeric.sql
@@ -1111,3 +1111,13 @@ FROM (VALUES (0::numeric, 0::numeric),
              (4232.820::numeric, 132.72000::numeric)) AS v(a, b);
 
 SELECT lcm(9999 * (10::numeric)^131068 + (10::numeric^131068 - 1), 2); -- 
overflow
+
+--
+-- Tests for pg_lsn()
+--
+SELECT pg_lsn(23783416::numeric);
+SELECT pg_lsn(0::numeric);
+SELECT pg_lsn(18446744073709551615::numeric);
+SELECT pg_lsn(-1::numeric);
+SELECT pg_lsn(18446744073709551616::numeric);
+SELECT pg_lsn('NaN'::numeric);
diff --git a/src/test/regress/sql/pg_lsn.sql b/src/test/regress/sql/pg_lsn.sql
index 2c143c82ff..74d9fb4e6a 100644
--- a/src/test/regress/sql/pg_lsn.sql
+++ b/src/test/regress/sql/pg_lsn.sql
@@ -27,6 +27,15 @@ SELECT '0/16AE7F7' < '0/16AE7F8'::pg_lsn;
 SELECT '0/16AE7F8' > pg_lsn '0/16AE7F7';
 SELECT '0/16AE7F7'::pg_lsn - '0/16AE7F8'::pg_lsn;
 SELECT '0/16AE7F8'::pg_lsn - '0/16AE7F7'::pg_lsn;
+SELECT '0/16AE7F7'::pg_lsn + 16::numeric;
+SELECT 16::numeric + '0/16AE7F7'::pg_lsn;
+SELECT '0/16AE7F7'::pg_lsn - 16::numeric;
+SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 1::numeric;
+SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 2::numeric; -- out of range error
+SELECT '0/1'::pg_lsn - 1::numeric;
+SELECT '0/1'::pg_lsn - 2::numeric; -- out of range error
+SELECT '0/0'::pg_lsn + ('FFFFFFFF/FFFFFFFF'::pg_lsn - '0/0'::pg_lsn);
+SELECT 'FFFFFFFF/FFFFFFFF'::pg_lsn - ('FFFFFFFF/FFFFFFFF'::pg_lsn - 
'0/0'::pg_lsn);
 
 -- Check btree and hash opclasses
 EXPLAIN (COSTS OFF)

Reply via email to