407 lines
15 KiB
Python
407 lines
15 KiB
Python
# -*- coding: utf-8 -*-
|
|
# no-pylint: disable=missing-docstring,broad-except,too-many-instance-attributes
|
|
|
|
import os
|
|
from pgbouncemgr.node_config import NodeConfig
|
|
|
|
|
|
LEADER_UNKNOWN = "LEADER_UNKNOWN"
|
|
LEADER_CONNECTED = "LEADER_CONNECTED"
|
|
LEADER_DISCONNECTED = "LEADER_DISCONNECTED"
|
|
LEADER_STATUSES = [LEADER_UNKNOWN, LEADER_CONNECTED, LEADER_DISCONNECTED]
|
|
|
|
NODE_UNKNOWN = "NODE_UNKNOWN"
|
|
NODE_OFFLINE = "NODE_OFFLINE"
|
|
NODE_PRIMARY = "NODE_PRIMARY"
|
|
NODE_STANDBY = "NODE_STANDBY"
|
|
NODE_STATUSES = [NODE_UNKNOWN, NODE_OFFLINE, NODE_PRIMARY, NODE_STANDBY]
|
|
|
|
|
|
class StateException(Exception):
|
|
"""Used for all exceptions that are raised from pgbouncemgr.state."""
|
|
|
|
class DuplicateNodeAdded(StateException):
|
|
"""Raised when a new node is added to the state using a node_id that
|
|
already exists in the contained nodes."""
|
|
def __init__(self, node_id):
|
|
super().__init__(
|
|
"Node already exists in state (duplicate node_id=%s)" % node_id)
|
|
|
|
class UnknownNodeRequested(StateException):
|
|
"""Raised when a request is made for a node that is not contained
|
|
by the State."""
|
|
def __init__(self, node_id):
|
|
super().__init__(
|
|
"Unknown node requested from state (unknown node_id=%s)" %
|
|
node_id)
|
|
|
|
class NodeCannotBePromoted(StateException):
|
|
"""Raised when an attempt is made to promote a node, which cannot
|
|
(yet) be promoted."""
|
|
def __init__(self, node, reason):
|
|
super().__init__(
|
|
"Node '%s' cannot be promoted: %s" % (node.node_id, reason))
|
|
|
|
class InvalidSystemId(StateException):
|
|
"""Raised when an invalid system_id is used."""
|
|
def __init__(self, invalid):
|
|
display = invalid if invalid is None else repr(invalid)
|
|
super().__init__(
|
|
"Invalid system_id provided; it must be a non-empty string " +
|
|
"(invalid system_id=%s)" % display)
|
|
|
|
class SystemIdChanged(StateException):
|
|
"""Raised when an attempt is made to change an already set system_id
|
|
for the state. This is not allowed, since it would mean that
|
|
pgboucer would be connected to a totally different dataset and
|
|
not a replica of the currently connected dataset."""
|
|
def __init__(self, old_id, new_id):
|
|
super().__init__(
|
|
"The system_id cannot be changed (from=%s, to=%s)" %
|
|
(old_id, new_id))
|
|
|
|
class InvalidTimelineId(StateException):
|
|
"""Raised when an invalid timeline_id is used. The timeline_id must
|
|
be an integer value or a value that can be case to an integer."""
|
|
def __init__(self, timeline_id):
|
|
super().__init__(
|
|
"Invalid timeline_id provided; it must be an integer or a " +
|
|
"value that can be cast to an integer (invalid value=%s)" %
|
|
timeline_id)
|
|
|
|
class TimelineIdLowered(StateException):
|
|
"""Raised when an attempt is made to lower the timeline_id.
|
|
The timeline_id indicates the version of our dataset and decrementing
|
|
this timeline_id would mean that we connect the pgbouncer to a
|
|
dataset that is older than the currently connected dataset."""
|
|
def __init__(self, old_id, new_id):
|
|
super().__init__(
|
|
"The timeline_id cannot be lowered (from=%d, to=%d)" %
|
|
(old_id, new_id))
|
|
|
|
class InvalidLeaderStatus(StateException):
|
|
"""Raised when leader_status is set to an invalid value."""
|
|
def __init__(self, status):
|
|
super().__init__(
|
|
("The leader_status cannot be set to '%s' " +
|
|
"(valid values are: %s)") % (status, ", ".join(LEADER_STATUSES)))
|
|
|
|
class UnknownLeaderNodeId(StateException):
|
|
"""Raised when leader_Node_id is set to a non-existing node_id value."""
|
|
def __init__(self, leader_node_id):
|
|
super().__init__(
|
|
"The leader_node_id does not exist " +
|
|
"(leader_node_id=%s)" % leader_node_id)
|
|
|
|
class ActivePgbouncerConfigDoesNotExist(StateException):
|
|
"""Raised when active_pgbouncer_config is set to a non-existing path."""
|
|
def __init__(self, active_pgbouncer_config):
|
|
super().__init__(
|
|
"The active_pgbouncer_config file does not exist " +
|
|
"(path=%s)" % active_pgbouncer_config)
|
|
|
|
class InvalidNodeStatus(StateException):
|
|
"""Raised when the status for a node is set to an invalid value."""
|
|
def __init__(self, status):
|
|
super().__init__(
|
|
("The node status cannot be set to '%s' " +
|
|
"(valid values are: %s)") % (status, ", ".join(NODE_STATUSES)))
|
|
|
|
class State():
|
|
@staticmethod
|
|
def fromConfig(config, logger):
|
|
state = State(logger)
|
|
for node_id, settings in config.nodes.items():
|
|
node_config = NodeConfig(node_id)
|
|
for k, v in settings.items():
|
|
setattr(node_config, k, v)
|
|
state.add_node(node_config)
|
|
return state
|
|
|
|
def __init__(self, logger):
|
|
self.log = logger
|
|
self.modified = False
|
|
self._system_id = None
|
|
self._timeline_id = None
|
|
self._active_pgbouncer_config = None
|
|
self._leader_node_id = None
|
|
self._leader_error = None
|
|
self._leader_status = LEADER_UNKNOWN
|
|
self.nodes = {}
|
|
|
|
def is_clean_slate(self):
|
|
"""Returns True when the state does not yet contain a PostgreSQL system_id."""
|
|
return self.system_id is None
|
|
|
|
@property
|
|
def system_id(self):
|
|
return self._system_id
|
|
|
|
@system_id.setter
|
|
def system_id(self, system_id):
|
|
"""Sets the PostgreSQL system_id in the state. This is the system_id
|
|
that the pgbouncemgr will use as the valid system_id to
|
|
connect pgbouncer to."""
|
|
if self._system_id is not None and self._system_id != system_id:
|
|
raise SystemIdChanged(self._system_id, system_id)
|
|
if system_id is None or system_id.strip() == "":
|
|
raise InvalidSystemId(system_id)
|
|
if self._system_id == system_id:
|
|
return
|
|
self.modified = True
|
|
self._system_id = system_id
|
|
|
|
@property
|
|
def timeline_id(self):
|
|
return self._timeline_id
|
|
|
|
@timeline_id.setter
|
|
def timeline_id(self, timeline_id):
|
|
"""Sets the PostgreSQL timeline_id in the state. This is the
|
|
timeline_id of the currently connected dataset."""
|
|
try:
|
|
timeline_id = int(timeline_id)
|
|
except ValueError:
|
|
raise InvalidTimelineId(timeline_id)
|
|
if self._timeline_id is not None and timeline_id < self._timeline_id:
|
|
raise TimelineIdLowered(self._timeline_id, timeline_id)
|
|
if self._timeline_id == timeline_id:
|
|
return
|
|
self.modified = True
|
|
self._timeline_id = timeline_id
|
|
|
|
def add_node(self, node):
|
|
"""Add a node to the state. Node can be a NodeConfig object or
|
|
a node_id. In case a node_id is provided, an NodeConfig object
|
|
will be created automatically."""
|
|
if not isinstance(node, NodeConfig):
|
|
node = NodeConfig(node)
|
|
if node.node_id in self.nodes:
|
|
raise DuplicateNodeAdded(node.node_id)
|
|
node_state = NodeState(node, self)
|
|
self.nodes[node.node_id] = node_state
|
|
self.modified = True
|
|
return node_state
|
|
|
|
def get_node(self, node_id):
|
|
"""Retrieve a node from the state, identified by the provided
|
|
node_id."""
|
|
if node_id not in self.nodes:
|
|
raise UnknownNodeRequested(node_id)
|
|
return self.nodes[node_id]
|
|
|
|
def promote_node(self, node):
|
|
"""Sets the provided node as the selected leader node for the
|
|
cluster. The leader status is reset to LEADER_UNKNOWN and should
|
|
after this be updated.
|
|
|
|
The system_id, timeline_id and active_pgbouncer_config for the State
|
|
object are inherted from the provided leader node, and must
|
|
therefore be set before calling this method."""
|
|
node = self.get_node(node.node_id)
|
|
if node.system_id is None:
|
|
raise NodeCannotBePromoted(node, "the node has no system_id")
|
|
if node.timeline_id is None:
|
|
raise NodeCannotBePromoted(node, "the node has no timeline_id")
|
|
if node.config.pgbouncer_config is None:
|
|
raise NodeCannotBePromoted(node, "the node has no pgbouncer_config")
|
|
if self.leader_node_id != node.node_id:
|
|
self.leader_node_id = node.node_id
|
|
self.modified = True
|
|
self.leader_status = LEADER_UNKNOWN
|
|
self.leader_error = None
|
|
self.system_id = node.system_id
|
|
self.timeline_id = node.timeline_id
|
|
self.active_pgbouncer_config = node.config.pgbouncer_config
|
|
|
|
@property
|
|
def leader_status(self):
|
|
return self._leader_status
|
|
|
|
@leader_status.setter
|
|
def leader_status(self, status):
|
|
"""Sets the pgbouncer connection status for the current leader
|
|
cluster node. Possible statuses are LEADER_UNKNOWN,
|
|
LEADER_CONNECTED and LEADER_DISCONNECTED."""
|
|
if self._leader_status == status:
|
|
return
|
|
if status not in LEADER_STATUSES:
|
|
raise InvalidLeaderStatus(status)
|
|
self._leader_status = status
|
|
self.modified = True
|
|
|
|
@property
|
|
def leader_node_id(self):
|
|
return self._leader_node_id
|
|
|
|
@leader_node_id.setter
|
|
def leader_node_id(self, leader_node_id):
|
|
"""Sets the id of the leader node."""
|
|
try:
|
|
node = self.get_node(leader_node_id)
|
|
except UnknownNodeRequested:
|
|
self.log.warning(
|
|
"The laeder_node_id value points to a non-existing " +
|
|
"node id: node config changed? (leader_node_id=%s)" %
|
|
leader_node_id)
|
|
raise UnknownLeaderNodeId(leader_node_id)
|
|
if self._leader_node_id == leader_node_id:
|
|
return
|
|
self._leader_node_id = leader_node_id
|
|
self.modified = True
|
|
|
|
@property
|
|
def leader_error(self):
|
|
return self._leader_error
|
|
|
|
@leader_error.setter
|
|
def leader_error(self, err):
|
|
"""Sets the pgbouncer connection error for the current leader
|
|
cluster node. This error is used to inform about problems that
|
|
keep pgbouncer from serving the current leader cluster node."""
|
|
if self._leader_error == err:
|
|
return
|
|
self._leader_error = err
|
|
self.modified = True
|
|
|
|
@property
|
|
def active_pgbouncer_config(self):
|
|
return self._active_pgbouncer_config
|
|
|
|
@active_pgbouncer_config.setter
|
|
def active_pgbouncer_config(self, active_pgbouncer_config):
|
|
"""Sets the pgbouncer configuration file that must be activated
|
|
in order to connect pgbouncer to the correct backend
|
|
PostgreSQL server.
|
|
When the active config is set to a non-existing file (this can
|
|
happen when the node configuration changes), a warning will be
|
|
logged."""
|
|
if not os.path.exists(active_pgbouncer_config):
|
|
self.log.warning(
|
|
"The active_pgbouncer_config value points to a non-existing " +
|
|
"file: node config changed? (path=%s)" %
|
|
active_pgbouncer_config)
|
|
raise ActivePgbouncerConfigDoesNotExist(active_pgbouncer_config)
|
|
if self._active_pgbouncer_config == active_pgbouncer_config:
|
|
return
|
|
self._active_pgbouncer_config = active_pgbouncer_config
|
|
self.modified = True
|
|
|
|
def export(self):
|
|
"""Exports the data for the state that is to be stored in the
|
|
state storage."""
|
|
return {
|
|
"system_id": self.system_id,
|
|
"timeline_id": self.timeline_id,
|
|
"active_pgbouncer_config": self.active_pgbouncer_config,
|
|
"leader_node_id": self.leader_node_id,
|
|
"leader_status": self.leader_status,
|
|
"leader_error": self.leader_error,
|
|
"nodes": dict((i, n.export()) for i, n in self.nodes.items())
|
|
}
|
|
|
|
|
|
class NodeState():
|
|
"""This class encapsulates the information for a single node in a
|
|
PostgreSQL cluster."""
|
|
|
|
def __init__(self, config, parent_state):
|
|
self.node_id = config.node_id
|
|
self.config = config
|
|
self.parent_state = parent_state
|
|
self._system_id = None
|
|
self._timeline_id = None
|
|
self.status = NODE_UNKNOWN
|
|
self.err = None
|
|
|
|
def reset(self):
|
|
"""Reset the data for the node."""
|
|
self.system_id = None
|
|
self.timeline_id = None
|
|
self.status = NODE_UNKNOWN
|
|
self.err = None
|
|
self.notify_parent()
|
|
|
|
def notify_parent(self):
|
|
self.parent_state.modified = True
|
|
|
|
@property
|
|
def system_id(self):
|
|
return self._system_id
|
|
|
|
@system_id.setter
|
|
def system_id(self, system_id):
|
|
"""Sets the PostgreSQL system_id, which is a cluster-wide identifier for
|
|
the database cluster data. When a PostgreSQL server uses a different
|
|
system_id than our cluster, then it does not serve the same dataset.
|
|
Possibly is was reinstalled or simply a new cluster which has not yet
|
|
been synced to the other cluster hosts."""
|
|
if system_id is None or system_id.strip() == "":
|
|
raise InvalidSystemId(system_id)
|
|
if self._system_id == system_id:
|
|
return
|
|
self._system_id = system_id
|
|
self.notify_parent()
|
|
|
|
@property
|
|
def timeline_id(self):
|
|
return self._timeline_id
|
|
|
|
@timeline_id.setter
|
|
def timeline_id(self, timeline_id):
|
|
"""Sets the PostgreSQL timeline_id, which is an identifier for the
|
|
version of the dataset that is served by a PostegreSQL server.
|
|
This version is incremented when a PIT recovery is done, or when a
|
|
database server is promoted to act as a primary server.
|
|
In pgbouncemgr, this timeline_id is used to make sure that we never
|
|
accidentally activate an old version of the dataset to be the new
|
|
primary dataset."""
|
|
try:
|
|
timeline_id = int(timeline_id)
|
|
except ValueError:
|
|
raise InvalidTimelineId(timeline_id)
|
|
if self._timeline_id == timeline_id:
|
|
return
|
|
self._timeline_id = timeline_id
|
|
self.notify_parent()
|
|
|
|
def set_status(self, status):
|
|
"""Set the connection status for the node. This is used to indicate
|
|
whether or not a connection can be setup to the node from the
|
|
pgbouncemgr application and if the node is running in primary
|
|
or fallback mode. Possible values are: NODE_UNKNOWN, NODE_OFFLINE,
|
|
NODE_PRIMARY and NODE_STANDBY."""
|
|
if self.status == status:
|
|
return
|
|
if status not in NODE_STATUSES:
|
|
raise InvalidNodeStatus(status)
|
|
self.status = status
|
|
self.notify_parent()
|
|
|
|
def set_error(self, err):
|
|
"""When there is some problem with this node, this method can be used
|
|
to set an error message for it, explaining the problem."""
|
|
if self.err == err:
|
|
return
|
|
self.err = err
|
|
self.notify_parent()
|
|
|
|
def promote(self):
|
|
"""Promote this node to be the leader of the cluster.
|
|
This means that pgbouncer will be reconfigured to make use of
|
|
this node as the primary backend."""
|
|
self.parent_state.promote_node(self)
|
|
|
|
def export(self):
|
|
"""Exports the data for the node that is to be stored in the
|
|
state storage."""
|
|
return {
|
|
"node_id": self.node_id,
|
|
"config": self.config.export(),
|
|
"system_id": self.system_id,
|
|
"timeline_id": self.timeline_id,
|
|
"is_leader": self.parent_state.leader_node_id == self.node_id,
|
|
"status": self.status,
|
|
"error": self.err,
|
|
}
|