first commit

This commit is contained in:
Maurice Makaay 2019-11-26 15:11:33 +01:00
commit a8e930e078
15 changed files with 1182 additions and 0 deletions

61
Makefile Normal file
View File

@ -0,0 +1,61 @@
all: build
lint:
pylint3 pgbouncemgr/*.py
pytest:
python3 setup.py test
test:
python3 -m unittest
run:
PYTHONPATH=./ python3 pgbouncemgr/manager.py
run-d:
PYTHONPATH=./ python3 pgbouncemgr/manager.py -d
run-v:
PYTHONPATH=./ python3 pgbouncemgr/manager.py -v
run-h:
PYTHONPATH=./ python3 pgbouncemgr/manager.py -h
clean:
@/bin/rm -fR dist
@/bin/rm -fR deb_dist
@/bin/rm -fR build
@/bin/rm -fR *.egg-info
@/bin/rm -f python3-pgbouncemgr-*.tar.gz
@/bin/rm -fR */__pycache__
dist-clean: clean
@/bin/rm -fR builds/*
runtime-deps:
apt-get install python3 python3-psycopg2 pgbouncer
# Create the default directory for the state file.
mkdir -p /var/lib/pgbouncemgr
chown -hR postgres:postgres /var/lib/pgbouncemgr
chmod 770 /var/lib/pgbouncemgr
chmod 660 /var/lib/pgbouncemgr/*
build-deps: runtime-deps
# I had to add python-all, even though we're building for
# python3. The sstdeb build at some point searches for
# that package. Without the package, the build will crash.
apt-get install \
pylint3 \
python3-setuptools \
python3-stdeb \
python-all
build: clean lint test
python3 setup.py --command-packages=stdeb.command bdist_deb
mkdir -p builds
/bin/rm -f builds/*.deb
mv deb_dist/*.deb builds/
make clean > /dev/null
upload: build
scp builds/*.deb mmakaay1@dpkg.xs4all.net:

Binary file not shown.

Binary file not shown.

41
pgbouncemgr/manager.py Normal file
View File

@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
"""The manager implements the main process that keeps track of changes in the
PostgreSQL cluster and that reconfigures pgbouncer when needed."""
from argparse import ArgumentParser
DEFAULT_CONFIG = "/etc/pgbouncer/pgbouncemgr.yaml"
DEFAULT_LOG_FACILITY = "LOG_LOCAL1"
def main():
"""Starts the pgbouncemgr main application."""
args = _parse_arguments()
print(repr(args))
def _parse_arguments():
parser = ArgumentParser(description="pgbouncemgr")
parser.add_argument(
"-v", "--verbose",
default=False, action="store_true",
help="enable verbose output (default: disabled)")
parser.add_argument(
"-d", "--debug",
default=False, action="store_true",
help="enable debugging output (default: disabled)")
parser.add_argument(
"-f", "--log-facility",
default=DEFAULT_LOG_FACILITY,
help="syslog facility to use (default: %s)" % DEFAULT_LOG_FACILITY)
parser.add_argument(
"--config",
default=DEFAULT_CONFIG,
help="config file (default: %s)" % DEFAULT_CONFIG)
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

View File

@ -0,0 +1,16 @@
class NodeConfig():
def __init__(self, node_id):
self.node_id = node_id
self.pgbouncer_config = None
self.host = None
self.port = None
def set_pgbouncer_config(self, path):
self.pgbouncer_config = path
def export(self):
return {
"pgbouncer_config": self.pgbouncer_config,
"host": self.host,
"port": self.port
}

355
pgbouncemgr/state.py Normal file
View File

@ -0,0 +1,355 @@
# -*- coding: utf-8 -*-
# no-pylint: disable=missing-docstring,broad-except,too-many-instance-attributes
from pgbouncemgr.node_config import NodeConfig
LEADER_UNKNOWN = "LEADER_UNKNOWN"
LEADER_CONNECTED = "LEADER_CONNECTED"
LEADER_DISCONNECTED = "LEADER_DISCONNECTED"
LEADER_STATUSES = [LEADER_UNKNOWN, LEADER_CONNECTED, LEADER_DISCONNECTED]
NODE_UNKNOWN = "NODE_UNKNOWN"
NODE_OFFLINE = "NODE_OFFLINE"
NODE_PRIMARY = "NODE_PRIMARY"
NODE_STANDBY = "NODE_STANDBY"
NODE_STATUSES = [NODE_UNKNOWN, NODE_OFFLINE, NODE_PRIMARY, NODE_STANDBY]
class StateException(Exception):
"""Used for all exceptions that are raised from pgbouncemgr.state."""
class DuplicateNodeAdded(StateException):
"""Raised when a new node is added to the state using a node_id that
already exists in the contained nodes."""
def __init__(self, node_id):
super().__init__(
"Node already exists in state (duplicate node_id=%s)" % node_id)
class UnknownNodeRequested(StateException):
"""Raised when a request is made for a node that is not contained
by the State."""
def __init__(self, node_id):
super().__init__(
"Unknown node requested from state (unknown node_id=%s)" %
node_id)
class NodeCannotBePromoted(StateException):
"""Raised when an attempt is made to promote a node, which cannot
(yet) be promoted."""
def __init__(self, node, reason):
super().__init__(
"Node '%s' cannot be promoted: %s" % (node.node_id, reason))
class InvalidSystemId(StateException):
"""Raised when an invalid system_id is used."""
def __init__(self, invalid):
display = "None" if invalid is None else "'%s'" % invalid
super().__init__(
"Invalid system_id provided; it must be a non-empty string " +
"(invalid system_id=%s)" % display)
class SystemIdChanged(StateException):
"""Raised when an attempt is made to change an already set system_id
for the state. This is not allowed, since it would mean that
pgboucer would be connected to a totally different dataset and
not a replica of the currently connected dataset."""
def __init__(self, old_id, new_id):
super().__init__(
"The system_id cannot be changed (from=%s, to=%s)" %
(old_id, new_id))
class InvalidTimelineId(StateException):
"""Raised when an invalid timeline_id is used. The timeline_id must
be an integer value or a value that can be case to an integer."""
def __init__(self, timeline_id):
super().__init__(
"Invalid timeline_id provided; it must be an integer or a " +
"value that can be cast to an integer (invalid value=%s)" %
timeline_id)
class TimelineIdLowered(StateException):
"""Raised when an attempt is made to lower the timeline_id.
The timeline_id indicates the version of our dataset and decrementing
this timeline_id would mean that we connect the pgbouncer to a
dataset that is older than the currently connected dataset."""
def __init__(self, old_id, new_id):
super().__init__(
"The timeline_id cannot be lowered (from=%d, to=%d)" %
(old_id, new_id))
class InvalidLeaderStatus(StateException):
"""Raised when leader_status is set to an invalid value."""
def __init__(self, status):
super().__init__(
("The leader_status cannot be set to '%s' " +
"(valid values are: %s)") % (status, ", ".join(LEADER_STATUSES)))
class InvalidNodeStatus(StateException):
"""Raised when the status for a node is set to an invalid value."""
def __init__(self, status):
super().__init__(
("The node status cannot be set to '%s' " +
"(valid values are: %s)") % (status, ", ".join(NODE_STATUSES)))
class State():
def __init__(self):
self.modified = False
self._system_id = None
self._timeline_id = None
self._pgbouncer_config = None
self._leader_node_id = None
self._leader_error = None
self._leader_status = LEADER_UNKNOWN
self.nodes = {}
def is_clean_slate(self):
"""Returns True when the state does not yet contain a PostgreSQL system_id."""
return self.system_id is None
@property
def system_id(self):
return self._system_id
@system_id.setter
def system_id(self, system_id):
"""Sets the PostgreSQL system_id in the state. This is the system_id
that the pgbouncemgr will use as the valid system_id to
connect pgbouncer to."""
if self._system_id is not None and self._system_id != system_id:
raise SystemIdChanged(self._system_id, system_id)
if system_id is None or system_id.strip() == "":
raise InvalidSystemId(system_id)
if self._system_id == system_id:
return
self.modified = True
self._system_id = system_id
@property
def timeline_id(self):
return self._timeline_id
@timeline_id.setter
def timeline_id(self, timeline_id):
"""Sets the PostgreSQL timeline_id in the state. This is the
timeline_id of the currently connected dataset."""
try:
timeline_id = int(timeline_id)
except ValueError:
raise InvalidTimelineId(timeline_id)
if self._timeline_id is not None and timeline_id < self._timeline_id:
raise TimelineIdLowered(self._timeline_id, timeline_id)
if self._timeline_id == timeline_id:
return
self.modified = True
self._timeline_id = timeline_id
def add_node(self, node):
"""Add a node to the state. Node can be a NodeConfig object or
a node_id. In case a node_id is provided, an NodeConfig object
will be created automatically."""
if not isinstance(node, NodeConfig):
node = NodeConfig(node)
if node.node_id in self.nodes:
raise DuplicateNodeAdded(node.node_id)
node_state = NodeState(node, self)
self.nodes[node.node_id] = node_state
self.modified = True
return node_state
def get_node(self, node_id):
"""Retrieve a node from the state, identified by the provided
node_id."""
if node_id not in self.nodes:
raise UnknownNodeRequested(node_id)
return self.nodes[node_id]
def promote_node(self, node):
"""Sets the provided node as the selected leader node for the
cluster. The leader status is reset to LEADER_UNKNOWN and should
after this be updated.
The system_id, timeline_id and pgbouncer_config for the State
object are inherted from the provided leader node, and must
therefore be set before calling this method."""
node = self.get_node(node.node_id)
if node.system_id is None:
raise NodeCannotBePromoted(node, "the node has no system_id")
if node.timeline_id is None:
raise NodeCannotBePromoted(node, "the node has no timeline_id")
if node.config.pgbouncer_config is None:
raise NodeCannotBePromoted(node, "the node has no pgbouncer_config")
if self._leader_node_id != node.node_id:
self._leader_node_id = node.node_id
self.modified = True
self.leader_status = LEADER_UNKNOWN
self.leader_error = None
self.system_id = node.system_id
self.timeline_id = node.timeline_id
self.pgbouncer_config = node.config.pgbouncer_config
@property
def leader_node_id(self):
return self._leader_node_id
@property
def leader_status(self):
return self._leader_status
@leader_status.setter
def leader_status(self, status):
"""Sets the pgbouncer connection status for the current leader
cluster node. Possible statuses are LEADER_UNKNOWN,
LEADER_CONNECTED and LEADER_DISCONNECTED."""
if self._leader_status == status:
return
if status not in LEADER_STATUSES:
raise InvalidLeaderStatus(status)
self._leader_status = status
self.modified = True
@property
def leader_error(self):
return self._leader_error
@leader_error.setter
def leader_error(self, err):
"""Sets the pgbouncer connection error for the current leader
cluster node. This error is used to inform about problems that
keep pgbouncer from serving the current leader cluster node."""
if self._leader_error == err:
return
self._leader_error = err
self.modified = True
@property
def pgbouncer_config(self):
return self._pgbouncer_config
@pgbouncer_config.setter
def pgbouncer_config(self, pgbouncer_config):
"""Sets the pgbouncer configuration file that must be activated
in order to connect pgbouncer to the correct backend
PostgreSQL server."""
if self._pgbouncer_config == pgbouncer_config:
return
self._pgbouncer_config = pgbouncer_config
self.modified = True
def export(self):
"""Exports the data for the state that is to be stored in the
state storage."""
return {
"system_id": self.system_id,
"timeline_id": self.timeline_id,
"pgbouncer_config": self.pgbouncer_config,
"leader_node_id": self.leader_node_id,
"leader_status": self.leader_status,
"leader_error": self.leader_error,
"nodes": dict((i, n.export()) for i, n in self.nodes.items())
}
class NodeState():
"""This class encapsulates the information for a single node in a
PostgreSQL cluster."""
def __init__(self, config, parent_state):
self.node_id = config.node_id
self.config = config
self.parent_state = parent_state
self._system_id = None
self._timeline_id = None
self.status = NODE_UNKNOWN
self.err = None
def reset(self):
"""Reset the data for the node."""
self.system_id = None
self.timeline_id = None
self.status = NODE_UNKNOWN
self.err = None
self.notify_parent()
def notify_parent(self):
self.parent_state.modified = True
@property
def system_id(self):
return self._system_id
@system_id.setter
def system_id(self, system_id):
"""Sets the PostgreSQL system_id, which is a cluster-wide identifier for
the database cluster data. When a PostgreSQL server uses a different
system_id than our cluster, then it does not serve the same dataset.
Possibly is was reinstalled or simply a new cluster which has not yet
been synced to the other cluster hosts."""
if system_id is None or system_id.strip() == "":
raise InvalidSystemId(system_id)
if self._system_id == system_id:
return
self._system_id = system_id
self.notify_parent()
@property
def timeline_id(self):
return self._timeline_id
@timeline_id.setter
def timeline_id(self, timeline_id):
"""Sets the PostgreSQL timeline_id, which is an identifier for the
version of the dataset that is served by a PostegreSQL server.
This version is incremented when a PIT recovery is done, or when a
database server is promoted to act as a primary server.
In pgbouncemgr, this timeline_id is used to make sure that we never
accidentally activate an old version of the dataset to be the new
primary dataset."""
try:
timeline_id = int(timeline_id)
except ValueError:
raise InvalidTimelineId(timeline_id)
if self._timeline_id == timeline_id:
return
self._timeline_id = timeline_id
self.notify_parent()
def set_status(self, status):
"""Set the connection status for the node. This is used to indicate
whether or not a connection can be setup to the node from the
pgbouncemgr application and if the node is running in primary
or fallback mode. Possible values are: NODE_UNKNOWN, NODE_OFFLINE,
NODE_PRIMARY and NODE_STANDBY."""
if self.status == status:
return
if status not in NODE_STATUSES:
raise InvalidNodeStatus(status)
self.status = status
self.notify_parent()
def set_error(self, err):
"""When there is some problem with this node, this method can be used
to set an error message for it, explaining the problem."""
if self.err == err:
return
self.err = err
self.notify_parent()
def promote(self):
"""Promote this node to be the leader of the cluster.
This means that pgbouncer will be reconfigured to make use of
this node as the primary backend."""
self.parent_state.promote_node(self)
def export(self):
"""Exports the data for the node that is to be stored in the
state storage."""
return {
"node_id": self.node_id,
"config": self.config.export(),
"system_id": self.system_id,
"timeline_id": self.timeline_id,
"is_leader": self.parent_state._leader_node_id == self.node_id,
"status": self.status,
"error": self.err,
}

View File

@ -0,0 +1,98 @@
# -*- coding: utf-8 -*-
# no-pylint: disable=missing-docstring,broad-except,too-many-instance-attributes
import json
from datetime import datetime
from os import rename
from os.path import isfile
from pgbouncemgr.log import format_exception_message
STORE_ERROR = "error"
STORE_UPDATED = "updated"
STORE_NOCHANGE = "nochange"
class State():
system_id = None
timeline_id = None
pgbouncer_config = None
primary_node = None
primary_connected = False
primary_error = None
err = None
old_state = None
def __init__(self, path):
self.path = path
self.load()
def load(self):
"""Load the state from the state file.
When this fails, an exception will be thrown."""
# When no state file exists, we start with a clean slate.
if not isfile(self.path):
return
# Otherwise, read the state from the state file.
with open(self.path, 'r') as stream:
try:
state = json.load(stream)
except json.decoder.JSONDecodeError:
# JSON decoding failed. This is beyond repair.
# Start with a clean slate to have the state data reinitialized.
return
# Copy the state over to this state object.
self.system_id = state["system_id"]
self.timeline_id = state["timeline_id"]
self.pgbouncer_config = state["pgbouncer_config"]
self.primary_node = state["primary_node"]
# The folowing properties are always filled dynamically by the manager.
# These are not restored from the state file on startup.
self.primary_connected = False
self.nodes = {}
def is_clean_slate(self):
return self.system_id is None
def set_primary_connected(self, connected, err=None):
self.primary_connected = connected
self.primary_error = err
def store(self, nodes):
"""Store the current state in the state file.
Returns True when the state was stored successfully.
Returns False when it failed. The error can be found in the err property."""
stored_state = {
"system_id": self.system_id,
"timeline_id": self.timeline_id,
"pgbouncer_config": self.pgbouncer_config,
"primary_node": self.primary_node,
"primary_connected": self.primary_connected,
"primary_error": self.primary_error,
"nodes": dict((n.name, {
"host": n.conn_params["host"],
"port": n.conn_params["port"],
"status": n.status,
"system_id": n.system_id,
"timeline_id": n.timeline_id,
"error": n.err}) for n in nodes)
}
# When the state has not changed, then don't write out a new state.
new_state = json.dumps(stored_state, sort_keys=True, indent=2)
if self.old_state and self.old_state == new_state:
return STORE_NOCHANGE
self.old_state = new_state
try:
self.err = None
swap_path = "%s..SWAP" % self.path
with open(swap_path, "w") as file_handle:
print(new_state, file=file_handle)
rename(swap_path, self.path)
return STORE_UPDATED
except Exception as exception:
self.err = "Storing state to file (%s) failed: %s" % (
self.path, format_exception_message(exception))
return STORE_ERROR

67
setup.py Normal file
View File

@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
"""
pgbouncemgr
===========
PostgreSQL has no support for a multi-primary HA setup. Instead, there is
always a single primary server, which can be used for reading and writing,
and one or more replicate servers, which can only be used for reading.
Updates on the primary server are streamed to the connected replica servers.
Therefore, when a client must connect to a PostgreSQL HA setup, it needs to
know what node to connect to (i.e. the currently active primary node).
Ideally, it has no knowledge about this, letting the client always connect
to the same endpoint.
This is where tools like haproxy and pgbouncer come in (possibly themselves
in an HA setup using automatic failover based on keepalived or one of its
friends). The client can connect to haproxy or bgbouncer, and those can be
configured to send the connection to the correct PostgreSQL node.
And this is where pgbouncemgr comes in. It actively monitors a PostgreSQL HA
cluster. Based on the monitoring data, it decides to what node clients must
connect and (re)configures pgbouncer to make that happen.
"""
import os
import re
from time import time
from setuptools import setup
def _get_version():
path = os.path.abspath(".")
pkginit = os.path.join(path, "pgbouncemgr", "__init__.py")
with open(pkginit, "r") as fh:
for line in fh.readlines():
m = re.match(r"^__version__\s*=\s*\"(.*)\"", line)
if (m is not None):
version = m.groups()[0]
return "%s.%d" % (version, time())
raise Exception("Unable to read version from %s" % pkginit)
setup(
name="pgbouncemgr",
version=_get_version(),
url="https://github.xs4all.net/XS4ALL/xs4all-pgbouncemgr",
license="BSD",
author="Maurice Makaay",
author_email="mmakaay1@xs4all.net",
description="Automatic (re)configuration of a pgbouncer server for a PostgreSQL HA cluster",
long_description=__doc__,
packages=[
"pgbouncemgr"
],
entry_points = {
'console_scripts': [
'pgbouncemgr=pgbouncemgr.manager:main']
},
data_files=[
("/etc/systemd/system", ["etc/pgbouncemgr.service"])
],
zip_safe=False,
include_package_data=True,
platforms="any",
test_suite="tests.suite"
)

7
tests/__init__.py Normal file
View File

@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
import unittest
def suite():
test_loader = unittest.TestLoader()
return test_loader.discover('tests')

Binary file not shown.

Binary file not shown.

Binary file not shown.

54
tests/stubs.py Normal file
View File

@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
from pgbouncer.manager import DEFAULT_CONFIG, DEFAULT_LOG_FACILITY
class ArgsStub():
config = DEFAULT_CONFIG
log_facility = DEFAULT_LOG_FACILITY
verbose = False
debug = False
class LoggerStub():
def __init(self, ident, facility, debug, verbose):
pass
def debug(self, msg):
pass
def info(self, msg):
pass
def warning(self, msg):
pass
def error(self, msg):
pass
def format_ex(self, exception):
return "FMTEX"
class StateStoreStub():
old_state = None
test_with_nr_of_errors = 0
test_with_state = None
def load(self)
state = State()
def store(self, state):
new_state = json.dumps(state_to_store, sort_keys=True, indent=2)
if self.old_state and self.old_state == new_state:
return STORE_NOCHANGE
sel.old_state = new_state
if test_with_nr_of_errors:
test_with_nr_of_errors -= 1
self.err = "Storing state failed: just testing"
return STORE_ERROR
return STORE_UPDATED

19
tests/test_node_config.py Normal file
View File

@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
import unittest
from pgbouncemgr.node_config import *
class NodeConfigTests(unittest.TestCase):
def test_CreateNewNodeConfig(self):
config = NodeConfig("test")
self.assertEqual("test", config.node_id)
self.assertEqual(None, config.pgbouncer_config)
self.assertEqual(None, config.host)
self.assertEqual(None, config.port)
def test_SetPgbouncerConfig_UpdatesConfig(self):
config = NodeConfig("test")
config.set_pgbouncer_config("/path/to/config")

464
tests/test_state.py Normal file
View File

@ -0,0 +1,464 @@
# -*- coding: utf-8 -*-
import unittest
from pgbouncemgr.state import *
class StateTests(unittest.TestCase):
def test_GivenFreshState_DefaultsAreSetCorrectly(self):
state = State()
self.assertIsNone(state.system_id)
self.assertIsNone(state.timeline_id)
self.assertIsNone(state.pgbouncer_config)
self.assertIsNone(state.leader_node_id)
self.assertIsNone(state.leader_error)
self.assertEqual(LEADER_UNKNOWN, state.leader_status)
self.assertEqual({}, state.nodes)
self.assertFalse(state.modified)
def test_GivenFreshState_ModifiedFlagIsNotSet(self):
state = State()
self.assertFalse(state.modified)
def test_GivenFreshState_IsCleanSlate_ReturnsTrue(self):
state = State()
self.assertTrue(state.is_clean_slate())
def test_GivenStateWithSystemIdAssigned_IsCleanSlate_ReturnsFalse(self):
state = State()
state.system_id = "whatever"
self.assertFalse(state.is_clean_slate())
def test_GivenFreshState_SetSystemId_SetsSystemId(self):
state = State()
state.system_id = "booya"
self.assertEqual("booya", state.system_id)
def test_GivenStateWithSystemId_SetSameSystemId_IsOk(self):
state = State()
state.system_id = "booya"
state.modified = False
state.system_id = "booya"
self.assertFalse(state.modified)
def test_GivenStateWithSystemId_SetDifferentSystemId_RaisesException(self):
state = State()
state.system_id = "booya"
state.modified = False
with self.assertRaises(SystemIdChanged) as context:
state.system_id = "baayo"
self.assertIn("from=booya, to=baayo", str(context.exception))
def test_GivenFreshState_SetTimelineId_SetsTimelineId(self):
state = State()
state.timeline_id = 666
self.assertEqual(666, state.timeline_id)
self.assertTrue(state.modified)
def test_WithNonIntegerInput_SetTimelineId_RaisesException(self):
state = State()
with self.assertRaises(InvalidTimelineId) as context:
state.timeline_id = "one hundred"
self.assertIn("invalid value=one hundred", str(context.exception))
def test_GivenStateWithTimelineId_SetLowerTimelineId_RaisesException(self):
state = State()
state.timeline_id = "50"
with self.assertRaises(TimelineIdLowered) as context:
state.timeline_id = 49
self.assertIn("(from=50, to=49)", str(context.exception))
def test_GivenStateWithTimelineId_SetSameTimelineId_IsOk(self):
state = State()
state.timeline_id = 50
state.modified = False
state.timeline_id = "50"
self.assertFalse(state.modified)
def test_GivenStateWithTimelineId_SetHigherTimelineId_IsOk(self):
state = State()
state.timeline_id = 50
state.modified = False
state.timeline_id = "51"
self.assertEqual(51, state.timeline_id)
self.assertTrue(state.modified)
def test_SetLeaderStatus_SetsLeaderStatus(self):
state = State()
state.modified = False
state.leader_status = LEADER_CONNECTED
self.assertEqual(LEADER_CONNECTED, state.leader_status)
self.assertTrue(state.modified)
def test_SetLeaderStatus_ToNonExistentStatus_RaisesException(self):
state = State()
state.modified = False
with self.assertRaises(InvalidLeaderStatus) as context:
state.leader_status = "gosh"
self.assertIn("'gosh'", str(context.exception))
def test_SetLeaderError_SetsLeaderError(self):
state = State()
state.modified = False
state.leader_error = "God disappeared in a puff of logic"
self.assertEqual("God disappeared in a puff of logic", state.leader_error)
self.assertTrue(state.modified)
state.modified = False
state.leader_error = None
self.assertEqual(None, state.leader_error)
self.assertTrue(state.modified)
class NodeCollectionTests(unittest.TestCase):
def test_WithNodeNotYetInState_AddNode_AddsNodeState(self):
state = State()
node = state.add_node(1)
self.assertEqual(1, node.node_id)
self.assertTrue(state.modified)
def test_WithNodeAlreadyInState_AddNode_RaisesException(self):
state = State()
state.add_node(123)
with self.assertRaises(DuplicateNodeAdded) as context:
state.add_node(123)
self.assertIn("duplicate node_id=123", str(context.exception))
def test_WithNodeNotInState_getNode_RaisesException(self):
state = State()
with self.assertRaises(UnknownNodeRequested):
state.get_node("that does not exist")
def test_WithNodeInState_getNode_ReturnsNode(self):
state = State()
node = state.add_node("that does exist")
node_from_state = state.get_node("that does exist")
self.assertEqual(node, node_from_state)
def test_WithUnknownNode_SetLeaderNode_RaisesException(self):
state1 = State()
state2 = State()
node = state2.add_node(1337)
with self.assertRaises(UnknownNodeRequested) as context:
state1.promote_node(node)
self.assertIn("unknown node_id=1337", str(context.exception))
def test_WithNodeWithoutSystemId_SetLeaderNode_RaisesException(self):
state = State()
node = state.add_node(1)
node.timeline_id = 1
node.config.pgbouncer_config = "/some/path/to/config"
with self.assertRaises(NodeCannotBePromoted) as context:
node.promote()
self.assertIn("Node '1'", str(context.exception))
self.assertIn("node has no system_id", str(context.exception))
def test_WithNodeWithoutTimelineId_SetLeaderNode_RaisesException(self):
state = State()
node = state.add_node(1)
node.system_id = "system"
node.config.pgbouncer_config = "/some/path/to/config"
with self.assertRaises(NodeCannotBePromoted) as context:
node.promote()
self.assertIn("Node '1'", str(context.exception))
self.assertIn("node has no timeline_id", str(context.exception))
def test_WithNodeWithoutPgbouncerConfig_SetLeaderNode_RaisesException(self):
state = State()
node = state.add_node(1)
node.system_id = "a7d8a9347df789saghdfs"
node.timeline_id = 11111111111
with self.assertRaises(NodeCannotBePromoted) as context:
node.promote()
self.assertIn("Node '1'", str(context.exception))
self.assertIn("node has no pgbouncer_config", str(context.exception))
def test_SetLeaderNode_SetsLeaderNode_WithUnknownLeaderStatus(self):
state = State()
node = state.add_node(1)
node.config.pgbouncer_config = "/some/path/to/config"
node.system_id = "SuperCluster"
node.timeline_id = " 005 "
node.promote()
self.assertEqual("SuperCluster", state.system_id)
self.assertEqual(5, state.timeline_id)
self.assertEqual(1, state.leader_node_id)
self.assertEqual(LEADER_UNKNOWN, state.leader_status)
self.assertEqual(None, state.leader_error)
self.assertTrue(state.modified)
def test_SetLeaderNode_ToSameLeader_ResetsLeaderNode_WithUnknownLeaderStatus(self):
state = State()
node = state.add_node(1)
node.config.pgbouncer_config = "/some/path/to/config"
node.system_id = "1.2.3.4.5.6.7.8.9.10.11"
node.timeline_id = 12
node.promote()
state.leader_status = LEADER_CONNECTED
state.leader_error = "Just testin'!"
state.modified = False
node.promote()
self.assertEqual(None, state.leader_error)
self.assertEqual(LEADER_UNKNOWN, state.leader_status)
self.assertTrue(state.modified)
def test_SetLeaderNode_ToNodeWithDifferentSystemId_RaisesException(self):
state = State()
node1 = state.add_node(1)
node1.config.pgbouncer_config = "/some/path/to/config1"
node1.system_id = "systemA"
node1.timeline_id = 10
node2 = state.add_node(2)
node2.config.pgbouncer_config = "/some/path/to/config2"
node2.system_id = "systemB"
node2.timeline_id = 10
node1.promote()
with self.assertRaises(SystemIdChanged):
node2.promote()
def test_SetLeaderNode_ToNodeWithLowerTimelineId_RaisesException(self):
state = State()
node1 = state.add_node(1)
node1.config.pgbouncer_config = "/some/path/to/config1"
node1.system_id = "systemX"
node1.timeline_id = 10
node2 = state.add_node(2)
node2.config.pgbouncer_config = "/some/path/to/config2"
node2.system_id = "systemX"
node2.timeline_id = 9
node1.promote()
with self.assertRaises(TimelineIdLowered):
node2.promote()
def test_SetLeaderNode_ToNodeWithSameOrHigherTimelineId_IsOk(self):
state = State()
node1 = state.add_node(1)
node1.config.pgbouncer_config = "/some/path/to/config1"
node1.system_id = "systemX"
node1.timeline_id = 42
node2 = state.add_node(2)
node2.config.pgbouncer_config = "/some/path/to/config2"
node2.system_id = "systemX"
node2.timeline_id = 42
node3 = state.add_node(3)
node3.config.pgbouncer_config = "/some/path/to/config3"
node3.system_id = "systemX"
node3.timeline_id = 43
state.modified = False
node1.promote()
self.assertEqual(1, state.leader_node_id)
self.assertEqual(42, state.timeline_id)
self.assertTrue(state.modified)
state.modified = False
node2.promote()
self.assertEqual(2, state.leader_node_id)
self.assertEqual(42, state.timeline_id)
self.assertTrue(state.modified)
state.modified = False
node3.promote()
self.assertEqual(3, state.leader_node_id)
self.assertEqual(43, state.timeline_id)
self.assertTrue(state.modified)
class NodeTests(unittest.TestCase):
def test_WithNoneValue_SetSystemId_RaisesException(self):
state = State()
node = state.add_node("break me")
state.modified = False
with self.assertRaises(InvalidSystemId) as context:
node.system_id = None
self.assertIn("invalid system_id=None", str(context.exception))
self.assertFalse(state.modified)
def test_WithEmptyString_SetSystemId_RaisesException(self):
state = State()
node = state.add_node("break me")
state.modified = False
with self.assertRaises(InvalidSystemId) as context:
node.system_id = " \t\r\n\t "
self.assertIn("invalid system_id=' \t\r\n\t '", str(context.exception))
self.assertFalse(state.modified)
def test_SetSystemId_SetsSystemId_AndNotifiesChangeToState(self):
state = State()
node = state.add_node(1)
state.modified = False
node.system_id = "X"
self.assertEqual("X", node.system_id)
self.assertTrue(state.modified)
def test_WithNonIntegerInput_SetTimelineId_RaisesException(self):
state = State()
node = state.add_node("break me")
with self.assertRaises(InvalidTimelineId) as context:
node.timeline_id = "TARDIS"
self.assertIn("invalid value=TARDIS", str(context.exception))
def test_SetTimelineId_SetsTimelineId_AndNotifiesChangeToState(self):
state = State()
node = state.add_node(1)
state.modified = False
node.timeline_id = 25
self.assertEqual(25, node.timeline_id)
self.assertTrue(state.modified)
def test_SetStatus_SetsStatus_AndNotifiesChangeToState(self):
state = State()
node = state.add_node(1)
state.modified = False
node.set_status(NODE_PRIMARY)
self.assertEqual(NODE_PRIMARY, node.status)
self.assertTrue(state.modified)
def test_WithInvalidStatus_SetStatus_RaisesException(self):
state = State()
node = state.add_node(1)
state.modified = False
with self.assertRaises(InvalidNodeStatus) as context:
node.set_status("DERAILED")
self.assertIn("'DERAILED'", str(context.exception))
self.assertFalse(state.modified)
def test_SetError_ToString_SetsError_AndNotifiesChangeToState(self):
state = State()
node = state.add_node(1)
state.modified = False
node.set_error("Found some spare bits at IKEA.py line 141")
self.assertEqual("Found some spare bits at IKEA.py line 141", node.err)
self.assertTrue(state.modified)
def test_SetError_ToNone_ClearsError_AndNotifiesChangeToState(self):
state = State()
node = state.add_node(1)
node.set_error("Mouse lost its ball")
state.modified = False
node.set_error(None)
self.assertEqual(None, node.err)
self.assertTrue(state.modified)
def test_WhenNothingChanges_NoChangeIsNotifiedToState(self):
state = State()
node = state.add_node("x")
node.system_id = "aaaaaaa"
node.timeline_id = 55
node.set_status(NODE_PRIMARY)
node.set_error("Just testin'")
state.modified = False
node.system_id = "aaaaaaa"
node.timeline_id = 55
node.set_status(NODE_PRIMARY)
node.set_error("Just testin'")
self.assertFalse(state.modified)
class StateExportTests(unittest.TestCase):
def test_GivenFilledState_Export_ExportsDataForStore(self):
self.maxDiff = 5000;
state = State()
node1 = state.add_node(1)
node1.config.pgbouncer_config = "/some/path/to/config1"
node1.system_id = "System X"
node1.timeline_id = 555
node1.set_status(NODE_PRIMARY)
node1.set_error("Some error for 1")
node2 = state.add_node(2)
node2.config.pgbouncer_config = "/some/path/to/config2"
node2.system_id = "System Y"
node2.timeline_id = 1
node2.set_status(NODE_STANDBY)
node2.set_error("Some error for 2")
node1.promote()
state.leader_status = LEADER_CONNECTED
state.leader_error = "Some error for leader connection"
export = state.export()
self.assertEqual({
"system_id": "System X",
"timeline_id": 555,
"pgbouncer_config": "/some/path/to/config1",
"leader_node_id": 1,
"leader_status": LEADER_CONNECTED,
"leader_error": "Some error for leader connection",
"nodes": {
1: {
"node_id": 1,
"config": {
"pgbouncer_config": "/some/path/to/config1",
"host": None,
"port": None
},
"system_id": "System X",
"timeline_id": 555,
"is_leader": True,
"status": NODE_PRIMARY,
"error": "Some error for 1"
},
2: {
"node_id": 2,
"config": {
"pgbouncer_config": "/some/path/to/config2",
"host": None,
"port": None
},
"system_id": "System Y",
"timeline_id": 1,
"is_leader": False,
"status": NODE_STANDBY,
"error": "Some error for 2"
}
}
}, export)