Implemented the state store.
This commit is contained in:
parent
b4755f9f81
commit
6f00ca34cd
|
@ -97,8 +97,8 @@ class State():
|
||||||
self.modified = False
|
self.modified = False
|
||||||
self._system_id = None
|
self._system_id = None
|
||||||
self._timeline_id = None
|
self._timeline_id = None
|
||||||
self._pgbouncer_config = None
|
self._active_pgbouncer_config = None
|
||||||
self._leader_node_id = None
|
self.leader_node_id = None
|
||||||
self._leader_error = None
|
self._leader_error = None
|
||||||
self._leader_status = LEADER_UNKNOWN
|
self._leader_status = LEADER_UNKNOWN
|
||||||
self.nodes = {}
|
self.nodes = {}
|
||||||
|
@ -169,7 +169,7 @@ class State():
|
||||||
cluster. The leader status is reset to LEADER_UNKNOWN and should
|
cluster. The leader status is reset to LEADER_UNKNOWN and should
|
||||||
after this be updated.
|
after this be updated.
|
||||||
|
|
||||||
The system_id, timeline_id and pgbouncer_config for the State
|
The system_id, timeline_id and active_pgbouncer_config for the State
|
||||||
object are inherted from the provided leader node, and must
|
object are inherted from the provided leader node, and must
|
||||||
therefore be set before calling this method."""
|
therefore be set before calling this method."""
|
||||||
node = self.get_node(node.node_id)
|
node = self.get_node(node.node_id)
|
||||||
|
@ -179,18 +179,14 @@ class State():
|
||||||
raise NodeCannotBePromoted(node, "the node has no timeline_id")
|
raise NodeCannotBePromoted(node, "the node has no timeline_id")
|
||||||
if node.config.pgbouncer_config is None:
|
if node.config.pgbouncer_config is None:
|
||||||
raise NodeCannotBePromoted(node, "the node has no pgbouncer_config")
|
raise NodeCannotBePromoted(node, "the node has no pgbouncer_config")
|
||||||
if self._leader_node_id != node.node_id:
|
if self.leader_node_id != node.node_id:
|
||||||
self._leader_node_id = node.node_id
|
self.leader_node_id = node.node_id
|
||||||
self.modified = True
|
self.modified = True
|
||||||
self.leader_status = LEADER_UNKNOWN
|
self.leader_status = LEADER_UNKNOWN
|
||||||
self.leader_error = None
|
self.leader_error = None
|
||||||
self.system_id = node.system_id
|
self.system_id = node.system_id
|
||||||
self.timeline_id = node.timeline_id
|
self.timeline_id = node.timeline_id
|
||||||
self.pgbouncer_config = node.config.pgbouncer_config
|
self.active_pgbouncer_config = node.config.pgbouncer_config
|
||||||
|
|
||||||
@property
|
|
||||||
def leader_node_id(self):
|
|
||||||
return self._leader_node_id
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def leader_status(self):
|
def leader_status(self):
|
||||||
|
@ -223,17 +219,17 @@ class State():
|
||||||
self.modified = True
|
self.modified = True
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def pgbouncer_config(self):
|
def active_pgbouncer_config(self):
|
||||||
return self._pgbouncer_config
|
return self._active_pgbouncer_config
|
||||||
|
|
||||||
@pgbouncer_config.setter
|
@active_pgbouncer_config.setter
|
||||||
def pgbouncer_config(self, pgbouncer_config):
|
def active_pgbouncer_config(self, pgbouncer_config):
|
||||||
"""Sets the pgbouncer configuration file that must be activated
|
"""Sets the pgbouncer configuration file that must be activated
|
||||||
in order to connect pgbouncer to the correct backend
|
in order to connect pgbouncer to the correct backend
|
||||||
PostgreSQL server."""
|
PostgreSQL server."""
|
||||||
if self._pgbouncer_config == pgbouncer_config:
|
if self._active_pgbouncer_config == pgbouncer_config:
|
||||||
return
|
return
|
||||||
self._pgbouncer_config = pgbouncer_config
|
self._active_pgbouncer_config = pgbouncer_config
|
||||||
self.modified = True
|
self.modified = True
|
||||||
|
|
||||||
def export(self):
|
def export(self):
|
||||||
|
@ -242,7 +238,7 @@ class State():
|
||||||
return {
|
return {
|
||||||
"system_id": self.system_id,
|
"system_id": self.system_id,
|
||||||
"timeline_id": self.timeline_id,
|
"timeline_id": self.timeline_id,
|
||||||
"pgbouncer_config": self.pgbouncer_config,
|
"active_pgbouncer_config": self.active_pgbouncer_config,
|
||||||
"leader_node_id": self.leader_node_id,
|
"leader_node_id": self.leader_node_id,
|
||||||
"leader_status": self.leader_status,
|
"leader_status": self.leader_status,
|
||||||
"leader_error": self.leader_error,
|
"leader_error": self.leader_error,
|
||||||
|
|
|
@ -7,92 +7,58 @@ from os import rename
|
||||||
from os.path import isfile
|
from os.path import isfile
|
||||||
from pgbouncemgr.logger import format_ex
|
from pgbouncemgr.logger import format_ex
|
||||||
|
|
||||||
STORE_ERROR = "error"
|
|
||||||
STORE_UPDATED = "updated"
|
|
||||||
STORE_NOCHANGE = "nochange"
|
|
||||||
|
|
||||||
class State():
|
class StateStoreException(Exception):
|
||||||
system_id = None
|
"""Used for all exceptions that are raised from pgbouncemgr.state_store."""
|
||||||
timeline_id = None
|
|
||||||
pgbouncer_config = None
|
|
||||||
primary_node = None
|
|
||||||
primary_connected = False
|
|
||||||
primary_error = None
|
|
||||||
err = None
|
|
||||||
old_state = None
|
|
||||||
|
|
||||||
def __init__(self, path):
|
|
||||||
|
class StateStore():
|
||||||
|
def __init__(self, path, state):
|
||||||
self.path = path
|
self.path = path
|
||||||
|
self.state = state
|
||||||
self.load()
|
self.load()
|
||||||
|
|
||||||
def load(self):
|
def load(self):
|
||||||
"""Load the state from the state file.
|
"""Load state from the state file into the state object.
|
||||||
When this fails, an exception will be thrown."""
|
When this fails, an exception will be thrown.
|
||||||
# When no state file exists, we start with a clean slate.
|
Note that not all of the stored state is restored.
|
||||||
|
Only those parts that are required to be carried
|
||||||
|
on between restarts."""
|
||||||
|
# When no state file exists, there is no state to restore.
|
||||||
|
# Keep the state object as-is.
|
||||||
if not isfile(self.path):
|
if not isfile(self.path):
|
||||||
return
|
return
|
||||||
|
|
||||||
# Otherwise, read the state from the state file.
|
# Otherwise, read the state from the state file.
|
||||||
with open(self.path, 'r') as stream:
|
with open(self.path, 'r') as stream:
|
||||||
try:
|
try:
|
||||||
state = json.load(stream)
|
loaded_state = json.load(stream)
|
||||||
except json.decoder.JSONDecodeError:
|
except json.decoder.JSONDecodeError:
|
||||||
# JSON decoding failed. This is beyond repair.
|
# JSON decoding failed. This is beyond repair.
|
||||||
# Start with a clean slate to have the state data reinitialized.
|
# Start with a clean slate to have the state data reinitialized.
|
||||||
return
|
return
|
||||||
|
|
||||||
# Copy the state over to this state object.
|
# Copy the state over to the state object.
|
||||||
self.system_id = state["system_id"]
|
for key in [
|
||||||
self.timeline_id = state["timeline_id"]
|
"system_id",
|
||||||
self.pgbouncer_config = state["pgbouncer_config"]
|
"timeline_id",
|
||||||
self.primary_node = state["primary_node"]
|
"active_pgbouncer_config",
|
||||||
|
"leader_node_id"]:
|
||||||
# The folowing properties are always filled dynamically by the manager.
|
setattr(self.state, key, loaded_state[key])
|
||||||
# These are not restored from the state file on startup.
|
|
||||||
self.primary_connected = False
|
|
||||||
self.nodes = {}
|
|
||||||
|
|
||||||
def is_clean_slate(self):
|
|
||||||
return self.system_id is None
|
|
||||||
|
|
||||||
def set_primary_connected(self, connected, err=None):
|
|
||||||
self.primary_connected = connected
|
|
||||||
self.primary_error = err
|
|
||||||
|
|
||||||
def store(self, nodes):
|
|
||||||
"""Store the current state in the state file.
|
|
||||||
Returns True when the state was stored successfully.
|
|
||||||
Returns False when it failed. The error can be found in the err property."""
|
|
||||||
stored_state = {
|
|
||||||
"system_id": self.system_id,
|
|
||||||
"timeline_id": self.timeline_id,
|
|
||||||
"pgbouncer_config": self.pgbouncer_config,
|
|
||||||
"primary_node": self.primary_node,
|
|
||||||
"primary_connected": self.primary_connected,
|
|
||||||
"primary_error": self.primary_error,
|
|
||||||
"nodes": dict((n.name, {
|
|
||||||
"host": n.conn_params["host"],
|
|
||||||
"port": n.conn_params["port"],
|
|
||||||
"status": n.status,
|
|
||||||
"system_id": n.system_id,
|
|
||||||
"timeline_id": n.timeline_id,
|
|
||||||
"error": n.err}) for n in nodes)
|
|
||||||
}
|
|
||||||
|
|
||||||
# When the state has not changed, then don't write out a new state.
|
|
||||||
new_state = json.dumps(stored_state, sort_keys=True, indent=2)
|
|
||||||
if self.old_state and self.old_state == new_state:
|
|
||||||
return STORE_NOCHANGE
|
|
||||||
self.old_state = new_state
|
|
||||||
|
|
||||||
|
def store(self):
|
||||||
|
"""Store the current state in the state file. Returns True when
|
||||||
|
the state was stored successfully. Returns False when it failed.
|
||||||
|
The error can be found in the err property."""
|
||||||
|
new_state = json.dumps(self.state.export(), sort_keys=True, indent=2)
|
||||||
try:
|
try:
|
||||||
self.err = None
|
self.err = None
|
||||||
swap_path = "%s..SWAP" % self.path
|
swap_path = "%s..SWAP" % self.path
|
||||||
with open(swap_path, "w") as file_handle:
|
with open(swap_path, "w") as file_handle:
|
||||||
print(new_state, file=file_handle)
|
print(new_state, file=file_handle)
|
||||||
rename(swap_path, self.path)
|
rename(swap_path, self.path)
|
||||||
return STORE_UPDATED
|
return True
|
||||||
except Exception as exception:
|
except Exception as exception:
|
||||||
self.err = "Storing state to file (%s) failed: %s" % (
|
self.err = "Storing state to file (%s) failed: %s" % (
|
||||||
self.path, format_exception_message(exception))
|
self.path, format_ex(exception))
|
||||||
return STORE_ERROR
|
return False
|
||||||
|
|
|
@ -15,7 +15,7 @@ class StateTests(unittest.TestCase):
|
||||||
state = State()
|
state = State()
|
||||||
self.assertIsNone(state.system_id)
|
self.assertIsNone(state.system_id)
|
||||||
self.assertIsNone(state.timeline_id)
|
self.assertIsNone(state.timeline_id)
|
||||||
self.assertIsNone(state.pgbouncer_config)
|
self.assertIsNone(state.active_pgbouncer_config)
|
||||||
self.assertIsNone(state.leader_node_id)
|
self.assertIsNone(state.leader_node_id)
|
||||||
self.assertIsNone(state.leader_error)
|
self.assertIsNone(state.leader_error)
|
||||||
self.assertEqual(LEADER_UNKNOWN, state.leader_status)
|
self.assertEqual(LEADER_UNKNOWN, state.leader_status)
|
||||||
|
@ -434,7 +434,7 @@ class StateExportTests(unittest.TestCase):
|
||||||
self.assertEqual({
|
self.assertEqual({
|
||||||
"system_id": "System X",
|
"system_id": "System X",
|
||||||
"timeline_id": 555,
|
"timeline_id": 555,
|
||||||
"pgbouncer_config": PGBOUNCER_CONFIG,
|
"active_pgbouncer_config": PGBOUNCER_CONFIG,
|
||||||
"leader_node_id": 1,
|
"leader_node_id": 1,
|
||||||
"leader_status": LEADER_CONNECTED,
|
"leader_status": LEADER_CONNECTED,
|
||||||
"leader_error": "Some error for leader connection",
|
"leader_error": "Some error for leader connection",
|
||||||
|
|
|
@ -0,0 +1,61 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import unittest
|
||||||
|
from tempfile import NamedTemporaryFile
|
||||||
|
from pgbouncemgr.state import *
|
||||||
|
from pgbouncemgr.state_store import *
|
||||||
|
|
||||||
|
|
||||||
|
def make_test_file_path(filename):
|
||||||
|
return os.path.join(
|
||||||
|
os.path.dirname(os.path.realpath(__file__)),
|
||||||
|
"testfiles", filename)
|
||||||
|
|
||||||
|
class StateStoreTests(unittest.TestCase):
|
||||||
|
def test_GivenNonExistingStateFile_OnLoad_StateStoreDoesNotLoadState(self):
|
||||||
|
state = State()
|
||||||
|
before = state.export()
|
||||||
|
StateStore("/non-existent/state.json", state)
|
||||||
|
after = state.export()
|
||||||
|
|
||||||
|
self.assertEqual(before, after)
|
||||||
|
|
||||||
|
def test_GivenExistingStateFile_ContainingInvalidJson_OnLoad_StateStoreDoesNotLoadState(self):
|
||||||
|
state = State()
|
||||||
|
before = state.export()
|
||||||
|
StateStore(make_test_file_path("invalid.json"), state)
|
||||||
|
after = state.export()
|
||||||
|
|
||||||
|
self.assertEqual(before, after)
|
||||||
|
|
||||||
|
def test_GivenExistingStateFile_OnLoad_StateStoreUpdatesState(self):
|
||||||
|
state = State()
|
||||||
|
|
||||||
|
# These are the fields as
|
||||||
|
expected = state.export()
|
||||||
|
expected["system_id"] = "A"
|
||||||
|
expected["timeline_id"] = 42
|
||||||
|
expected["leader_node_id"] = "NODE1"
|
||||||
|
expected["active_pgbouncer_config"] = "/my/config.ini"
|
||||||
|
|
||||||
|
StateStore(make_test_file_path("state.json"), state)
|
||||||
|
|
||||||
|
self.assertEqual(expected, state.export())
|
||||||
|
|
||||||
|
def test_GivenState_OnSave_StateStoreStoresState(self):
|
||||||
|
try:
|
||||||
|
state = State()
|
||||||
|
tmpfile = NamedTemporaryFile(delete=False)
|
||||||
|
StateStore(tmpfile.name, state).store()
|
||||||
|
|
||||||
|
with open(tmpfile.name, 'r') as stream:
|
||||||
|
stored = json.load(stream)
|
||||||
|
self.assertEqual(state.export(), stored)
|
||||||
|
except Exception as exception:
|
||||||
|
self.fail("Unexpected exception: %s" % str(exception))
|
||||||
|
finally:
|
||||||
|
if tmpfile and os.path.exists(tmpfile.name):
|
||||||
|
os.unlink(tmpfile.name)
|
||||||
|
|
|
@ -0,0 +1,6 @@
|
||||||
|
this
|
||||||
|
is
|
||||||
|
an
|
||||||
|
invalid
|
||||||
|
JSON
|
||||||
|
file
|
|
@ -0,0 +1,10 @@
|
||||||
|
{
|
||||||
|
"active_pgbouncer_config": "/my/config.ini",
|
||||||
|
"system_id": "A",
|
||||||
|
"timeline_id": 42,
|
||||||
|
"leader_node_id": "NODE1",
|
||||||
|
"any": "other",
|
||||||
|
"state": "properties",
|
||||||
|
"are": "ignored",
|
||||||
|
"on": "restore"
|
||||||
|
}
|
Loading…
Reference in New Issue