diff --git a/pgbouncemgr/postgres.py b/pgbouncemgr/postgres.py index cda703d..3538878 100644 --- a/pgbouncemgr/postgres.py +++ b/pgbouncemgr/postgres.py @@ -240,8 +240,8 @@ class PgConnectionViaPgbouncer(PgConnection): # allows for doing some extra checks. def verify_connection(self): - """Check if the connection via pgbouncer ends up with the - configured node.""" + """This function checks if the connection via pgbouncer ends up at + the currently known primary node.""" # This is done in a somewhat convoluted way with a subprocess and a # timer. This is done, because a connection is made through # pgbouncer, and pgbouncer will try really hard to connect the user @@ -258,10 +258,7 @@ class PgConnectionViaPgbouncer(PgConnection): # we're going for HA here.""" def check_func(report_func): try: - # Setup the database connection self.connect() - - # Check if we're connected to the requested node. with self.conn.cursor() as cursor: try: cursor.execute(VERIFY_QUERY, { @@ -269,28 +266,47 @@ class PgConnectionViaPgbouncer(PgConnection): "port": self.node_config.port }) result = cursor.fetchone()[0] - self.disconnect() - - if result is not None: - return report_func( - False, - "The pgbouncer is not connected to the expected PostgreSQL " + - "backend service: %s" % result, - cursor) except Exception as exception: self.disconnect() - return report_func(False, format_ex(exception), cursor) + return report_func( + False, + "Error running query: %s %s" % + (repr(cursor), format_ex(exception)), + cursor) except Exception as exception: - return report_func(False, format_ex(exception), None) + self.disconnect() + return report_func( + False, + "Error connecting: %s" % format_ex(exception), + None) + self.disconnect() + + # When the verify query returned an error message, the + # verification failed. + if result is not None: + return report_func( + False, + "The pgbouncer is not connected to the expected " + + "PostgreSQL backend service: %s" % result, + cursor) # When the verify query did not return an error message, then we # are in the green. return report_func(True, None, cursor) - parent_conn, child_conn = multiprocessing.Pipe() def report_func(true_or_false, exception, cursor): + """This function is used to report back results from the + subprocess that runs the check_func.""" + # The cursor is included in the results for unit testing purposes. + # The cursor is modified, but since we're running in a subprocess, + # we wouldn't be able to see these modifications otherwise. child_conn.send([true_or_false, exception, cursor]) child_conn.close() + + # Start the subprocess and wait for it to finish within the + # configured connect timeout. When the connect timeout is not + # met, the subprocess will be killed off. + parent_conn, child_conn = multiprocessing.Pipe() proc = multiprocessing.Process(target=check_func, args=(report_func,)) proc.start() proc.join(self.node_config.connect_timeout) diff --git a/tests/stub_psycopg2.py b/tests/stub_psycopg2.py index fb6de10..e45d82c 100644 --- a/tests/stub_psycopg2.py +++ b/tests/stub_psycopg2.py @@ -75,7 +75,6 @@ class StubCursor(list): PostgreSQL server instance.""" def __init__(self, *results): super().__init__(results) - self.results = list(results) self.statusmessage = None self.rows = None self.query = None @@ -90,7 +89,7 @@ class StubCursor(list): def execute(self, query, params=()): self.query = query self.params = params - response = self.results.pop(0) + response = self.pop(0) if isinstance(response, Exception): raise response if callable(response): diff --git a/tests/test_postgres.py b/tests/test_postgres.py index 633f258..f42c631 100644 --- a/tests/test_postgres.py +++ b/tests/test_postgres.py @@ -1,5 +1,12 @@ # -*- coding: utf-8 -*- +"""The tests in this module all make use of a fully stubbed psycopg2 + connection to test the various PostgreSQL connection classes. + Using the stubs, all logic routes can be tested and we can spy on + the output of the various functions, without having to connect to + an actual PostgreSQL server.""" + + import unittest import psycopg2 import time @@ -13,27 +20,27 @@ class PgConnectionTests(unittest.TestCase): def test_GivenAuthenticationFailure_PgConnection_RaisesException(self): self._test_connect_exception( PgConnection, StubPsycopg2().add_auth_failure(), - PgConnectionFailed, "authentication failed") + PgConnectionFailed, 'authentication failed') def test_GivenConnectionTimeout_PgConnection_RaisesException(self): self._test_connect_exception( PgConnection, StubPsycopg2().add_conn_timeout(), - PgConnectionFailed, "timeout expired") + PgConnectionFailed, 'timeout expired') def test_GivenConnectionFailure_PgConnection_RaisesException(self): self._test_connect_exception( PgConnection, StubPsycopg2().add_conn_failure(), - PgConnectionFailed, "could not connect") + PgConnectionFailed, 'could not connect') def test_GivenPostgresStartingUp_PgConnection_RaisesException(self): self._test_connect_exception( PgConnection, StubPsycopg2().add_admin_startup(), - PgConnectionFailed, "system is starting up") + PgConnectionFailed, 'system is starting up') def test_GivenPostgresShuttingDown_PgConnection_RaisesException(self): self._test_connect_exception( PgConnection, StubPsycopg2().add_admin_shutdown(), - PgConnectionFailed, "AdminShutdown") + PgConnectionFailed, 'AdminShutdown') def _test_connect_exception(self, test_class, stub_psycopg2, err, msg): pg = test_class(NodeConfig(1), Logger(), stub_psycopg2) @@ -43,13 +50,13 @@ class PgConnectionTests(unittest.TestCase): def test_NodeConfig_IsAppliedToConnParams(self): node_config = NodeConfig(1) - node_config.host = "1.1.1.1" + node_config.host = '1.1.1.1' node_config.port = 9999 pg = PgConnection(node_config, Logger(), None) self.assertEqual(1, pg.node_id) - self.assertEqual("1.1.1.1", pg.conn_params["host"]) - self.assertEqual(9999, pg.conn_params["port"]) + self.assertEqual('1.1.1.1', pg.conn_params['host']) + self.assertEqual(9999, pg.conn_params['port']) def test_GivenNoneValueInNodeConfig_ValueIsOmittedInConnParams(self): node_config = NodeConfig(2) @@ -60,8 +67,8 @@ class PgConnectionTests(unittest.TestCase): pg.connect() self.assertEqual(2, pg.node_id) - self.assertNotIn("host", pg.conn_params) - self.assertNotIn("port", pg.conn_params) + self.assertNotIn('host', pg.conn_params) + self.assertNotIn('port', pg.conn_params) def test_FirstConnect_SetsUpConnection(self): stub_psycopg2 = StubPsycopg2().add_connection(StubConnection()) @@ -69,10 +76,10 @@ class PgConnectionTests(unittest.TestCase): result = pg.connect() - self.assertEqual("CONN_CONNECTED", result) + self.assertEqual('CONN_CONNECTED', result) def test_SecondConnect_PingsAndReusesConnection(self): - cursor = StubCursor(("SELECT", [[1]])) + cursor = StubCursor(('SELECT', [[1]])) conn = StubConnection(cursor) stub_psycopg2 = StubPsycopg2().add_connection(conn) pg = PgConnection(NodeConfig('b'), Logger(), stub_psycopg2) @@ -80,9 +87,9 @@ class PgConnectionTests(unittest.TestCase): result1 = pg.connect() self.assertIs(None, cursor.query) result2 = pg.connect() - self.assertEqual("SELECT 1", cursor.query) - self.assertEqual("CONN_CONNECTED", result1) - self.assertEqual("CONN_REUSED", result2) + self.assertEqual('SELECT 1', cursor.query) + self.assertEqual('CONN_CONNECTED', result1) + self.assertEqual('CONN_REUSED', result2) def test_SecondConnectPingFails_SetsUpNewConnection(self): conn1 = StubConnection(StubCursor(psycopg2.OperationalError())) @@ -93,8 +100,8 @@ class PgConnectionTests(unittest.TestCase): result1 = pg.connect() # Connection OK result2 = pg.connect() # Ping fails, reconnectt - self.assertEqual("CONN_CONNECTED", result1) - self.assertEqual("CONN_RECONNECTED", result2) + self.assertEqual('CONN_CONNECTED', result1) + self.assertEqual('CONN_RECONNECTED', result2) def test_Disconnect_ClosesConnection(self): conn = StubConnection() @@ -110,10 +117,10 @@ class PgConnectionTests(unittest.TestCase): conn1 = StubConnection( StubCursor(psycopg2.OperationalError())) conn2 = StubConnection( - StubCursor(("SELECT", [])), - StubCursor(("SELECT", []))) + StubCursor(('SELECT', [])), + StubCursor(('SELECT', []))) conn3 = StubConnection( - StubCursor(("SELECT", []))) + StubCursor(('SELECT', []))) stub_psycopg2 = StubPsycopg2(conn1, conn2, conn3) pg = PgConnection(NodeConfig('b'), Logger(), stub_psycopg2) @@ -126,12 +133,12 @@ class PgConnectionTests(unittest.TestCase): result5 = pg.connect() # Connection 3 OK result6 = pg.connect() # Ping success - self.assertEqual("CONN_CONNECTED", result1) - self.assertEqual("CONN_RECONNECTED", result2) - self.assertEqual("CONN_REUSED", result3) - self.assertEqual("CONN_REUSED", result4) - self.assertEqual("CONN_CONNECTED", result5) - self.assertEqual("CONN_REUSED", result6) + self.assertEqual('CONN_CONNECTED', result1) + self.assertEqual('CONN_RECONNECTED', result2) + self.assertEqual('CONN_REUSED', result3) + self.assertEqual('CONN_REUSED', result4) + self.assertEqual('CONN_CONNECTED', result5) + self.assertEqual('CONN_REUSED', result6) class PgReplicationConnectionTests(unittest.TestCase): @@ -139,157 +146,185 @@ class PgReplicationConnectionTests(unittest.TestCase): conn = StubConnection() stub_psycopg2 = StubPsycopg2().add_connection(conn) pg = PgReplicationConnection( - NodeConfig("foo"), Logger(), stub_psycopg2) + NodeConfig('foo'), Logger(), stub_psycopg2) pg.connect() self.assertEqual( psycopg2.extras.LogicalReplicationConnection, - pg.conn_params["connection_factory"]) + pg.conn_params['connection_factory']) def test_GivenFailingConnection_ReplicationStatusIsOffline(self): stub_psycopg2 = StubPsycopg2().add_auth_failure() pg = PgReplicationConnection( - NodeConfig("foo"), Logger(), stub_psycopg2) + NodeConfig('foo'), Logger(), stub_psycopg2) status = pg.get_replication_status() self.assertEqual({ - "status": "NODE_OFFLINE", - "system_id": None, - "timeline_id": None}, status) + 'status': 'NODE_OFFLINE', + 'system_id': None, + 'timeline_id': None}, status) def test_GivenFailingStandbyQuery_ReplicationStatusRaisesException(self): conn = StubConnection(StubCursor(psycopg2.OperationalError())) pg = PgReplicationConnection( - NodeConfig("foo"), Logger(), StubPsycopg2(conn)) + NodeConfig('foo'), Logger(), StubPsycopg2(conn)) with self.assertRaises(RetrievingPgReplicationStatusFailed) as context: pg.get_replication_status() - self.assertIn("pg_is_in_recovery() failed", str(context.exception)) + self.assertIn('pg_is_in_recovery() failed', str(context.exception)) def test_GivenFailingIdentifySystemQuery_ReplicationStatusRaisesException(self): conn = StubConnection( - StubCursor(("SELECT", [[False]])), + StubCursor(('SELECT', [[False]])), psycopg2.OperationalError()) pg = PgReplicationConnection( - NodeConfig("foo"), Logger(), StubPsycopg2(conn)) + NodeConfig('foo'), Logger(), StubPsycopg2(conn)) with self.assertRaises(RetrievingPgReplicationStatusFailed) as context: pg.get_replication_status() - self.assertIn("IDENTIFY_SYSTEM failed", str(context.exception)) + self.assertIn('IDENTIFY_SYSTEM failed', str(context.exception)) def test_GivenConnectionToPrimaryNode_ReplicationStatusIsPrimary(self): conn = StubConnection( - StubCursor(("SELECT", [[False]])), - StubCursor(("IDENTIFY_SYSTEM", [["id", 1234, "other", "fields"]]))) + StubCursor(('SELECT', [[False]])), + StubCursor(('IDENTIFY_SYSTEM', [['id', 1234, 'other', 'fields']]))) stub_psycopg2 = StubPsycopg2(conn) pg = PgReplicationConnection( - NodeConfig("foo"), Logger(), stub_psycopg2) + NodeConfig('foo'), Logger(), stub_psycopg2) status = pg.get_replication_status() self.assertEqual({ - "status": "NODE_PRIMARY", - "system_id": "id", - "timeline_id": 1234}, status) + 'status': 'NODE_PRIMARY', + 'system_id': 'id', + 'timeline_id': 1234}, status) def test_GivenConnectionToStandbyNode_ReplicationStatusIsStandby(self): conn = StubConnection( - StubCursor(("SELECT", [[True]])), - StubCursor(("IDENTIFY_SYSTEM", [["towel", 42, "other", "fields"]]))) + StubCursor(('SELECT', [[True]])), + StubCursor(('IDENTIFY_SYSTEM', [['towel', 42, 'other', 'fields']]))) stub_psycopg2 = StubPsycopg2(conn) pg = PgReplicationConnection( - NodeConfig("foo"), Logger(), stub_psycopg2) + NodeConfig('foo'), Logger(), stub_psycopg2) status = pg.get_replication_status() self.assertEqual({ - "status": "NODE_STANDBY", - "system_id": "towel", - "timeline_id": 42}, status) + 'status': 'NODE_STANDBY', + 'system_id': 'towel', + 'timeline_id': 42}, status) class PgConnectionViaPgbouncerTests(unittest.TestCase): def test_NodeConfigAndPgBouncerConfig_AreMergedInConnParams(self): node_config = NodeConfig(777) - node_config.host = "1.1.1.1" + node_config.host = '1.1.1.1' node_config.port = 9999 - pgbouncer_config = {"host": "192.168.0.1", "port": 7654} + pgbouncer_config = {'host': '192.168.0.1', 'port': 7654} pg = PgConnectionViaPgbouncer( node_config, pgbouncer_config, Logger(), None) self.assertEqual(777, pg.node_id) - self.assertEqual("template1", pg.conn_params["database"]) - self.assertEqual("192.168.0.1", pg.conn_params["host"]) - self.assertEqual(7654, pg.conn_params["port"]) + self.assertEqual('template1', pg.conn_params['database']) + self.assertEqual('192.168.0.1', pg.conn_params['host']) + self.assertEqual(7654, pg.conn_params['port']) - def test_WhenVerifyQueryCannotConnect_ExceptionIsRaised(self): + def test_WhenVerifyQueryCannotConnect_ErrorIsReported(self): stub_psycopg2 = StubPsycopg2().add_auth_failure() node_config = NodeConfig('xyz123') - pgbouncer_config = {"host": "192.168.0.1", "port": 7654} + pgbouncer_config = {'host': '192.168.0.1', 'port': 7654} pg = PgConnectionViaPgbouncer( node_config, pgbouncer_config, Logger(), stub_psycopg2) ok, err, cursor = pg.verify_connection() self.assertFalse(ok) self.assertIs(None, cursor) - self.assertIn("password authentication failed", str(err)) + self.assertIn('password authentication failed', str(err)) - def test_WhenVerifyQueryTimesOut_ExceptionIsRaised(self): + def test_WhenVerifyQueryCannotCreateCursor_ErrorIsReported(self): + conn = StubConnection(psycopg2.OperationalError('borked cursor')) + stub_psycopg2 = StubPsycopg2(conn) + node_config = NodeConfig('xyz123') + pgbouncer_config = {'host': '192.168.0.1', 'port': 7654} + pg = PgConnectionViaPgbouncer( + node_config, pgbouncer_config, Logger(), stub_psycopg2) + + ok, err, cursor = pg.verify_connection() + self.assertFalse(ok) + self.assertIs(None, cursor) + self.assertIn('Error connecting', str(err)) + self.assertIn('borked cursor', str(err)) + + def test_WhenVerifyQueryFails_ErrorIsReported(self): + cursor = StubCursor(psycopg2.OperationalError('borked query')) + conn = StubConnection(cursor) + stub_psycopg2 = StubPsycopg2(conn) + node_config = NodeConfig('xyz123') + pgbouncer_config = {'host': '192.168.0.1', 'port': 7654} + pg = PgConnectionViaPgbouncer( + node_config, pgbouncer_config, Logger(), stub_psycopg2) + + ok, err, cursor = pg.verify_connection() + self.assertFalse(ok) + self.assertIn('Error running query', str(err)) + self.assertIn('borked query', str(err)) + + def test_WhenVerifyQueryTimesOut_ErrorIsReported(self): def sleeping_query(query): time.sleep(1) conn = StubConnection(StubCursor(sleeping_query)) stub_psycopg2 = StubPsycopg2(conn) node_config = NodeConfig('xyz123') node_config.connect_timeout = 0.01 - pgbouncer_config = {"host": "192.168.0.1", "port": 7654} + pgbouncer_config = {'host': '192.168.0.1', 'port': 7654} pg = PgConnectionViaPgbouncer( node_config, pgbouncer_config, Logger(), stub_psycopg2) ok, err, cursor = pg.verify_connection() self.assertFalse(ok) self.assertIs(None, cursor) - self.assertIn("Connection attempt timed out", str(err)) + self.assertIn('Connection attempt timed out', str(err)) def test_WhenVerifyQueryRuns_NodeHostAndPortAreQueried(self): conn = StubConnection(StubCursor(('SELECT', [[None]]))) stub_psycopg2 = StubPsycopg2(conn) node_config = NodeConfig('uh') - node_config.host = "111.222.111.222" + node_config.host = '111.222.111.222' node_config.port = 1122 - pgbouncer_config = {"host": "192.168.0.1", "port": 7654} + pgbouncer_config = {'host': '192.168.0.1', 'port': 7654} pg = PgConnectionViaPgbouncer( node_config, pgbouncer_config, Logger(), stub_psycopg2) ok, err, cursor = pg.verify_connection() - self.assertIn("inet_server_addr()", cursor.query) - self.assertIn("%(host)s", cursor.query) - self.assertIn("inet_server_port()", cursor.query) - self.assertIn("%(port)s", cursor.query) + self.assertIn('inet_server_addr()', cursor.query) + self.assertIn('%(host)s', cursor.query) + self.assertIn('inet_server_port()', cursor.query) + self.assertIn('%(port)s', cursor.query) self.assertEqual({ - "host": "111.222.111.222", - "port": 1122 + 'host': '111.222.111.222', + 'port': 1122 }, cursor.params) - def test_WhenVerifyQueryReturnsIssue_IssueIsReported(self): + def test_WhenVerifyQueryReturnsIssue_ErrorIsReported(self): cursor = StubCursor(('SELECT', [['Bokito has escaped']])) conn = StubConnection(cursor) stub_psycopg2 = StubPsycopg2(conn) node_config = NodeConfig('xyz123') - node_config.host = "111.222.111.222" + node_config.host = '111.222.111.222' node_config.port = 1122 - pgbouncer_config = {"host": "192.168.0.1", "port": 7654} + pgbouncer_config = {'host': '192.168.0.1', 'port': 7654} pg = PgConnectionViaPgbouncer( node_config, pgbouncer_config, Logger(), stub_psycopg2) ok, err, _ = pg.verify_connection() self.assertFalse(ok) - self.assertIn("not connected to the expected PostgreSQL backend", err) - self.assertIn("Bokito has escaped", err) + self.assertIn('not connected to the expected PostgreSQL backend', err) + self.assertIn('Bokito has escaped', err) def test_WhenVerifyQueryReturnsNoIssue_OkIsReported(self): conn = StubConnection(StubCursor(('SELECT', [[None]]))) stub_psycopg2 = StubPsycopg2(conn) node_config = NodeConfig('xyz123') - pgbouncer_config = {"host": "192.168.0.1", "port": 7654} + pgbouncer_config = {'host': '192.168.0.1', 'port': 7654} pg = PgConnectionViaPgbouncer( node_config, pgbouncer_config, Logger(), stub_psycopg2) @@ -297,13 +332,69 @@ class PgConnectionViaPgbouncerTests(unittest.TestCase): self.assertTrue(ok) self.assertIs(None, err) + class PgBouncerConsoleConnectionTests(unittest.TestCase): - def test_OnConnect_AutoCommitIsEnabled(self): + def test_OnConnect_DatabaseIsPgbouncerAndAutoCommitIsEnabled(self): conn = StubConnection() stub_psycopg2 = StubPsycopg2().add_connection(conn) pg = PgBouncerConsoleConnection( - NodeConfig("bob"), Logger(), stub_psycopg2) + NodeConfig('bob'), Logger(), stub_psycopg2) + # Autocommit must be enabled, because the console does + # not support transactions. self.assertFalse(conn.autocommit) pg.connect() self.assertTrue(conn.autocommit) + + # To connect to the console, the database 'pgbouncer' must be used. + self.assertEqual('pgbouncer', pg.conn_params['database']) + + def test_OnReconnect_ShowVersionIsUsedForPinging(self): + '''Because the pgbouncer console does not support SELECT queries, + we have to make use of another query to check if the console + connection is still alive.''' + cursor = StubCursor(('SHOW', [['the version']])) + conn = StubConnection(cursor) + stub_psycopg2 = StubPsycopg2(conn) + pg = PgBouncerConsoleConnection( + NodeConfig('bar'), Logger(), stub_psycopg2) + + pg.connect() + result = pg.connect() + self.assertEqual('CONN_REUSED', result) + self.assertEqual('SHOW VERSION', cursor.query) + + def test_WhenReloadFails_ExceptionIsRaised(self): + conn = StubConnection(psycopg2.OperationalError('borked reload')) + stub_psycopg2 = StubPsycopg2(conn) + pg = PgBouncerConsoleConnection( + NodeConfig(1001), Logger(), stub_psycopg2) + + with self.assertRaises(ReloadingPgbouncerFailed) as context: + pg.reload() + self.assertIn('borked reload', str(context.exception)) + + def test_WhenReloadReturnsAnUnexpectedStatusMessage_ExceptionIsRaised(self): + cursor = StubCursor(('WEIRD', [[None]])) + conn = StubConnection(cursor) + stub_psycopg2 = StubPsycopg2(conn) + pg = PgBouncerConsoleConnection( + NodeConfig(1002), Logger(), stub_psycopg2) + + with self.assertRaises(ReloadingPgbouncerFailed) as context: + pg.reload() + self.assertIn( + 'Unexpected status message: WEIRD', + str(context.exception)) + + def test_WhenReloadIsSuccessful_NoExceptionIsRaised(self): + cursor = StubCursor(('RELOAD', [[None]])) + conn = StubConnection(cursor) + stub_psycopg2 = StubPsycopg2(conn) + pg = PgBouncerConsoleConnection( + NodeConfig(1002), Logger(), stub_psycopg2) + + pg.reload() + self.assertEqual('RELOAD', cursor.query) + self.assertEqual('RELOAD', cursor.statusmessage) +