diff --git a/pgbouncemgr/constants.py b/pgbouncemgr/constants.py index 52f8bca..05390d6 100644 --- a/pgbouncemgr/constants.py +++ b/pgbouncemgr/constants.py @@ -7,9 +7,9 @@ DEFAULT_CONFIG = "/etc/pgbouncer/pgbouncemgr.yaml" DEFAULT_LOG_FACILITY = "LOG_LOCAL1" # Return values for the PgConnection.connect() method. -CONN_CONNECTED = 'CONNECTED' -CONN_REUSED = 'REUSED' -CONN_RECONNECTED = 'RECONNECTED' +CONN_CONNECTED = 'CONN_CONNECTED' +CONN_REUSED = 'CONN_REUSED' +CONN_RECONNECTED = 'CONN_RECONNECTED' # Backend node status. NODE_UNKNOWN = "NODE_UNKNOWN" diff --git a/pgbouncemgr/manager.py b/pgbouncemgr/manager.py index 77c47de..d4238fb 100644 --- a/pgbouncemgr/manager.py +++ b/pgbouncemgr/manager.py @@ -22,7 +22,7 @@ class Manager(): self.single_shot = args.single_shot self._create_logger(args) self._create_state() - self.node_poller = NodePoller(self.state) + self.node_poller = NodePoller(self.state, self.log) def _create_logger(self, args): self.log = Logger() diff --git a/pgbouncemgr/node_poller.py b/pgbouncemgr/node_poller.py index 7d5b1f0..2e4b5fb 100644 --- a/pgbouncemgr/node_poller.py +++ b/pgbouncemgr/node_poller.py @@ -1,7 +1,9 @@ # -*- coding: utf-8 -*- # no-pylint: disable=missing-docstring,too-few-public-methods +from pgbouncemgr.logger import format_ex from pgbouncemgr.postgres import PgReplicationConnection, PgException +from pgbouncemgr.constants import NODE_OFFLINE class NodePoller(): """The NodePoller is used to poll all the nodes that are available @@ -9,11 +11,12 @@ class NodePoller(): the results. The connection_class that can be provided is used for testing purposes (dependency injection).""" - def __init__(self, state, connection_class=None): + def __init__(self, state, log, connection_class=None): + self.state = state + self.log = log self.connection_class = PgReplicationConnection if connection_class is not None: self.connection_class = connection_class - self.state = state self._connections = {} def poll(self): @@ -22,21 +25,23 @@ class NodePoller(): self._poll_node(node) def _poll_node(self, node): - connection = self._get_connection_object(node) + connection = self._get_connection(node) try: - result = connection.connect() - print(result) status = connection.get_replication_status() node.status = status["status"] - node.timeline_id = status["timeline_id"] - node.system_id = status["system_id"] + if node.status != NODE_OFFLINE: + node.timeline_id = status["timeline_id"] + node.system_id = status["system_id"] except PgException as exception: - pass + node.status = NODE_OFFLINE + self.log.error( + "[%s] Error on polling node: %s" % + (node.node_id, format_ex(exception))) - def _get_connection_object(self, node): + def _get_connection(self, node): if node.node_id in self._connections: return self._connections[node.node_id] - connection = self.connection_class(node.config) + connection = self.connection_class(node.config, self.log) self._connections[node.node_id] = connection return connection diff --git a/pgbouncemgr/postgres.py b/pgbouncemgr/postgres.py index 2b0d5de..94deb49 100644 --- a/pgbouncemgr/postgres.py +++ b/pgbouncemgr/postgres.py @@ -20,7 +20,7 @@ class PgConnectionFailed(PgException): """Raised when connecting to the database server failed.""" def __init__(self, exception): super().__init__( - "Could not connect to %s: %s" % (format_ex(exception))) + "Could not connect to node: %s" % format_ex(exception)) class RetrievingPgReplicationStatusFailed(PgException): """Raised when the replication status cannot be determined.""" @@ -48,54 +48,91 @@ CONNECTION_PARAMS = [ 'connect_timeout' ] + +class NodeLogger(): + def __init__(self, node_config, log): + self.node_id = node_config.node_id + self.log = log + + def debug(self, msg): + self.log.debug("[%s] %s" % (self.node_id, msg)) + + def info(self, msg): + self.log.info("[%s] %s" % (self.node_id, msg)) + + def warning(self, msg): + self.log.warning("[%s] %s" % (self.node_id, msg)) + + def error(self, msg): + self.log.error("[%s] %s" % (self.node_id, msg)) + + class PgConnection(): - """Implements a connection to a PostgreSQL server.""" - def __init__(self, config): - self.conn_params = self._create_conn_params(config) + """Implements a connection to a PostgreSQL server. + The provided node_config must be a pgbouncemgr.NodeConfig object. + The psycopg2_module argument is used as a depencency injection + mechanism for testing purposes.""" + def __init__(self, node_config, log, psycopg2_module=psycopg2): + self.log = NodeLogger(node_config, log) + self.psycopg2 = psycopg2_module + + self.node_id = node_config.node_id + self.conn_params = self._create_conn_params(node_config) self.ping_query = "SELECT 1" self.conn = None + self.reconnecting = False - def _create_conn_params(self, config): + def _create_conn_params(self, node_config): """Extract all connection parameters from the provided configuration, that don't have value of None.""" conn_params = {} for key in CONNECTION_PARAMS: - if not hasattr(config, key): + if not hasattr(node_config, key): continue - value = getattr(config, key) + value = getattr(node_config, key) if value is None: continue conn_params[key] = value return conn_params def connect(self): - """Connect to the database server. When a connection exists, - then check if it is still oeprational. If yes, then reuse - this connection. If no, or when no connection exists, then - setup a new connection. + """Connect to the database server. When a connection exists, then + check if it is still operational. If yes, then reuse this + connection. If no, or when no connection exists, then setup a new + connection. Raises an exeption when the database connection cannot be setup. returns CONN_CONNECTED, CONN_REUSED or CONN_RECONNECTED when the connection was setup successfully.""" - reconnected = False if self.conn is not None: try: with self.conn.cursor() as cursor: cursor.execute(self.ping_query) + self.log.debug("reusing connection") return CONN_REUSED - except psycopg2.OperationalError: - reconnected = True + except psycopg2.OperationalError as exception: + self.log.warning( + "connection went away, reconnecting (got exception: %s)" % + format_ex(exception)) + self.reconnecting = True self.disconnect() try: - self.conn = psycopg2.connect(**self.conn_params) - return CONN_RECONNECTED if reconnected else CONN_CONNECTED + self.conn = self.psycopg2.connect(**self.conn_params) + if self.reconnecting: + self.log.info("reconnected successfully") + self.reconnecting = False + return CONN_RECONNECTED + self.log.info("connected successfully") + + return CONN_CONNECTED except psycopg2.OperationalError as exception: self.disconnect() + self.log.error("connection failed: %s" % format_ex(exception)) raise PgConnectionFailed(exception) def disconnect(self): """Disconnect from the database server.""" try: - if self.conn: + if self.conn is not None: self.conn.close() except Exception: pass @@ -106,14 +143,16 @@ class PgReplicationConnection(PgConnection): """This PostgresQL connection class is used to setup a replication connection to a PostgreSQL database server, which can be used to retrieve the replication status for the server.""" - def __init__(self, node_config): - super().__init__(node_config) + def __init__(self, node_config, log, psycopg2_module=psycopg2): + super().__init__(node_config, log, psycopg2_module) self.conn_params["connection_factory"] = LogicalReplicationConnection def get_replication_status(self): """Returns the replication status for a node. This is an array, containing the keys "status" (NODE_OFFLINE, NODE_PRIMARY or - NODE_STANDBY), "system_id" and "timeline_id".""" + NODE_STANDBY), "system_id" and "timeline_id". + Note: before calling this method, first setup the connection using + the connect() method.""" status = { "status": None, "system_id": None, @@ -134,7 +173,7 @@ class PgReplicationConnection(PgConnection): cursor.execute("SELECT pg_is_in_recovery()") in_recovery = cursor.fetchone()[0] status["status"] = NODE_STANDBY if in_recovery else NODE_PRIMARY - except psycopg2.InternalError as exception: + except psycopg2.OperationalError as exception: self.disconnect() raise RetrievingPgReplicationStatusFailed( "SELECT pg_is_in_recovery() failed: %s" % format_ex(exception)) @@ -147,11 +186,17 @@ class PgReplicationConnection(PgConnection): system_id, timeline_id, *_ = row status["system_id"] = system_id status["timeline_id"] = timeline_id - except psycopg2.InternalError as exception: + except psycopg2.OperationalError as exception: self.disconnect() + self.log.error( + "retrieving replication status failed: %s" % + format_ex(exception)) raise RetrievingPgReplicationStatusFailed( "IDENTIFY_SYSTEM failed: %s" % format_ex(exception)) + self.log.debug( + "node status: status=%s, system_id=%s, timeline_id=%s" % + (status["status"], status["system_id"], status["timeline_id"])) return status # The query that is used to check if pgbouncer is connected to the expected @@ -183,7 +228,7 @@ VERIFY_QUERY = """ class PgConnectionViaPgbouncer(PgConnection): """This PostgreSQL connection class is used to setup a connection to the PostgreSQL cluster, via the pgbouncer instance.""" - def __init__(self, node_config, pgbouncer_config): + def __init__(self, node_config, pgbouncer_config, log, psycopg2_module=psycopg2): """Instantiate a new connection. The node_config and the pgbouncer_config will be combined to get the connection parameters for connecting to the PostgreSQL server.""" @@ -191,7 +236,7 @@ class PgConnectionViaPgbouncer(PgConnection): # First, apply all the connection parameters as defined for the node. # This is fully handled by the parent class. - super().__init__(node_config) + super().__init__(node_config, log, psycopg2_module) # Secondly, override parameters to redirect the connection to # the pgbouncer instance. @@ -232,8 +277,8 @@ class PgConnectionViaPgbouncer(PgConnection): with self.conn.cursor() as cursor: try: cursor.execute(VERIFY_QUERY, { - "host": self.node_config["host"], - "port": self.node_config["port"] + "host": self.node_config.host, + "port": self.node_config.port }) result = cursor.fetchone()[0] self.disconnect() @@ -254,13 +299,16 @@ class PgConnectionViaPgbouncer(PgConnection): child_conn.close() proc = multiprocessing.Process(target=check_func, args=(report_func,)) proc.start() - proc.join(self.node_config["connect_timeout"]) + proc.join(self.node_config.connect_timeout) if proc.is_alive(): proc.terminate() proc.join() return (False, PgConnectionFailed("Connection attempt timed out")) result = parent_conn.recv() proc.join() + ##### DEBUG + raise Exception(repr(result)) + ##### /DEBUG return result @@ -270,23 +318,22 @@ class PgBouncerConsoleConnection(PgConnection): used to control the pgbouncer instance via admin commands. This connection is used by pgbouncemgr to reload the configuration of pgbouncer when the cluster state changes.""" - def __init__(self, pgbouncer_config): - super().__init__(pgbouncer_config) + def __init__(self, node_config, log, psycopg2_module=psycopg2): + super().__init__(node_config, log, psycopg2_module) # For the console connection, the database name "pgbouncer" # must be used. - self.conn_params["dbname"] = "pgbouncer" + self.conn_params["database"] = "pgbouncer" # The default ping query does not work when connected to the # pgbouncer console. Here's a simple replacement for it. self.ping_query = "SHOW VERSION" def connect(self): - """Connect to the pgbouncer console. After connecting, the autocommit - feature will be disabled for the connection, so the underlying - PostgreSQL client library won't automatically try to setup a - transaction. Transactions are not supported by the pgbouncer - console.""" + """Connect to the pgbouncer console. Enable the autocommit feature on + the connection, so the underlying PostgreSQL client library won't + automatically try to setup a transaction. Transactions are not + supported by the pgbouncer console.""" result = super().connect() self.conn.autocommit = True return result diff --git a/pgbouncemgr/state.py b/pgbouncemgr/state.py index 9adade5..c7845e2 100644 --- a/pgbouncemgr/state.py +++ b/pgbouncemgr/state.py @@ -4,8 +4,7 @@ import os from pgbouncemgr.node_config import NodeConfig from pgbouncemgr.constants import \ - LEADER_UNKNOWN, LEADER_STATUSES,\ - NODE_UNKNOWN, NODE_OFFLINE, NODE_PRIMARY, NODE_STANDBY, NODE_STATUSES + LEADER_UNKNOWN, LEADER_STATUSES, NODE_UNKNOWN, NODE_STATUSES class StateException(Exception): diff --git a/tests/stub_psycopg2.py b/tests/stub_psycopg2.py new file mode 100644 index 0000000..dbd15ac --- /dev/null +++ b/tests/stub_psycopg2.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- + +import psycopg2 + + +class StubPsycopg2(list): + """A stubbed version of the psycopg2 module, used to implement PostgreSQL + server interaction tests, without having to talk to a running + PostgreSQL server instance.""" + def __init__(self, *connections): + self.conn_params = None + super().__init__(connections) + + def add(self, connection): + self.append(connection) + return self + + def add_auth_failure(self): + return self.add(psycopg2.OperationalError( + 'FATAL: password authentication failed for user "test"')) + + def add_conn_timeout(self): + return self.add(psycopg2.OperationalError('timeout expired')) + + def add_conn_failure(self): + return self.add(psycopg2.OperationalError( + 'could not connect to server: No route to host / Is the ' + + 'server running on host "..." and accepting / TCP/IP ' + + 'connections on port ...?')) + return self + + def add_admin_startup(self): + return self.add(psycopg2.errors.OperationalError( + 'FATAL: the database system is starting up')) + + def add_admin_shutdown(self): + return self.add(psycopg2.errors.AdminShutdown( + 'terminating connection due to administrator command')) + + def add_connection(self, connection): + return self.add(connection) + + def connect(self, **conn_params): + self.conn_params = conn_params + response = self.pop(0) + if isinstance(response, Exception): + raise response + return response + + +class StubConnection(list): + """A stubbed version of a psycopg2 connection, used to implement + PostgreSQL server interaction tests, without having to talk to a + running PostgreSQL server instance.""" + def __init__(self, *cursors): + super().__init__(cursors) + self.connected = True + self.autocommit = False + + def close(self): + self.connected = False + + def cursor(self): + if not self.connected: + raise psycopg2.InterfaceError("connection already closed") + response = self.pop(0) + if isinstance(response, Exception): + raise response + return response + + +class StubCursor(list): + """A stubbed version of a psycopg2 cursor, used to implement PostgreSQL + server interaction tests, without having to talk to a running + PostgreSQL server instance.""" + def __init__(self, *results): + super().__init__(results) + self.results = list(results) + self.statusmessage = None + self.rows = None + self.query = None + + def __enter__(self): + return self + + def __exit__(self, a, b, c): + pass + + def execute(self, query): + self.query = query + response = self.results.pop(0) + if isinstance(response, Exception): + raise response + status, rows = response + self.statusmessage = status + self.rows = rows if rows is not None else [] + + def fetchone(self): + if self.rows is None: + raise psycopg2.ProgrammingError("no results to fetch") + try: + return self.rows.pop(0) + except IndexError: + return None diff --git a/tests/test_postgres.py b/tests/test_postgres.py new file mode 100644 index 0000000..285511e --- /dev/null +++ b/tests/test_postgres.py @@ -0,0 +1,246 @@ +# -*- coding: utf-8 -*- + +import unittest +import psycopg2 +from pgbouncemgr.postgres import * +from pgbouncemgr.logger import Logger +from pgbouncemgr.node_config import NodeConfig +from tests.stub_psycopg2 import * + + +class PgConnectionTests(unittest.TestCase): + def test_GivenAuthenticationFailure_PgConnection_RaisesException(self): + self._test_connect_exception( + PgConnection, StubPsycopg2().add_auth_failure(), + PgConnectionFailed, "authentication failed") + + def test_GivenConnectionTimeout_PgConnection_RaisesException(self): + self._test_connect_exception( + PgConnection, StubPsycopg2().add_conn_timeout(), + PgConnectionFailed, "timeout expired") + + def test_GivenConnectionFailure_PgConnection_RaisesException(self): + self._test_connect_exception( + PgConnection, StubPsycopg2().add_conn_failure(), + PgConnectionFailed, "could not connect") + + def test_GivenPostgresStartingUp_PgConnection_RaisesException(self): + self._test_connect_exception( + PgConnection, StubPsycopg2().add_admin_startup(), + PgConnectionFailed, "system is starting up") + + def test_GivenPostgresShuttingDown_PgConnection_RaisesException(self): + self._test_connect_exception( + PgConnection, StubPsycopg2().add_admin_shutdown(), + PgConnectionFailed, "AdminShutdown") + + def _test_connect_exception(self, test_class, stub_psycopg2, err, msg): + pg = test_class(NodeConfig(1), Logger(), stub_psycopg2) + with self.assertRaises(err) as context: + pg.connect() + self.assertIn(msg, str(context.exception)) + + def test_NodeConfig_IsAppliedToConnParams(self): + node_config = NodeConfig(1) + node_config.host = "1.1.1.1" + node_config.port = 9999 + pg = PgConnection(node_config, Logger(), None) + + self.assertEqual(1, pg.node_id) + self.assertEqual("1.1.1.1", pg.conn_params["host"]) + self.assertEqual(9999, pg.conn_params["port"]) + + def test_GivenNoneValueInNodeConfig_ValueIsOmittedInConnParams(self): + node_config = NodeConfig(2) + node_config.host = None + node_config.port = None + stub_psycopg2 = StubPsycopg2().add_connection(StubConnection()) + pg = PgConnection(node_config, Logger(), stub_psycopg2) + pg.connect() + + self.assertEqual(2, pg.node_id) + self.assertNotIn("host", pg.conn_params) + self.assertNotIn("port", pg.conn_params) + + def test_FirstConnect_SetsUpConnection(self): + stub_psycopg2 = StubPsycopg2().add_connection(StubConnection()) + pg = PgConnection(NodeConfig('a'), Logger(), stub_psycopg2) + + result = pg.connect() + + self.assertEqual("CONN_CONNECTED", result) + + def test_SecondConnect_PingsAndReusesConnection(self): + cursor = StubCursor(("SELECT", [[1]])) + conn = StubConnection(cursor) + stub_psycopg2 = StubPsycopg2().add_connection(conn) + pg = PgConnection(NodeConfig('b'), Logger(), stub_psycopg2) + + result1 = pg.connect() + self.assertIs(None, cursor.query) + result2 = pg.connect() + self.assertEqual("SELECT 1", cursor.query) + self.assertEqual("CONN_CONNECTED", result1) + self.assertEqual("CONN_REUSED", result2) + + def test_SecondConnectPingFails_SetsUpNewConnection(self): + conn1 = StubConnection(StubCursor(psycopg2.OperationalError())) + conn2 = StubConnection() + stub_psycopg2 = StubPsycopg2(conn1, conn2) + pg = PgConnection(NodeConfig('b'), Logger(), stub_psycopg2) + + result1 = pg.connect() # Connection OK + result2 = pg.connect() # Ping fails, reconnectt + + self.assertEqual("CONN_CONNECTED", result1) + self.assertEqual("CONN_RECONNECTED", result2) + + def test_Disconnect_ClosesConnection(self): + conn = StubConnection() + stub_psycopg2 = StubPsycopg2(conn) + pg = PgConnection(NodeConfig('disco'), Logger(), stub_psycopg2) + + pg.connect() + self.assertTrue(conn.connected) + pg.disconnect() + self.assertFalse(conn.connected) + + def test_BigConnectionFlow(self): + conn1 = StubConnection( + StubCursor(psycopg2.OperationalError())) + conn2 = StubConnection( + StubCursor(("SELECT", [])), + StubCursor(("SELECT", []))) + conn3 = StubConnection( + StubCursor(("SELECT", []))) + + stub_psycopg2 = StubPsycopg2(conn1, conn2, conn3) + pg = PgConnection(NodeConfig('b'), Logger(), stub_psycopg2) + + result1 = pg.connect() # Connection 1 OK + result2 = pg.connect() # Ping fails, new connection 2 OK + result3 = pg.connect() # Ping 1 success + result4 = pg.connect() # Ping 2 success + pg.disconnect() # Explicit disconnect + result5 = pg.connect() # Connection 3 OK + result6 = pg.connect() # Ping success + + self.assertEqual("CONN_CONNECTED", result1) + self.assertEqual("CONN_RECONNECTED", result2) + self.assertEqual("CONN_REUSED", result3) + self.assertEqual("CONN_REUSED", result4) + self.assertEqual("CONN_CONNECTED", result5) + self.assertEqual("CONN_REUSED", result6) + + +class PgReplicationConnectionTests(unittest.TestCase): + def test_LogicalReplicationConnection_IsUsed(self): + conn = StubConnection() + stub_psycopg2 = StubPsycopg2().add_connection(conn) + pg = PgReplicationConnection( + NodeConfig("foo"), Logger(), stub_psycopg2) + + pg.connect() + self.assertEqual( + psycopg2.extras.LogicalReplicationConnection, + pg.conn_params["connection_factory"]) + + def test_GivenFailingConnection_ReplicationStatusIsOffline(self): + stub_psycopg2 = StubPsycopg2().add_auth_failure() + pg = PgReplicationConnection( + NodeConfig("foo"), Logger(), stub_psycopg2) + + status = pg.get_replication_status() + self.assertEqual({ + "status": "NODE_OFFLINE", + "system_id": None, + "timeline_id": None}, status) + + def test_GivenFailingStandbyQuery_ReplicationStatusRaisesException(self): + conn = StubConnection(StubCursor(psycopg2.OperationalError())) + pg = PgReplicationConnection( + NodeConfig("foo"), Logger(), StubPsycopg2(conn)) + + with self.assertRaises(RetrievingPgReplicationStatusFailed) as context: + pg.get_replication_status() + self.assertIn("pg_is_in_recovery() failed", str(context.exception)) + + def test_GivenFailingIdentifySystemQuery_ReplicationStatusRaisesException(self): + conn = StubConnection( + StubCursor(("SELECT", [[False]])), + psycopg2.OperationalError()) + pg = PgReplicationConnection( + NodeConfig("foo"), Logger(), StubPsycopg2(conn)) + + with self.assertRaises(RetrievingPgReplicationStatusFailed) as context: + pg.get_replication_status() + self.assertIn("IDENTIFY_SYSTEM failed", str(context.exception)) + + def test_GivenConnectionToPrimaryNode_ReplicationStatusIsPrimary(self): + conn = StubConnection( + StubCursor(("SELECT", [[False]])), + StubCursor(("IDENTIFY_SYSTEM", [["id", 1234, "other", "fields"]]))) + stub_psycopg2 = StubPsycopg2(conn) + pg = PgReplicationConnection( + NodeConfig("foo"), Logger(), stub_psycopg2) + + status = pg.get_replication_status() + self.assertEqual({ + "status": "NODE_PRIMARY", + "system_id": "id", + "timeline_id": 1234}, status) + + def test_GivenConnectionToStandbyNode_ReplicationStatusIsStandby(self): + conn = StubConnection( + StubCursor(("SELECT", [[True]])), + StubCursor(("IDENTIFY_SYSTEM", [["towel", 42, "other", "fields"]]))) + stub_psycopg2 = StubPsycopg2(conn) + pg = PgReplicationConnection( + NodeConfig("foo"), Logger(), stub_psycopg2) + + status = pg.get_replication_status() + self.assertEqual({ + "status": "NODE_STANDBY", + "system_id": "towel", + "timeline_id": 42}, status) + + +class PgConnectionViaPgbouncerTests(unittest.TestCase): + def test_NodeConfig_AndPgBouncerConfig_AreAppliedToConnParams(self): + node_config = NodeConfig(777) + node_config.host = "1.1.1.1" + node_config.port = 9999 + pgbouncer_config = {"host": "192.168.0.1", "port": 7654} + + pg = PgConnectionViaPgbouncer( + node_config, pgbouncer_config, Logger(), None) + + self.assertEqual(777, pg.node_id) + self.assertEqual("template1", pg.conn_params["database"]) + self.assertEqual("192.168.0.1", pg.conn_params["host"]) + self.assertEqual(7654, pg.conn_params["port"]) + + def test_WhenVerifyQueryTimesOut_ExceptionIsRaised(self): + def sleeping_query(): + time.sleep(2) + conn = StubConnection(StubCursor(sleeping_query)) + stub_psycopg2 = StubPsycopg2(conn) + node_config = NodeConfig('xyz123') + node_config.connect_timeout = 1 + pgbouncer_config = {"host": "192.168.0.1", "port": 7654} + pg = PgConnectionViaPgbouncer( + node_config, pgbouncer_config, Logger(), stub_psycopg2) + pg.verify_connection() + ##### TODO + + +class PgBouncerConsoleConnectionTests(unittest.TestCase): + def test_OnConnect_AutoCommitIsEnabled(self): + conn = StubConnection() + stub_psycopg2 = StubPsycopg2().add_connection(conn) + pg = PgBouncerConsoleConnection( + NodeConfig("bob"), Logger(), stub_psycopg2) + + self.assertFalse(conn.autocommit) + pg.connect() + self.assertTrue(conn.autocommit) diff --git a/tests/test_state.py b/tests/test_state.py index cc5c255..0570cd9 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -4,7 +4,8 @@ import os import unittest from pgbouncemgr.logger import * from pgbouncemgr.state import * -from pgbouncemgr.constants import LEADER_UNKNOWN, LEADER_CONNECTED +from pgbouncemgr.constants import \ + LEADER_UNKNOWN, LEADER_CONNECTED, NODE_PRIMARY, NODE_STANDBY PGBOUNCER_CONFIG = os.path.join( diff --git a/tests/test_stub_psycopg2.py b/tests/test_stub_psycopg2.py new file mode 100644 index 0000000..ff7f479 --- /dev/null +++ b/tests/test_stub_psycopg2.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- + +import unittest +import psycopg2 +from tests.stub_psycopg2 import * + + +class StubPsycopg2Tests(unittest.TestCase): + def test_AuthenticationFailure(self): + with self.assertRaises(psycopg2.OperationalError) as context: + StubPsycopg2().add_auth_failure().connect() + self.assertIn("authentication failed", str(context.exception)) + + def test_ConnectionTimeout(self): + with self.assertRaises(psycopg2.OperationalError) as context: + StubPsycopg2().add_conn_timeout().connect() + self.assertIn("timeout expired", str(context.exception)) + + def test_ConnectionFailure(self): + with self.assertRaises(psycopg2.OperationalError) as context: + StubPsycopg2().add_conn_failure().connect() + self.assertIn("could not connect to server", str(context.exception)) + + def test_AdminStartup(self): + with self.assertRaises(psycopg2.OperationalError) as context: + StubPsycopg2().add_admin_startup().connect() + self.assertIn("system is starting up", str(context.exception)) + + def test_AdminShutdown(self): + with self.assertRaises(psycopg2.errors.AdminShutdown) as context: + StubPsycopg2().add_admin_shutdown().connect() + self.assertIn("terminating connection", str(context.exception)) + + def test_GivenNoFixtures_Connect_RaisesIndexError(self): + with self.assertRaises(IndexError) as context: + StubPsycopg2().connect() + + def test_SuccessfulConnect(self): + conn = StubConnection() + StubPsycopg2().add_connection(conn) + + +class StubConnectionTests(unittest.TestCase): + def test_GivenLiveConnection_Cursor_ReturnsCursor(self): + cursor = StubCursor() + conn = StubConnection(cursor) + self.assertIs(cursor, conn.cursor()) + + def test_GivenClosedConnection_Cursor_ReturnsInterfaceError(self): + conn = StubConnection() + conn.close() + with self.assertRaises(psycopg2.InterfaceError): + conn.cursor() + + def test_IntegrationWithPsycopg2Stub(self): + cursor1 = StubCursor() + cursor2 = StubCursor() + conn = StubConnection(cursor1, cursor2) + pg = StubPsycopg2(conn) + + conn_result = pg.connect() + cursor1_result = conn_result.cursor() + cursor2_result = conn_result.cursor() + + self.assertIs(conn, conn_result) + self.assertIs(cursor1, cursor1_result) + self.assertIs(cursor2, cursor2_result) + +class StubCursorTests(unittest.TestCase): + def test_GivenNoExecuteCalled_FetchOne_RaisesProgrammingError(self): + cursor = StubCursor() + with self.assertRaises(psycopg2.ProgrammingError): + cursor.fetchone() + + def test_GivenExecuteCalled_WithoutResultRows_FetchOneReturnsNone(self): + cursor = StubCursor(('OK', None)) + cursor.execute("SELECT query") + self.assertIs(None, cursor.fetchone()) + self.assertEqual("SELECT query", cursor.query) + self.assertEqual("OK", cursor.statusmessage) + + def test_GivenExecuteCalled_WithResultRows_FetchOneRetursRow(self): + cursor = StubCursor( + ("STATUS1", [['value1', 'value2'], ['value3', 'value4']]), + ("STATUS2", [['value5', 'value6'], ['value7', 'value8']])) + + cursor.execute("SELECT query 1") + self.assertEqual("STATUS1", cursor.statusmessage) + self.assertEqual("value1", cursor.fetchone()[0]) + self.assertEqual("value3", cursor.fetchone()[0]) + + cursor.execute("SELECT query 2") + self.assertEqual("STATUS2", cursor.statusmessage) + self.assertEqual("value6", cursor.fetchone()[1]) + self.assertEqual("value8", cursor.fetchone()[1]) + self.assertIs(None, cursor.fetchone())