Completed the postgres module tests.

This commit is contained in:
Maurice Makaay 2019-12-20 13:44:16 +01:00
parent 475e20a588
commit 5e84e88bc1
3 changed files with 200 additions and 94 deletions

View File

@ -240,8 +240,8 @@ class PgConnectionViaPgbouncer(PgConnection):
# allows for doing some extra checks.
def verify_connection(self):
"""Check if the connection via pgbouncer ends up with the
configured node."""
"""This function checks if the connection via pgbouncer ends up at
the currently known primary node."""
# This is done in a somewhat convoluted way with a subprocess and a
# timer. This is done, because a connection is made through
# pgbouncer, and pgbouncer will try really hard to connect the user
@ -258,10 +258,7 @@ class PgConnectionViaPgbouncer(PgConnection):
# we're going for HA here."""
def check_func(report_func):
try:
# Setup the database connection
self.connect()
# Check if we're connected to the requested node.
with self.conn.cursor() as cursor:
try:
cursor.execute(VERIFY_QUERY, {
@ -269,28 +266,47 @@ class PgConnectionViaPgbouncer(PgConnection):
"port": self.node_config.port
})
result = cursor.fetchone()[0]
self.disconnect()
if result is not None:
return report_func(
False,
"The pgbouncer is not connected to the expected PostgreSQL " +
"backend service: %s" % result,
cursor)
except Exception as exception:
self.disconnect()
return report_func(False, format_ex(exception), cursor)
return report_func(
False,
"Error running query: %s %s" %
(repr(cursor), format_ex(exception)),
cursor)
except Exception as exception:
return report_func(False, format_ex(exception), None)
self.disconnect()
return report_func(
False,
"Error connecting: %s" % format_ex(exception),
None)
self.disconnect()
# When the verify query returned an error message, the
# verification failed.
if result is not None:
return report_func(
False,
"The pgbouncer is not connected to the expected " +
"PostgreSQL backend service: %s" % result,
cursor)
# When the verify query did not return an error message, then we
# are in the green.
return report_func(True, None, cursor)
parent_conn, child_conn = multiprocessing.Pipe()
def report_func(true_or_false, exception, cursor):
"""This function is used to report back results from the
subprocess that runs the check_func."""
# The cursor is included in the results for unit testing purposes.
# The cursor is modified, but since we're running in a subprocess,
# we wouldn't be able to see these modifications otherwise.
child_conn.send([true_or_false, exception, cursor])
child_conn.close()
# Start the subprocess and wait for it to finish within the
# configured connect timeout. When the connect timeout is not
# met, the subprocess will be killed off.
parent_conn, child_conn = multiprocessing.Pipe()
proc = multiprocessing.Process(target=check_func, args=(report_func,))
proc.start()
proc.join(self.node_config.connect_timeout)

View File

