From 475e20a588def3507a0d82809929c4e21078b634 Mon Sep 17 00:00:00 2001 From: Maurice Makaay Date: Fri, 20 Dec 2019 09:41:56 +0100 Subject: [PATCH] Backup work in progress on postgres tests. --- .gitignore | 2 +- pgbouncemgr/postgres.py | 67 +++++++++++++++------------------- tests/stub_psycopg2.py | 6 ++- tests/test_postgres.py | 81 ++++++++++++++++++++++++++++++++++++----- 4 files changed, 107 insertions(+), 49 deletions(-) diff --git a/.gitignore b/.gitignore index fffc64d..3975598 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ __pycache__ -*.swp +*.sw? diff --git a/pgbouncemgr/postgres.py b/pgbouncemgr/postgres.py index 94deb49..cda703d 100644 --- a/pgbouncemgr/postgres.py +++ b/pgbouncemgr/postgres.py @@ -18,9 +18,8 @@ class PgException(Exception): class PgConnectionFailed(PgException): """Raised when connecting to the database server failed.""" - def __init__(self, exception): - super().__init__( - "Could not connect to node: %s" % format_ex(exception)) + def __init__(self, msg): + super().__init__("Could not connect to node: %s" % msg) class RetrievingPgReplicationStatusFailed(PgException): """Raised when the replication status cannot be determined.""" @@ -28,15 +27,6 @@ class RetrievingPgReplicationStatusFailed(PgException): class ReloadingPgbouncerFailed(PgException): """Raised when reloading the pgbouncer configuration fails.""" -class ConnectedToWrongBackend(PgException): - """Raised when the pgbouncer instance is not connected to the - correct PostgreSQL backend service.""" - def __init__(self, msg): - super().__init__( - "The pgbouncer is not connected to the expected PostgreSQL " + - "backend service: %s" % msg) - - # The properties that can be used in a configuration object to # define connection parameters for psycopg2. CONNECTION_PARAMS = [ @@ -127,7 +117,7 @@ class PgConnection(): except psycopg2.OperationalError as exception: self.disconnect() self.log.error("connection failed: %s" % format_ex(exception)) - raise PgConnectionFailed(exception) + raise PgConnectionFailed(format_ex(exception)) def disconnect(self): """Disconnect from the database server.""" @@ -267,35 +257,39 @@ class PgConnectionViaPgbouncer(PgConnection): # right away. We must be prepared for the odd case out though, since # we're going for HA here.""" def check_func(report_func): - # Setup the database connection try: + # Setup the database connection self.connect() + + # Check if we're connected to the requested node. + with self.conn.cursor() as cursor: + try: + cursor.execute(VERIFY_QUERY, { + "host": self.node_config.host, + "port": self.node_config.port + }) + result = cursor.fetchone()[0] + self.disconnect() + + if result is not None: + return report_func( + False, + "The pgbouncer is not connected to the expected PostgreSQL " + + "backend service: %s" % result, + cursor) + except Exception as exception: + self.disconnect() + return report_func(False, format_ex(exception), cursor) except Exception as exception: - return report_func(False, exception) - - # Check if we're connected to the requested node. - with self.conn.cursor() as cursor: - try: - cursor.execute(VERIFY_QUERY, { - "host": self.node_config.host, - "port": self.node_config.port - }) - result = cursor.fetchone()[0] - self.disconnect() - - if result is not None: - raise ConnectedToWrongBackend(result) - except Exception as exception: - self.disconnect() - return report_func(False, exception) + return report_func(False, format_ex(exception), None) # When the verify query did not return an error message, then we # are in the green. - return report_func(True, None) + return report_func(True, None, cursor) parent_conn, child_conn = multiprocessing.Pipe() - def report_func(true_or_false, exception): - child_conn.send([true_or_false, exception]) + def report_func(true_or_false, exception, cursor): + child_conn.send([true_or_false, exception, cursor]) child_conn.close() proc = multiprocessing.Process(target=check_func, args=(report_func,)) proc.start() @@ -303,12 +297,9 @@ class PgConnectionViaPgbouncer(PgConnection): if proc.is_alive(): proc.terminate() proc.join() - return (False, PgConnectionFailed("Connection attempt timed out")) + return (False, "Connection attempt timed out", None) result = parent_conn.recv() proc.join() - ##### DEBUG - raise Exception(repr(result)) - ##### /DEBUG return result diff --git a/tests/stub_psycopg2.py b/tests/stub_psycopg2.py index dbd15ac..fb6de10 100644 --- a/tests/stub_psycopg2.py +++ b/tests/stub_psycopg2.py @@ -79,6 +79,7 @@ class StubCursor(list): self.statusmessage = None self.rows = None self.query = None + self.params = None def __enter__(self): return self @@ -86,11 +87,14 @@ class StubCursor(list): def __exit__(self, a, b, c): pass - def execute(self, query): + def execute(self, query, params=()): self.query = query + self.params = params response = self.results.pop(0) if isinstance(response, Exception): raise response + if callable(response): + return response(query) status, rows = response self.statusmessage = status self.rows = rows if rows is not None else [] diff --git a/tests/test_postgres.py b/tests/test_postgres.py index 285511e..633f258 100644 --- a/tests/test_postgres.py +++ b/tests/test_postgres.py @@ -2,6 +2,7 @@ import unittest import psycopg2 +import time from pgbouncemgr.postgres import * from pgbouncemgr.logger import Logger from pgbouncemgr.node_config import NodeConfig @@ -206,7 +207,7 @@ class PgReplicationConnectionTests(unittest.TestCase): class PgConnectionViaPgbouncerTests(unittest.TestCase): - def test_NodeConfig_AndPgBouncerConfig_AreAppliedToConnParams(self): + def test_NodeConfigAndPgBouncerConfig_AreMergedInConnParams(self): node_config = NodeConfig(777) node_config.host = "1.1.1.1" node_config.port = 9999 @@ -220,19 +221,81 @@ class PgConnectionViaPgbouncerTests(unittest.TestCase): self.assertEqual("192.168.0.1", pg.conn_params["host"]) self.assertEqual(7654, pg.conn_params["port"]) - def test_WhenVerifyQueryTimesOut_ExceptionIsRaised(self): - def sleeping_query(): - time.sleep(2) - conn = StubConnection(StubCursor(sleeping_query)) - stub_psycopg2 = StubPsycopg2(conn) + def test_WhenVerifyQueryCannotConnect_ExceptionIsRaised(self): + stub_psycopg2 = StubPsycopg2().add_auth_failure() node_config = NodeConfig('xyz123') - node_config.connect_timeout = 1 pgbouncer_config = {"host": "192.168.0.1", "port": 7654} pg = PgConnectionViaPgbouncer( node_config, pgbouncer_config, Logger(), stub_psycopg2) - pg.verify_connection() - ##### TODO + ok, err, cursor = pg.verify_connection() + self.assertFalse(ok) + self.assertIs(None, cursor) + self.assertIn("password authentication failed", str(err)) + + def test_WhenVerifyQueryTimesOut_ExceptionIsRaised(self): + def sleeping_query(query): + time.sleep(1) + conn = StubConnection(StubCursor(sleeping_query)) + stub_psycopg2 = StubPsycopg2(conn) + node_config = NodeConfig('xyz123') + node_config.connect_timeout = 0.01 + pgbouncer_config = {"host": "192.168.0.1", "port": 7654} + pg = PgConnectionViaPgbouncer( + node_config, pgbouncer_config, Logger(), stub_psycopg2) + + ok, err, cursor = pg.verify_connection() + self.assertFalse(ok) + self.assertIs(None, cursor) + self.assertIn("Connection attempt timed out", str(err)) + + def test_WhenVerifyQueryRuns_NodeHostAndPortAreQueried(self): + conn = StubConnection(StubCursor(('SELECT', [[None]]))) + stub_psycopg2 = StubPsycopg2(conn) + node_config = NodeConfig('uh') + node_config.host = "111.222.111.222" + node_config.port = 1122 + pgbouncer_config = {"host": "192.168.0.1", "port": 7654} + pg = PgConnectionViaPgbouncer( + node_config, pgbouncer_config, Logger(), stub_psycopg2) + + ok, err, cursor = pg.verify_connection() + self.assertIn("inet_server_addr()", cursor.query) + self.assertIn("%(host)s", cursor.query) + self.assertIn("inet_server_port()", cursor.query) + self.assertIn("%(port)s", cursor.query) + self.assertEqual({ + "host": "111.222.111.222", + "port": 1122 + }, cursor.params) + + def test_WhenVerifyQueryReturnsIssue_IssueIsReported(self): + cursor = StubCursor(('SELECT', [['Bokito has escaped']])) + conn = StubConnection(cursor) + stub_psycopg2 = StubPsycopg2(conn) + node_config = NodeConfig('xyz123') + node_config.host = "111.222.111.222" + node_config.port = 1122 + pgbouncer_config = {"host": "192.168.0.1", "port": 7654} + pg = PgConnectionViaPgbouncer( + node_config, pgbouncer_config, Logger(), stub_psycopg2) + + ok, err, _ = pg.verify_connection() + self.assertFalse(ok) + self.assertIn("not connected to the expected PostgreSQL backend", err) + self.assertIn("Bokito has escaped", err) + + def test_WhenVerifyQueryReturnsNoIssue_OkIsReported(self): + conn = StubConnection(StubCursor(('SELECT', [[None]]))) + stub_psycopg2 = StubPsycopg2(conn) + node_config = NodeConfig('xyz123') + pgbouncer_config = {"host": "192.168.0.1", "port": 7654} + pg = PgConnectionViaPgbouncer( + node_config, pgbouncer_config, Logger(), stub_psycopg2) + + ok, err, _ = pg.verify_connection() + self.assertTrue(ok) + self.assertIs(None, err) class PgBouncerConsoleConnectionTests(unittest.TestCase): def test_OnConnect_AutoCommitIsEnabled(self):