Moved all constants into their own pgbouncemgr.constants module.

This commit is contained in:
Maurice Makaay 2019-12-13 14:17:15 +01:00
parent a3a97a9721
commit 4f8f1473a1
10 changed files with 107 additions and 47 deletions

View File

@ -13,10 +13,13 @@ run:
PYTHONPATH=./ python3 pgbouncemgr/manager.py
run-d:
PYTHONPATH=./ python3 pgbouncemgr/manager.py -d
PYTHONPATH=./ python3 pgbouncemgr/manager.py --debug
run-s:
PYTHONPATH=./ python3 pgbouncemgr/manager.py --debug --single-shot
run-v:
PYTHONPATH=./ python3 pgbouncemgr/manager.py -v
PYTHONPATH=./ python3 pgbouncemgr/manager.py --verbose
run-h:
PYTHONPATH=./ python3 pgbouncemgr/manager.py -h

View File

@ -55,7 +55,7 @@ class Config():
"connect_timeout": 1,
"user": "pgbouncemgr",
"password": None,
"name": "template1",
"database": "template1",
}
self.pgbouncer = {
"pgbouncer_config": "/etc/pgbouncer/pgbouncer.ini",

25
pgbouncemgr/constants.py Normal file
View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
"""Constants as used by the pgbouncemgr modules."""
# Configuration defaults.
DEFAULT_CONFIG = "/etc/pgbouncer/pgbouncemgr.yaml"
DEFAULT_LOG_FACILITY = "LOG_LOCAL1"
# Return values for the PgConnection.connect() method.
CONN_CONNECTED = 'CONNECTED'
CONN_REUSED = 'REUSED'
CONN_RECONNECTED = 'RECONNECTED'
# Backend node status.
NODE_UNKNOWN = "NODE_UNKNOWN"
NODE_OFFLINE = "NODE_OFFLINE"
NODE_PRIMARY = "NODE_PRIMARY"
NODE_STANDBY = "NODE_STANDBY"
NODE_STATUSES = [NODE_UNKNOWN, NODE_OFFLINE, NODE_PRIMARY, NODE_STANDBY]
# Leader node status.
LEADER_UNKNOWN = "LEADER_UNKNOWN"
LEADER_CONNECTED = "LEADER_CONNECTED"
LEADER_DISCONNECTED = "LEADER_DISCONNECTED"
LEADER_STATUSES = [LEADER_UNKNOWN, LEADER_CONNECTED, LEADER_DISCONNECTED]

View File

@ -6,6 +6,7 @@
from time import sleep
from argparse import ArgumentParser
from pgbouncemgr.constants import DEFAULT_CONFIG, DEFAULT_LOG_FACILITY
from pgbouncemgr.logger import Logger, ConsoleLog, SyslogLog
from pgbouncemgr.config import Config
from pgbouncemgr.state import State
@ -14,14 +15,11 @@ from pgbouncemgr.state_store import StateStore
from pgbouncemgr.node_poller import NodePoller
DEFAULT_CONFIG = "/etc/pgbouncer/pgbouncemgr.yaml"
DEFAULT_LOG_FACILITY = "LOG_LOCAL1"
class Manager():
def __init__(self, argv=None):
args = _parse_arguments(argv)
self.config = Config(args.config)
self.single_shot = args.single_shot
self._create_logger(args)
self._create_state()
self.node_poller = NodePoller(self.state)
@ -42,6 +40,8 @@ class Manager():
drop_privileges(self.config.run_user, self.config.run_group)
while True:
self.node_poller.poll()
if self.single_shot:
return
sleep(self.config.poll_interval_in_sec)
@ -55,6 +55,11 @@ def _parse_arguments(args):
"-d", "--debug",
default=False, action="store_true",
help="enable debugging console output (default: disabled)")
parser.add_argument(
"-s", "--single-shot",
default=False, action="store_true",
help="do only a single run, instead of running continuously " +
"(default: disabled)")
parser.add_argument(
"-f", "--log-facility",
default=DEFAULT_LOG_FACILITY,

View File

@ -14,7 +14,7 @@ class NodeConfig():
self.host = None
self.port = None
self.connect_timeout = 1
self.name = 'template1'
self.database = 'template1'
self.user = 'pgbouncemgr'
self.password = None

View File

@ -1,13 +1,47 @@
# -*- coding: utf-8 -*-
# no-pylint: disable=missing-docstring,too-few-public-methods
from pgbouncemgr.postgres import PgReplicationConnection, PgException
class NodePoller():
"""The NodePoller is used to poll all the nodes that are available
in the state object, and to update their status according to
the results."""
def __init__(self, state):
the results.
The connection_class that can be provided is used for testing
purposes (dependency injection)."""
def __init__(self, state, connection_class=None):
self.connection_class = PgReplicationConnection
if connection_class is not None:
self.connection_class = connection_class
self.state = state
self._connections = {}
def poll(self):
"""Check the status of all backend nodes and update their status."""
for node in self.state.nodes.values():
print(repr(node.config))
self._poll_node(node)
def _poll_node(self, node):
connection = self._get_connection_object(node)
try:
result = connection.connect()
print(result)
status = connection.get_replication_status()
node.status = status["status"]
node.timeline_id = status["timeline_id"]
node.system_id = status["system_id"]
except PgException as exception:
pass
def _get_connection_object(self, node):
if node.node_id in self._connections:
return self._connections[node.node_id]
connection = self.connection_class(node.config)
self._connections[node.node_id] = connection
return connection
def reset(self):
"""Reset all database connections."""
for connection in self._connections:
connection.disconnect()
self._connections = {}

View File

@ -8,6 +8,9 @@ import multiprocessing
import psycopg2
from psycopg2.extras import LogicalReplicationConnection
from pgbouncemgr.logger import format_ex
from pgbouncemgr.constants import \
NODE_OFFLINE, NODE_STANDBY, NODE_PRIMARY,\
CONN_REUSED, CONN_RECONNECTED, CONN_CONNECTED
class PgException(Exception):
@ -34,14 +37,16 @@ class ConnectedToWrongBackend(PgException):
"backend service: %s" % msg)
# Return values for the PgConnection.connect() method.
CONNECTED = 'CONNECTED'
REUSED = 'REUSED'
RECONNECTED = 'RECONNECTED'
# The properties that can be used in a configuration object to
# define connection parameters for psycopg2.
CONNECTION_PARAMS = ['host', 'port', 'connect_timeout', 'user', 'password']
CONNECTION_PARAMS = [
'host',
'port',
'user',
'password',
'database',
'connect_timeout'
]
class PgConnection():
"""Implements a connection to a PostgreSQL server."""
@ -69,20 +74,20 @@ class PgConnection():
this connection. If no, or when no connection exists, then
setup a new connection.
Raises an exeption when the database connection cannot be setup.
returns CONNECTED, REUSED or RECONNECTED when the connection
was setup successfully."""
returns CONN_CONNECTED, CONN_REUSED or CONN_RECONNECTED when
the connection was setup successfully."""
reconnected = False
if self.conn is not None:
try:
with self.conn.cursor() as cursor:
cursor.execute(self.ping_query)
return REUSED
return CONN_REUSED
except psycopg2.OperationalError:
reconnected = True
self.disconnect()
try:
self.conn = psycopg2.connect(**self.conn_params)
return RECONNECTED if reconnected else CONNECTED
return CONN_RECONNECTED if reconnected else CONN_CONNECTED
except psycopg2.OperationalError as exception:
self.disconnect()
raise PgConnectionFailed(exception)
@ -97,11 +102,6 @@ class PgConnection():
self.conn = None
# Return values for the PgReplicationConnection status.
OFFLINE = "OFFLINE"
PRIMARY = "PRIMARY"
STANDBY = "STANDBY"
class PgReplicationConnection(PgConnection):
"""This PostgresQL connection class is used to setup a replication
connection to a PostgreSQL database server, which can be used
@ -112,19 +112,20 @@ class PgReplicationConnection(PgConnection):
def get_replication_status(self):
"""Returns the replication status for a node. This is an array,
containing the keys "status" (OFFLINE, PRIMARY or STANDBY),
"system_id" and "timeline_id"."""
containing the keys "status" (NODE_OFFLINE, NODE_PRIMARY or
NODE_STANDBY), "system_id" and "timeline_id"."""
status = {
"status": None,
"system_id": None,
"timeline_id": None
}
# Try to connect to the node. If this fails, the node is OFFLINE.
# Try to connect to the node. If this fails, the status is
# set to NODE_OFFLINE.
try:
self.connect()
except PgConnectionFailed:
status["status"] = OFFLINE
status["status"] = NODE_OFFLINE
return status
# Check if the node is running in primary or standby mode.
@ -132,7 +133,7 @@ class PgReplicationConnection(PgConnection):
with self.conn.cursor() as cursor:
cursor.execute("SELECT pg_is_in_recovery()")
in_recovery = cursor.fetchone()[0]
status["status"] = STANDBY if in_recovery else PRIMARY
status["status"] = NODE_STANDBY if in_recovery else NODE_PRIMARY
except psycopg2.InternalError as exception:
self.disconnect()
raise RetrievingPgReplicationStatusFailed(

View File

@ -3,18 +3,9 @@
import os
from pgbouncemgr.node_config import NodeConfig
LEADER_UNKNOWN = "LEADER_UNKNOWN"
LEADER_CONNECTED = "LEADER_CONNECTED"
LEADER_DISCONNECTED = "LEADER_DISCONNECTED"
LEADER_STATUSES = [LEADER_UNKNOWN, LEADER_CONNECTED, LEADER_DISCONNECTED]
NODE_UNKNOWN = "NODE_UNKNOWN"
NODE_OFFLINE = "NODE_OFFLINE"
NODE_PRIMARY = "NODE_PRIMARY"
NODE_STANDBY = "NODE_STANDBY"
NODE_STATUSES = [NODE_UNKNOWN, NODE_OFFLINE, NODE_PRIMARY, NODE_STANDBY]
from pgbouncemgr.constants import \
LEADER_UNKNOWN, LEADER_STATUSES,\
NODE_UNKNOWN, NODE_OFFLINE, NODE_PRIMARY, NODE_STANDBY, NODE_STATUSES
class StateException(Exception):

View File

@ -52,7 +52,7 @@ class ConfigTests(unittest.TestCase):
self.assertEqual(1, config.db_connection_defaults["connect_timeout"])
self.assertEqual("pgbouncemgr", config.db_connection_defaults["user"])
self.assertEqual(None, config.db_connection_defaults["password"])
self.assertEqual("template1", config.db_connection_defaults["name"])
self.assertEqual("template1", config.db_connection_defaults["database"])
# pgbouncer
self.assertEqual("/etc/pgbouncer/pgbouncer.ini", config.pgbouncer["pgbouncer_config"])
self.assertEqual(6432, config.pgbouncer["port"])
@ -64,7 +64,7 @@ class ConfigTests(unittest.TestCase):
self.assertEqual({
"connect_timeout": 1,
"host": "0.0.0.0",
"name": "template1",
"database": "template1",
"password": "Wilmaaaaa!!!",
"pgbouncer_config": "/etc/pgbouncer/pgbouncer.ini",
"port": 6432,
@ -74,7 +74,7 @@ class ConfigTests(unittest.TestCase):
"nodeA": {
"connect_timeout": 1,
"host": "1.2.3.4",
"name": "template1",
"database": "template1",
"password": "Wilmaaaaa!!!",
"port": 8888,
"user": "pgbouncemgr"
@ -82,7 +82,7 @@ class ConfigTests(unittest.TestCase):
"nodeB": {
"connect_timeout": 1,
"host": "2.3.4.5",
"name": "template1",
"database": "template1",
"password": "Wilmaaaaa!!!",
"port": 7777,
"user": "pgbouncemgr"

View File

@ -4,6 +4,7 @@ import os
import unittest
from pgbouncemgr.logger import *
from pgbouncemgr.state import *
from pgbouncemgr.constants import LEADER_UNKNOWN, LEADER_CONNECTED
PGBOUNCER_CONFIG = os.path.join(