@ -75,7 +75,6 @@ class StubCursor(list):
PostgreSQL server instance."""
def __init__(self, *results):
super().__init__(results)
self.results = list(results)
self.statusmessage = None
self.rows = None
self.query = None
@ -90,7 +89,7 @@ class StubCursor(list):
def execute(self, query, params=()):
self.query = query
self.params = params
response = self.results.pop(0)
response = self.pop(0)
if isinstance(response, Exception):
raise response
if callable(response):

View File

@ -1,5 +1,12 @@
# -*- coding: utf-8 -*-
"""The tests in this module all make use of a fully stubbed psycopg2
connection to test the various PostgreSQL connection classes.
Using the stubs, all logic routes can be tested and we can spy on
the output of the various functions, without having to connect to
an actual PostgreSQL server."""
import unittest
import psycopg2
import time
@ -13,27 +20,27 @@ class PgConnectionTests(unittest.TestCase):
def test_GivenAuthenticationFailure_PgConnection_RaisesException(self):
self._test_connect_exception(
PgConnection, StubPsycopg2().add_auth_failure(),
PgConnectionFailed, "authentication failed")
PgConnectionFailed, 'authentication failed')
def test_GivenConnectionTimeout_PgConnection_RaisesException(self):
self._test_connect_exception(
PgConnection, StubPsycopg2().add_conn_timeout(),
PgConnectionFailed, "timeout expired")
PgConnectionFailed, 'timeout expired')
def test_GivenConnectionFailure_PgConnection_RaisesException(self):
self._test_connect_exception(
PgConnection, StubPsycopg2().add_conn_failure(),
PgConnectionFailed, "could not connect")
PgConnectionFailed, 'could not connect')
def test_GivenPostgresStartingUp_PgConnection_RaisesException(self):
self._test_connect_exception(
PgConnection, StubPsycopg2().add_admin_startup(),
PgConnectionFailed, "system is starting up")
PgConnectionFailed, 'system is starting up')
def test_GivenPostgresShuttingDown_PgConnection_RaisesException(self):
self._test_connect_exception(
PgConnection, StubPsycopg2().add_admin_shutdown(),
PgConnectionFailed, "AdminShutdown")
PgConnectionFailed, 'AdminShutdown')
def _test_connect_exception(self, test_class, stub_psycopg2, err, msg):
pg = test_class(NodeConfig(1), Logger(), stub_psycopg2)
@ -43,13 +50,13 @@ class PgConnectionTests(unittest.TestCase):
def test_NodeConfig_IsAppliedToConnParams(self):
node_config = NodeConfig(1)
node_config.host = "1.1.1.1"
node_config.host = '1.1.1.1'
node_config.port = 9999
pg = PgConnection(node_config, Logger(), None)
self.assertEqual(1, pg.node_id)
self.assertEqual("1.1.1.1", pg.conn_params["host"])
self.assertEqual(9999, pg.conn_params["port"])
self.assertEqual('1.1.1.1', pg.conn_params['host'])
self.assertEqual(9999, pg.conn_params['port'])
def test_GivenNoneValueInNodeConfig_ValueIsOmittedInConnParams(self):
node_config = NodeConfig(2)
@ -60,8 +67,8 @@ class PgConnectionTests(unittest.TestCase):
pg.connect()
self.assertEqual(2, pg.node_id)
self.assertNotIn("host", pg.conn_params)
self.assertNotIn("port", pg.conn_params)
self.assertNotIn('host', pg.conn_params)
self.assertNotIn('port', pg.conn_params)
def test_FirstConnect_SetsUpConnection(self):
stub_psycopg2 = StubPsycopg2().add_connection(StubConnection())
@ -69,10 +76,10 @@ class PgConnectionTests(unittest.TestCase):
result = pg.connect()
self.assertEqual("CONN_CONNECTED", result)
self.assertEqual('CONN_CONNECTED', result)
def test_SecondConnect_PingsAndReusesConnection(self):
cursor = StubCursor(("SELECT", [[1]]))
cursor = StubCursor(('SELECT', [[1]]))
conn = StubConnection(cursor)
stub_psycopg2 = StubPsycopg2().add_connection(conn)
pg = PgConnection(NodeConfig('b'), Logger(), stub_psycopg2)
@ -80,9 +87,9 @@ class PgConnectionTests(unittest.TestCase):
result1 = pg.connect()
self.assertIs(None, cursor.query)
result2 = pg.connect()
self.assertEqual("SELECT 1", cursor.query)
self.assertEqual("CONN_CONNECTED", result1)
self.assertEqual("CONN_REUSED", result2)
self.assertEqual('SELECT 1', cursor.query)
self.assertEqual('CONN_CONNECTED', result1)
self.assertEqual('CONN_REUSED', result2)
def test_SecondConnectPingFails_SetsUpNewConnection(self):
conn1 = StubConnection(StubCursor(psycopg2.OperationalError()))
@ -93,8 +100,8 @@ class PgConnectionTests(unittest.TestCase):
result1 = pg.connect() # Connection OK
result2 = pg.connect() # Ping fails, reconnectt
self.assertEqual("CONN_CONNECTED", result1)
self.assertEqual("CONN_RECONNECTED", result2)
self.assertEqual('CONN_CONNECTED', result1)
self.assertEqual('CONN_RECONNECTED', result2)
def test_Disconnect_ClosesConnection(self):
conn = StubConnection()
@ -110,10 +117,10 @@ class PgConnectionTests(unittest.TestCase):
conn1 = StubConnection(
StubCursor(psycopg2.OperationalError()))
conn2 = StubConnection(
StubCursor(("SELECT", [])),
StubCursor(("SELECT", [])))
StubCursor(('SELECT', [])),
StubCursor(('SELECT', [])))
conn3 = StubConnection(
StubCursor(("SELECT", [])))
StubCursor(('SELECT', [])))
stub_psycopg2 = StubPsycopg2(conn1, conn2, conn3)
pg = PgConnection(NodeConfig('b'), Logger(), stub_psycopg2)
@ -126,12 +133,12 @@ class PgConnectionTests(unittest.TestCase):
result5 = pg.connect() # Connection 3 OK
result6 = pg.connect() # Ping success
self.assertEqual("CONN_CONNECTED", result1)
self.assertEqual("CONN_RECONNECTED", result2)
self.assertEqual("CONN_REUSED", result3)
self.assertEqual("CONN_REUSED", result4)
self.assertEqual("CONN_CONNECTED", result5)
self.assertEqual("CONN_REUSED", result6)
self.assertEqual('CONN_CONNECTED', result1)
self.assertEqual('CONN_RECONNECTED', result2)
self.assertEqual('CONN_REUSED', result3)
self.assertEqual('CONN_REUSED', result4)
self.assertEqual('CONN_CONNECTED', result5)
self.assertEqual('CONN_REUSED', result6)
class PgReplicationConnectionTests(unittest.TestCase):
@ -139,157 +146,185 @@ class PgReplicationConnectionTests(unittest.TestCase):
conn = StubConnection()
stub_psycopg2 = StubPsycopg2().add_connection(conn)
pg = PgReplicationConnection(
NodeConfig("foo"), Logger(), stub_psycopg2)
NodeConfig('foo'), Logger(), stub_psycopg2)
pg.connect()
self.assertEqual(
psycopg2.extras.LogicalReplicationConnection,
pg.conn_params["connection_factory"])
pg.conn_params['connection_factory'])
def test_GivenFailingConnection_ReplicationStatusIsOffline(self):
stub_psycopg2 = StubPsycopg2().add_auth_failure()
pg = PgReplicationConnection(
NodeConfig("foo"), Logger(), stub_psycopg2)
NodeConfig('foo'), Logger(), stub_psycopg2)
status = pg.get_replication_status()
self.assertEqual({
"status": "NODE_OFFLINE",
"system_id": None,
"timeline_id": None}, status)
'status': 'NODE_OFFLINE',
'system_id': None,
'timeline_id': None}, status)
def test_GivenFailingStandbyQuery_ReplicationStatusRaisesException(self):
conn = StubConnection(StubCursor(psycopg2.OperationalError()))
pg = PgReplicationConnection(
NodeConfig("foo"), Logger(), StubPsycopg2(conn))
NodeConfig('foo'), Logger(), StubPsycopg2(conn))
with self.assertRaises(RetrievingPgReplicationStatusFailed) as context:
pg.get_replication_status()
self.assertIn("pg_is_in_recovery() failed", str(context.exception))
self.assertIn('pg_is_in_recovery() failed', str(context.exception))
def test_GivenFailingIdentifySystemQuery_ReplicationStatusRaisesException(self):
conn = StubConnection(
StubCursor(("SELECT", [[False]])),
StubCursor(('SELECT', [[False]])),
psycopg2.OperationalError())
pg = PgReplicationConnection(
NodeConfig("foo"), Logger(), StubPsycopg2(conn))
NodeConfig('foo'), Logger(), StubPsycopg2(conn))
with self.assertRaises(RetrievingPgReplicationStatusFailed) as context:
pg.get_replication_status()
self.assertIn("IDENTIFY_SYSTEM failed", str(context.exception))
self.assertIn('IDENTIFY_SYSTEM failed', str(context.exception))
def test_GivenConnectionToPrimaryNode_ReplicationStatusIsPrimary(self):
conn = StubConnection(
StubCursor(("SELECT", [[False]])),
StubCursor(("IDENTIFY_SYSTEM", [["id", 1234, "other", "fields"]])))
StubCursor(('SELECT', [[False]])),
StubCursor(('IDENTIFY_SYSTEM', [['id', 1234, 'other', 'fields']])))
stub_psycopg2 = StubPsycopg2(conn)
pg = PgReplicationConnection(
NodeConfig("foo"), Logger(), stub_psycopg2)
NodeConfig('foo'), Logger(), stub_psycopg2)
status = pg.get_replication_status()
self.assertEqual({
"status": "NODE_PRIMARY",
"system_id": "id",
"timeline_id": 1234}, status)
'status': 'NODE_PRIMARY',
'system_id': 'id',
'timeline_id': 1234}, status)
def test_GivenConnectionToStandbyNode_ReplicationStatusIsStandby(self):
conn = StubConnection(
StubCursor(("SELECT", [[True]])),
StubCursor(("IDENTIFY_SYSTEM", [["towel", 42, "other", "fields"]])))
StubCursor(('SELECT', [[True]])),
StubCursor(('IDENTIFY_SYSTEM', [['towel', 42, 'other', 'fields']])))
stub_psycopg2 = StubPsycopg2(conn)
pg = PgReplicationConnection(
NodeConfig("foo"), Logger(), stub_psycopg2)
NodeConfig('foo'), Logger(), stub_psycopg2)
status = pg.get_replication_status()
self.assertEqual({
"status": "NODE_STANDBY",
"system_id": "towel",
"timeline_id": 42}, status)
'status': 'NODE_STANDBY',
'system_id': 'towel',
'timeline_id': 42}, status)
class PgConnectionViaPgbouncerTests(unittest.TestCase):
def test_NodeConfigAndPgBouncerConfig_AreMergedInConnParams(self):
node_config = NodeConfig(777)
node_config.host = "1.1.1.1"
node_config.host = '1.1.1.1'
node_config.port = 9999
pgbouncer_config = {"host": "192.168.0.1", "port": 7654}
pgbouncer_config = {'host': '192.168.0.1', 'port': 7654}
pg = PgConnectionViaPgbouncer(
node_config, pgbouncer_config, Logger(), None)
self.assertEqual(777, pg.node_id)
self.assertEqual("template1", pg.conn_params["database"])
self.assertEqual("192.168.0.1", pg.conn_params["host"])
self.assertEqual(7654, pg.conn_params["port"])
self.assertEqual('template1', pg.conn_params['database'])
self.assertEqual('192.168.0.1', pg.conn_params['host'])
self.assertEqual(7654, pg.conn_params['port'])
def test_WhenVerifyQueryCannotConnect_ExceptionIsRaised(self):
def test_WhenVerifyQueryCannotConnect_ErrorIsReported(self):
stub_psycopg2 = StubPsycopg2().add_auth_failure()
node_config = NodeConfig('xyz123')
pgbouncer_config = {"host": "192.168.0.1", "port": 7654}
pgbouncer_config = {'host': '192.168.0.1', 'port': 7654}
pg = PgConnectionViaPgbouncer(
node_config, pgbouncer_config, Logger(), stub_psycopg2)
ok, err, cursor = pg.verify_connection()
self.assertFalse(ok)
self.assertIs(None, cursor)
self.assertIn("password authentication failed", str(err))
self.assertIn('password authentication failed', str(err))
def test_WhenVerifyQueryTimesOut_ExceptionIsRaised(self):
def test_WhenVerifyQueryCannotCreateCursor_ErrorIsReported(self):
conn = StubConnection(psycopg2.OperationalError('borked cursor'))
stub_psycopg2 = StubPsycopg2(conn)
node_config = NodeConfig('xyz123')
pgbouncer_config = {'host': '192.168.0.1', 'port': 7654}
pg = PgConnectionViaPgbouncer(
node_config, pgbouncer_config, Logger(), stub_psycopg2)
ok, err, cursor = pg.verify_connection()
self.assertFalse(ok)
self.assertIs(None, cursor)
self.assertIn('Error connecting', str(err))
self.assertIn('borked cursor', str(err))
def test_WhenVerifyQueryFails_ErrorIsReported(self):
cursor = StubCursor(psycopg2.OperationalError('borked query'))
conn = StubConnection(cursor)
stub_psycopg2 = StubPsycopg2(conn)
node_config = NodeConfig('xyz123')
pgbouncer_config = {'host': '192.168.0.1', 'port': 7654}
pg = PgConnectionViaPgbouncer(
node_config, pgbouncer_config, Logger(), stub_psycopg2)
ok, err, cursor = pg.verify_connection()
self.assertFalse(ok)
self.assertIn('Error running query', str(err))
self.assertIn('borked query', str(err))
def test_WhenVerifyQueryTimesOut_ErrorIsReported(self):
def sleeping_query(query):
time.sleep(1)
conn = StubConnection(StubCursor(sleeping_query))
stub_psycopg2 = StubPsycopg2(conn)
node_config = NodeConfig('xyz123')
node_config.connect_timeout = 0.01
pgbouncer_config = {"host": "192.168.0.1", "port": 7654}
pgbouncer_config = {'host': '192.168.0.1', 'port': 7654}
pg = PgConnectionViaPgbouncer(
node_config, pgbouncer_config, Logger(), stub_psycopg2)
ok, err, cursor = pg.verify_connection()
self.assertFalse(ok)
self.assertIs(None, cursor)
self.assertIn("Connection attempt timed out", str(err))
self.assertIn('Connection attempt timed out', str(err))
def test_WhenVerifyQueryRuns_NodeHostAndPortAreQueried(self):
conn = StubConnection(StubCursor(('SELECT', [[None]])))
stub_psycopg2 = StubPsycopg2(conn)
node_config = NodeConfig('uh')
node_config.host = "111.222.111.222"
node_config.host = '111.222.111.222'
node_config.port = 1122
pgbouncer_config = {"host": "192.168.0.1", "port": 7654}
pgbouncer_config = {'host': '192.168.0.1', 'port': 7654}
pg = PgConnectionViaPgbouncer(
node_config, pgbouncer_config, Logger(), stub_psycopg2)
ok, err, cursor = pg.verify_connection()
self.assertIn("inet_server_addr()", cursor.query)
self.assertIn("%(host)s", cursor.query)
self.assertIn("inet_server_port()", cursor.query)
self.assertIn("%(port)s", cursor.query)
self.assertIn('inet_server_addr()', cursor.query)
self.assertIn('%(host)s', cursor.query)
self.assertIn('inet_server_port()', cursor.query)
self.assertIn('%(port)s', cursor.query)
self.assertEqual({
"host": "111.222.111.222",
"port": 1122
'host': '111.222.111.222',
'port': 1122
}, cursor.params)
def test_WhenVerifyQueryReturnsIssue_IssueIsReported(self):
def test_WhenVerifyQueryReturnsIssue_ErrorIsReported(self):
cursor = StubCursor(('SELECT', [['Bokito has escaped']]))
conn = StubConnection(cursor)
stub_psycopg2 = StubPsycopg2(conn)
node_config = NodeConfig('xyz123')
node_config.host = "111.222.111.222"
node_config.host = '111.222.111.222'
node_config.port = 1122
pgbouncer_config = {"host": "192.168.0.1", "port": 7654}
pgbouncer_config = {'host': '192.168.0.1', 'port': 7654}
pg = PgConnectionViaPgbouncer(
node_config, pgbouncer_config, Logger(), stub_psycopg2)
ok, err, _ = pg.verify_connection()
self.assertFalse(ok)
self.assertIn("not connected to the expected PostgreSQL backend", err)
self.assertIn("Bokito has escaped", err)
self.assertIn('not connected to the expected PostgreSQL backend', err)
self.assertIn('Bokito has escaped', err)
def test_WhenVerifyQueryReturnsNoIssue_OkIsReported(self):
conn = StubConnection(StubCursor(('SELECT', [[None]])))
stub_psycopg2 = StubPsycopg2(conn)
node_config = NodeConfig('xyz123')
pgbouncer_config = {"host": "192.168.0.1", "port": 7654}
pgbouncer_config = {'host': '192.168.0.1', 'port': 7654}
pg = PgConnectionViaPgbouncer(
node_config, pgbouncer_config, Logger(), stub_psycopg2)
@ -297,13 +332,69 @@ class PgConnectionViaPgbouncerTests(unittest.TestCase):
self.assertTrue(ok)
self.assertIs(None, err)
class PgBouncerConsoleConnectionTests(unittest.TestCase):
def test_OnConnect_AutoCommitIsEnabled(self):
def test_OnConnect_DatabaseIsPgbouncerAndAutoCommitIsEnabled(self):
conn = StubConnection()
stub_psycopg2 = StubPsycopg2().add_connection(conn)
pg = PgBouncerConsoleConnection(
NodeConfig("bob"), Logger(), stub_psycopg2)
NodeConfig('bob'), Logger(), stub_psycopg2)
# Autocommit must be enabled, because the console does
# not support transactions.
self.assertFalse(conn.autocommit)
pg.connect()
self.assertTrue(conn.autocommit)
# To connect to the console, the database 'pgbouncer' must be used.
self.assertEqual('pgbouncer', pg.conn_params['database'])
def test_OnReconnect_ShowVersionIsUsedForPinging(self):
'''Because the pgbouncer console does not support SELECT queries,
we have to make use of another query to check if the console
connection is still alive.'''
cursor = StubCursor(('SHOW', [['the version']]))
conn = StubConnection(cursor)
stub_psycopg2 = StubPsycopg2(conn)
pg = PgBouncerConsoleConnection(
NodeConfig('bar'), Logger(), stub_psycopg2)
pg.connect()
result = pg.connect()
self.assertEqual('CONN_REUSED', result)
self.assertEqual('SHOW VERSION', cursor.query)
def test_WhenReloadFails_ExceptionIsRaised(self):
conn = StubConnection(psycopg2.OperationalError('borked reload'))
stub_psycopg2 = StubPsycopg2(conn)
pg = PgBouncerConsoleConnection(
NodeConfig(1001), Logger(), stub_psycopg2)
with self.assertRaises(ReloadingPgbouncerFailed) as context:
pg.reload()
self.assertIn('borked reload', str(context.exception))
def test_WhenReloadReturnsAnUnexpectedStatusMessage_ExceptionIsRaised(self):
cursor = StubCursor(('WEIRD', [[None]]))
conn = StubConnection(cursor)
stub_psycopg2 = StubPsycopg2(conn)
pg = PgBouncerConsoleConnection(
NodeConfig(1002), Logger(), stub_psycopg2)
with self.assertRaises(ReloadingPgbouncerFailed) as context:
pg.reload()
self.assertIn(
'Unexpected status message: WEIRD',
str(context.exception))
def test_WhenReloadIsSuccessful_NoExceptionIsRaised(self):
cursor = StubCursor(('RELOAD', [[None]]))
conn = StubConnection(cursor)
stub_psycopg2 = StubPsycopg2(conn)
pg = PgBouncerConsoleConnection(
NodeConfig(1002), Logger(), stub_psycopg2)
pg.reload()
self.assertEqual('RELOAD', cursor.query)
self.assertEqual('RELOAD', cursor.statusmessage)