# -*- coding: utf-8 -*- """The tests in this module all make use of a fully stubbed psycopg2 connection to test the various PostgreSQL connection classes. Using the stubs, all logic routes can be tested and we can spy on the output of the various functions, without having to connect to an actual PostgreSQL server.""" import unittest import psycopg2 import time from pgbouncemgr.postgres import * from pgbouncemgr.logger import Logger from pgbouncemgr.node_config import NodeConfig from tests.stub_psycopg2 import * class PgConnectionTests(unittest.TestCase): def test_GivenAuthenticationFailure_PgConnection_RaisesException(self): self._test_connect_exception( PgConnection, StubPsycopg2().add_auth_failure(), PgConnectionFailed, 'authentication failed') def test_GivenConnectionTimeout_PgConnection_RaisesException(self): self._test_connect_exception( PgConnection, StubPsycopg2().add_conn_timeout(), PgConnectionFailed, 'timeout expired') def test_GivenConnectionFailure_PgConnection_RaisesException(self): self._test_connect_exception( PgConnection, StubPsycopg2().add_conn_failure(), PgConnectionFailed, 'could not connect') def test_GivenPostgresStartingUp_PgConnection_RaisesException(self): self._test_connect_exception( PgConnection, StubPsycopg2().add_admin_startup(), PgConnectionFailed, 'system is starting up') def test_GivenPostgresShuttingDown_PgConnection_RaisesException(self): self._test_connect_exception( PgConnection, StubPsycopg2().add_admin_shutdown(), PgConnectionFailed, 'AdminShutdown') def _test_connect_exception(self, test_class, stub_psycopg2, err, msg): pg = test_class(NodeConfig(1), Logger(), stub_psycopg2) with self.assertRaises(err) as context: pg.connect() self.assertIn(msg, str(context.exception)) def test_NodeConfig_IsAppliedToConnParams(self): node_config = NodeConfig(1) node_config.host = '1.1.1.1' node_config.port = 9999 pg = PgConnection(node_config, Logger(), None) self.assertEqual(1, pg.node_id) self.assertEqual('1.1.1.1', pg.conn_params['host']) self.assertEqual(9999, pg.conn_params['port']) def test_GivenNoneValueInNodeConfig_ValueIsOmittedInConnParams(self): node_config = NodeConfig(2) node_config.host = None node_config.port = None stub_psycopg2 = StubPsycopg2().add_connection(StubConnection()) pg = PgConnection(node_config, Logger(), stub_psycopg2) pg.connect() self.assertEqual(2, pg.node_id) self.assertNotIn('host', pg.conn_params) self.assertNotIn('port', pg.conn_params) def test_FirstConnect_SetsUpConnection(self): stub_psycopg2 = StubPsycopg2().add_connection(StubConnection()) pg = PgConnection(NodeConfig('a'), Logger(), stub_psycopg2) result = pg.connect() self.assertEqual('CONN_CONNECTED', result) def test_SecondConnect_PingsAndReusesConnection(self): cursor = StubCursor(('SELECT', [[1]])) conn = StubConnection(cursor) stub_psycopg2 = StubPsycopg2().add_connection(conn) pg = PgConnection(NodeConfig('b'), Logger(), stub_psycopg2) result1 = pg.connect() self.assertIs(None, cursor.query) result2 = pg.connect() self.assertEqual('SELECT 1', cursor.query) self.assertEqual('CONN_CONNECTED', result1) self.assertEqual('CONN_REUSED', result2) def test_SecondConnectPingFails_SetsUpNewConnection(self): conn1 = StubConnection(StubCursor(psycopg2.OperationalError())) conn2 = StubConnection() stub_psycopg2 = StubPsycopg2(conn1, conn2) pg = PgConnection(NodeConfig('b'), Logger(), stub_psycopg2) result1 = pg.connect() # Connection OK result2 = pg.connect() # Ping fails, reconnectt self.assertEqual('CONN_CONNECTED', result1) self.assertEqual('CONN_RECONNECTED', result2) def test_Disconnect_ClosesConnection(self): conn = StubConnection() stub_psycopg2 = StubPsycopg2(conn) pg = PgConnection(NodeConfig('disco'), Logger(), stub_psycopg2) pg.connect() self.assertTrue(conn.connected) pg.disconnect() self.assertFalse(conn.connected) def test_BigConnectionFlow(self): conn1 = StubConnection( StubCursor(psycopg2.OperationalError())) conn2 = StubConnection( StubCursor(('SELECT', [])), StubCursor(('SELECT', []))) conn3 = StubConnection( StubCursor(('SELECT', []))) stub_psycopg2 = StubPsycopg2(conn1, conn2, conn3) pg = PgConnection(NodeConfig('b'), Logger(), stub_psycopg2) result1 = pg.connect() # Connection 1 OK result2 = pg.connect() # Ping fails, new connection 2 OK result3 = pg.connect() # Ping 1 success result4 = pg.connect() # Ping 2 success pg.disconnect() # Explicit disconnect result5 = pg.connect() # Connection 3 OK result6 = pg.connect() # Ping success self.assertEqual('CONN_CONNECTED', result1) self.assertEqual('CONN_RECONNECTED', result2) self.assertEqual('CONN_REUSED', result3) self.assertEqual('CONN_REUSED', result4) self.assertEqual('CONN_CONNECTED', result5) self.assertEqual('CONN_REUSED', result6) class PgReplicationConnectionTests(unittest.TestCase): def test_LogicalReplicationConnection_IsUsed(self): conn = StubConnection() stub_psycopg2 = StubPsycopg2().add_connection(conn) pg = PgReplicationConnection( NodeConfig('foo'), Logger(), stub_psycopg2) pg.connect() self.assertEqual( psycopg2.extras.LogicalReplicationConnection, pg.conn_params['connection_factory']) def test_GivenFailingConnection_ReplicationStatusIsOffline(self): stub_psycopg2 = StubPsycopg2().add_auth_failure() pg = PgReplicationConnection( NodeConfig('foo'), Logger(), stub_psycopg2) status = pg.get_replication_status() self.assertEqual({ 'status': 'NODE_OFFLINE', 'system_id': None, 'timeline_id': None}, status) def test_GivenFailingStandbyQuery_ReplicationStatusRaisesException(self): conn = StubConnection(StubCursor(psycopg2.OperationalError())) pg = PgReplicationConnection( NodeConfig('foo'), Logger(), StubPsycopg2(conn)) with self.assertRaises(RetrievingPgReplicationStatusFailed) as context: pg.get_replication_status() self.assertIn('pg_is_in_recovery() failed', str(context.exception)) def test_GivenFailingIdentifySystemQuery_ReplicationStatusRaisesException(self): conn = StubConnection( StubCursor(('SELECT', [[False]])), psycopg2.OperationalError()) pg = PgReplicationConnection( NodeConfig('foo'), Logger(), StubPsycopg2(conn)) with self.assertRaises(RetrievingPgReplicationStatusFailed) as context: pg.get_replication_status() self.assertIn('IDENTIFY_SYSTEM failed', str(context.exception)) def test_GivenConnectionToPrimaryNode_ReplicationStatusIsPrimary(self): conn = StubConnection( StubCursor(('SELECT', [[False]])), StubCursor(('IDENTIFY_SYSTEM', [['id', 1234, 'other', 'fields']]))) stub_psycopg2 = StubPsycopg2(conn) pg = PgReplicationConnection( NodeConfig('foo'), Logger(), stub_psycopg2) status = pg.get_replication_status() self.assertEqual({ 'status': 'NODE_PRIMARY', 'system_id': 'id', 'timeline_id': 1234}, status) def test_GivenConnectionToStandbyNode_ReplicationStatusIsStandby(self): conn = StubConnection( StubCursor(('SELECT', [[True]])), StubCursor(('IDENTIFY_SYSTEM', [['towel', 42, 'other', 'fields']]))) stub_psycopg2 = StubPsycopg2(conn) pg = PgReplicationConnection( NodeConfig('foo'), Logger(), stub_psycopg2) status = pg.get_replication_status() self.assertEqual({ 'status': 'NODE_STANDBY', 'system_id': 'towel', 'timeline_id': 42}, status) class PgConnectionViaPgbouncerTests(unittest.TestCase): def test_NodeConfigAndPgBouncerConfig_AreMergedInConnParams(self): node_config = NodeConfig(777) node_config.host = '1.1.1.1' node_config.port = 9999 pgbouncer_config = {'host': '192.168.0.1', 'port': 7654} pg = PgConnectionViaPgbouncer( node_config, pgbouncer_config, Logger(), None) self.assertEqual(777, pg.node_id) self.assertEqual('template1', pg.conn_params['database']) self.assertEqual('192.168.0.1', pg.conn_params['host']) self.assertEqual(7654, pg.conn_params['port']) def test_WhenVerifyQueryCannotConnect_ErrorIsReported(self): stub_psycopg2 = StubPsycopg2().add_auth_failure() node_config = NodeConfig('xyz123') pgbouncer_config = {'host': '192.168.0.1', 'port': 7654} pg = PgConnectionViaPgbouncer( node_config, pgbouncer_config, Logger(), stub_psycopg2) ok, err, cursor = pg.verify_connection() self.assertFalse(ok) self.assertIs(None, cursor) self.assertIn('password authentication failed', str(err)) def test_WhenVerifyQueryCannotCreateCursor_ErrorIsReported(self): conn = StubConnection(psycopg2.OperationalError('borked cursor')) stub_psycopg2 = StubPsycopg2(conn) node_config = NodeConfig('xyz123') pgbouncer_config = {'host': '192.168.0.1', 'port': 7654} pg = PgConnectionViaPgbouncer( node_config, pgbouncer_config, Logger(), stub_psycopg2) ok, err, cursor = pg.verify_connection() self.assertFalse(ok) self.assertIs(None, cursor) self.assertIn('Error connecting', str(err)) self.assertIn('borked cursor', str(err)) def test_WhenVerifyQueryFails_ErrorIsReported(self): cursor = StubCursor(psycopg2.OperationalError('borked query')) conn = StubConnection(cursor) stub_psycopg2 = StubPsycopg2(conn) node_config = NodeConfig('xyz123') pgbouncer_config = {'host': '192.168.0.1', 'port': 7654} pg = PgConnectionViaPgbouncer( node_config, pgbouncer_config, Logger(), stub_psycopg2) ok, err, cursor = pg.verify_connection() self.assertFalse(ok) self.assertIn('Error running query', str(err)) self.assertIn('borked query', str(err)) def test_WhenVerifyQueryTimesOut_ErrorIsReported(self): def sleeping_query(query): time.sleep(1) conn = StubConnection(StubCursor(sleeping_query)) stub_psycopg2 = StubPsycopg2(conn) node_config = NodeConfig('xyz123') node_config.connect_timeout = 0.01 pgbouncer_config = {'host': '192.168.0.1', 'port': 7654} pg = PgConnectionViaPgbouncer( node_config, pgbouncer_config, Logger(), stub_psycopg2) ok, err, cursor = pg.verify_connection() self.assertFalse(ok) self.assertIs(None, cursor) self.assertIn('Connection attempt timed out', str(err)) def test_WhenVerifyQueryRuns_NodeHostAndPortAreQueried(self): conn = StubConnection(StubCursor(('SELECT', [[None]]))) stub_psycopg2 = StubPsycopg2(conn) node_config = NodeConfig('uh') node_config.host = '111.222.111.222' node_config.port = 1122 pgbouncer_config = {'host': '192.168.0.1', 'port': 7654} pg = PgConnectionViaPgbouncer( node_config, pgbouncer_config, Logger(), stub_psycopg2) ok, err, cursor = pg.verify_connection() self.assertIn('inet_server_addr()', cursor.query) self.assertIn('%(host)s', cursor.query) self.assertIn('inet_server_port()', cursor.query) self.assertIn('%(port)s', cursor.query) self.assertEqual({ 'host': '111.222.111.222', 'port': 1122 }, cursor.params) def test_WhenVerifyQueryReturnsIssue_ErrorIsReported(self): cursor = StubCursor(('SELECT', [['Bokito has escaped']])) conn = StubConnection(cursor) stub_psycopg2 = StubPsycopg2(conn) node_config = NodeConfig('xyz123') node_config.host = '111.222.111.222' node_config.port = 1122 pgbouncer_config = {'host': '192.168.0.1', 'port': 7654} pg = PgConnectionViaPgbouncer( node_config, pgbouncer_config, Logger(), stub_psycopg2) ok, err, _ = pg.verify_connection() self.assertFalse(ok) self.assertIn('not connected to the expected PostgreSQL backend', err) self.assertIn('Bokito has escaped', err) def test_WhenVerifyQueryReturnsNoIssue_OkIsReported(self): conn = StubConnection(StubCursor(('SELECT', [[None]]))) stub_psycopg2 = StubPsycopg2(conn) node_config = NodeConfig('xyz123') pgbouncer_config = {'host': '192.168.0.1', 'port': 7654} pg = PgConnectionViaPgbouncer( node_config, pgbouncer_config, Logger(), stub_psycopg2) ok, err, _ = pg.verify_connection() self.assertTrue(ok) self.assertIs(None, err) class PgBouncerConsoleConnectionTests(unittest.TestCase): def test_OnConnect_DatabaseIsPgbouncerAndAutoCommitIsEnabled(self): conn = StubConnection() stub_psycopg2 = StubPsycopg2().add_connection(conn) pg = PgBouncerConsoleConnection( NodeConfig('bob'), Logger(), stub_psycopg2) # Autocommit must be enabled, because the console does # not support transactions. self.assertFalse(conn.autocommit) pg.connect() self.assertTrue(conn.autocommit) # To connect to the console, the database 'pgbouncer' must be used. self.assertEqual('pgbouncer', pg.conn_params['database']) def test_OnReconnect_ShowVersionIsUsedForPinging(self): '''Because the pgbouncer console does not support SELECT queries, we have to make use of another query to check if the console connection is still alive.''' cursor = StubCursor(('SHOW', [['the version']])) conn = StubConnection(cursor) stub_psycopg2 = StubPsycopg2(conn) pg = PgBouncerConsoleConnection( NodeConfig('bar'), Logger(), stub_psycopg2) pg.connect() result = pg.connect() self.assertEqual('CONN_REUSED', result) self.assertEqual('SHOW VERSION', cursor.query) def test_WhenReloadFails_ExceptionIsRaised(self): conn = StubConnection(psycopg2.OperationalError('borked reload')) stub_psycopg2 = StubPsycopg2(conn) pg = PgBouncerConsoleConnection( NodeConfig(1001), Logger(), stub_psycopg2) with self.assertRaises(ReloadingPgbouncerFailed) as context: pg.reload() self.assertIn('borked reload', str(context.exception)) def test_WhenReloadReturnsAnUnexpectedStatusMessage_ExceptionIsRaised(self): cursor = StubCursor(('WEIRD', [[None]])) conn = StubConnection(cursor) stub_psycopg2 = StubPsycopg2(conn) pg = PgBouncerConsoleConnection( NodeConfig(1002), Logger(), stub_psycopg2) with self.assertRaises(ReloadingPgbouncerFailed) as context: pg.reload() self.assertIn( 'Unexpected status message: WEIRD', str(context.exception)) def test_WhenReloadIsSuccessful_NoExceptionIsRaised(self): cursor = StubCursor(('RELOAD', [[None]])) conn = StubConnection(cursor) stub_psycopg2 = StubPsycopg2(conn) pg = PgBouncerConsoleConnection( NodeConfig(1002), Logger(), stub_psycopg2) pg.reload() self.assertEqual('RELOAD', cursor.query) self.assertEqual('RELOAD', cursor.statusmessage)