# -*- coding: utf-8 -*- import unittest from pgbouncemgr.constants import * from pgbouncemgr.logger import * from pgbouncemgr.node_poller import * from pgbouncemgr.postgres import * from pgbouncemgr.state import * from tests.stub_postgres import * class NodePollerTests(unittest.TestCase): def test_GivenNoNodes_NothingHappens(self): log = Logger() state = State(log) poller = NodePoller(state, log) poller.poll() def test_ConnectionsAreReused(self): log = Logger() state = State(log) node1_config = NodeConfig(1) node1_config.stub_replication_statuses = [ {"status": NODE_PRIMARY, "system_id": "A", "timeline_id": 10}, {"status": NODE_PRIMARY, "system_id": "A", "timeline_id": 10}] state.add_node(node1_config) node2_config = NodeConfig(2) node2_config.stub_replication_statuses = [ {"status": NODE_STANDBY, "system_id": "A", "timeline_id": 9}, {"status": NODE_STANDBY, "system_id": "A", "timeline_id": 9}] state.add_node(node2_config) poller = NodePoller(state, log, StubPgReplicationConnection) self.assertEqual({}, poller._connections) poller.poll() self.assertEqual(2, len(poller._connections)) conn1 = poller._connections[1] conn2 = poller._connections[2] conn1.connected = False conn1.connected = False poller.poll() self.assertEqual(conn1, poller._connections[1]) self.assertTrue(conn1.connected) self.assertEqual(conn2, poller._connections[2]) self.assertTrue(conn2.connected) def test_GivenSingleNode_NodeIsPolledAndStateIsUpdated(self): log = Logger() state = State(log) node1_config = NodeConfig(1) node1_config.stub_replication_statuses = [{ "status": NODE_PRIMARY, "system_id": "A", "timeline_id": 10}] state.add_node(node1_config) state.modified = False poller = NodePoller(state, log, StubPgReplicationConnection) poller.poll() self.assertTrue(state.modified) self.assertEqual(NODE_PRIMARY, state.nodes[1].status) self.assertEqual("A", state.nodes[1].system_id) self.assertEqual(10, state.nodes[1].timeline_id) def test_GivenMultipleNodes_NodesArePolledAndStateIsUpdated(self): log = Logger() state = State(log) node1_config = NodeConfig(1) node1_config.stub_replication_statuses = [{ "status": NODE_PRIMARY, "system_id": "A", "timeline_id": 10}] state.add_node(node1_config) node2_config = NodeConfig(2) node2_config.stub_replication_statuses = [{ "status": NODE_STANDBY, "system_id": "B", "timeline_id": 11}] state.add_node(node2_config) state.modified = False poller = NodePoller(state, log, StubPgReplicationConnection) poller.poll() self.assertTrue(state.modified) self.assertEqual(NODE_PRIMARY, state.nodes[1].status) self.assertEqual("A", state.nodes[1].system_id) self.assertEqual(10, state.nodes[1].timeline_id) self.assertEqual(NODE_STANDBY, state.nodes[2].status) self.assertEqual("B", state.nodes[2].system_id) self.assertEqual(11, state.nodes[2].timeline_id) def test_GivenNodeWithPollError_NodeIsMarkedOffline(self): log = Logger() state = State(log) node1_config = NodeConfig(1) node1_config.stub_replication_statuses = [PgException("Breakage!")] state.add_node(node1_config) node2_config = NodeConfig(2) node2_config.stub_replication_statuses = [{ "status": NODE_PRIMARY, "system_id": "C", "timeline_id": 12}] state.add_node(node2_config) state.modified = False poller = NodePoller(state, log, StubPgReplicationConnection) poller.poll() self.assertTrue(state.modified) self.assertEqual(NODE_OFFLINE, state.nodes[1].status) self.assertEqual(None, state.nodes[1].system_id) self.assertEqual(None, state.nodes[1].timeline_id) self.assertEqual(NODE_PRIMARY, state.nodes[2].status) self.assertEqual("C", state.nodes[2].system_id) self.assertEqual(12, state.nodes[2].timeline_id) def test_GivenNodeWithNewStatus_StateIsUpdated(self): log = Logger() state = State(log) node1_config = NodeConfig(1) node1_config.stub_replication_statuses = [ {"status": NODE_STANDBY, "system_id": "A", "timeline_id": 14}, {"status": NODE_PRIMARY, "system_id": "B", "timeline_id": 15}] state.add_node(node1_config) state.modified = False poller = NodePoller(state, log, StubPgReplicationConnection) poller.poll() self.assertTrue(state.modified) self.assertEqual(NODE_STANDBY, state.nodes[1].status) self.assertEqual("A", state.nodes[1].system_id) self.assertEqual(14, state.nodes[1].timeline_id) state.modified = False poller.poll() self.assertTrue(state.modified) self.assertEqual(NODE_PRIMARY, state.nodes[1].status) self.assertEqual("B", state.nodes[1].system_id) self.assertEqual(15, state.nodes[1].timeline_id) def test_GivenNodeWithPollError_PreviousSystemAndReplicationIdAreKept(self): log = Logger() state = State(log) node1_config = NodeConfig(1) node1_config.stub_replication_statuses = [ {"status": NODE_PRIMARY, "system_id": "A", "timeline_id": 14}, PgException("Breakage!")] state.add_node(node1_config) state.modified = False poller = NodePoller(state, log, StubPgReplicationConnection) poller.poll() self.assertTrue(state.modified) self.assertEqual(NODE_PRIMARY, state.nodes[1].status) self.assertEqual("A", state.nodes[1].system_id) self.assertEqual(14, state.nodes[1].timeline_id) state.modified = False poller.poll() self.assertTrue(state.modified) self.assertEqual(NODE_OFFLINE, state.nodes[1].status) self.assertEqual("A", state.nodes[1].system_id) self.assertEqual(14, state.nodes[1].timeline_id) def test_GivenNodeWithResultOffline_PreviousSystemAndReplicationIdAreKept(self): log = Logger() state = State(log) node1_config = NodeConfig(1) node1_config.stub_replication_statuses = [ {"status": NODE_PRIMARY, "system_id": "A", "timeline_id": 14}, {"status": NODE_OFFLINE, "system_id": None, "timeline_id": None}] state.add_node(node1_config) state.modified = False poller = NodePoller(state, log, StubPgReplicationConnection) poller.poll() self.assertTrue(state.modified) self.assertEqual(NODE_PRIMARY, state.nodes[1].status) self.assertEqual("A", state.nodes[1].system_id) self.assertEqual(14, state.nodes[1].timeline_id) state.modified = False poller.poll() self.assertTrue(state.modified) self.assertEqual(NODE_OFFLINE, state.nodes[1].status) self.assertEqual("A", state.nodes[1].system_id) self.assertEqual(14, state.nodes[1].timeline_id)