On 20/11/11 19:14, Steve Singer wrote:
On 11-10-15 07:28 PM, Jan Urbański wrote:
Hi,

attached is a patch implementing the usage of SPI cursors in PL/Python.
Currently when trying to process a large table in PL/Python you have
slurp it all into memory (that's what plpy.execute does).

J

I found a few bugs (see my testing section below) that will need fixing
+ a few questions about the code

Responding now to all questions and attaching a revised patch based on your comments.

Do we like the name plpy.cursor or would we rather call it something like
plpy.execute_cursor(...) or plpy.cursor_open(...) or
plpy.create_cursor(...)
Since we will be mostly stuck with the API once we release 9.2 this is
worth
some opinions on. I like cursor() but if anyone disagrees now is the time.

We use plpy.subtransaction() to create Subxact objects, so I though plpy.cursor() would be most appropriate.

This patch does not provide a wrapper around SPI_cursor_move. The patch
is useful without that and I don't see anything that preculdes someone else
adding that later if they see a need.

My idea is to add keyword arguments to plpy.cursor() that will allow you to decide whether you want a scrollable cursor and after that provide a move() method.

The patch includes documentation updates that describes the new feature.
The Database Access page doesn't provide a API style list of database
access
functions like the plperl
http://www.postgresql.org/docs/9.1/interactive/plperl-builtins.html page
does. I think the organization of the perl page is
clearer than the python one and we should think about a doing some
documentaiton refactoring. That should be done as a seperate patch and
shouldn't be a barrier to committing this one.

Yeah, the PL/Python docs are a bit chaotic right now. I haven't yet summoned force to overhaul them.

in PLy_cursor_plan line 4080
+ PG_TRY();
+ {
+ Portal portal;
+ char *volatile nulls;
+ volatile int j;

I am probably not seeing a code path or misunderstanding something
about the setjmp/longjump usages but I don't see why nulls and j need to be
volatile here.

It looked like you could drop volatile there (and in PLy_spi_execute_plan, where this is copied from (did I mention there's quite some code duplication in PL/Python?)) but digging in git I found this commit:

http://git.postgresql.org/gitweb/?p=postgresql.git;a=commit;h=2789b7278c11785750dd9d2837856510ffc67000

that added the original volatile qualification, so I guess there's a reason.

line 444
PLy_cursor(PyObject *self, PyObject *args)
+ {
+ char *query;
+ PyObject *plan;
+ PyObject *planargs = NULL;
+
+ if (PyArg_ParseTuple(args, "s", &query))
+ return PLy_cursor_query(query);
+

Should query be freed with PyMem_free()

No, PyArg_ParseTuple returns a string on the stack, I check that repeatedly creating a cursor with a plan argument does not leak memory and that adding PyMem_Free there promptly leads to a segfault.


I tested both python 2.6 and 3 on a Linux system

[test cases demonstrating bugs]

Turns out it's a really bad idea to store pointers to Portal structures, because they get invalidated by the subtransaction abort hooks.

I switched to storing the cursor name and looking it up in the appropriate hash table every time it's used. The examples you sent (which I included as regression tests) now cause a ValueError to be raised with a message stating that the cursor has been created in an aborted subtransaction.

Not sure about the wording of the error message, though.

Thanks again for the review!

Cheers,
Jan
diff --git a/doc/src/sgml/plpython.sgml b/doc/src/sgml/plpython.sgml
index eda2bbf..d08c3d1 100644
--- a/doc/src/sgml/plpython.sgml
+++ b/doc/src/sgml/plpython.sgml
@@ -892,6 +892,15 @@ $$ LANGUAGE plpythonu;
   </para>
 
   <para>
+    Note that calling <literal>plpy.execute</literal> will cause the entire
+    result set to be read into memory. Only use that function when you are sure
+    that the result set will be relatively small.  If you don't want to risk
+    excessive memory usage when fetching large results,
+    use <literal>plpy.cursor</literal> rather
+    than <literal>plpy.execute</literal>.
+  </para>
+
+  <para>
    For example:
 <programlisting>
 rv = plpy.execute("SELECT * FROM my_table", 5)
@@ -958,6 +967,77 @@ $$ LANGUAGE plpythonu;
 
   </sect2>
 
+  <sect2>
+    <title>Accessing data with cursors</title>
+
+  <para>
+    The <literal>plpy.cursor</literal> function accepts the same arguments
+    as <literal>plpy.execute</literal> (except for <literal>limit</literal>)
+    and returns a cursor object, which allows you to process large result sets
+    in smaller chunks.  As with <literal>plpy.execute</literal>, either a query
+    string or a plan object along with a list of arguments can be used.  The
+    cursor object provides a <literal>fetch</literal> method that accepts an
+    integer paramter and returns a result object.  Each time you
+    call <literal>fetch</literal>, the returned object will contain the next
+    batch of rows, never larger than the parameter value.  Once all rows are
+    exhausted, <literal>fetch</literal> starts returning an empty result
+    object.  Cursor objects also provide an
+    <ulink url="http://docs.python.org/library/stdtypes.html#iterator-types";>iterator
+    interface</ulink>, yielding one row at a time until all rows are exhausted.
+    Data fetched that way is not returned as result objects, but rather as
+    dictionaries, each dictionary corresponding to a single result row.
+  </para>
+
+  <para>
+    Cursors are automatically disposed of, but if you want to explicitly
+    release all resources held by a cursor, use the <literal>close</literal>
+    method.  Once closed, a cursor cannot be fetched from anymore.
+  </para>
+
+   <note>
+    <para>
+      Do not confuse objects created by <literal>plpy.cursor</literal> with
+      DBAPI cursors as defined by
+      the <ulink url="http://www.python.org/dev/peps/pep-0249/";>Python Database API specification</ulink>.
+      They don't have anything in common except for the name.
+    </para>
+   </note>
+
+  <para>
+    An example of two ways of processing data from a large table would be:
+<programlisting>
+CREATE FUNCTION count_odd_iterator() RETURNS integer AS $$
+odd = 0
+for row in plpy.cursor("select num from largetable"):
+    if row['num'] % 2:
+         odd += 1
+return odd
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION count_odd_fetch(batch_size integer) RETURNS integer AS $$
+odd = 0
+cursor = plpy.cursor("select num from largetable")
+while True:
+    rows = cursor.fetch(batch_size)
+    if not rows:
+        break
+    for row in rows:
+        if row['num'] % 2:
+            odd += 1
+return odd
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION count_odd_prepared() RETURNS integer AS $$
+odd = 0
+plan = plpy.prepare("select num from largetable where num % $1 != 0", ["integer"])
+rows = list(plpy.cursor(plan, [2]))
+
+return len(rows)
+$$ LANGUAGE plpythonu;
+</programlisting>
+  </para>
+  </sect2>
+
   <sect2 id="plpython-trapping">
    <title>Trapping Errors</title>
 
diff --git a/src/pl/plpython/expected/plpython_spi.out b/src/pl/plpython/expected/plpython_spi.out
index 7f4ae5c..3b4d7a3 100644
--- a/src/pl/plpython/expected/plpython_spi.out
+++ b/src/pl/plpython/expected/plpython_spi.out
@@ -133,3 +133,154 @@ CONTEXT:  PL/Python function "result_nrows_test"
                  2
 (1 row)
 
+-- cursor objects
+CREATE FUNCTION simple_cursor_test() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+does = 0
+for row in res:
+    if row['lname'] == 'doe':
+        does += 1
+return does
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION double_cursor_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+res.close()
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_fetch() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+assert len(res.fetch(3)) == 3
+assert len(res.fetch(3)) == 1
+assert len(res.fetch(3)) == 0
+assert len(res.fetch(3)) == 0
+try:
+    # use next() or __next__(), the method name changed in
+    # http://www.python.org/dev/peps/pep-3114/
+    try:
+        res.next()
+    except AttributeError:
+        res.__next__()
+except StopIteration:
+    pass
+else:
+    assert False, "StopIteration not raised"
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_mix_next_and_fetch() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users order by fname")
+assert len(res.fetch(2)) == 2
+
+item = None
+try:
+    item = res.next()
+except AttributeError:
+    item = res.__next__()
+assert item['fname'] == 'rick'
+
+assert len(res.fetch(2)) == 1
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION fetch_after_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+try:
+    res.fetch(1)
+except ValueError:
+    pass
+else:
+    assert False, "ValueError not raised"
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION next_after_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+try:
+    try:
+        res.next()
+    except AttributeError:
+        res.__next__()
+except ValueError:
+    pass
+else:
+    assert False, "ValueError not raised"
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_fetch_next_empty() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users where false")
+assert len(res.fetch(1)) == 0
+try:
+    try:
+        res.next()
+    except AttributeError:
+        res.__next__()
+except StopIteration:
+    pass
+else:
+    assert False, "StopIteration not raised"
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_plan() RETURNS SETOF text AS $$
+plan = plpy.prepare(
+    "select fname, lname from users where fname like $1 || '%' order by fname",
+    ["text"])
+for row in plpy.cursor(plan, ["w"]):
+    yield row['fname']
+for row in plpy.cursor(plan, ["j"]):
+    yield row['fname']
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_plan_wrong_args() RETURNS SETOF text AS $$
+plan = plpy.prepare("select fname, lname from users where fname like $1 || '%'",
+                    ["text"])
+c = plpy.cursor(plan, ["a", "b"])
+$$ LANGUAGE plpythonu;
+SELECT simple_cursor_test();
+ simple_cursor_test 
+--------------------
+                  3
+(1 row)
+
+SELECT double_cursor_close();
+ double_cursor_close 
+---------------------
+                    
+(1 row)
+
+SELECT cursor_fetch();
+ cursor_fetch 
+--------------
+             
+(1 row)
+
+SELECT cursor_mix_next_and_fetch();
+ cursor_mix_next_and_fetch 
+---------------------------
+                          
+(1 row)
+
+SELECT fetch_after_close();
+ fetch_after_close 
+-------------------
+                  
+(1 row)
+
+SELECT next_after_close();
+ next_after_close 
+------------------
+                 
+(1 row)
+
+SELECT cursor_fetch_next_empty();
+ cursor_fetch_next_empty 
+-------------------------
+                        
+(1 row)
+
+SELECT cursor_plan();
+ cursor_plan 
+-------------
+ willem
+ jane
+ john
+(3 rows)
+
+SELECT cursor_plan_wrong_args();
+ERROR:  TypeError: Expected sequence of 1 argument, got 2: ['a', 'b']
+CONTEXT:  Traceback (most recent call last):
+  PL/Python function "cursor_plan_wrong_args", line 4, in <module>
+    c = plpy.cursor(plan, ["a", "b"])
+PL/Python function "cursor_plan_wrong_args"
diff --git a/src/pl/plpython/expected/plpython_subtransaction.out b/src/pl/plpython/expected/plpython_subtransaction.out
index 515b0bb..774d103 100644
--- a/src/pl/plpython/expected/plpython_subtransaction.out
+++ b/src/pl/plpython/expected/plpython_subtransaction.out
@@ -409,3 +409,69 @@ SELECT * FROM subtransaction_tbl;
 (1 row)
 
 DROP TABLE subtransaction_tbl;
+-- cursor/subtransactions interactions
+CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$
+with plpy.subtransaction():
+    cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+    cur.fetch(10)
+fetched = cur.fetch(10);
+return int(fetched[5]["i"])
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$
+try:
+    with plpy.subtransaction():
+        cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+        cur.fetch(10);
+	plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    fetched = cur.fetch(10)
+    return int(fetched[5]["i"])
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$
+try:
+    with plpy.subtransaction():
+        plpy.execute('create temporary table tmp(i) '
+                     'as select generate_series(1, 10)')
+        plan = plpy.prepare("select i from tmp")
+        cur = plpy.cursor(plan)
+	plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    fetched = cur.fetch(5)
+    return fetched[2]["i"]
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$
+try:
+    with plpy.subtransaction():
+        cur = plpy.cursor('select 1')
+	plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    cur.close()
+    return True
+return False # not reached
+$$ LANGUAGE plpythonu;
+SELECT cursor_in_subxact();
+ cursor_in_subxact 
+-------------------
+                16
+(1 row)
+
+SELECT cursor_aborted_subxact();
+ERROR:  ValueError: iterating a cursor from an aborted subtransaction
+CONTEXT:  Traceback (most recent call last):
+  PL/Python function "cursor_aborted_subxact", line 8, in <module>
+    fetched = cur.fetch(10)
+PL/Python function "cursor_aborted_subxact"
+SELECT cursor_plan_aborted_subxact();
+ERROR:  ValueError: iterating a cursor from an aborted subtransaction
+CONTEXT:  Traceback (most recent call last):
+  PL/Python function "cursor_plan_aborted_subxact", line 10, in <module>
+    fetched = cur.fetch(5)
+PL/Python function "cursor_plan_aborted_subxact"
+SELECT cursor_close_aborted_subxact();
+ERROR:  ValueError: closing a cursor from an aborted subtransaction
+CONTEXT:  Traceback (most recent call last):
+  PL/Python function "cursor_close_aborted_subxact", line 7, in <module>
+    cur.close()
+PL/Python function "cursor_close_aborted_subxact"
diff --git a/src/pl/plpython/expected/plpython_subtransaction_0.out b/src/pl/plpython/expected/plpython_subtransaction_0.out
index 4017c41..a593263 100644
--- a/src/pl/plpython/expected/plpython_subtransaction_0.out
+++ b/src/pl/plpython/expected/plpython_subtransaction_0.out
@@ -382,3 +382,73 @@ SELECT * FROM subtransaction_tbl;
 (0 rows)
 
 DROP TABLE subtransaction_tbl;
+-- cursor/subtransactions interactions
+CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$
+with plpy.subtransaction():
+    cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+    cur.fetch(10)
+fetched = cur.fetch(10);
+return int(fetched[5]["i"])
+$$ LANGUAGE plpythonu;
+ERROR:  could not compile PL/Python function "cursor_in_subxact"
+DETAIL:  SyntaxError: invalid syntax (line 3)
+CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$
+try:
+    with plpy.subtransaction():
+        cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+        cur.fetch(10);
+	plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    fetched = cur.fetch(10)
+    return int(fetched[5]["i"])
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+ERROR:  could not compile PL/Python function "cursor_aborted_subxact"
+DETAIL:  SyntaxError: invalid syntax (line 4)
+CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$
+try:
+    with plpy.subtransaction():
+        plpy.execute('create temporary table tmp(i) '
+                     'as select generate_series(1, 10)')
+        plan = plpy.prepare("select i from tmp")
+        cur = plpy.cursor(plan)
+	plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    fetched = cur.fetch(5)
+    return fetched[2]["i"]
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+ERROR:  could not compile PL/Python function "cursor_plan_aborted_subxact"
+DETAIL:  SyntaxError: invalid syntax (line 4)
+CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$
+try:
+    with plpy.subtransaction():
+        cur = plpy.cursor('select 1')
+	plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    cur.close()
+    return True
+return False # not reached
+$$ LANGUAGE plpythonu;
+ERROR:  could not compile PL/Python function "cursor_close_aborted_subxact"
+DETAIL:  SyntaxError: invalid syntax (line 4)
+SELECT cursor_in_subxact();
+ERROR:  function cursor_in_subxact() does not exist
+LINE 1: SELECT cursor_in_subxact();
+               ^
+HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_aborted_subxact();
+ERROR:  function cursor_aborted_subxact() does not exist
+LINE 1: SELECT cursor_aborted_subxact();
+               ^
+HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_plan_aborted_subxact();
+ERROR:  function cursor_plan_aborted_subxact() does not exist
+LINE 1: SELECT cursor_plan_aborted_subxact();
+               ^
+HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_close_aborted_subxact();
+ERROR:  function cursor_close_aborted_subxact() does not exist
+LINE 1: SELECT cursor_close_aborted_subxact();
+               ^
+HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
diff --git a/src/pl/plpython/expected/plpython_subtransaction_5.out b/src/pl/plpython/expected/plpython_subtransaction_5.out
index 9216151..a71df67 100644
--- a/src/pl/plpython/expected/plpython_subtransaction_5.out
+++ b/src/pl/plpython/expected/plpython_subtransaction_5.out
@@ -382,3 +382,73 @@ SELECT * FROM subtransaction_tbl;
 (0 rows)
 
 DROP TABLE subtransaction_tbl;
+-- cursor/subtransactions interactions
+CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$
+with plpy.subtransaction():
+    cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+    cur.fetch(10)
+fetched = cur.fetch(10);
+return int(fetched[5]["i"])
+$$ LANGUAGE plpythonu;
+ERROR:  could not compile PL/Python function "cursor_in_subxact"
+DETAIL:  SyntaxError: invalid syntax (<string>, line 3)
+CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$
+try:
+    with plpy.subtransaction():
+        cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+        cur.fetch(10);
+	plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    fetched = cur.fetch(10)
+    return int(fetched[5]["i"])
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+ERROR:  could not compile PL/Python function "cursor_aborted_subxact"
+DETAIL:  SyntaxError: invalid syntax (<string>, line 4)
+CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$
+try:
+    with plpy.subtransaction():
+        plpy.execute('create temporary table tmp(i) '
+                     'as select generate_series(1, 10)')
+        plan = plpy.prepare("select i from tmp")
+        cur = plpy.cursor(plan)
+	plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    fetched = cur.fetch(5)
+    return fetched[2]["i"]
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+ERROR:  could not compile PL/Python function "cursor_plan_aborted_subxact"
+DETAIL:  SyntaxError: invalid syntax (<string>, line 4)
+CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$
+try:
+    with plpy.subtransaction():
+        cur = plpy.cursor('select 1')
+	plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    cur.close()
+    return True
+return False # not reached
+$$ LANGUAGE plpythonu;
+ERROR:  could not compile PL/Python function "cursor_close_aborted_subxact"
+DETAIL:  SyntaxError: invalid syntax (<string>, line 4)
+SELECT cursor_in_subxact();
+ERROR:  function cursor_in_subxact() does not exist
+LINE 1: SELECT cursor_in_subxact();
+               ^
+HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_aborted_subxact();
+ERROR:  function cursor_aborted_subxact() does not exist
+LINE 1: SELECT cursor_aborted_subxact();
+               ^
+HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_plan_aborted_subxact();
+ERROR:  function cursor_plan_aborted_subxact() does not exist
+LINE 1: SELECT cursor_plan_aborted_subxact();
+               ^
+HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_close_aborted_subxact();
+ERROR:  function cursor_close_aborted_subxact() does not exist
+LINE 1: SELECT cursor_close_aborted_subxact();
+               ^
+HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out
index f2dda66..a884fc0 100644
--- a/src/pl/plpython/expected/plpython_test.out
+++ b/src/pl/plpython/expected/plpython_test.out
@@ -43,9 +43,9 @@ contents.sort()
 return ", ".join(contents)
 $$ LANGUAGE plpythonu;
 select module_contents();
-                                                                           module_contents                                                                            
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
- Error, Fatal, SPIError, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning
+                                                                               module_contents                                                                                
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Error, Fatal, SPIError, cursor, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning
 (1 row)
 
 CREATE FUNCTION elog_test() RETURNS void
diff --git a/src/pl/plpython/plpython.c b/src/pl/plpython/plpython.c
index 93e8043..10bd30f 100644
--- a/src/pl/plpython/plpython.c
+++ b/src/pl/plpython/plpython.c
@@ -134,6 +134,11 @@ typedef int Py_ssize_t;
 		PyObject_HEAD_INIT(type) size,
 #endif
 
+/* Python 3 removed the Py_TPFLAGS_HAVE_ITER flag */
+#if PY_MAJOR_VERSION >= 3
+#define Py_TPFLAGS_HAVE_ITER 0
+#endif
+
 /* define our text domain for translations */
 #undef TEXTDOMAIN
 #define TEXTDOMAIN PG_TEXTDOMAIN("plpython")
@@ -310,6 +315,14 @@ typedef struct PLySubtransactionObject
 	bool		exited;
 } PLySubtransactionObject;
 
+typedef struct PLyCursorObject
+{
+	PyObject_HEAD
+	char		*portalname;
+	PLyTypeInfo result;
+	bool		closed;
+} PLyCursorObject;
+
 /* A list of all known exceptions, generated from backend/utils/errcodes.txt */
 typedef struct ExceptionMap
 {
@@ -486,6 +499,10 @@ static char PLy_subtransaction_doc[] = {
 	"PostgreSQL subtransaction context manager"
 };
 
+static char PLy_cursor_doc[] = {
+	"Wrapper around a PostgreSQL cursor"
+};
+
 
 /*
  * the function definitions
@@ -2963,6 +2980,14 @@ static void PLy_subtransaction_dealloc(PyObject *);
 static PyObject *PLy_subtransaction_enter(PyObject *, PyObject *);
 static PyObject *PLy_subtransaction_exit(PyObject *, PyObject *);
 
+static PyObject *PLy_cursor(PyObject *, PyObject *);
+static PyObject *PLy_cursor_query(char *);
+static PyObject *PLy_cursor_plan(PyObject *, PyObject *);
+static void PLy_cursor_dealloc(PyObject *);
+static PyObject *PLy_cursor_iternext(PyObject *);
+static PyObject *PLy_cursor_fetch(PyObject *, PyObject *);
+static PyObject *PLy_cursor_close(PyObject *, PyObject *);
+
 
 static PyMethodDef PLy_plan_methods[] = {
 	{"status", PLy_plan_status, METH_VARARGS, NULL},
@@ -3099,6 +3124,47 @@ static PyTypeObject PLy_SubtransactionType = {
 	PLy_subtransaction_methods, /* tp_tpmethods */
 };
 
+static PyMethodDef PLy_cursor_methods[] = {
+	{"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
+	{"close", PLy_cursor_close, METH_NOARGS, NULL},
+	{NULL, NULL, 0, NULL}
+};
+
+static PyTypeObject PLy_CursorType = {
+	PyVarObject_HEAD_INIT(NULL, 0)
+	"PLyCursor",		/* tp_name */
+	sizeof(PLyCursorObject),	/* tp_size */
+	0,							/* tp_itemsize */
+
+	/*
+	 * methods
+	 */
+	PLy_cursor_dealloc, 		/* tp_dealloc */
+	0,							/* tp_print */
+	0,							/* tp_getattr */
+	0,							/* tp_setattr */
+	0,							/* tp_compare */
+	0,							/* tp_repr */
+	0,							/* tp_as_number */
+	0,							/* tp_as_sequence */
+	0,							/* tp_as_mapping */
+	0,							/* tp_hash */
+	0,							/* tp_call */
+	0,							/* tp_str */
+	0,							/* tp_getattro */
+	0,							/* tp_setattro */
+	0,							/* tp_as_buffer */
+	Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER,	/* tp_flags */
+	PLy_cursor_doc,				/* tp_doc */
+	0,							/* tp_traverse */
+	0,							/* tp_clear */
+	0,							/* tp_richcompare */
+	0,							/* tp_weaklistoffset */
+	PyObject_SelfIter,			/* tp_iter */
+	PLy_cursor_iternext,		/* tp_iternext */
+	PLy_cursor_methods,			/* tp_tpmethods */
+};
+
 static PyMethodDef PLy_methods[] = {
 	/*
 	 * logging methods
@@ -3133,6 +3199,11 @@ static PyMethodDef PLy_methods[] = {
 	 */
 	{"subtransaction", PLy_subtransaction, METH_NOARGS, NULL},
 
+	/*
+	 * create a cursor
+	 */
+	{"cursor", PLy_cursor, METH_VARARGS, NULL},
+
 	{NULL, NULL, 0, NULL}
 };
 
@@ -3833,6 +3904,576 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status)
 	return (PyObject *) result;
 }
 
+/*
+ * c = plpy.cursor("select * from largetable")
+ * c = plpy.cursor(plan, [])
+ */
+static PyObject *
+PLy_cursor(PyObject *self, PyObject *args)
+{
+	char			*query;
+	PyObject		*plan;
+	PyObject   		*planargs = NULL;
+
+	if (PyArg_ParseTuple(args, "s", &query))
+		return PLy_cursor_query(query);
+
+	PyErr_Clear();
+
+	if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
+		return PLy_cursor_plan(plan, planargs);
+
+	PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
+	return NULL;
+}
+
+
+static PyObject *
+PLy_cursor_query(char *query)
+{
+	PLyCursorObject	*cursor;
+	volatile MemoryContext oldcontext;
+	volatile ResourceOwner oldowner;
+
+	if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
+		return NULL;
+	cursor->portalname = NULL;
+	cursor->closed = false;
+	PLy_typeinfo_init(&cursor->result);
+
+	oldcontext = CurrentMemoryContext;
+	oldowner = CurrentResourceOwner;
+
+	BeginInternalSubTransaction(NULL);
+	MemoryContextSwitchTo(oldcontext);
+
+	PG_TRY();
+	{
+		SPIPlanPtr	plan;
+		Portal		portal;
+
+		pg_verifymbstr(query, strlen(query), false);
+
+		plan = SPI_prepare(query, 0, NULL);
+		if (plan == NULL)
+			elog(ERROR, "SPI_prepare failed: %s",
+				 SPI_result_code_string(SPI_result));
+
+		portal = SPI_cursor_open(NULL, plan, NULL, NULL,
+								 PLy_curr_procedure->fn_readonly);
+		SPI_freeplan(plan);
+
+		if (portal == NULL)
+			elog(ERROR, "SPI_cursor_open() failed:%s",
+				 SPI_result_code_string(SPI_result));
+
+		cursor->portalname = PLy_strdup(portal->name);
+
+		/* Commit the inner transaction, return to outer xact context */
+		ReleaseCurrentSubTransaction();
+		MemoryContextSwitchTo(oldcontext);
+		CurrentResourceOwner = oldowner;
+
+		/*
+		 * AtEOSubXact_SPI() should not have popped any SPI context, but just
+		 * in case it did, make sure we remain connected.
+		 */
+		SPI_restore_connection();
+	}
+	PG_CATCH();
+	{
+		ErrorData  *edata;
+		PLyExceptionEntry *entry;
+		PyObject   *exc;
+
+		/* Save error info */
+		MemoryContextSwitchTo(oldcontext);
+		edata = CopyErrorData();
+		FlushErrorState();
+
+		/* Abort the inner transaction */
+		RollbackAndReleaseCurrentSubTransaction();
+		MemoryContextSwitchTo(oldcontext);
+		CurrentResourceOwner = oldowner;
+
+		Py_DECREF(cursor);
+
+		/*
+		 * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+		 * have left us in a disconnected state.  We need this hack to return
+		 * to connected state.
+		 */
+		SPI_restore_connection();
+
+		/* Look up the correct exception */
+		entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode),
+							HASH_FIND, NULL);
+		/* We really should find it, but just in case have a fallback */
+		Assert(entry != NULL);
+		exc = entry ? entry->exc : PLy_exc_spi_error;
+		/* Make Python raise the exception */
+		PLy_spi_exception_set(exc, edata);
+		return NULL;
+	}
+	PG_END_TRY();
+
+	Assert(cursor->portalname != NULL);
+	return (PyObject *) cursor;
+}
+
+static PyObject *
+PLy_cursor_plan(PyObject *ob, PyObject *args)
+{
+	PLyCursorObject	*cursor;
+	volatile int nargs;
+	int			i;
+	PLyPlanObject *plan;
+	volatile MemoryContext oldcontext;
+	volatile ResourceOwner oldowner;
+
+	if (args != NULL)
+	{
+		if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
+		{
+			PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
+			return NULL;
+		}
+		nargs = PySequence_Length(args);
+	}
+	else
+		nargs = 0;
+
+	plan = (PLyPlanObject *) ob;
+
+	if (nargs != plan->nargs)
+	{
+		char	   *sv;
+		PyObject   *so = PyObject_Str(args);
+
+		if (!so)
+			PLy_elog(ERROR, "could not execute plan");
+		sv = PyString_AsString(so);
+		PLy_exception_set_plural(PyExc_TypeError,
+								 "Expected sequence of %d argument, got %d: %s",
+								 "Expected sequence of %d arguments, got %d: %s",
+								 plan->nargs,
+								 plan->nargs, nargs, sv);
+		Py_DECREF(so);
+
+		return NULL;
+	}
+
+	if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
+		return NULL;
+	cursor->portalname = NULL;
+	cursor->closed = false;
+	PLy_typeinfo_init(&cursor->result);
+
+	oldcontext = CurrentMemoryContext;
+	oldowner = CurrentResourceOwner;
+
+	BeginInternalSubTransaction(NULL);
+	MemoryContextSwitchTo(oldcontext);
+
+	PG_TRY();
+	{
+		Portal		portal;
+		char	   *volatile nulls;
+		volatile int j;
+
+		if (nargs > 0)
+			nulls = palloc(nargs * sizeof(char));
+		else
+			nulls = NULL;
+
+		for (j = 0; j < nargs; j++)
+		{
+			PyObject   *elem;
+
+			elem = PySequence_GetItem(args, j);
+			if (elem != Py_None)
+			{
+				PG_TRY();
+				{
+					plan->values[j] =
+						plan->args[j].out.d.func(&(plan->args[j].out.d),
+												 -1,
+												 elem);
+				}
+				PG_CATCH();
+				{
+					Py_DECREF(elem);
+					PG_RE_THROW();
+				}
+				PG_END_TRY();
+
+				Py_DECREF(elem);
+				nulls[j] = ' ';
+			}
+			else
+			{
+				Py_DECREF(elem);
+				plan->values[j] =
+					InputFunctionCall(&(plan->args[j].out.d.typfunc),
+									  NULL,
+									  plan->args[j].out.d.typioparam,
+									  -1);
+				nulls[j] = 'n';
+			}
+		}
+
+		portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
+								 PLy_curr_procedure->fn_readonly);
+		if (portal == NULL)
+			elog(ERROR, "SPI_cursor_open() failed:%s",
+				 SPI_result_code_string(SPI_result));
+
+		cursor->portalname = PLy_strdup(portal->name);
+
+		/* Commit the inner transaction, return to outer xact context */
+		ReleaseCurrentSubTransaction();
+		MemoryContextSwitchTo(oldcontext);
+		CurrentResourceOwner = oldowner;
+
+		/*
+		 * AtEOSubXact_SPI() should not have popped any SPI context, but just
+		 * in case it did, make sure we remain connected.
+		 */
+		SPI_restore_connection();
+	}
+	PG_CATCH();
+	{
+		int			k;
+		ErrorData  *edata;
+		PLyExceptionEntry *entry;
+		PyObject   *exc;
+
+		/* Save error info */
+		MemoryContextSwitchTo(oldcontext);
+		edata = CopyErrorData();
+		FlushErrorState();
+
+		/* cleanup plan->values array */
+		for (k = 0; k < nargs; k++)
+		{
+			if (!plan->args[k].out.d.typbyval &&
+				(plan->values[k] != PointerGetDatum(NULL)))
+			{
+				pfree(DatumGetPointer(plan->values[k]));
+				plan->values[k] = PointerGetDatum(NULL);
+			}
+		}
+
+		/* Abort the inner transaction */
+		RollbackAndReleaseCurrentSubTransaction();
+		MemoryContextSwitchTo(oldcontext);
+		CurrentResourceOwner = oldowner;
+
+		Py_DECREF(cursor);
+
+		/*
+		 * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+		 * have left us in a disconnected state.  We need this hack to return
+		 * to connected state.
+		 */
+		SPI_restore_connection();
+
+		/* Look up the correct exception */
+		entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode),
+							HASH_FIND, NULL);
+		/* We really should find it, but just in case have a fallback */
+		Assert(entry != NULL);
+		exc = entry ? entry->exc : PLy_exc_spi_error;
+		/* Make Python raise the exception */
+		PLy_spi_exception_set(exc, edata);
+		return NULL;
+	}
+	PG_END_TRY();
+
+	for (i = 0; i < nargs; i++)
+	{
+		if (!plan->args[i].out.d.typbyval &&
+			(plan->values[i] != PointerGetDatum(NULL)))
+		{
+			pfree(DatumGetPointer(plan->values[i]));
+			plan->values[i] = PointerGetDatum(NULL);
+		}
+	}
+
+	Assert(cursor->portalname != NULL);
+	return (PyObject *) cursor;
+}
+
+static void
+PLy_cursor_dealloc(PyObject *arg)
+{
+	PLyCursorObject *cursor;
+	Portal 			portal;
+
+	cursor = (PLyCursorObject *) arg;
+
+	if (!cursor->closed)
+	{
+		portal = GetPortalByName(cursor->portalname);
+
+		if (PortalIsValid(portal))
+			SPI_cursor_close(portal);
+	}
+
+	PLy_free(cursor->portalname);
+	cursor->portalname = NULL;
+
+	PLy_typeinfo_dealloc(&cursor->result);
+	arg->ob_type->tp_free(arg);
+}
+
+static PyObject *
+PLy_cursor_iternext(PyObject *self)
+{
+	PLyCursorObject *cursor;
+	PyObject		*ret;
+	volatile MemoryContext oldcontext;
+	volatile ResourceOwner oldowner;
+	Portal 			portal;
+
+	cursor = (PLyCursorObject *) self;
+
+	if (cursor->closed)
+	{
+		PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
+		return NULL;
+	}
+
+	portal = GetPortalByName(cursor->portalname);
+	if (!PortalIsValid(portal))
+	{
+		PLy_exception_set(PyExc_ValueError,
+						  "iterating a cursor from an aborted subtransaction");
+		return NULL;
+	}
+
+	oldcontext = CurrentMemoryContext;
+	oldowner = CurrentResourceOwner;
+
+	BeginInternalSubTransaction(NULL);
+	MemoryContextSwitchTo(oldcontext);
+
+	PG_TRY();
+	{
+		SPI_cursor_fetch(portal, true, 1);
+		if (SPI_processed == 0)
+		{
+			PyErr_SetNone(PyExc_StopIteration);
+			ret = NULL;
+		}
+		else
+		{
+			if (cursor->result.is_rowtype != 1)
+				PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
+
+			ret = PLyDict_FromTuple(&cursor->result, SPI_tuptable->vals[0],
+									SPI_tuptable->tupdesc);
+		}
+
+		SPI_freetuptable(SPI_tuptable);
+
+		/* Commit the inner transaction, return to outer xact context */
+		ReleaseCurrentSubTransaction();
+		MemoryContextSwitchTo(oldcontext);
+		CurrentResourceOwner = oldowner;
+
+		/*
+		 * AtEOSubXact_SPI() should not have popped any SPI context, but just
+		 * in case it did, make sure we remain connected.
+		 */
+		SPI_restore_connection();
+	}
+	PG_CATCH();
+	{
+		ErrorData  *edata;
+		PLyExceptionEntry *entry;
+		PyObject   *exc;
+
+		/* Save error info */
+		MemoryContextSwitchTo(oldcontext);
+		edata = CopyErrorData();
+		FlushErrorState();
+
+		/* Abort the inner transaction */
+		RollbackAndReleaseCurrentSubTransaction();
+		MemoryContextSwitchTo(oldcontext);
+		CurrentResourceOwner = oldowner;
+
+		SPI_freetuptable(SPI_tuptable);
+
+		/*
+		 * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+		 * have left us in a disconnected state.  We need this hack to return
+		 * to connected state.
+		 */
+		SPI_restore_connection();
+
+		/* Look up the correct exception */
+		entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode,
+							HASH_FIND, NULL);
+		/* We really should find it, but just in case have a fallback */
+		Assert(entry != NULL);
+		exc = entry ? entry->exc : PLy_exc_spi_error;
+		/* Make Python raise the exception */
+		PLy_spi_exception_set(exc, edata);
+		return NULL;
+	}
+	PG_END_TRY();
+
+	return ret;
+}
+
+static PyObject *
+PLy_cursor_fetch(PyObject *self, PyObject *args)
+{
+	PLyCursorObject *cursor;
+	int				count;
+	PLyResultObject	*ret;
+	volatile MemoryContext oldcontext;
+	volatile ResourceOwner oldowner;
+	Portal			portal;
+
+	if (!PyArg_ParseTuple(args, "i", &count))
+		return NULL;
+
+	cursor = (PLyCursorObject *) self;
+
+	if (cursor->closed)
+	{
+		PLy_exception_set(PyExc_ValueError, "fetch on a closed cursor");
+		return NULL;
+	}
+
+	portal = GetPortalByName(cursor->portalname);
+	if (!PortalIsValid(portal))
+	{
+		PLy_exception_set(PyExc_ValueError,
+						  "iterating a cursor from an aborted subtransaction");
+		return NULL;
+	}
+
+	ret = (PLyResultObject *) PLy_result_new();
+	if (ret == NULL)
+		return NULL;
+
+	oldcontext = CurrentMemoryContext;
+	oldowner = CurrentResourceOwner;
+
+	BeginInternalSubTransaction(NULL);
+	MemoryContextSwitchTo(oldcontext);
+
+	PG_TRY();
+	{
+		SPI_cursor_fetch(portal, true, count);
+
+		if (cursor->result.is_rowtype != 1)
+			PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
+
+		Py_DECREF(ret->status);
+		ret->status = PyInt_FromLong(SPI_OK_FETCH);
+
+		Py_DECREF(ret->nrows);
+		ret->nrows = PyInt_FromLong(SPI_processed);
+
+		if (SPI_processed != 0)
+		{
+			int	i;
+
+			Py_DECREF(ret->rows);
+			ret->rows = PyList_New(SPI_processed);
+
+			for (i = 0; i < SPI_processed; i++)
+			{
+				PyObject   *row = PLyDict_FromTuple(&cursor->result,
+													SPI_tuptable->vals[i],
+													SPI_tuptable->tupdesc);
+				PyList_SetItem(ret->rows, i, row);
+			}
+		}
+
+		SPI_freetuptable(SPI_tuptable);
+
+		/* Commit the inner transaction, return to outer xact context */
+		ReleaseCurrentSubTransaction();
+		MemoryContextSwitchTo(oldcontext);
+		CurrentResourceOwner = oldowner;
+
+		/*
+		 * AtEOSubXact_SPI() should not have popped any SPI context, but just
+		 * in case it did, make sure we remain connected.
+		 */
+		SPI_restore_connection();
+	}
+	PG_CATCH();
+	{
+		ErrorData  *edata;
+		PLyExceptionEntry *entry;
+		PyObject   *exc;
+
+		/* Save error info */
+		MemoryContextSwitchTo(oldcontext);
+		edata = CopyErrorData();
+		FlushErrorState();
+
+		/* Abort the inner transaction */
+		RollbackAndReleaseCurrentSubTransaction();
+		MemoryContextSwitchTo(oldcontext);
+		CurrentResourceOwner = oldowner;
+
+		SPI_freetuptable(SPI_tuptable);
+
+		/*
+		 * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+		 * have left us in a disconnected state.  We need this hack to return
+		 * to connected state.
+		 */
+		SPI_restore_connection();
+
+		/* Look up the correct exception */
+		entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode,
+							HASH_FIND, NULL);
+		/* We really should find it, but just in case have a fallback */
+		Assert(entry != NULL);
+		exc = entry ? entry->exc : PLy_exc_spi_error;
+		/* Make Python raise the exception */
+		PLy_spi_exception_set(exc, edata);
+		return NULL;
+	}
+	PG_END_TRY();
+
+	return (PyObject *) ret;
+}
+
+static PyObject *
+PLy_cursor_close(PyObject *self, PyObject *unused)
+{
+	PLyCursorObject *cursor = (PLyCursorObject *) self;
+
+	if (!cursor->closed)
+	{
+		Portal portal = GetPortalByName(cursor->portalname);
+
+		if (!PortalIsValid(portal))
+		{
+			PLy_exception_set(PyExc_ValueError,
+							  "closing a cursor "
+							  "from an aborted subtransaction");
+			return NULL;
+		}
+
+		SPI_cursor_close(portal);
+		cursor->closed = true;
+	}
+
+	Py_INCREF(Py_None);
+	return Py_None;
+}
+
 /* s = plpy.subtransaction() */
 static PyObject *
 PLy_subtransaction(PyObject *self, PyObject *unused)
