401 lines
16 KiB
Python
401 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
"""The tests in this module all make use of a fully stubbed psycopg2
|
|
connection to test the various PostgreSQL connection classes.
|
|
Using the stubs, all logic routes can be tested and we can spy on
|
|
the output of the various functions, without having to connect to
|
|
an actual PostgreSQL server."""
|
|
|
|
|
|
import unittest
|
|
import psycopg2
|
|
import time
|
|
from pgbouncemgr.postgres import *
|
|
from pgbouncemgr.logger import Logger
|
|
from pgbouncemgr.node_config import NodeConfig
|
|
from tests.stub_psycopg2 import *
|
|
|
|
|
|
class PgConnectionTests(unittest.TestCase):
|
|
def test_GivenAuthenticationFailure_PgConnection_RaisesException(self):
|
|
self._test_connect_exception(
|
|
PgConnection, StubPsycopg2().add_auth_failure(),
|
|
PgConnectionFailed, 'authentication failed')
|
|
|
|
def test_GivenConnectionTimeout_PgConnection_RaisesException(self):
|
|
self._test_connect_exception(
|
|
PgConnection, StubPsycopg2().add_conn_timeout(),
|
|
PgConnectionFailed, 'timeout expired')
|
|
|
|
def test_GivenConnectionFailure_PgConnection_RaisesException(self):
|
|
self._test_connect_exception(
|
|
PgConnection, StubPsycopg2().add_conn_failure(),
|
|
PgConnectionFailed, 'could not connect')
|
|
|
|
def test_GivenPostgresStartingUp_PgConnection_RaisesException(self):
|
|
self._test_connect_exception(
|
|
PgConnection, StubPsycopg2().add_admin_startup(),
|
|
PgConnectionFailed, 'system is starting up')
|
|
|
|
def test_GivenPostgresShuttingDown_PgConnection_RaisesException(self):
|
|
self._test_connect_exception(
|
|
PgConnection, StubPsycopg2().add_admin_shutdown(),
|
|
PgConnectionFailed, 'AdminShutdown')
|
|
|
|
def _test_connect_exception(self, test_class, stub_psycopg2, err, msg):
|
|
pg = test_class(NodeConfig(1), Logger(), stub_psycopg2)
|
|
with self.assertRaises(err) as context:
|
|
pg.connect()
|
|
self.assertIn(msg, str(context.exception))
|
|
|
|
def test_NodeConfig_IsAppliedToConnParams(self):
|
|
node_config = NodeConfig(1)
|
|
node_config.host = '1.1.1.1'
|
|
node_config.port = 9999
|
|
pg = PgConnection(node_config, Logger(), None)
|
|
|
|
self.assertEqual(1, pg.node_id)
|
|
self.assertEqual('1.1.1.1', pg.conn_params['host'])
|
|
self.assertEqual(9999, pg.conn_params['port'])
|
|
|
|
def test_GivenNoneValueInNodeConfig_ValueIsOmittedInConnParams(self):
|
|
node_config = NodeConfig(2)
|
|
node_config.host = None
|
|
node_config.port = None
|
|
stub_psycopg2 = StubPsycopg2().add_connection(StubConnection())
|
|
pg = PgConnection(node_config, Logger(), stub_psycopg2)
|
|
pg.connect()
|
|
|
|
self.assertEqual(2, pg.node_id)
|
|
self.assertNotIn('host', pg.conn_params)
|
|
self.assertNotIn('port', pg.conn_params)
|
|
|
|
def test_FirstConnect_SetsUpConnection(self):
|
|
stub_psycopg2 = StubPsycopg2().add_connection(StubConnection())
|
|
pg = PgConnection(NodeConfig('a'), Logger(), stub_psycopg2)
|
|
|
|
result = pg.connect()
|
|
|
|
self.assertEqual('CONN_CONNECTED', result)
|
|
|
|
def test_SecondConnect_PingsAndReusesConnection(self):
|
|
cursor = StubCursor(('SELECT', [[1]]))
|
|
conn = StubConnection(cursor)
|
|
stub_psycopg2 = StubPsycopg2().add_connection(conn)
|
|
pg = PgConnection(NodeConfig('b'), Logger(), stub_psycopg2)
|
|
|
|
result1 = pg.connect()
|
|
self.assertIs(None, cursor.query)
|
|
result2 = pg.connect()
|
|
self.assertEqual('SELECT 1', cursor.query)
|
|
self.assertEqual('CONN_CONNECTED', result1)
|
|
self.assertEqual('CONN_REUSED', result2)
|
|
|
|
def test_SecondConnectPingFails_SetsUpNewConnection(self):
|
|
conn1 = StubConnection(StubCursor(psycopg2.OperationalError()))
|
|
conn2 = StubConnection()
|
|
stub_psycopg2 = StubPsycopg2(conn1, conn2)
|
|
pg = PgConnection(NodeConfig('b'), Logger(), stub_psycopg2)
|
|
|
|
result1 = pg.connect() # Connection OK
|
|
result2 = pg.connect() # Ping fails, reconnectt
|
|
|
|
self.assertEqual('CONN_CONNECTED', result1)
|
|
self.assertEqual('CONN_RECONNECTED', result2)
|
|
|
|
def test_Disconnect_ClosesConnection(self):
|
|
conn = StubConnection()
|
|
stub_psycopg2 = StubPsycopg2(conn)
|
|
pg = PgConnection(NodeConfig('disco'), Logger(), stub_psycopg2)
|
|
|
|
pg.connect()
|
|
self.assertTrue(conn.connected)
|
|
pg.disconnect()
|
|
self.assertFalse(conn.connected)
|
|
|
|
def test_BigConnectionFlow(self):
|
|
conn1 = StubConnection(
|
|
StubCursor(psycopg2.OperationalError()))
|
|
conn2 = StubConnection(
|
|
StubCursor(('SELECT', [])),
|
|
StubCursor(('SELECT', [])))
|
|
conn3 = StubConnection(
|
|
StubCursor(('SELECT', [])))
|
|
|
|
stub_psycopg2 = StubPsycopg2(conn1, conn2, conn3)
|
|
pg = PgConnection(NodeConfig('b'), Logger(), stub_psycopg2)
|
|
|
|
result1 = pg.connect() # Connection 1 OK
|
|
result2 = pg.connect() # Ping fails, new connection 2 OK
|
|
result3 = pg.connect() # Ping 1 success
|
|
result4 = pg.connect() # Ping 2 success
|
|
pg.disconnect() # Explicit disconnect
|
|
result5 = pg.connect() # Connection 3 OK
|
|
result6 = pg.connect() # Ping success
|
|
|
|
self.assertEqual('CONN_CONNECTED', result1)
|
|
self.assertEqual('CONN_RECONNECTED', result2)
|
|
self.assertEqual('CONN_REUSED', result3)
|
|
self.assertEqual('CONN_REUSED', result4)
|
|
self.assertEqual('CONN_CONNECTED', result5)
|
|
self.assertEqual('CONN_REUSED', result6)
|
|
|
|
|
|
class PgReplicationConnectionTests(unittest.TestCase):
|
|
def test_LogicalReplicationConnection_IsUsed(self):
|
|
conn = StubConnection()
|
|
stub_psycopg2 = StubPsycopg2().add_connection(conn)
|
|
pg = PgReplicationConnection(
|
|
NodeConfig('foo'), Logger(), stub_psycopg2)
|
|
|
|
pg.connect()
|
|
self.assertEqual(
|
|
psycopg2.extras.LogicalReplicationConnection,
|
|
pg.conn_params['connection_factory'])
|
|
|
|
def test_GivenFailingConnection_ReplicationStatusIsOffline(self):
|
|
stub_psycopg2 = StubPsycopg2().add_auth_failure()
|
|
pg = PgReplicationConnection(
|
|
NodeConfig('foo'), Logger(), stub_psycopg2)
|
|
|
|
status = pg.get_replication_status()
|
|
self.assertEqual({
|
|
'status': 'NODE_OFFLINE',
|
|
'system_id': None,
|
|
'timeline_id': None}, status)
|
|
|
|
def test_GivenFailingStandbyQuery_ReplicationStatusRaisesException(self):
|
|
conn = StubConnection(StubCursor(psycopg2.OperationalError()))
|
|
pg = PgReplicationConnection(
|
|
NodeConfig('foo'), Logger(), StubPsycopg2(conn))
|
|
|
|
with self.assertRaises(RetrievingPgReplicationStatusFailed) as context:
|
|
pg.get_replication_status()
|
|
self.assertIn('pg_is_in_recovery() failed', str(context.exception))
|
|
|
|
def test_GivenFailingIdentifySystemQuery_ReplicationStatusRaisesException(self):
|
|
conn = StubConnection(
|
|
StubCursor(('SELECT', [[False]])),
|
|
psycopg2.OperationalError())
|
|
pg = PgReplicationConnection(
|
|
NodeConfig('foo'), Logger(), StubPsycopg2(conn))
|
|
|
|
with self.assertRaises(RetrievingPgReplicationStatusFailed) as context:
|
|
pg.get_replication_status()
|
|
self.assertIn('IDENTIFY_SYSTEM failed', str(context.exception))
|
|
|
|
def test_GivenConnectionToPrimaryNode_ReplicationStatusIsPrimary(self):
|
|
conn = StubConnection(
|
|
StubCursor(('SELECT', [[False]])),
|
|
StubCursor(('IDENTIFY_SYSTEM', [['id', 1234, 'other', 'fields']])))
|
|
stub_psycopg2 = StubPsycopg2(conn)
|
|
pg = PgReplicationConnection(
|
|
NodeConfig('foo'), Logger(), stub_psycopg2)
|
|
|
|
status = pg.get_replication_status()
|
|
self.assertEqual({
|
|
'status': 'NODE_PRIMARY',
|
|
'system_id': 'id',
|
|
'timeline_id': 1234}, status)
|
|
|
|
def test_GivenConnectionToStandbyNode_ReplicationStatusIsStandby(self):
|
|
conn = StubConnection(
|
|
StubCursor(('SELECT', [[True]])),
|
|
StubCursor(('IDENTIFY_SYSTEM', [['towel', 42, 'other', 'fields']])))
|
|
stub_psycopg2 = StubPsycopg2(conn)
|
|
pg = PgReplicationConnection(
|
|
NodeConfig('foo'), Logger(), stub_psycopg2)
|
|
|
|
status = pg.get_replication_status()
|
|
self.assertEqual({
|
|
'status': 'NODE_STANDBY',
|
|
'system_id': 'towel',
|
|
'timeline_id': 42}, status)
|
|
|
|
|
|
class PgConnectionViaPgbouncerTests(unittest.TestCase):
|
|
def test_NodeConfigAndPgBouncerConfig_AreMergedInConnParams(self):
|
|
node_config = NodeConfig(777)
|
|
node_config.host = '1.1.1.1'
|
|
node_config.port = 9999
|
|
pgbouncer_config = {'host': '192.168.0.1', 'port': 7654}
|
|
|
|
pg = PgConnectionViaPgbouncer(
|
|
node_config, pgbouncer_config, Logger(), None)
|
|
|
|
self.assertEqual(777, pg.node_id)
|
|
self.assertEqual('template1', pg.conn_params['database'])
|
|
self.assertEqual('192.168.0.1', pg.conn_params['host'])
|
|
self.assertEqual(7654, pg.conn_params['port'])
|
|
|
|
def test_WhenVerifyQueryCannotConnect_ErrorIsReported(self):
|
|
stub_psycopg2 = StubPsycopg2().add_auth_failure()
|
|
node_config = NodeConfig('xyz123')
|
|
pgbouncer_config = {'host': '192.168.0.1', 'port': 7654}
|
|
pg = PgConnectionViaPgbouncer(
|
|
node_config, pgbouncer_config, Logger(), stub_psycopg2)
|
|
|
|
ok, err, cursor = pg.verify_connection()
|
|
self.assertFalse(ok)
|
|
self.assertIs(None, cursor)
|
|
self.assertIn('password authentication failed', str(err))
|
|
|
|
def test_WhenVerifyQueryCannotCreateCursor_ErrorIsReported(self):
|
|
conn = StubConnection(psycopg2.OperationalError('borked cursor'))
|
|
stub_psycopg2 = StubPsycopg2(conn)
|
|
node_config = NodeConfig('xyz123')
|
|
pgbouncer_config = {'host': '192.168.0.1', 'port': 7654}
|
|
pg = PgConnectionViaPgbouncer(
|
|
node_config, pgbouncer_config, Logger(), stub_psycopg2)
|
|
|
|
ok, err, cursor = pg.verify_connection()
|
|
self.assertFalse(ok)
|
|
self.assertIs(None, cursor)
|
|
self.assertIn('Error connecting', str(err))
|
|
self.assertIn('borked cursor', str(err))
|
|
|
|
def test_WhenVerifyQueryFails_ErrorIsReported(self):
|
|
cursor = StubCursor(psycopg2.OperationalError('borked query'))
|
|
conn = StubConnection(cursor)
|
|
stub_psycopg2 = StubPsycopg2(conn)
|
|
node_config = NodeConfig('xyz123')
|
|
pgbouncer_config = {'host': '192.168.0.1', 'port': 7654}
|
|
pg = PgConnectionViaPgbouncer(
|
|
node_config, pgbouncer_config, Logger(), stub_psycopg2)
|
|
|
|
ok, err, cursor = pg.verify_connection()
|
|
self.assertFalse(ok)
|
|
self.assertIn('Error running query', str(err))
|
|
self.assertIn('borked query', str(err))
|
|
|
|
def test_WhenVerifyQueryTimesOut_ErrorIsReported(self):
|
|
def sleeping_query(query):
|
|
time.sleep(1)
|
|
conn = StubConnection(StubCursor(sleeping_query))
|
|
stub_psycopg2 = StubPsycopg2(conn)
|
|
node_config = NodeConfig('xyz123')
|
|
node_config.connect_timeout = 0.01
|
|
pgbouncer_config = {'host': '192.168.0.1', 'port': 7654}
|
|
pg = PgConnectionViaPgbouncer(
|
|
node_config, pgbouncer_config, Logger(), stub_psycopg2)
|
|
|
|
ok, err, cursor = pg.verify_connection()
|
|
self.assertFalse(ok)
|
|
self.assertIs(None, cursor)
|
|
self.assertIn('Connection attempt timed out', str(err))
|
|
|
|
def test_WhenVerifyQueryRuns_NodeHostAndPortAreQueried(self):
|
|
conn = StubConnection(StubCursor(('SELECT', [[None]])))
|
|
stub_psycopg2 = StubPsycopg2(conn)
|
|
node_config = NodeConfig('uh')
|
|
node_config.host = '111.222.111.222'
|
|
node_config.port = 1122
|
|
pgbouncer_config = {'host': '192.168.0.1', 'port': 7654}
|
|
pg = PgConnectionViaPgbouncer(
|
|
node_config, pgbouncer_config, Logger(), stub_psycopg2)
|
|
|
|
ok, err, cursor = pg.verify_connection()
|
|
self.assertIn('inet_server_addr()', cursor.query)
|
|
self.assertIn('%(host)s', cursor.query)
|
|
self.assertIn('inet_server_port()', cursor.query)
|
|
self.assertIn('%(port)s', cursor.query)
|
|
self.assertEqual({
|
|
'host': '111.222.111.222',
|
|
'port': 1122
|
|
}, cursor.params)
|
|
|
|
def test_WhenVerifyQueryReturnsIssue_ErrorIsReported(self):
|
|
cursor = StubCursor(('SELECT', [['Bokito has escaped']]))
|
|
conn = StubConnection(cursor)
|
|
stub_psycopg2 = StubPsycopg2(conn)
|
|
node_config = NodeConfig('xyz123')
|
|
node_config.host = '111.222.111.222'
|
|
node_config.port = 1122
|
|
pgbouncer_config = {'host': '192.168.0.1', 'port': 7654}
|
|
pg = PgConnectionViaPgbouncer(
|
|
node_config, pgbouncer_config, Logger(), stub_psycopg2)
|
|
|
|
ok, err, _ = pg.verify_connection()
|
|
self.assertFalse(ok)
|
|
self.assertIn('not connected to the expected PostgreSQL backend', err)
|
|
self.assertIn('Bokito has escaped', err)
|
|
|
|
def test_WhenVerifyQueryReturnsNoIssue_OkIsReported(self):
|
|
conn = StubConnection(StubCursor(('SELECT', [[None]])))
|
|
stub_psycopg2 = StubPsycopg2(conn)
|
|
node_config = NodeConfig('xyz123')
|
|
pgbouncer_config = {'host': '192.168.0.1', 'port': 7654}
|
|
pg = PgConnectionViaPgbouncer(
|
|
node_config, pgbouncer_config, Logger(), stub_psycopg2)
|
|
|
|
ok, err, _ = pg.verify_connection()
|
|
self.assertTrue(ok)
|
|
self.assertIs(None, err)
|
|
|
|
|
|
class PgBouncerConsoleConnectionTests(unittest.TestCase):
|
|
def test_OnConnect_DatabaseIsPgbouncerAndAutoCommitIsEnabled(self):
|
|
conn = StubConnection()
|
|
stub_psycopg2 = StubPsycopg2().add_connection(conn)
|
|
pg = PgBouncerConsoleConnection(
|
|
NodeConfig('bob'), Logger(), stub_psycopg2)
|
|
|
|
# Autocommit must be enabled, because the console does
|
|
# not support transactions.
|
|
self.assertFalse(conn.autocommit)
|
|
pg.connect()
|
|
self.assertTrue(conn.autocommit)
|
|
|
|
# To connect to the console, the database 'pgbouncer' must be used.
|
|
self.assertEqual('pgbouncer', pg.conn_params['database'])
|
|
|
|
def test_OnReconnect_ShowVersionIsUsedForPinging(self):
|
|
'''Because the pgbouncer console does not support SELECT queries,
|
|
we have to make use of another query to check if the console
|
|
connection is still alive.'''
|
|
cursor = StubCursor(('SHOW', [['the version']]))
|
|
conn = StubConnection(cursor)
|
|
stub_psycopg2 = StubPsycopg2(conn)
|
|
pg = PgBouncerConsoleConnection(
|
|
NodeConfig('bar'), Logger(), stub_psycopg2)
|
|
|
|
pg.connect()
|
|
result = pg.connect()
|
|
self.assertEqual('CONN_REUSED', result)
|
|
self.assertEqual('SHOW VERSION', cursor.query)
|
|
|
|
def test_WhenReloadFails_ExceptionIsRaised(self):
|
|
conn = StubConnection(psycopg2.OperationalError('borked reload'))
|
|
stub_psycopg2 = StubPsycopg2(conn)
|
|
pg = PgBouncerConsoleConnection(
|
|
NodeConfig(1001), Logger(), stub_psycopg2)
|
|
|
|
with self.assertRaises(ReloadingPgbouncerFailed) as context:
|
|
pg.reload()
|
|
self.assertIn('borked reload', str(context.exception))
|
|
|
|
def test_WhenReloadReturnsAnUnexpectedStatusMessage_ExceptionIsRaised(self):
|
|
cursor = StubCursor(('WEIRD', [[None]]))
|
|
conn = StubConnection(cursor)
|
|
stub_psycopg2 = StubPsycopg2(conn)
|
|
pg = PgBouncerConsoleConnection(
|
|
NodeConfig(1002), Logger(), stub_psycopg2)
|
|
|
|
with self.assertRaises(ReloadingPgbouncerFailed) as context:
|
|
pg.reload()
|
|
self.assertIn(
|
|
'Unexpected status message: WEIRD',
|
|
str(context.exception))
|
|
|
|
def test_WhenReloadIsSuccessful_NoExceptionIsRaised(self):
|
|
cursor = StubCursor(('RELOAD', [[None]]))
|
|
conn = StubConnection(cursor)
|
|
stub_psycopg2 = StubPsycopg2(conn)
|
|
pg = PgBouncerConsoleConnection(
|
|
NodeConfig(1002), Logger(), stub_psycopg2)
|
|
|
|
pg.reload()
|
|
self.assertEqual('RELOAD', cursor.query)
|
|
self.assertEqual('RELOAD', cursor.statusmessage)
|
|
|