From 31335197dcfa61da792bb11c671f0685329c5328 Mon Sep 17 00:00:00 2001 From: Maurice Makaay Date: Mon, 23 Dec 2019 17:30:07 +0100 Subject: [PATCH] Implemented node poller tests. --- pgbouncemgr/postgres.py | 1 + pgbouncemgr/state.py | 4 +- tests/stub_postgres.py | 27 ++++++ tests/test_node_poller.py | 185 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 215 insertions(+), 2 deletions(-) create mode 100644 tests/stub_postgres.py create mode 100644 tests/test_node_poller.py diff --git a/pgbouncemgr/postgres.py b/pgbouncemgr/postgres.py index 3538878..f507cb2 100644 --- a/pgbouncemgr/postgres.py +++ b/pgbouncemgr/postgres.py @@ -316,6 +316,7 @@ class PgConnectionViaPgbouncer(PgConnection): return (False, "Connection attempt timed out", None) result = parent_conn.recv() proc.join() + return result diff --git a/pgbouncemgr/state.py b/pgbouncemgr/state.py index c7845e2..bf818e4 100644 --- a/pgbouncemgr/state.py +++ b/pgbouncemgr/state.py @@ -101,9 +101,9 @@ class State(): @staticmethod def from_config(config, logger): state = State(logger) - for node_id, settings in config.nodes.items(): + for node_id, node_settings in config.nodes.items(): node_config = NodeConfig(node_id) - for key, value in settings.items(): + for key, value in node_settings.items(): setattr(node_config, key, value) state.add_node(node_config) return state diff --git a/tests/stub_postgres.py b/tests/stub_postgres.py new file mode 100644 index 0000000..c458759 --- /dev/null +++ b/tests/stub_postgres.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- + +from pgbouncemgr.postgres import * + + +class StubPgReplicationConnection(PgReplicationConnection): + def __init__(self, node_config, log, psycopg2_module=psycopg2): + super().__init__(node_config, log, psycopg2_module) + self.connected = False + self.node_config = node_config + + def connect(self): + self.connected = True + + def disconnect(self): + self.connected = False + + def get_replication_status(self): + self.connect() + if (not hasattr(self.node_config, "stub_replication_statuses") or + not self.node_config.stub_replication_statuses): + raise Exception( + "Node %s misses stub_replication_statuses" % __class__) + response = self.node_config.stub_replication_statuses.pop(0) + if isinstance(response, Exception): + raise response + return response diff --git a/tests/test_node_poller.py b/tests/test_node_poller.py new file mode 100644 index 0000000..c3a73a1 --- /dev/null +++ b/tests/test_node_poller.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- + +import unittest +from pgbouncemgr.constants import * +from pgbouncemgr.logger import * +from pgbouncemgr.node_poller import * +from pgbouncemgr.postgres import * +from pgbouncemgr.state import * +from tests.stub_postgres import * + + +class NodePollerTests(unittest.TestCase): + def test_GivenNoNodes_NothingHappens(self): + log = Logger() + state = State(log) + poller = NodePoller(state, log) + poller.poll() + + def test_ConnectionsAreReused(self): + log = Logger() + state = State(log) + node1_config = NodeConfig(1) + node1_config.stub_replication_statuses = [ + {"status": NODE_PRIMARY, "system_id": "A", "timeline_id": 10}, + {"status": NODE_PRIMARY, "system_id": "A", "timeline_id": 10}] + state.add_node(node1_config) + node2_config = NodeConfig(2) + node2_config.stub_replication_statuses = [ + {"status": NODE_STANDBY, "system_id": "A", "timeline_id": 9}, + {"status": NODE_STANDBY, "system_id": "A", "timeline_id": 9}] + state.add_node(node2_config) + poller = NodePoller(state, log, StubPgReplicationConnection) + + self.assertEqual({}, poller._connections) + poller.poll() + self.assertEqual(2, len(poller._connections)) + conn1 = poller._connections[1] + conn2 = poller._connections[2] + conn1.connected = False + conn1.connected = False + poller.poll() + self.assertEqual(conn1, poller._connections[1]) + self.assertTrue(conn1.connected) + self.assertEqual(conn2, poller._connections[2]) + self.assertTrue(conn2.connected) + + + def test_GivenSingleNode_NodeIsPolledAndStateIsUpdated(self): + log = Logger() + state = State(log) + node1_config = NodeConfig(1) + node1_config.stub_replication_statuses = [{ + "status": NODE_PRIMARY, + "system_id": "A", + "timeline_id": 10}] + state.add_node(node1_config) + state.modified = False + poller = NodePoller(state, log, StubPgReplicationConnection) + + poller.poll() + self.assertTrue(state.modified) + self.assertEqual(NODE_PRIMARY, state.nodes[1].status) + self.assertEqual("A", state.nodes[1].system_id) + self.assertEqual(10, state.nodes[1].timeline_id) + + def test_GivenMultipleNodes_NodesArePolledAndStateIsUpdated(self): + log = Logger() + state = State(log) + node1_config = NodeConfig(1) + node1_config.stub_replication_statuses = [{ + "status": NODE_PRIMARY, + "system_id": "A", + "timeline_id": 10}] + state.add_node(node1_config) + node2_config = NodeConfig(2) + node2_config.stub_replication_statuses = [{ + "status": NODE_STANDBY, + "system_id": "B", + "timeline_id": 11}] + state.add_node(node2_config) + state.modified = False + poller = NodePoller(state, log, StubPgReplicationConnection) + + poller.poll() + self.assertTrue(state.modified) + self.assertEqual(NODE_PRIMARY, state.nodes[1].status) + self.assertEqual("A", state.nodes[1].system_id) + self.assertEqual(10, state.nodes[1].timeline_id) + self.assertEqual(NODE_STANDBY, state.nodes[2].status) + self.assertEqual("B", state.nodes[2].system_id) + self.assertEqual(11, state.nodes[2].timeline_id) + + def test_GivenNodeWithPollError_NodeIsMarkedOffline(self): + log = Logger() + state = State(log) + node1_config = NodeConfig(1) + node1_config.stub_replication_statuses = [PgException("Breakage!")] + state.add_node(node1_config) + node2_config = NodeConfig(2) + node2_config.stub_replication_statuses = [{ + "status": NODE_PRIMARY, + "system_id": "C", + "timeline_id": 12}] + state.add_node(node2_config) + state.modified = False + poller = NodePoller(state, log, StubPgReplicationConnection) + + poller.poll() + self.assertTrue(state.modified) + self.assertEqual(NODE_OFFLINE, state.nodes[1].status) + self.assertEqual(None, state.nodes[1].system_id) + self.assertEqual(None, state.nodes[1].timeline_id) + self.assertEqual(NODE_PRIMARY, state.nodes[2].status) + self.assertEqual("C", state.nodes[2].system_id) + self.assertEqual(12, state.nodes[2].timeline_id) + + def test_GivenNodeWithNewStatus_StateIsUpdated(self): + log = Logger() + state = State(log) + node1_config = NodeConfig(1) + node1_config.stub_replication_statuses = [ + {"status": NODE_STANDBY, "system_id": "A", "timeline_id": 14}, + {"status": NODE_PRIMARY, "system_id": "B", "timeline_id": 15}] + state.add_node(node1_config) + state.modified = False + poller = NodePoller(state, log, StubPgReplicationConnection) + + poller.poll() + self.assertTrue(state.modified) + self.assertEqual(NODE_STANDBY, state.nodes[1].status) + self.assertEqual("A", state.nodes[1].system_id) + self.assertEqual(14, state.nodes[1].timeline_id) + state.modified = False + poller.poll() + self.assertTrue(state.modified) + self.assertEqual(NODE_PRIMARY, state.nodes[1].status) + self.assertEqual("B", state.nodes[1].system_id) + self.assertEqual(15, state.nodes[1].timeline_id) + + def test_GivenNodeWithPollError_PreviousSystemAndReplicationIdAreKept(self): + log = Logger() + state = State(log) + node1_config = NodeConfig(1) + node1_config.stub_replication_statuses = [ + {"status": NODE_PRIMARY, "system_id": "A", "timeline_id": 14}, + PgException("Breakage!")] + state.add_node(node1_config) + state.modified = False + poller = NodePoller(state, log, StubPgReplicationConnection) + + poller.poll() + self.assertTrue(state.modified) + self.assertEqual(NODE_PRIMARY, state.nodes[1].status) + self.assertEqual("A", state.nodes[1].system_id) + self.assertEqual(14, state.nodes[1].timeline_id) + state.modified = False + poller.poll() + self.assertTrue(state.modified) + self.assertEqual(NODE_OFFLINE, state.nodes[1].status) + self.assertEqual("A", state.nodes[1].system_id) + self.assertEqual(14, state.nodes[1].timeline_id) + + def test_GivenNodeWithResultOffline_PreviousSystemAndReplicationIdAreKept(self): + log = Logger() + state = State(log) + node1_config = NodeConfig(1) + node1_config.stub_replication_statuses = [ + {"status": NODE_PRIMARY, "system_id": "A", "timeline_id": 14}, + {"status": NODE_OFFLINE, "system_id": None, "timeline_id": None}] + state.add_node(node1_config) + state.modified = False + poller = NodePoller(state, log, StubPgReplicationConnection) + + poller.poll() + self.assertTrue(state.modified) + self.assertEqual(NODE_PRIMARY, state.nodes[1].status) + self.assertEqual("A", state.nodes[1].system_id) + self.assertEqual(14, state.nodes[1].timeline_id) + state.modified = False + poller.poll() + self.assertTrue(state.modified) + self.assertEqual(NODE_OFFLINE, state.nodes[1].status) + self.assertEqual("A", state.nodes[1].system_id) + self.assertEqual(14, state.nodes[1].timeline_id) +