Viewing file: test_async.py (15.39 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
#!/usr/bin/env python # -*- coding: utf-8 -*-
# test__async.py - unit test for _asynchronous API # # Copyright (C) 2010-2011 Jan UrbaĆski <wulczer@wulczer.org> # # psycopg2 is free software: you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # In addition, as a special exception, the copyright holders give # permission to link this program with the OpenSSL library (or with # modified versions of OpenSSL that use the same license as OpenSSL), # and distribute linked combinations including the two. # # You must obey the GNU Lesser General Public License in all respects for # all of the code used other than OpenSSL. # # psycopg2 is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details.
from .testutils import unittest, skip_before_postgres
import psycopg2 from psycopg2 import extensions
import time import select import io
from .testutils import ConnectingTestCase
class PollableStub(object): """A 'pollable' wrapper allowing analysis of the `poll()` calls.""" def __init__(self, pollable): self.pollable = pollable self.polls = []
def fileno(self): return self.pollable.fileno()
def poll(self): rv = self.pollable.poll() self.polls.append(rv) return rv
class AsyncTests(ConnectingTestCase):
def setUp(self): ConnectingTestCase.setUp(self)
self.sync_conn = self.conn self.conn = self.connect(_async=True)
self.wait(self.conn)
curs = self.conn.cursor() curs.execute(''' CREATE TEMPORARY TABLE table1 ( id int PRIMARY KEY )''') self.wait(curs)
def wait(self, cur_or_conn): pollable = cur_or_conn if not hasattr(pollable, 'poll'): pollable = cur_or_conn.connection while True: state = pollable.poll() if state == psycopg2.extensions.POLL_OK: break elif state == psycopg2.extensions.POLL_READ: select.select([pollable], [], [], 10) elif state == psycopg2.extensions.POLL_WRITE: select.select([], [pollable], [], 10) else: raise Exception("Unexpected result from poll: %r", state)
def test_connection_setup(self): cur = self.conn.cursor() sync_cur = self.sync_conn.cursor()
self.assertTrue(self.conn._async) self.assertTrue(not self.sync_conn._async)
# the _async connection should be in isolevel 0 self.assertEqual(self.conn.isolation_level, 0)
# check other properties to be found on the connection self.assertTrue(self.conn.server_version) self.assertTrue(self.conn.protocol_version in (2,3)) self.assertTrue(self.conn.encoding in psycopg2.extensions.encodings)
def test__async_named_cursor(self): self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor, "name")
def test__async_select(self): cur = self.conn.cursor() self.assertFalse(self.conn.isexecuting()) cur.execute("select 'a'") self.assertTrue(self.conn.isexecuting())
self.wait(cur)
self.assertFalse(self.conn.isexecuting()) self.assertEqual(cur.fetchone()[0], "a")
@skip_before_postgres(8, 2) def test__async_callproc(self): cur = self.conn.cursor() cur.callproc("pg_sleep", (0.1, )) self.assertTrue(self.conn.isexecuting())
self.wait(cur) self.assertFalse(self.conn.isexecuting()) self.assertEqual(cur.fetchall()[0][0], '')
def test__async_after__async(self): cur = self.conn.cursor() cur2 = self.conn.cursor()
cur.execute("insert into table1 values (1)")
# an _async execute after an _async one raises an exception self.assertRaises(psycopg2.ProgrammingError, cur.execute, "select * from table1") # same for callproc self.assertRaises(psycopg2.ProgrammingError, cur.callproc, "version") # but after you've waited it should be good self.wait(cur) cur.execute("select * from table1") self.wait(cur)
self.assertEqual(cur.fetchall()[0][0], 1)
cur.execute("delete from table1") self.wait(cur)
cur.execute("select * from table1") self.wait(cur)
self.assertEqual(cur.fetchone(), None)
def test_fetch_after__async(self): cur = self.conn.cursor() cur.execute("select 'a'")
# a fetch after an _asynchronous query should raise an error self.assertRaises(psycopg2.ProgrammingError, cur.fetchall) # but after waiting it should work self.wait(cur) self.assertEqual(cur.fetchall()[0][0], "a")
def test_rollback_while__async(self): cur = self.conn.cursor()
cur.execute("select 'a'")
# a rollback should not work in _asynchronous mode self.assertRaises(psycopg2.ProgrammingError, self.conn.rollback)
def test_commit_while__async(self): cur = self.conn.cursor()
cur.execute("begin") self.wait(cur)
cur.execute("insert into table1 values (1)")
# a commit should not work in _asynchronous mode self.assertRaises(psycopg2.ProgrammingError, self.conn.commit) self.assertTrue(self.conn.isexecuting())
# but a manual commit should self.wait(cur) cur.execute("commit") self.wait(cur)
cur.execute("select * from table1") self.wait(cur) self.assertEqual(cur.fetchall()[0][0], 1)
cur.execute("delete from table1") self.wait(cur)
cur.execute("select * from table1") self.wait(cur) self.assertEqual(cur.fetchone(), None)
def test_set_parameters_while__async(self): cur = self.conn.cursor()
cur.execute("select 'c'") self.assertTrue(self.conn.isexecuting())
# getting transaction status works self.assertEqual(self.conn.get_transaction_status(), extensions.TRANSACTION_STATUS_ACTIVE) self.assertTrue(self.conn.isexecuting())
# setting connection encoding should fail self.assertRaises(psycopg2.ProgrammingError, self.conn.set_client_encoding, "LATIN1")
# same for transaction isolation self.assertRaises(psycopg2.ProgrammingError, self.conn.set_isolation_level, 1)
def test_reset_while__async(self): cur = self.conn.cursor() cur.execute("select 'c'") self.assertTrue(self.conn.isexecuting())
# a reset should fail self.assertRaises(psycopg2.ProgrammingError, self.conn.reset)
def test__async_iter(self): cur = self.conn.cursor()
cur.execute("begin") self.wait(cur) cur.execute(""" insert into table1 values (1); insert into table1 values (2); insert into table1 values (3); """) self.wait(cur) cur.execute("select id from table1 order by id")
# iteration fails if a query is underway self.assertRaises(psycopg2.ProgrammingError, list, cur)
# but after it's done it should work self.wait(cur) self.assertEqual(list(cur), [(1, ), (2, ), (3, )]) self.assertFalse(self.conn.isexecuting())
def test_copy_while__async(self): cur = self.conn.cursor() cur.execute("select 'a'")
# copy should fail self.assertRaises(psycopg2.ProgrammingError, cur.copy_from, io.StringIO("1\n3\n5\n\\.\n"), "table1")
def test_lobject_while__async(self): # large objects should be prohibited self.assertRaises(psycopg2.ProgrammingError, self.conn.lobject)
def test__async_executemany(self): cur = self.conn.cursor() self.assertRaises( psycopg2.ProgrammingError, cur.executemany, "insert into table1 values (%s)", [1, 2, 3])
def test__async_scroll(self): cur = self.conn.cursor() cur.execute(""" insert into table1 values (1); insert into table1 values (2); insert into table1 values (3); """) self.wait(cur) cur.execute("select id from table1 order by id")
# scroll should fail if a query is underway self.assertRaises(psycopg2.ProgrammingError, cur.scroll, 1) self.assertTrue(self.conn.isexecuting())
# but after it's done it should work self.wait(cur) cur.scroll(1) self.assertEqual(cur.fetchall(), [(2, ), (3, )])
cur = self.conn.cursor() cur.execute("select id from table1 order by id") self.wait(cur)
cur2 = self.conn.cursor() self.assertRaises(psycopg2.ProgrammingError, cur2.scroll, 1)
self.assertRaises(psycopg2.ProgrammingError, cur.scroll, 4)
cur = self.conn.cursor() cur.execute("select id from table1 order by id") self.wait(cur) cur.scroll(2) cur.scroll(-1) self.assertEqual(cur.fetchall(), [(2, ), (3, )])
def test_scroll(self): cur = self.sync_conn.cursor() cur.execute("create table table1 (id int)") cur.execute(""" insert into table1 values (1); insert into table1 values (2); insert into table1 values (3); """) cur.execute("select id from table1 order by id") cur.scroll(2) cur.scroll(-1) self.assertEqual(cur.fetchall(), [(2, ), (3, )])
def test__async_dont_read_all(self): cur = self.conn.cursor() cur.execute("select repeat('a', 10000); select repeat('b', 10000)")
# fetch the result self.wait(cur)
# it should be the result of the second query self.assertEqual(cur.fetchone()[0], "b" * 10000)
def test__async_subclass(self): class MyConn(psycopg2.extensions.connection): def __init__(self, dsn, _async=0): psycopg2.extensions.connection.__init__(self, dsn, _async=_async)
conn = self.connect(connection_factory=MyConn, _async=True) self.assertTrue(isinstance(conn, MyConn)) self.assertTrue(conn._async) conn.close()
def test_flush_on_write(self): # a very large query requires a flush loop to be sent to the backend curs = self.conn.cursor() for mb in 1, 5, 10, 20, 50: size = mb * 1024 * 1024 stub = PollableStub(self.conn) curs.execute("select %s;", ('x' * size,)) self.wait(stub) self.assertEqual(size, len(curs.fetchone()[0])) if stub.polls.count(psycopg2.extensions.POLL_WRITE) > 1: return
# This is more a testing glitch than an error: it happens # on high load on linux: probably because the kernel has more # buffers ready. A warning may be useful during development, # but an error is bad during regression testing. import warnings warnings.warn("sending a large query didn't trigger block on write.")
def test_sync_poll(self): cur = self.sync_conn.cursor() cur.execute("select 1") # polling with a sync query works cur.connection.poll() self.assertEqual(cur.fetchone()[0], 1)
def test_notify(self): cur = self.conn.cursor() sync_cur = self.sync_conn.cursor()
sync_cur.execute("listen test_notify") self.sync_conn.commit() cur.execute("notify test_notify") self.wait(cur)
self.assertEqual(self.sync_conn.notifies, [])
pid = self.conn.get_backend_pid() for _ in range(5): self.wait(self.sync_conn) if not self.sync_conn.notifies: time.sleep(0.5) continue self.assertEqual(len(self.sync_conn.notifies), 1) self.assertEqual(self.sync_conn.notifies.pop(), (pid, "test_notify")) return self.fail("No NOTIFY in 2.5 seconds")
def test__async_fetch_wrong_cursor(self): cur1 = self.conn.cursor() cur2 = self.conn.cursor() cur1.execute("select 1")
self.wait(cur1) self.assertFalse(self.conn.isexecuting()) # fetching from a cursor with no results is an error self.assertRaises(psycopg2.ProgrammingError, cur2.fetchone) # fetching from the correct cursor works self.assertEqual(cur1.fetchone()[0], 1)
def test_error(self): cur = self.conn.cursor() cur.execute("insert into table1 values (%s)", (1, )) self.wait(cur) cur.execute("insert into table1 values (%s)", (1, )) # this should fail self.assertRaises(psycopg2.IntegrityError, self.wait, cur) cur.execute("insert into table1 values (%s); " "insert into table1 values (%s)", (2, 2)) # this should fail as well self.assertRaises(psycopg2.IntegrityError, self.wait, cur) # but this should work cur.execute("insert into table1 values (%s)", (2, )) self.wait(cur) # and the cursor should be usable afterwards cur.execute("insert into table1 values (%s)", (3, )) self.wait(cur) cur.execute("select * from table1 order by id") self.wait(cur) self.assertEqual(cur.fetchall(), [(1, ), (2, ), (3, )]) cur.execute("delete from table1") self.wait(cur)
def test_error_two_cursors(self): cur = self.conn.cursor() cur2 = self.conn.cursor() cur.execute("select * from no_such_table") self.assertRaises(psycopg2.ProgrammingError, self.wait, cur) cur2.execute("select 1") self.wait(cur2) self.assertEqual(cur2.fetchone()[0], 1)
def test_notices(self): del self.conn.notices[:] cur = self.conn.cursor() if self.conn.server_version >= 90300: cur.execute("set client_min_messages=debug1") self.wait(cur) cur.execute("create temp table chatty (id serial primary key);") self.wait(cur) self.assertEqual("CREATE TABLE", cur.statusmessage) self.assertTrue(self.conn.notices)
def test__async_cursor_gone(self): import gc cur = self.conn.cursor() cur.execute("select 42;"); del cur gc.collect() self.assertRaises(psycopg2.InterfaceError, self.wait, self.conn)
# The connection is still usable cur = self.conn.cursor() cur.execute("select 42;"); self.wait(self.conn) self.assertEqual(cur.fetchone(), (42,))
def test__async_connection_error_message(self): try: cnn = psycopg2.connect('dbname=thisdatabasedoesntexist', _async=True) self.wait(cnn) except psycopg2.Error as e: self.assertNotEqual(str(e), "_asynchronous connection failed", "connection error reason lost") else: self.fail("no exception raised")
def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == "__main__": unittest.main()
|