commit a8e930e078c541a533db8a889ac06d379fff2599 Author: Maurice Makaay Date: Tue Nov 26 15:11:33 2019 +0100 first commit diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d366f79 --- /dev/null +++ b/Makefile @@ -0,0 +1,61 @@ +all: build + +lint: + pylint3 pgbouncemgr/*.py + +pytest: + python3 setup.py test + +test: + python3 -m unittest + +run: + PYTHONPATH=./ python3 pgbouncemgr/manager.py + +run-d: + PYTHONPATH=./ python3 pgbouncemgr/manager.py -d + +run-v: + PYTHONPATH=./ python3 pgbouncemgr/manager.py -v + +run-h: + PYTHONPATH=./ python3 pgbouncemgr/manager.py -h + +clean: + @/bin/rm -fR dist + @/bin/rm -fR deb_dist + @/bin/rm -fR build + @/bin/rm -fR *.egg-info + @/bin/rm -f python3-pgbouncemgr-*.tar.gz + @/bin/rm -fR */__pycache__ + +dist-clean: clean + @/bin/rm -fR builds/* + +runtime-deps: + apt-get install python3 python3-psycopg2 pgbouncer + # Create the default directory for the state file. + mkdir -p /var/lib/pgbouncemgr + chown -hR postgres:postgres /var/lib/pgbouncemgr + chmod 770 /var/lib/pgbouncemgr + chmod 660 /var/lib/pgbouncemgr/* + +build-deps: runtime-deps + # I had to add python-all, even though we're building for + # python3. The sstdeb build at some point searches for + # that package. Without the package, the build will crash. + apt-get install \ + pylint3 \ + python3-setuptools \ + python3-stdeb \ + python-all + +build: clean lint test + python3 setup.py --command-packages=stdeb.command bdist_deb + mkdir -p builds + /bin/rm -f builds/*.deb + mv deb_dist/*.deb builds/ + make clean > /dev/null + +upload: build + scp builds/*.deb mmakaay1@dpkg.xs4all.net: diff --git a/pgbouncemgr/__pycache__/node_config.cpython-36.pyc b/pgbouncemgr/__pycache__/node_config.cpython-36.pyc new file mode 100644 index 0000000..346d10b Binary files /dev/null and b/pgbouncemgr/__pycache__/node_config.cpython-36.pyc differ diff --git a/pgbouncemgr/__pycache__/state.cpython-36.pyc b/pgbouncemgr/__pycache__/state.cpython-36.pyc new file mode 100644 index 0000000..6cb6835 Binary files /dev/null and b/pgbouncemgr/__pycache__/state.cpython-36.pyc differ diff --git a/pgbouncemgr/manager.py b/pgbouncemgr/manager.py new file mode 100644 index 0000000..10c1fc6 --- /dev/null +++ b/pgbouncemgr/manager.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +"""The manager implements the main process that keeps track of changes in the + PostgreSQL cluster and that reconfigures pgbouncer when needed.""" + +from argparse import ArgumentParser + + +DEFAULT_CONFIG = "/etc/pgbouncer/pgbouncemgr.yaml" +DEFAULT_LOG_FACILITY = "LOG_LOCAL1" + + +def main(): + """Starts the pgbouncemgr main application.""" + args = _parse_arguments() + print(repr(args)) + + +def _parse_arguments(): + parser = ArgumentParser(description="pgbouncemgr") + parser.add_argument( + "-v", "--verbose", + default=False, action="store_true", + help="enable verbose output (default: disabled)") + parser.add_argument( + "-d", "--debug", + default=False, action="store_true", + help="enable debugging output (default: disabled)") + parser.add_argument( + "-f", "--log-facility", + default=DEFAULT_LOG_FACILITY, + help="syslog facility to use (default: %s)" % DEFAULT_LOG_FACILITY) + parser.add_argument( + "--config", + default=DEFAULT_CONFIG, + help="config file (default: %s)" % DEFAULT_CONFIG) + args = parser.parse_args() + return args + + +if __name__ == "__main__": + main() diff --git a/pgbouncemgr/node_config.py b/pgbouncemgr/node_config.py new file mode 100644 index 0000000..0050d3c --- /dev/null +++ b/pgbouncemgr/node_config.py @@ -0,0 +1,16 @@ +class NodeConfig(): + def __init__(self, node_id): + self.node_id = node_id + self.pgbouncer_config = None + self.host = None + self.port = None + + def set_pgbouncer_config(self, path): + self.pgbouncer_config = path + + def export(self): + return { + "pgbouncer_config": self.pgbouncer_config, + "host": self.host, + "port": self.port + } diff --git a/pgbouncemgr/state.py b/pgbouncemgr/state.py new file mode 100644 index 0000000..4dfe731 --- /dev/null +++ b/pgbouncemgr/state.py @@ -0,0 +1,355 @@ +# -*- coding: utf-8 -*- +# no-pylint: disable=missing-docstring,broad-except,too-many-instance-attributes + +from pgbouncemgr.node_config import NodeConfig + + +LEADER_UNKNOWN = "LEADER_UNKNOWN" +LEADER_CONNECTED = "LEADER_CONNECTED" +LEADER_DISCONNECTED = "LEADER_DISCONNECTED" +LEADER_STATUSES = [LEADER_UNKNOWN, LEADER_CONNECTED, LEADER_DISCONNECTED] + +NODE_UNKNOWN = "NODE_UNKNOWN" +NODE_OFFLINE = "NODE_OFFLINE" +NODE_PRIMARY = "NODE_PRIMARY" +NODE_STANDBY = "NODE_STANDBY" +NODE_STATUSES = [NODE_UNKNOWN, NODE_OFFLINE, NODE_PRIMARY, NODE_STANDBY] + + +class StateException(Exception): + """Used for all exceptions that are raised from pgbouncemgr.state.""" + +class DuplicateNodeAdded(StateException): + """Raised when a new node is added to the state using a node_id that + already exists in the contained nodes.""" + def __init__(self, node_id): + super().__init__( + "Node already exists in state (duplicate node_id=%s)" % node_id) + +class UnknownNodeRequested(StateException): + """Raised when a request is made for a node that is not contained + by the State.""" + def __init__(self, node_id): + super().__init__( + "Unknown node requested from state (unknown node_id=%s)" % + node_id) + +class NodeCannotBePromoted(StateException): + """Raised when an attempt is made to promote a node, which cannot + (yet) be promoted.""" + def __init__(self, node, reason): + super().__init__( + "Node '%s' cannot be promoted: %s" % (node.node_id, reason)) + +class InvalidSystemId(StateException): + """Raised when an invalid system_id is used.""" + def __init__(self, invalid): + display = "None" if invalid is None else "'%s'" % invalid + super().__init__( + "Invalid system_id provided; it must be a non-empty string " + + "(invalid system_id=%s)" % display) + +class SystemIdChanged(StateException): + """Raised when an attempt is made to change an already set system_id + for the state. This is not allowed, since it would mean that + pgboucer would be connected to a totally different dataset and + not a replica of the currently connected dataset.""" + def __init__(self, old_id, new_id): + super().__init__( + "The system_id cannot be changed (from=%s, to=%s)" % + (old_id, new_id)) + +class InvalidTimelineId(StateException): + """Raised when an invalid timeline_id is used. The timeline_id must + be an integer value or a value that can be case to an integer.""" + def __init__(self, timeline_id): + super().__init__( + "Invalid timeline_id provided; it must be an integer or a " + + "value that can be cast to an integer (invalid value=%s)" % + timeline_id) + +class TimelineIdLowered(StateException): + """Raised when an attempt is made to lower the timeline_id. + The timeline_id indicates the version of our dataset and decrementing + this timeline_id would mean that we connect the pgbouncer to a + dataset that is older than the currently connected dataset.""" + def __init__(self, old_id, new_id): + super().__init__( + "The timeline_id cannot be lowered (from=%d, to=%d)" % + (old_id, new_id)) + +class InvalidLeaderStatus(StateException): + """Raised when leader_status is set to an invalid value.""" + def __init__(self, status): + super().__init__( + ("The leader_status cannot be set to '%s' " + + "(valid values are: %s)") % (status, ", ".join(LEADER_STATUSES))) + +class InvalidNodeStatus(StateException): + """Raised when the status for a node is set to an invalid value.""" + def __init__(self, status): + super().__init__( + ("The node status cannot be set to '%s' " + + "(valid values are: %s)") % (status, ", ".join(NODE_STATUSES))) + +class State(): + def __init__(self): + self.modified = False + self._system_id = None + self._timeline_id = None + self._pgbouncer_config = None + self._leader_node_id = None + self._leader_error = None + self._leader_status = LEADER_UNKNOWN + self.nodes = {} + + def is_clean_slate(self): + """Returns True when the state does not yet contain a PostgreSQL system_id.""" + return self.system_id is None + + @property + def system_id(self): + return self._system_id + + @system_id.setter + def system_id(self, system_id): + """Sets the PostgreSQL system_id in the state. This is the system_id + that the pgbouncemgr will use as the valid system_id to + connect pgbouncer to.""" + if self._system_id is not None and self._system_id != system_id: + raise SystemIdChanged(self._system_id, system_id) + if system_id is None or system_id.strip() == "": + raise InvalidSystemId(system_id) + if self._system_id == system_id: + return + self.modified = True + self._system_id = system_id + + @property + def timeline_id(self): + return self._timeline_id + + @timeline_id.setter + def timeline_id(self, timeline_id): + """Sets the PostgreSQL timeline_id in the state. This is the + timeline_id of the currently connected dataset.""" + try: + timeline_id = int(timeline_id) + except ValueError: + raise InvalidTimelineId(timeline_id) + if self._timeline_id is not None and timeline_id < self._timeline_id: + raise TimelineIdLowered(self._timeline_id, timeline_id) + if self._timeline_id == timeline_id: + return + self.modified = True + self._timeline_id = timeline_id + + def add_node(self, node): + """Add a node to the state. Node can be a NodeConfig object or + a node_id. In case a node_id is provided, an NodeConfig object + will be created automatically.""" + if not isinstance(node, NodeConfig): + node = NodeConfig(node) + if node.node_id in self.nodes: + raise DuplicateNodeAdded(node.node_id) + node_state = NodeState(node, self) + self.nodes[node.node_id] = node_state + self.modified = True + return node_state + + def get_node(self, node_id): + """Retrieve a node from the state, identified by the provided + node_id.""" + if node_id not in self.nodes: + raise UnknownNodeRequested(node_id) + return self.nodes[node_id] + + def promote_node(self, node): + """Sets the provided node as the selected leader node for the + cluster. The leader status is reset to LEADER_UNKNOWN and should + after this be updated. + + The system_id, timeline_id and pgbouncer_config for the State + object are inherted from the provided leader node, and must + therefore be set before calling this method.""" + node = self.get_node(node.node_id) + if node.system_id is None: + raise NodeCannotBePromoted(node, "the node has no system_id") + if node.timeline_id is None: + raise NodeCannotBePromoted(node, "the node has no timeline_id") + if node.config.pgbouncer_config is None: + raise NodeCannotBePromoted(node, "the node has no pgbouncer_config") + if self._leader_node_id != node.node_id: + self._leader_node_id = node.node_id + self.modified = True + self.leader_status = LEADER_UNKNOWN + self.leader_error = None + self.system_id = node.system_id + self.timeline_id = node.timeline_id + self.pgbouncer_config = node.config.pgbouncer_config + + @property + def leader_node_id(self): + return self._leader_node_id + + @property + def leader_status(self): + return self._leader_status + + @leader_status.setter + def leader_status(self, status): + """Sets the pgbouncer connection status for the current leader + cluster node. Possible statuses are LEADER_UNKNOWN, + LEADER_CONNECTED and LEADER_DISCONNECTED.""" + if self._leader_status == status: + return + if status not in LEADER_STATUSES: + raise InvalidLeaderStatus(status) + self._leader_status = status + self.modified = True + + @property + def leader_error(self): + return self._leader_error + + @leader_error.setter + def leader_error(self, err): + """Sets the pgbouncer connection error for the current leader + cluster node. This error is used to inform about problems that + keep pgbouncer from serving the current leader cluster node.""" + if self._leader_error == err: + return + self._leader_error = err + self.modified = True + + @property + def pgbouncer_config(self): + return self._pgbouncer_config + + @pgbouncer_config.setter + def pgbouncer_config(self, pgbouncer_config): + """Sets the pgbouncer configuration file that must be activated + in order to connect pgbouncer to the correct backend + PostgreSQL server.""" + if self._pgbouncer_config == pgbouncer_config: + return + self._pgbouncer_config = pgbouncer_config + self.modified = True + + def export(self): + """Exports the data for the state that is to be stored in the + state storage.""" + return { + "system_id": self.system_id, + "timeline_id": self.timeline_id, + "pgbouncer_config": self.pgbouncer_config, + "leader_node_id": self.leader_node_id, + "leader_status": self.leader_status, + "leader_error": self.leader_error, + "nodes": dict((i, n.export()) for i, n in self.nodes.items()) + } + + +class NodeState(): + """This class encapsulates the information for a single node in a + PostgreSQL cluster.""" + + def __init__(self, config, parent_state): + self.node_id = config.node_id + self.config = config + self.parent_state = parent_state + self._system_id = None + self._timeline_id = None + self.status = NODE_UNKNOWN + self.err = None + + def reset(self): + """Reset the data for the node.""" + self.system_id = None + self.timeline_id = None + self.status = NODE_UNKNOWN + self.err = None + self.notify_parent() + + def notify_parent(self): + self.parent_state.modified = True + + @property + def system_id(self): + return self._system_id + + @system_id.setter + def system_id(self, system_id): + """Sets the PostgreSQL system_id, which is a cluster-wide identifier for + the database cluster data. When a PostgreSQL server uses a different + system_id than our cluster, then it does not serve the same dataset. + Possibly is was reinstalled or simply a new cluster which has not yet + been synced to the other cluster hosts.""" + if system_id is None or system_id.strip() == "": + raise InvalidSystemId(system_id) + if self._system_id == system_id: + return + self._system_id = system_id + self.notify_parent() + + @property + def timeline_id(self): + return self._timeline_id + + @timeline_id.setter + def timeline_id(self, timeline_id): + """Sets the PostgreSQL timeline_id, which is an identifier for the + version of the dataset that is served by a PostegreSQL server. + This version is incremented when a PIT recovery is done, or when a + database server is promoted to act as a primary server. + In pgbouncemgr, this timeline_id is used to make sure that we never + accidentally activate an old version of the dataset to be the new + primary dataset.""" + try: + timeline_id = int(timeline_id) + except ValueError: + raise InvalidTimelineId(timeline_id) + if self._timeline_id == timeline_id: + return + self._timeline_id = timeline_id + self.notify_parent() + + def set_status(self, status): + """Set the connection status for the node. This is used to indicate + whether or not a connection can be setup to the node from the + pgbouncemgr application and if the node is running in primary + or fallback mode. Possible values are: NODE_UNKNOWN, NODE_OFFLINE, + NODE_PRIMARY and NODE_STANDBY.""" + if self.status == status: + return + if status not in NODE_STATUSES: + raise InvalidNodeStatus(status) + self.status = status + self.notify_parent() + + def set_error(self, err): + """When there is some problem with this node, this method can be used + to set an error message for it, explaining the problem.""" + if self.err == err: + return + self.err = err + self.notify_parent() + + def promote(self): + """Promote this node to be the leader of the cluster. + This means that pgbouncer will be reconfigured to make use of + this node as the primary backend.""" + self.parent_state.promote_node(self) + + def export(self): + """Exports the data for the node that is to be stored in the + state storage.""" + return { + "node_id": self.node_id, + "config": self.config.export(), + "system_id": self.system_id, + "timeline_id": self.timeline_id, + "is_leader": self.parent_state._leader_node_id == self.node_id, + "status": self.status, + "error": self.err, + } diff --git a/pgbouncemgr/state_store.py b/pgbouncemgr/state_store.py new file mode 100644 index 0000000..f2b1fbc --- /dev/null +++ b/pgbouncemgr/state_store.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- +# no-pylint: disable=missing-docstring,broad-except,too-many-instance-attributes + +import json +from datetime import datetime +from os import rename +from os.path import isfile +from pgbouncemgr.log import format_exception_message + +STORE_ERROR = "error" +STORE_UPDATED = "updated" +STORE_NOCHANGE = "nochange" + +class State(): + system_id = None + timeline_id = None + pgbouncer_config = None + primary_node = None + primary_connected = False + primary_error = None + err = None + old_state = None + + def __init__(self, path): + self.path = path + self.load() + + def load(self): + """Load the state from the state file. + When this fails, an exception will be thrown.""" + # When no state file exists, we start with a clean slate. + if not isfile(self.path): + return + + # Otherwise, read the state from the state file. + with open(self.path, 'r') as stream: + try: + state = json.load(stream) + except json.decoder.JSONDecodeError: + # JSON decoding failed. This is beyond repair. + # Start with a clean slate to have the state data reinitialized. + return + + # Copy the state over to this state object. + self.system_id = state["system_id"] + self.timeline_id = state["timeline_id"] + self.pgbouncer_config = state["pgbouncer_config"] + self.primary_node = state["primary_node"] + + # The folowing properties are always filled dynamically by the manager. + # These are not restored from the state file on startup. + self.primary_connected = False + self.nodes = {} + + def is_clean_slate(self): + return self.system_id is None + + def set_primary_connected(self, connected, err=None): + self.primary_connected = connected + self.primary_error = err + + def store(self, nodes): + """Store the current state in the state file. + Returns True when the state was stored successfully. + Returns False when it failed. The error can be found in the err property.""" + stored_state = { + "system_id": self.system_id, + "timeline_id": self.timeline_id, + "pgbouncer_config": self.pgbouncer_config, + "primary_node": self.primary_node, + "primary_connected": self.primary_connected, + "primary_error": self.primary_error, + "nodes": dict((n.name, { + "host": n.conn_params["host"], + "port": n.conn_params["port"], + "status": n.status, + "system_id": n.system_id, + "timeline_id": n.timeline_id, + "error": n.err}) for n in nodes) + } + + # When the state has not changed, then don't write out a new state. + new_state = json.dumps(stored_state, sort_keys=True, indent=2) + if self.old_state and self.old_state == new_state: + return STORE_NOCHANGE + self.old_state = new_state + + try: + self.err = None + swap_path = "%s..SWAP" % self.path + with open(swap_path, "w") as file_handle: + print(new_state, file=file_handle) + rename(swap_path, self.path) + return STORE_UPDATED + except Exception as exception: + self.err = "Storing state to file (%s) failed: %s" % ( + self.path, format_exception_message(exception)) + return STORE_ERROR diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..ba6d6c8 --- /dev/null +++ b/setup.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +""" +pgbouncemgr +=========== + +PostgreSQL has no support for a multi-primary HA setup. Instead, there is +always a single primary server, which can be used for reading and writing, +and one or more replicate servers, which can only be used for reading. +Updates on the primary server are streamed to the connected replica servers. + +Therefore, when a client must connect to a PostgreSQL HA setup, it needs to +know what node to connect to (i.e. the currently active primary node). +Ideally, it has no knowledge about this, letting the client always connect +to the same endpoint. + +This is where tools like haproxy and pgbouncer come in (possibly themselves +in an HA setup using automatic failover based on keepalived or one of its +friends). The client can connect to haproxy or bgbouncer, and those can be +configured to send the connection to the correct PostgreSQL node. + +And this is where pgbouncemgr comes in. It actively monitors a PostgreSQL HA +cluster. Based on the monitoring data, it decides to what node clients must +connect and (re)configures pgbouncer to make that happen. +""" + +import os +import re +from time import time +from setuptools import setup + + +def _get_version(): + path = os.path.abspath(".") + pkginit = os.path.join(path, "pgbouncemgr", "__init__.py") + with open(pkginit, "r") as fh: + for line in fh.readlines(): + m = re.match(r"^__version__\s*=\s*\"(.*)\"", line) + if (m is not None): + version = m.groups()[0] + return "%s.%d" % (version, time()) + raise Exception("Unable to read version from %s" % pkginit) + + +setup( + name="pgbouncemgr", + version=_get_version(), + url="https://github.xs4all.net/XS4ALL/xs4all-pgbouncemgr", + license="BSD", + author="Maurice Makaay", + author_email="mmakaay1@xs4all.net", + description="Automatic (re)configuration of a pgbouncer server for a PostgreSQL HA cluster", + long_description=__doc__, + packages=[ + "pgbouncemgr" + ], + entry_points = { + 'console_scripts': [ + 'pgbouncemgr=pgbouncemgr.manager:main'] + }, + data_files=[ + ("/etc/systemd/system", ["etc/pgbouncemgr.service"]) + ], + zip_safe=False, + include_package_data=True, + platforms="any", + test_suite="tests.suite" +) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..8ed8c31 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,7 @@ +# -*- coding: utf-8 -*- + +import unittest + +def suite(): + test_loader = unittest.TestLoader() + return test_loader.discover('tests') diff --git a/tests/__pycache__/__init__.cpython-36.pyc b/tests/__pycache__/__init__.cpython-36.pyc new file mode 100644 index 0000000..953bf6d Binary files /dev/null and b/tests/__pycache__/__init__.cpython-36.pyc differ diff --git a/tests/__pycache__/test_node_config.cpython-36.pyc b/tests/__pycache__/test_node_config.cpython-36.pyc new file mode 100644 index 0000000..7da08f4 Binary files /dev/null and b/tests/__pycache__/test_node_config.cpython-36.pyc differ diff --git a/tests/__pycache__/test_state.cpython-36.pyc b/tests/__pycache__/test_state.cpython-36.pyc new file mode 100644 index 0000000..95bbc03 Binary files /dev/null and b/tests/__pycache__/test_state.cpython-36.pyc differ diff --git a/tests/stubs.py b/tests/stubs.py new file mode 100644 index 0000000..fd72235 --- /dev/null +++ b/tests/stubs.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- + +from pgbouncer.manager import DEFAULT_CONFIG, DEFAULT_LOG_FACILITY + + +class ArgsStub(): + config = DEFAULT_CONFIG + log_facility = DEFAULT_LOG_FACILITY + verbose = False + debug = False + + +class LoggerStub(): + def __init(self, ident, facility, debug, verbose): + pass + + def debug(self, msg): + pass + + def info(self, msg): + pass + + def warning(self, msg): + pass + + def error(self, msg): + pass + + def format_ex(self, exception): + return "FMTEX" + + +class StateStoreStub(): + old_state = None + test_with_nr_of_errors = 0 + test_with_state = None + + def load(self) + state = State() + + def store(self, state): + new_state = json.dumps(state_to_store, sort_keys=True, indent=2) + if self.old_state and self.old_state == new_state: + return STORE_NOCHANGE + sel.old_state = new_state + + if test_with_nr_of_errors: + test_with_nr_of_errors -= 1 + self.err = "Storing state failed: just testing" + return STORE_ERROR + + return STORE_UPDATED + + diff --git a/tests/test_node_config.py b/tests/test_node_config.py new file mode 100644 index 0000000..b60212d --- /dev/null +++ b/tests/test_node_config.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- + +import unittest +from pgbouncemgr.node_config import * + + +class NodeConfigTests(unittest.TestCase): + def test_CreateNewNodeConfig(self): + config = NodeConfig("test") + + self.assertEqual("test", config.node_id) + self.assertEqual(None, config.pgbouncer_config) + self.assertEqual(None, config.host) + self.assertEqual(None, config.port) + + def test_SetPgbouncerConfig_UpdatesConfig(self): + config = NodeConfig("test") + config.set_pgbouncer_config("/path/to/config") + diff --git a/tests/test_state.py b/tests/test_state.py new file mode 100644 index 0000000..7f0deae --- /dev/null +++ b/tests/test_state.py @@ -0,0 +1,464 @@ +# -*- coding: utf-8 -*- + +import unittest +from pgbouncemgr.state import * + + +class StateTests(unittest.TestCase): + def test_GivenFreshState_DefaultsAreSetCorrectly(self): + state = State() + self.assertIsNone(state.system_id) + self.assertIsNone(state.timeline_id) + self.assertIsNone(state.pgbouncer_config) + self.assertIsNone(state.leader_node_id) + self.assertIsNone(state.leader_error) + self.assertEqual(LEADER_UNKNOWN, state.leader_status) + self.assertEqual({}, state.nodes) + self.assertFalse(state.modified) + + def test_GivenFreshState_ModifiedFlagIsNotSet(self): + state = State() + + self.assertFalse(state.modified) + + def test_GivenFreshState_IsCleanSlate_ReturnsTrue(self): + state = State() + + self.assertTrue(state.is_clean_slate()) + + def test_GivenStateWithSystemIdAssigned_IsCleanSlate_ReturnsFalse(self): + state = State() + state.system_id = "whatever" + + self.assertFalse(state.is_clean_slate()) + + def test_GivenFreshState_SetSystemId_SetsSystemId(self): + state = State() + state.system_id = "booya" + + self.assertEqual("booya", state.system_id) + + def test_GivenStateWithSystemId_SetSameSystemId_IsOk(self): + state = State() + state.system_id = "booya" + state.modified = False + state.system_id = "booya" + + self.assertFalse(state.modified) + + def test_GivenStateWithSystemId_SetDifferentSystemId_RaisesException(self): + state = State() + state.system_id = "booya" + state.modified = False + + with self.assertRaises(SystemIdChanged) as context: + state.system_id = "baayo" + self.assertIn("from=booya, to=baayo", str(context.exception)) + + def test_GivenFreshState_SetTimelineId_SetsTimelineId(self): + state = State() + state.timeline_id = 666 + + self.assertEqual(666, state.timeline_id) + self.assertTrue(state.modified) + + def test_WithNonIntegerInput_SetTimelineId_RaisesException(self): + state = State() + with self.assertRaises(InvalidTimelineId) as context: + state.timeline_id = "one hundred" + self.assertIn("invalid value=one hundred", str(context.exception)) + + def test_GivenStateWithTimelineId_SetLowerTimelineId_RaisesException(self): + state = State() + state.timeline_id = "50" + + with self.assertRaises(TimelineIdLowered) as context: + state.timeline_id = 49 + self.assertIn("(from=50, to=49)", str(context.exception)) + + def test_GivenStateWithTimelineId_SetSameTimelineId_IsOk(self): + state = State() + state.timeline_id = 50 + state.modified = False + + state.timeline_id = "50" + + self.assertFalse(state.modified) + + def test_GivenStateWithTimelineId_SetHigherTimelineId_IsOk(self): + state = State() + state.timeline_id = 50 + state.modified = False + + state.timeline_id = "51" + + self.assertEqual(51, state.timeline_id) + self.assertTrue(state.modified) + + def test_SetLeaderStatus_SetsLeaderStatus(self): + state = State() + state.modified = False + + state.leader_status = LEADER_CONNECTED + + self.assertEqual(LEADER_CONNECTED, state.leader_status) + self.assertTrue(state.modified) + + def test_SetLeaderStatus_ToNonExistentStatus_RaisesException(self): + state = State() + state.modified = False + + with self.assertRaises(InvalidLeaderStatus) as context: + state.leader_status = "gosh" + self.assertIn("'gosh'", str(context.exception)) + + def test_SetLeaderError_SetsLeaderError(self): + state = State() + state.modified = False + + state.leader_error = "God disappeared in a puff of logic" + + self.assertEqual("God disappeared in a puff of logic", state.leader_error) + self.assertTrue(state.modified) + + state.modified = False + state.leader_error = None + + self.assertEqual(None, state.leader_error) + self.assertTrue(state.modified) + + +class NodeCollectionTests(unittest.TestCase): + def test_WithNodeNotYetInState_AddNode_AddsNodeState(self): + state = State() + node = state.add_node(1) + + self.assertEqual(1, node.node_id) + self.assertTrue(state.modified) + + def test_WithNodeAlreadyInState_AddNode_RaisesException(self): + state = State() + state.add_node(123) + + with self.assertRaises(DuplicateNodeAdded) as context: + state.add_node(123) + self.assertIn("duplicate node_id=123", str(context.exception)) + + def test_WithNodeNotInState_getNode_RaisesException(self): + state = State() + + with self.assertRaises(UnknownNodeRequested): + state.get_node("that does not exist") + + def test_WithNodeInState_getNode_ReturnsNode(self): + state = State() + node = state.add_node("that does exist") + + node_from_state = state.get_node("that does exist") + + self.assertEqual(node, node_from_state) + + def test_WithUnknownNode_SetLeaderNode_RaisesException(self): + state1 = State() + state2 = State() + node = state2.add_node(1337) + with self.assertRaises(UnknownNodeRequested) as context: + state1.promote_node(node) + self.assertIn("unknown node_id=1337", str(context.exception)) + + def test_WithNodeWithoutSystemId_SetLeaderNode_RaisesException(self): + state = State() + node = state.add_node(1) + node.timeline_id = 1 + node.config.pgbouncer_config = "/some/path/to/config" + with self.assertRaises(NodeCannotBePromoted) as context: + node.promote() + self.assertIn("Node '1'", str(context.exception)) + self.assertIn("node has no system_id", str(context.exception)) + + def test_WithNodeWithoutTimelineId_SetLeaderNode_RaisesException(self): + state = State() + node = state.add_node(1) + node.system_id = "system" + node.config.pgbouncer_config = "/some/path/to/config" + with self.assertRaises(NodeCannotBePromoted) as context: + node.promote() + self.assertIn("Node '1'", str(context.exception)) + self.assertIn("node has no timeline_id", str(context.exception)) + + def test_WithNodeWithoutPgbouncerConfig_SetLeaderNode_RaisesException(self): + state = State() + node = state.add_node(1) + node.system_id = "a7d8a9347df789saghdfs" + node.timeline_id = 11111111111 + with self.assertRaises(NodeCannotBePromoted) as context: + node.promote() + self.assertIn("Node '1'", str(context.exception)) + self.assertIn("node has no pgbouncer_config", str(context.exception)) + + def test_SetLeaderNode_SetsLeaderNode_WithUnknownLeaderStatus(self): + state = State() + node = state.add_node(1) + node.config.pgbouncer_config = "/some/path/to/config" + node.system_id = "SuperCluster" + node.timeline_id = " 005 " + node.promote() + + self.assertEqual("SuperCluster", state.system_id) + self.assertEqual(5, state.timeline_id) + self.assertEqual(1, state.leader_node_id) + self.assertEqual(LEADER_UNKNOWN, state.leader_status) + self.assertEqual(None, state.leader_error) + self.assertTrue(state.modified) + + def test_SetLeaderNode_ToSameLeader_ResetsLeaderNode_WithUnknownLeaderStatus(self): + state = State() + node = state.add_node(1) + node.config.pgbouncer_config = "/some/path/to/config" + node.system_id = "1.2.3.4.5.6.7.8.9.10.11" + node.timeline_id = 12 + node.promote() + state.leader_status = LEADER_CONNECTED + state.leader_error = "Just testin'!" + state.modified = False + + node.promote() + self.assertEqual(None, state.leader_error) + self.assertEqual(LEADER_UNKNOWN, state.leader_status) + self.assertTrue(state.modified) + + def test_SetLeaderNode_ToNodeWithDifferentSystemId_RaisesException(self): + state = State() + node1 = state.add_node(1) + node1.config.pgbouncer_config = "/some/path/to/config1" + node1.system_id = "systemA" + node1.timeline_id = 10 + node2 = state.add_node(2) + node2.config.pgbouncer_config = "/some/path/to/config2" + node2.system_id = "systemB" + node2.timeline_id = 10 + node1.promote() + + with self.assertRaises(SystemIdChanged): + node2.promote() + + def test_SetLeaderNode_ToNodeWithLowerTimelineId_RaisesException(self): + state = State() + node1 = state.add_node(1) + node1.config.pgbouncer_config = "/some/path/to/config1" + node1.system_id = "systemX" + node1.timeline_id = 10 + node2 = state.add_node(2) + node2.config.pgbouncer_config = "/some/path/to/config2" + node2.system_id = "systemX" + node2.timeline_id = 9 + node1.promote() + + with self.assertRaises(TimelineIdLowered): + node2.promote() + + def test_SetLeaderNode_ToNodeWithSameOrHigherTimelineId_IsOk(self): + state = State() + node1 = state.add_node(1) + node1.config.pgbouncer_config = "/some/path/to/config1" + node1.system_id = "systemX" + node1.timeline_id = 42 + node2 = state.add_node(2) + node2.config.pgbouncer_config = "/some/path/to/config2" + node2.system_id = "systemX" + node2.timeline_id = 42 + node3 = state.add_node(3) + node3.config.pgbouncer_config = "/some/path/to/config3" + node3.system_id = "systemX" + node3.timeline_id = 43 + state.modified = False + + node1.promote() + self.assertEqual(1, state.leader_node_id) + self.assertEqual(42, state.timeline_id) + self.assertTrue(state.modified) + + state.modified = False + node2.promote() + self.assertEqual(2, state.leader_node_id) + self.assertEqual(42, state.timeline_id) + self.assertTrue(state.modified) + + state.modified = False + node3.promote() + self.assertEqual(3, state.leader_node_id) + self.assertEqual(43, state.timeline_id) + self.assertTrue(state.modified) + + +class NodeTests(unittest.TestCase): + def test_WithNoneValue_SetSystemId_RaisesException(self): + state = State() + node = state.add_node("break me") + state.modified = False + + with self.assertRaises(InvalidSystemId) as context: + node.system_id = None + self.assertIn("invalid system_id=None", str(context.exception)) + self.assertFalse(state.modified) + + def test_WithEmptyString_SetSystemId_RaisesException(self): + state = State() + node = state.add_node("break me") + state.modified = False + + with self.assertRaises(InvalidSystemId) as context: + node.system_id = " \t\r\n\t " + self.assertIn("invalid system_id=' \t\r\n\t '", str(context.exception)) + self.assertFalse(state.modified) + + def test_SetSystemId_SetsSystemId_AndNotifiesChangeToState(self): + state = State() + node = state.add_node(1) + state.modified = False + + node.system_id = "X" + + self.assertEqual("X", node.system_id) + self.assertTrue(state.modified) + + def test_WithNonIntegerInput_SetTimelineId_RaisesException(self): + state = State() + node = state.add_node("break me") + + with self.assertRaises(InvalidTimelineId) as context: + node.timeline_id = "TARDIS" + self.assertIn("invalid value=TARDIS", str(context.exception)) + + def test_SetTimelineId_SetsTimelineId_AndNotifiesChangeToState(self): + state = State() + node = state.add_node(1) + state.modified = False + + node.timeline_id = 25 + + self.assertEqual(25, node.timeline_id) + self.assertTrue(state.modified) + + def test_SetStatus_SetsStatus_AndNotifiesChangeToState(self): + state = State() + node = state.add_node(1) + state.modified = False + + node.set_status(NODE_PRIMARY) + + self.assertEqual(NODE_PRIMARY, node.status) + self.assertTrue(state.modified) + + def test_WithInvalidStatus_SetStatus_RaisesException(self): + state = State() + node = state.add_node(1) + state.modified = False + + with self.assertRaises(InvalidNodeStatus) as context: + node.set_status("DERAILED") + self.assertIn("'DERAILED'", str(context.exception)) + self.assertFalse(state.modified) + + def test_SetError_ToString_SetsError_AndNotifiesChangeToState(self): + state = State() + node = state.add_node(1) + state.modified = False + + node.set_error("Found some spare bits at IKEA.py line 141") + + self.assertEqual("Found some spare bits at IKEA.py line 141", node.err) + self.assertTrue(state.modified) + + def test_SetError_ToNone_ClearsError_AndNotifiesChangeToState(self): + state = State() + node = state.add_node(1) + node.set_error("Mouse lost its ball") + state.modified = False + + node.set_error(None) + + self.assertEqual(None, node.err) + self.assertTrue(state.modified) + + def test_WhenNothingChanges_NoChangeIsNotifiedToState(self): + state = State() + node = state.add_node("x") + node.system_id = "aaaaaaa" + node.timeline_id = 55 + node.set_status(NODE_PRIMARY) + node.set_error("Just testin'") + state.modified = False + + node.system_id = "aaaaaaa" + node.timeline_id = 55 + node.set_status(NODE_PRIMARY) + node.set_error("Just testin'") + + self.assertFalse(state.modified) + + +class StateExportTests(unittest.TestCase): + + def test_GivenFilledState_Export_ExportsDataForStore(self): + self.maxDiff = 5000; + state = State() + + node1 = state.add_node(1) + node1.config.pgbouncer_config = "/some/path/to/config1" + node1.system_id = "System X" + node1.timeline_id = 555 + node1.set_status(NODE_PRIMARY) + node1.set_error("Some error for 1") + + node2 = state.add_node(2) + node2.config.pgbouncer_config = "/some/path/to/config2" + node2.system_id = "System Y" + node2.timeline_id = 1 + node2.set_status(NODE_STANDBY) + node2.set_error("Some error for 2") + + node1.promote() + + state.leader_status = LEADER_CONNECTED + state.leader_error = "Some error for leader connection" + + export = state.export() + + self.assertEqual({ + "system_id": "System X", + "timeline_id": 555, + "pgbouncer_config": "/some/path/to/config1", + "leader_node_id": 1, + "leader_status": LEADER_CONNECTED, + "leader_error": "Some error for leader connection", + "nodes": { + 1: { + "node_id": 1, + "config": { + "pgbouncer_config": "/some/path/to/config1", + "host": None, + "port": None + }, + "system_id": "System X", + "timeline_id": 555, + "is_leader": True, + "status": NODE_PRIMARY, + "error": "Some error for 1" + }, + 2: { + "node_id": 2, + "config": { + "pgbouncer_config": "/some/path/to/config2", + "host": None, + "port": None + }, + "system_id": "System Y", + "timeline_id": 1, + "is_leader": False, + "status": NODE_STANDBY, + "error": "Some error for 2" + } + } + }, export) +