Implemented node poller tests.
This commit is contained in:
parent
5e84e88bc1
commit
31335197dc
|
@ -316,6 +316,7 @@ class PgConnectionViaPgbouncer(PgConnection):
|
|||
return (False, "Connection attempt timed out", None)
|
||||
result = parent_conn.recv()
|
||||
proc.join()
|
||||
|
||||
return result
|
||||
|
||||
|
||||
|
|
|
@ -101,9 +101,9 @@ class State():
|
|||
@staticmethod
|
||||
def from_config(config, logger):
|
||||
state = State(logger)
|
||||
for node_id, settings in config.nodes.items():
|
||||
for node_id, node_settings in config.nodes.items():
|
||||
node_config = NodeConfig(node_id)
|
||||
for key, value in settings.items():
|
||||
for key, value in node_settings.items():
|
||||
setattr(node_config, key, value)
|
||||
state.add_node(node_config)
|
||||
return state
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
from pgbouncemgr.postgres import *
|
||||
|
||||
|
||||
class StubPgReplicationConnection(PgReplicationConnection):
|
||||
def __init__(self, node_config, log, psycopg2_module=psycopg2):
|
||||
super().__init__(node_config, log, psycopg2_module)
|
||||
self.connected = False
|
||||
self.node_config = node_config
|
||||
|
||||
def connect(self):
|
||||
self.connected = True
|
||||
|
||||
def disconnect(self):
|
||||
self.connected = False
|
||||
|
||||
def get_replication_status(self):
|
||||
self.connect()
|
||||
if (not hasattr(self.node_config, "stub_replication_statuses") or
|
||||
not self.node_config.stub_replication_statuses):
|
||||
raise Exception(
|
||||
"Node %s misses stub_replication_statuses" % __class__)
|
||||
response = self.node_config.stub_replication_statuses.pop(0)
|
||||
if isinstance(response, Exception):
|
||||
raise response
|
||||
return response
|
|
@ -0,0 +1,185 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import unittest
|
||||
from pgbouncemgr.constants import *
|
||||
from pgbouncemgr.logger import *
|
||||
from pgbouncemgr.node_poller import *
|
||||
from pgbouncemgr.postgres import *
|
||||
from pgbouncemgr.state import *
|
||||
from tests.stub_postgres import *
|
||||
|
||||
|
||||
class NodePollerTests(unittest.TestCase):
|
||||
def test_GivenNoNodes_NothingHappens(self):
|
||||
log = Logger()
|
||||
state = State(log)
|
||||
poller = NodePoller(state, log)
|
||||
poller.poll()
|
||||
|
||||
def test_ConnectionsAreReused(self):
|
||||
log = Logger()
|
||||
state = State(log)
|
||||
node1_config = NodeConfig(1)
|
||||
node1_config.stub_replication_statuses = [
|
||||
{"status": NODE_PRIMARY, "system_id": "A", "timeline_id": 10},
|
||||
{"status": NODE_PRIMARY, "system_id": "A", "timeline_id": 10}]
|
||||
state.add_node(node1_config)
|
||||
node2_config = NodeConfig(2)
|
||||
node2_config.stub_replication_statuses = [
|
||||
{"status": NODE_STANDBY, "system_id": "A", "timeline_id": 9},
|
||||
{"status": NODE_STANDBY, "system_id": "A", "timeline_id": 9}]
|
||||
state.add_node(node2_config)
|
||||
poller = NodePoller(state, log, StubPgReplicationConnection)
|
||||
|
||||
self.assertEqual({}, poller._connections)
|
||||
poller.poll()
|
||||
self.assertEqual(2, len(poller._connections))
|
||||
conn1 = poller._connections[1]
|
||||
conn2 = poller._connections[2]
|
||||
conn1.connected = False
|
||||
conn1.connected = False
|
||||
poller.poll()
|
||||
self.assertEqual(conn1, poller._connections[1])
|
||||
self.assertTrue(conn1.connected)
|
||||
self.assertEqual(conn2, poller._connections[2])
|
||||
self.assertTrue(conn2.connected)
|
||||
|
||||
|
||||
def test_GivenSingleNode_NodeIsPolledAndStateIsUpdated(self):
|
||||
log = Logger()
|
||||
state = State(log)
|
||||
node1_config = NodeConfig(1)
|
||||
node1_config.stub_replication_statuses = [{
|
||||
"status": NODE_PRIMARY,
|
||||
"system_id": "A",
|
||||
"timeline_id": 10}]
|
||||
state.add_node(node1_config)
|
||||
state.modified = False
|
||||
poller = NodePoller(state, log, StubPgReplicationConnection)
|
||||
|
||||
poller.poll()
|
||||
self.assertTrue(state.modified)
|
||||
self.assertEqual(NODE_PRIMARY, state.nodes[1].status)
|
||||
self.assertEqual("A", state.nodes[1].system_id)
|
||||
self.assertEqual(10, state.nodes[1].timeline_id)
|
||||
|
||||
def test_GivenMultipleNodes_NodesArePolledAndStateIsUpdated(self):
|
||||
log = Logger()
|
||||
state = State(log)
|
||||
node1_config = NodeConfig(1)
|
||||
node1_config.stub_replication_statuses = [{
|
||||
"status": NODE_PRIMARY,
|
||||
"system_id": "A",
|
||||
"timeline_id": 10}]
|
||||
state.add_node(node1_config)
|
||||
node2_config = NodeConfig(2)
|
||||
node2_config.stub_replication_statuses = [{
|
||||
"status": NODE_STANDBY,
|
||||
"system_id": "B",
|
||||
"timeline_id": 11}]
|
||||
state.add_node(node2_config)
|
||||
state.modified = False
|
||||
poller = NodePoller(state, log, StubPgReplicationConnection)
|
||||
|
||||
poller.poll()
|
||||
self.assertTrue(state.modified)
|
||||
self.assertEqual(NODE_PRIMARY, state.nodes[1].status)
|
||||
self.assertEqual("A", state.nodes[1].system_id)
|
||||
self.assertEqual(10, state.nodes[1].timeline_id)
|
||||
self.assertEqual(NODE_STANDBY, state.nodes[2].status)
|
||||
self.assertEqual("B", state.nodes[2].system_id)
|
||||
self.assertEqual(11, state.nodes[2].timeline_id)
|
||||
|
||||
def test_GivenNodeWithPollError_NodeIsMarkedOffline(self):
|
||||
log = Logger()
|
||||
state = State(log)
|
||||
node1_config = NodeConfig(1)
|
||||
node1_config.stub_replication_statuses = [PgException("Breakage!")]
|
||||
state.add_node(node1_config)
|
||||
node2_config = NodeConfig(2)
|
||||
node2_config.stub_replication_statuses = [{
|
||||
"status": NODE_PRIMARY,
|
||||
"system_id": "C",
|
||||
"timeline_id": 12}]
|
||||
state.add_node(node2_config)
|
||||
state.modified = False
|
||||
poller = NodePoller(state, log, StubPgReplicationConnection)
|
||||
|
||||
poller.poll()
|
||||
self.assertTrue(state.modified)
|
||||
self.assertEqual(NODE_OFFLINE, state.nodes[1].status)
|
||||
self.assertEqual(None, state.nodes[1].system_id)
|
||||
self.assertEqual(None, state.nodes[1].timeline_id)
|
||||
self.assertEqual(NODE_PRIMARY, state.nodes[2].status)
|
||||
self.assertEqual("C", state.nodes[2].system_id)
|
||||
self.assertEqual(12, state.nodes[2].timeline_id)
|
||||
|
||||
def test_GivenNodeWithNewStatus_StateIsUpdated(self):
|
||||
log = Logger()
|
||||
state = State(log)
|
||||
node1_config = NodeConfig(1)
|
||||
node1_config.stub_replication_statuses = [
|
||||
{"status": NODE_STANDBY, "system_id": "A", "timeline_id": 14},
|
||||
{"status": NODE_PRIMARY, "system_id": "B", "timeline_id": 15}]
|
||||
state.add_node(node1_config)
|
||||
state.modified = False
|
||||
poller = NodePoller(state, log, StubPgReplicationConnection)
|
||||
|
||||
poller.poll()
|
||||
self.assertTrue(state.modified)
|
||||
self.assertEqual(NODE_STANDBY, state.nodes[1].status)
|
||||
self.assertEqual("A", state.nodes[1].system_id)
|
||||
self.assertEqual(14, state.nodes[1].timeline_id)
|
||||
state.modified = False
|
||||
poller.poll()
|
||||
self.assertTrue(state.modified)
|
||||
self.assertEqual(NODE_PRIMARY, state.nodes[1].status)
|
||||
self.assertEqual("B", state.nodes[1].system_id)
|
||||
self.assertEqual(15, state.nodes[1].timeline_id)
|
||||
|
||||
def test_GivenNodeWithPollError_PreviousSystemAndReplicationIdAreKept(self):
|
||||
log = Logger()
|
||||
state = State(log)
|
||||
node1_config = NodeConfig(1)
|
||||
node1_config.stub_replication_statuses = [
|
||||
{"status": NODE_PRIMARY, "system_id": "A", "timeline_id": 14},
|
||||
PgException("Breakage!")]
|
||||
state.add_node(node1_config)
|
||||
state.modified = False
|
||||
poller = NodePoller(state, log, StubPgReplicationConnection)
|
||||
|
||||
poller.poll()
|
||||
self.assertTrue(state.modified)
|
||||
self.assertEqual(NODE_PRIMARY, state.nodes[1].status)
|
||||
self.assertEqual("A", state.nodes[1].system_id)
|
||||
self.assertEqual(14, state.nodes[1].timeline_id)
|
||||
state.modified = False
|
||||
poller.poll()
|
||||
self.assertTrue(state.modified)
|
||||
self.assertEqual(NODE_OFFLINE, state.nodes[1].status)
|
||||
self.assertEqual("A", state.nodes[1].system_id)
|
||||
self.assertEqual(14, state.nodes[1].timeline_id)
|
||||
|
||||
def test_GivenNodeWithResultOffline_PreviousSystemAndReplicationIdAreKept(self):
|
||||
log = Logger()
|
||||
state = State(log)
|
||||
node1_config = NodeConfig(1)
|
||||
node1_config.stub_replication_statuses = [
|
||||
{"status": NODE_PRIMARY, "system_id": "A", "timeline_id": 14},
|
||||
{"status": NODE_OFFLINE, "system_id": None, "timeline_id": None}]
|
||||
state.add_node(node1_config)
|
||||
state.modified = False
|
||||
poller = NodePoller(state, log, StubPgReplicationConnection)
|
||||
|
||||
poller.poll()
|
||||
self.assertTrue(state.modified)
|
||||
self.assertEqual(NODE_PRIMARY, state.nodes[1].status)
|
||||
self.assertEqual("A", state.nodes[1].system_id)
|
||||
self.assertEqual(14, state.nodes[1].timeline_id)
|
||||
state.modified = False
|
||||
poller.poll()
|
||||
self.assertTrue(state.modified)
|
||||
self.assertEqual(NODE_OFFLINE, state.nodes[1].status)
|
||||
self.assertEqual("A", state.nodes[1].system_id)
|
||||
self.assertEqual(14, state.nodes[1].timeline_id)
|
||||
|
Loading…
Reference in New Issue