@@ -4184,6 +4825,8 @@ PLy_init_plpy(void)
 		elog(ERROR, "could not initialize PLy_ResultType");
 	if (PyType_Ready(&PLy_SubtransactionType) < 0)
 		elog(ERROR, "could not initialize PLy_SubtransactionType");
+	if (PyType_Ready(&PLy_CursorType) < 0)
+		elog(ERROR, "could not initialize PLy_CursorType");
 
 #if PY_MAJOR_VERSION >= 3
 	PyModule_Create(&PLy_module);
diff --git a/src/pl/plpython/sql/plpython_spi.sql b/src/pl/plpython/sql/plpython_spi.sql
index 7f8f6a3..874b31e 100644
--- a/src/pl/plpython/sql/plpython_spi.sql
+++ b/src/pl/plpython/sql/plpython_spi.sql
@@ -105,3 +105,119 @@ else:
 $$ LANGUAGE plpythonu;
 
 SELECT result_nrows_test();
+
+
+-- cursor objects
+
+CREATE FUNCTION simple_cursor_test() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+does = 0
+for row in res:
+    if row['lname'] == 'doe':
+        does += 1
+return does
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION double_cursor_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+res.close()
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_fetch() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+assert len(res.fetch(3)) == 3
+assert len(res.fetch(3)) == 1
+assert len(res.fetch(3)) == 0
+assert len(res.fetch(3)) == 0
+try:
+    # use next() or __next__(), the method name changed in
+    # http://www.python.org/dev/peps/pep-3114/
+    try:
+        res.next()
+    except AttributeError:
+        res.__next__()
+except StopIteration:
+    pass
+else:
+    assert False, "StopIteration not raised"
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_mix_next_and_fetch() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users order by fname")
+assert len(res.fetch(2)) == 2
+
+item = None
+try:
+    item = res.next()
+except AttributeError:
+    item = res.__next__()
+assert item['fname'] == 'rick'
+
+assert len(res.fetch(2)) == 1
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION fetch_after_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+try:
+    res.fetch(1)
+except ValueError:
+    pass
+else:
+    assert False, "ValueError not raised"
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION next_after_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+try:
+    try:
+        res.next()
+    except AttributeError:
+        res.__next__()
+except ValueError:
+    pass
+else:
+    assert False, "ValueError not raised"
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_fetch_next_empty() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users where false")
+assert len(res.fetch(1)) == 0
+try:
+    try:
+        res.next()
+    except AttributeError:
+        res.__next__()
+except StopIteration:
+    pass
+else:
+    assert False, "StopIteration not raised"
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_plan() RETURNS SETOF text AS $$
+plan = plpy.prepare(
+    "select fname, lname from users where fname like $1 || '%' order by fname",
+    ["text"])
+for row in plpy.cursor(plan, ["w"]):
+    yield row['fname']
+for row in plpy.cursor(plan, ["j"]):
+    yield row['fname']
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_plan_wrong_args() RETURNS SETOF text AS $$
+plan = plpy.prepare("select fname, lname from users where fname like $1 || '%'",
+                    ["text"])
+c = plpy.cursor(plan, ["a", "b"])
+$$ LANGUAGE plpythonu;
+
+SELECT simple_cursor_test();
+SELECT double_cursor_close();
+SELECT cursor_fetch();
+SELECT cursor_mix_next_and_fetch();
+SELECT fetch_after_close();
+SELECT next_after_close();
+SELECT cursor_fetch_next_empty();
+SELECT cursor_plan();
+SELECT cursor_plan_wrong_args();
diff --git a/src/pl/plpython/sql/plpython_subtransaction.sql b/src/pl/plpython/sql/plpython_subtransaction.sql
index a19cad5..4173cd7 100644
--- a/src/pl/plpython/sql/plpython_subtransaction.sql
+++ b/src/pl/plpython/sql/plpython_subtransaction.sql
@@ -242,3 +242,55 @@ SELECT pk_violation_inside_subtransaction();
 SELECT * FROM subtransaction_tbl;
 
 DROP TABLE subtransaction_tbl;
+
+-- cursor/subtransactions interactions
+
+CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$
+with plpy.subtransaction():
+    cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+    cur.fetch(10)
+fetched = cur.fetch(10);
+return int(fetched[5]["i"])
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$
+try:
+    with plpy.subtransaction():
+        cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+        cur.fetch(10);
+	plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    fetched = cur.fetch(10)
+    return int(fetched[5]["i"])
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$
+try:
+    with plpy.subtransaction():
+        plpy.execute('create temporary table tmp(i) '
+                     'as select generate_series(1, 10)')
+        plan = plpy.prepare("select i from tmp")
+        cur = plpy.cursor(plan)
+	plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    fetched = cur.fetch(5)
+    return fetched[2]["i"]
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$
+try:
+    with plpy.subtransaction():
+        cur = plpy.cursor('select 1')
+	plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    cur.close()
+    return True
+return False # not reached
+$$ LANGUAGE plpythonu;
+
+SELECT cursor_in_subxact();
+SELECT cursor_aborted_subxact();
+SELECT cursor_plan_aborted_subxact();
+SELECT cursor_close_aborted_subxact();
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to