Backup work on unit tests using a psycopg2 stub.
This commit is contained in:
parent
4f8f1473a1
commit
c8595f0a55
|
@ -7,9 +7,9 @@ DEFAULT_CONFIG = "/etc/pgbouncer/pgbouncemgr.yaml"
|
||||||
DEFAULT_LOG_FACILITY = "LOG_LOCAL1"
|
DEFAULT_LOG_FACILITY = "LOG_LOCAL1"
|
||||||
|
|
||||||
# Return values for the PgConnection.connect() method.
|
# Return values for the PgConnection.connect() method.
|
||||||
CONN_CONNECTED = 'CONNECTED'
|
CONN_CONNECTED = 'CONN_CONNECTED'
|
||||||
CONN_REUSED = 'REUSED'
|
CONN_REUSED = 'CONN_REUSED'
|
||||||
CONN_RECONNECTED = 'RECONNECTED'
|
CONN_RECONNECTED = 'CONN_RECONNECTED'
|
||||||
|
|
||||||
# Backend node status.
|
# Backend node status.
|
||||||
NODE_UNKNOWN = "NODE_UNKNOWN"
|
NODE_UNKNOWN = "NODE_UNKNOWN"
|
||||||
|
|
|
@ -22,7 +22,7 @@ class Manager():
|
||||||
self.single_shot = args.single_shot
|
self.single_shot = args.single_shot
|
||||||
self._create_logger(args)
|
self._create_logger(args)
|
||||||
self._create_state()
|
self._create_state()
|
||||||
self.node_poller = NodePoller(self.state)
|
self.node_poller = NodePoller(self.state, self.log)
|
||||||
|
|
||||||
def _create_logger(self, args):
|
def _create_logger(self, args):
|
||||||
self.log = Logger()
|
self.log = Logger()
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
# no-pylint: disable=missing-docstring,too-few-public-methods
|
# no-pylint: disable=missing-docstring,too-few-public-methods
|
||||||
|
|
||||||
|
from pgbouncemgr.logger import format_ex
|
||||||
from pgbouncemgr.postgres import PgReplicationConnection, PgException
|
from pgbouncemgr.postgres import PgReplicationConnection, PgException
|
||||||
|
from pgbouncemgr.constants import NODE_OFFLINE
|
||||||
|
|
||||||
class NodePoller():
|
class NodePoller():
|
||||||
"""The NodePoller is used to poll all the nodes that are available
|
"""The NodePoller is used to poll all the nodes that are available
|
||||||
|
@ -9,11 +11,12 @@ class NodePoller():
|
||||||
the results.
|
the results.
|
||||||
The connection_class that can be provided is used for testing
|
The connection_class that can be provided is used for testing
|
||||||
purposes (dependency injection)."""
|
purposes (dependency injection)."""
|
||||||
def __init__(self, state, connection_class=None):
|
def __init__(self, state, log, connection_class=None):
|
||||||
|
self.state = state
|
||||||
|
self.log = log
|
||||||
self.connection_class = PgReplicationConnection
|
self.connection_class = PgReplicationConnection
|
||||||
if connection_class is not None:
|
if connection_class is not None:
|
||||||
self.connection_class = connection_class
|
self.connection_class = connection_class
|
||||||
self.state = state
|
|
||||||
self._connections = {}
|
self._connections = {}
|
||||||
|
|
||||||
def poll(self):
|
def poll(self):
|
||||||
|
@ -22,21 +25,23 @@ class NodePoller():
|
||||||
self._poll_node(node)
|
self._poll_node(node)
|
||||||
|
|
||||||
def _poll_node(self, node):
|
def _poll_node(self, node):
|
||||||
connection = self._get_connection_object(node)
|
connection = self._get_connection(node)
|
||||||
try:
|
try:
|
||||||
result = connection.connect()
|
|
||||||
print(result)
|
|
||||||
status = connection.get_replication_status()
|
status = connection.get_replication_status()
|
||||||
node.status = status["status"]
|
node.status = status["status"]
|
||||||
node.timeline_id = status["timeline_id"]
|
if node.status != NODE_OFFLINE:
|
||||||
node.system_id = status["system_id"]
|
node.timeline_id = status["timeline_id"]
|
||||||
|
node.system_id = status["system_id"]
|
||||||
except PgException as exception:
|
except PgException as exception:
|
||||||
pass
|
node.status = NODE_OFFLINE
|
||||||
|
self.log.error(
|
||||||
|
"[%s] Error on polling node: %s" %
|
||||||
|
(node.node_id, format_ex(exception)))
|
||||||
|
|
||||||
def _get_connection_object(self, node):
|
def _get_connection(self, node):
|
||||||
if node.node_id in self._connections:
|
if node.node_id in self._connections:
|
||||||
return self._connections[node.node_id]
|
return self._connections[node.node_id]
|
||||||
connection = self.connection_class(node.config)
|
connection = self.connection_class(node.config, self.log)
|
||||||
self._connections[node.node_id] = connection
|
self._connections[node.node_id] = connection
|
||||||
return connection
|
return connection
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ class PgConnectionFailed(PgException):
|
||||||
"""Raised when connecting to the database server failed."""
|
"""Raised when connecting to the database server failed."""
|
||||||
def __init__(self, exception):
|
def __init__(self, exception):
|
||||||
super().__init__(
|
super().__init__(
|
||||||
"Could not connect to %s: %s" % (format_ex(exception)))
|
"Could not connect to node: %s" % format_ex(exception))
|
||||||
|
|
||||||
class RetrievingPgReplicationStatusFailed(PgException):
|
class RetrievingPgReplicationStatusFailed(PgException):
|
||||||
"""Raised when the replication status cannot be determined."""
|
"""Raised when the replication status cannot be determined."""
|
||||||
|
@ -48,54 +48,91 @@ CONNECTION_PARAMS = [
|
||||||
'connect_timeout'
|
'connect_timeout'
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class NodeLogger():
|
||||||
|
def __init__(self, node_config, log):
|
||||||
|
self.node_id = node_config.node_id
|
||||||
|
self.log = log
|
||||||
|
|
||||||
|
def debug(self, msg):
|
||||||
|
self.log.debug("[%s] %s" % (self.node_id, msg))
|
||||||
|
|
||||||
|
def info(self, msg):
|
||||||
|
self.log.info("[%s] %s" % (self.node_id, msg))
|
||||||
|
|
||||||
|
def warning(self, msg):
|
||||||
|
self.log.warning("[%s] %s" % (self.node_id, msg))
|
||||||
|
|
||||||
|
def error(self, msg):
|
||||||
|
self.log.error("[%s] %s" % (self.node_id, msg))
|
||||||
|
|
||||||
|
|
||||||
class PgConnection():
|
class PgConnection():
|
||||||
"""Implements a connection to a PostgreSQL server."""
|
"""Implements a connection to a PostgreSQL server.
|
||||||
def __init__(self, config):
|
The provided node_config must be a pgbouncemgr.NodeConfig object.
|
||||||
self.conn_params = self._create_conn_params(config)
|
The psycopg2_module argument is used as a depencency injection
|
||||||
|
mechanism for testing purposes."""
|
||||||
|
def __init__(self, node_config, log, psycopg2_module=psycopg2):
|
||||||
|
self.log = NodeLogger(node_config, log)
|
||||||
|
self.psycopg2 = psycopg2_module
|
||||||
|
|
||||||
|
self.node_id = node_config.node_id
|
||||||
|
self.conn_params = self._create_conn_params(node_config)
|
||||||
self.ping_query = "SELECT 1"
|
self.ping_query = "SELECT 1"
|
||||||
self.conn = None
|
self.conn = None
|
||||||
|
self.reconnecting = False
|
||||||
|
|
||||||
def _create_conn_params(self, config):
|
def _create_conn_params(self, node_config):
|
||||||
"""Extract all connection parameters from the provided configuration,
|
"""Extract all connection parameters from the provided configuration,
|
||||||
that don't have value of None."""
|
that don't have value of None."""
|
||||||
conn_params = {}
|
conn_params = {}
|
||||||
for key in CONNECTION_PARAMS:
|
for key in CONNECTION_PARAMS:
|
||||||
if not hasattr(config, key):
|
if not hasattr(node_config, key):
|
||||||
continue
|
continue
|
||||||
value = getattr(config, key)
|
value = getattr(node_config, key)
|
||||||
if value is None:
|
if value is None:
|
||||||
continue
|
continue
|
||||||
conn_params[key] = value
|
conn_params[key] = value
|
||||||
return conn_params
|
return conn_params
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
"""Connect to the database server. When a connection exists,
|
"""Connect to the database server. When a connection exists, then
|
||||||
then check if it is still oeprational. If yes, then reuse
|
check if it is still operational. If yes, then reuse this
|
||||||
this connection. If no, or when no connection exists, then
|
connection. If no, or when no connection exists, then setup a new
|
||||||
setup a new connection.
|
connection.
|
||||||
Raises an exeption when the database connection cannot be setup.
|
Raises an exeption when the database connection cannot be setup.
|
||||||
returns CONN_CONNECTED, CONN_REUSED or CONN_RECONNECTED when
|
returns CONN_CONNECTED, CONN_REUSED or CONN_RECONNECTED when
|
||||||
the connection was setup successfully."""
|
the connection was setup successfully."""
|
||||||
reconnected = False
|
|
||||||
if self.conn is not None:
|
if self.conn is not None:
|
||||||
try:
|
try:
|
||||||
with self.conn.cursor() as cursor:
|
with self.conn.cursor() as cursor:
|
||||||
cursor.execute(self.ping_query)
|
cursor.execute(self.ping_query)
|
||||||
|
self.log.debug("reusing connection")
|
||||||
return CONN_REUSED
|
return CONN_REUSED
|
||||||
except psycopg2.OperationalError:
|
except psycopg2.OperationalError as exception:
|
||||||
reconnected = True
|
self.log.warning(
|
||||||
|
"connection went away, reconnecting (got exception: %s)" %
|
||||||
|
format_ex(exception))
|
||||||
|
self.reconnecting = True
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
try:
|
try:
|
||||||
self.conn = psycopg2.connect(**self.conn_params)
|
self.conn = self.psycopg2.connect(**self.conn_params)
|
||||||
return CONN_RECONNECTED if reconnected else CONN_CONNECTED
|
if self.reconnecting:
|
||||||
|
self.log.info("reconnected successfully")
|
||||||
|
self.reconnecting = False
|
||||||
|
return CONN_RECONNECTED
|
||||||
|
self.log.info("connected successfully")
|
||||||
|
|
||||||
|
return CONN_CONNECTED
|
||||||
except psycopg2.OperationalError as exception:
|
except psycopg2.OperationalError as exception:
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
|
self.log.error("connection failed: %s" % format_ex(exception))
|
||||||
raise PgConnectionFailed(exception)
|
raise PgConnectionFailed(exception)
|
||||||
|
|
||||||
def disconnect(self):
|
def disconnect(self):
|
||||||
"""Disconnect from the database server."""
|
"""Disconnect from the database server."""
|
||||||
try:
|
try:
|
||||||
if self.conn:
|
if self.conn is not None:
|
||||||
self.conn.close()
|
self.conn.close()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
@ -106,14 +143,16 @@ class PgReplicationConnection(PgConnection):
|
||||||
"""This PostgresQL connection class is used to setup a replication
|
"""This PostgresQL connection class is used to setup a replication
|
||||||
connection to a PostgreSQL database server, which can be used
|
connection to a PostgreSQL database server, which can be used
|
||||||
to retrieve the replication status for the server."""
|
to retrieve the replication status for the server."""
|
||||||
def __init__(self, node_config):
|
def __init__(self, node_config, log, psycopg2_module=psycopg2):
|
||||||
super().__init__(node_config)
|
super().__init__(node_config, log, psycopg2_module)
|
||||||
self.conn_params["connection_factory"] = LogicalReplicationConnection
|
self.conn_params["connection_factory"] = LogicalReplicationConnection
|
||||||
|
|
||||||
def get_replication_status(self):
|
def get_replication_status(self):
|
||||||
"""Returns the replication status for a node. This is an array,
|
"""Returns the replication status for a node. This is an array,
|
||||||
containing the keys "status" (NODE_OFFLINE, NODE_PRIMARY or
|
containing the keys "status" (NODE_OFFLINE, NODE_PRIMARY or
|
||||||
NODE_STANDBY), "system_id" and "timeline_id"."""
|
NODE_STANDBY), "system_id" and "timeline_id".
|
||||||
|
Note: before calling this method, first setup the connection using
|
||||||
|
the connect() method."""
|
||||||
status = {
|
status = {
|
||||||
"status": None,
|
"status": None,
|
||||||
"system_id": None,
|
"system_id": None,
|
||||||
|
@ -134,7 +173,7 @@ class PgReplicationConnection(PgConnection):
|
||||||
cursor.execute("SELECT pg_is_in_recovery()")
|
cursor.execute("SELECT pg_is_in_recovery()")
|
||||||
in_recovery = cursor.fetchone()[0]
|
in_recovery = cursor.fetchone()[0]
|
||||||
status["status"] = NODE_STANDBY if in_recovery else NODE_PRIMARY
|
status["status"] = NODE_STANDBY if in_recovery else NODE_PRIMARY
|
||||||
except psycopg2.InternalError as exception:
|
except psycopg2.OperationalError as exception:
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
raise RetrievingPgReplicationStatusFailed(
|
raise RetrievingPgReplicationStatusFailed(
|
||||||
"SELECT pg_is_in_recovery() failed: %s" % format_ex(exception))
|
"SELECT pg_is_in_recovery() failed: %s" % format_ex(exception))
|
||||||
|
@ -147,11 +186,17 @@ class PgReplicationConnection(PgConnection):
|
||||||
system_id, timeline_id, *_ = row
|
system_id, timeline_id, *_ = row
|
||||||
status["system_id"] = system_id
|
status["system_id"] = system_id
|
||||||
status["timeline_id"] = timeline_id
|
status["timeline_id"] = timeline_id
|
||||||
except psycopg2.InternalError as exception:
|
except psycopg2.OperationalError as exception:
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
|
self.log.error(
|
||||||
|
"retrieving replication status failed: %s" %
|
||||||
|
format_ex(exception))
|
||||||
raise RetrievingPgReplicationStatusFailed(
|
raise RetrievingPgReplicationStatusFailed(
|
||||||
"IDENTIFY_SYSTEM failed: %s" % format_ex(exception))
|
"IDENTIFY_SYSTEM failed: %s" % format_ex(exception))
|
||||||
|
|
||||||
|
self.log.debug(
|
||||||
|
"node status: status=%s, system_id=%s, timeline_id=%s" %
|
||||||
|
(status["status"], status["system_id"], status["timeline_id"]))
|
||||||
return status
|
return status
|
||||||
|
|
||||||
# The query that is used to check if pgbouncer is connected to the expected
|
# The query that is used to check if pgbouncer is connected to the expected
|
||||||
|
@ -183,7 +228,7 @@ VERIFY_QUERY = """
|
||||||
class PgConnectionViaPgbouncer(PgConnection):
|
class PgConnectionViaPgbouncer(PgConnection):
|
||||||
"""This PostgreSQL connection class is used to setup a connection
|
"""This PostgreSQL connection class is used to setup a connection
|
||||||
to the PostgreSQL cluster, via the pgbouncer instance."""
|
to the PostgreSQL cluster, via the pgbouncer instance."""
|
||||||
def __init__(self, node_config, pgbouncer_config):
|
def __init__(self, node_config, pgbouncer_config, log, psycopg2_module=psycopg2):
|
||||||
"""Instantiate a new connection. The node_config and the
|
"""Instantiate a new connection. The node_config and the
|
||||||
pgbouncer_config will be combined to get the connection parameters
|
pgbouncer_config will be combined to get the connection parameters
|
||||||
for connecting to the PostgreSQL server."""
|
for connecting to the PostgreSQL server."""
|
||||||
|
@ -191,7 +236,7 @@ class PgConnectionViaPgbouncer(PgConnection):
|
||||||
|
|
||||||
# First, apply all the connection parameters as defined for the node.
|
# First, apply all the connection parameters as defined for the node.
|
||||||
# This is fully handled by the parent class.
|
# This is fully handled by the parent class.
|
||||||
super().__init__(node_config)
|
super().__init__(node_config, log, psycopg2_module)
|
||||||
|
|
||||||
# Secondly, override parameters to redirect the connection to
|
# Secondly, override parameters to redirect the connection to
|
||||||
# the pgbouncer instance.
|
# the pgbouncer instance.
|
||||||
|
@ -232,8 +277,8 @@ class PgConnectionViaPgbouncer(PgConnection):
|
||||||
with self.conn.cursor() as cursor:
|
with self.conn.cursor() as cursor:
|
||||||
try:
|
try:
|
||||||
cursor.execute(VERIFY_QUERY, {
|
cursor.execute(VERIFY_QUERY, {
|
||||||
"host": self.node_config["host"],
|
"host": self.node_config.host,
|
||||||
"port": self.node_config["port"]
|
"port": self.node_config.port
|
||||||
})
|
})
|
||||||
result = cursor.fetchone()[0]
|
result = cursor.fetchone()[0]
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
|
@ -254,13 +299,16 @@ class PgConnectionViaPgbouncer(PgConnection):
|
||||||
child_conn.close()
|
child_conn.close()
|
||||||
proc = multiprocessing.Process(target=check_func, args=(report_func,))
|
proc = multiprocessing.Process(target=check_func, args=(report_func,))
|
||||||
proc.start()
|
proc.start()
|
||||||
proc.join(self.node_config["connect_timeout"])
|
proc.join(self.node_config.connect_timeout)
|
||||||
if proc.is_alive():
|
if proc.is_alive():
|
||||||
proc.terminate()
|
proc.terminate()
|
||||||
proc.join()
|
proc.join()
|
||||||
return (False, PgConnectionFailed("Connection attempt timed out"))
|
return (False, PgConnectionFailed("Connection attempt timed out"))
|
||||||
result = parent_conn.recv()
|
result = parent_conn.recv()
|
||||||
proc.join()
|
proc.join()
|
||||||
|
##### DEBUG
|
||||||
|
raise Exception(repr(result))
|
||||||
|
##### /DEBUG
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
@ -270,23 +318,22 @@ class PgBouncerConsoleConnection(PgConnection):
|
||||||
used to control the pgbouncer instance via admin commands.
|
used to control the pgbouncer instance via admin commands.
|
||||||
This connection is used by pgbouncemgr to reload the configuration
|
This connection is used by pgbouncemgr to reload the configuration
|
||||||
of pgbouncer when the cluster state changes."""
|
of pgbouncer when the cluster state changes."""
|
||||||
def __init__(self, pgbouncer_config):
|
def __init__(self, node_config, log, psycopg2_module=psycopg2):
|
||||||
super().__init__(pgbouncer_config)
|
super().__init__(node_config, log, psycopg2_module)
|
||||||
|
|
||||||
# For the console connection, the database name "pgbouncer"
|
# For the console connection, the database name "pgbouncer"
|
||||||
# must be used.
|
# must be used.
|
||||||
self.conn_params["dbname"] = "pgbouncer"
|
self.conn_params["database"] = "pgbouncer"
|
||||||
|
|
||||||
# The default ping query does not work when connected to the
|
# The default ping query does not work when connected to the
|
||||||
# pgbouncer console. Here's a simple replacement for it.
|
# pgbouncer console. Here's a simple replacement for it.
|
||||||
self.ping_query = "SHOW VERSION"
|
self.ping_query = "SHOW VERSION"
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
"""Connect to the pgbouncer console. After connecting, the autocommit
|
"""Connect to the pgbouncer console. Enable the autocommit feature on
|
||||||
feature will be disabled for the connection, so the underlying
|
the connection, so the underlying PostgreSQL client library won't
|
||||||
PostgreSQL client library won't automatically try to setup a
|
automatically try to setup a transaction. Transactions are not
|
||||||
transaction. Transactions are not supported by the pgbouncer
|
supported by the pgbouncer console."""
|
||||||
console."""
|
|
||||||
result = super().connect()
|
result = super().connect()
|
||||||
self.conn.autocommit = True
|
self.conn.autocommit = True
|
||||||
return result
|
return result
|
||||||
|
|
|
@ -4,8 +4,7 @@
|
||||||
import os
|
import os
|
||||||
from pgbouncemgr.node_config import NodeConfig
|
from pgbouncemgr.node_config import NodeConfig
|
||||||
from pgbouncemgr.constants import \
|
from pgbouncemgr.constants import \
|
||||||
LEADER_UNKNOWN, LEADER_STATUSES,\
|
LEADER_UNKNOWN, LEADER_STATUSES, NODE_UNKNOWN, NODE_STATUSES
|
||||||
NODE_UNKNOWN, NODE_OFFLINE, NODE_PRIMARY, NODE_STANDBY, NODE_STATUSES
|
|
||||||
|
|
||||||
|
|
||||||
class StateException(Exception):
|
class StateException(Exception):
|
||||||
|
|
|
@ -0,0 +1,104 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import psycopg2
|
||||||
|
|
||||||
|
|
||||||
|
class StubPsycopg2(list):
|
||||||
|
"""A stubbed version of the psycopg2 module, used to implement PostgreSQL
|
||||||
|
server interaction tests, without having to talk to a running
|
||||||
|
PostgreSQL server instance."""
|
||||||
|
def __init__(self, *connections):
|
||||||
|
self.conn_params = None
|
||||||
|
super().__init__(connections)
|
||||||
|
|
||||||
|
def add(self, connection):
|
||||||
|
self.append(connection)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def add_auth_failure(self):
|
||||||
|
return self.add(psycopg2.OperationalError(
|
||||||
|
'FATAL: password authentication failed for user "test"'))
|
||||||
|
|
||||||
|
def add_conn_timeout(self):
|
||||||
|
return self.add(psycopg2.OperationalError('timeout expired'))
|
||||||
|
|
||||||
|
def add_conn_failure(self):
|
||||||
|
return self.add(psycopg2.OperationalError(
|
||||||
|
'could not connect to server: No route to host / Is the ' +
|
||||||
|
'server running on host "..." and accepting / TCP/IP ' +
|
||||||
|
'connections on port ...?'))
|
||||||
|
return self
|
||||||
|
|
||||||
|
def add_admin_startup(self):
|
||||||
|
return self.add(psycopg2.errors.OperationalError(
|
||||||
|
'FATAL: the database system is starting up'))
|
||||||
|
|
||||||
|
def add_admin_shutdown(self):
|
||||||
|
return self.add(psycopg2.errors.AdminShutdown(
|
||||||
|
'terminating connection due to administrator command'))
|
||||||
|
|
||||||
|
def add_connection(self, connection):
|
||||||
|
return self.add(connection)
|
||||||
|
|
||||||
|
def connect(self, **conn_params):
|
||||||
|
self.conn_params = conn_params
|
||||||
|
response = self.pop(0)
|
||||||
|
if isinstance(response, Exception):
|
||||||
|
raise response
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
class StubConnection(list):
|
||||||
|
"""A stubbed version of a psycopg2 connection, used to implement
|
||||||
|
PostgreSQL server interaction tests, without having to talk to a
|
||||||
|
running PostgreSQL server instance."""
|
||||||
|
def __init__(self, *cursors):
|
||||||
|
super().__init__(cursors)
|
||||||
|
self.connected = True
|
||||||
|
self.autocommit = False
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.connected = False
|
||||||
|
|
||||||
|
def cursor(self):
|
||||||
|
if not self.connected:
|
||||||
|
raise psycopg2.InterfaceError("connection already closed")
|
||||||
|
response = self.pop(0)
|
||||||
|
if isinstance(response, Exception):
|
||||||
|
raise response
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
class StubCursor(list):
|
||||||
|
"""A stubbed version of a psycopg2 cursor, used to implement PostgreSQL
|
||||||
|
server interaction tests, without having to talk to a running
|
||||||
|
PostgreSQL server instance."""
|
||||||
|
def __init__(self, *results):
|
||||||
|
super().__init__(results)
|
||||||
|
self.results = list(results)
|
||||||
|
self.statusmessage = None
|
||||||
|
self.rows = None
|
||||||
|
self.query = None
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, a, b, c):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def execute(self, query):
|
||||||
|
self.query = query
|
||||||
|
response = self.results.pop(0)
|
||||||
|
if isinstance(response, Exception):
|
||||||
|
raise response
|
||||||
|
status, rows = response
|
||||||
|
self.statusmessage = status
|
||||||
|
self.rows = rows if rows is not None else []
|
||||||
|
|
||||||
|
def fetchone(self):
|
||||||
|
if self.rows is None:
|
||||||
|
raise psycopg2.ProgrammingError("no results to fetch")
|
||||||
|
try:
|
||||||
|
return self.rows.pop(0)
|
||||||
|
except IndexError:
|
||||||
|
return None
|
|
@ -0,0 +1,246 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
import psycopg2
|
||||||
|
from pgbouncemgr.postgres import *
|
||||||
|
from pgbouncemgr.logger import Logger
|
||||||
|
from pgbouncemgr.node_config import NodeConfig
|
||||||
|
from tests.stub_psycopg2 import *
|
||||||
|
|
||||||
|
|
||||||
|
class PgConnectionTests(unittest.TestCase):
|
||||||
|
def test_GivenAuthenticationFailure_PgConnection_RaisesException(self):
|
||||||
|
self._test_connect_exception(
|
||||||
|
PgConnection, StubPsycopg2().add_auth_failure(),
|
||||||
|
PgConnectionFailed, "authentication failed")
|
||||||
|
|
||||||
|
def test_GivenConnectionTimeout_PgConnection_RaisesException(self):
|
||||||
|
self._test_connect_exception(
|
||||||
|
PgConnection, StubPsycopg2().add_conn_timeout(),
|
||||||
|
PgConnectionFailed, "timeout expired")
|
||||||
|
|
||||||
|
def test_GivenConnectionFailure_PgConnection_RaisesException(self):
|
||||||
|
self._test_connect_exception(
|
||||||
|
PgConnection, StubPsycopg2().add_conn_failure(),
|
||||||
|
PgConnectionFailed, "could not connect")
|
||||||
|
|
||||||
|
def test_GivenPostgresStartingUp_PgConnection_RaisesException(self):
|
||||||
|
self._test_connect_exception(
|
||||||
|
PgConnection, StubPsycopg2().add_admin_startup(),
|
||||||
|
PgConnectionFailed, "system is starting up")
|
||||||
|
|
||||||
|
def test_GivenPostgresShuttingDown_PgConnection_RaisesException(self):
|
||||||
|
self._test_connect_exception(
|
||||||
|
PgConnection, StubPsycopg2().add_admin_shutdown(),
|
||||||
|
PgConnectionFailed, "AdminShutdown")
|
||||||
|
|
||||||
|
def _test_connect_exception(self, test_class, stub_psycopg2, err, msg):
|
||||||
|
pg = test_class(NodeConfig(1), Logger(), stub_psycopg2)
|
||||||
|
with self.assertRaises(err) as context:
|
||||||
|
pg.connect()
|
||||||
|
self.assertIn(msg, str(context.exception))
|
||||||
|
|
||||||
|
def test_NodeConfig_IsAppliedToConnParams(self):
|
||||||
|
node_config = NodeConfig(1)
|
||||||
|
node_config.host = "1.1.1.1"
|
||||||
|
node_config.port = 9999
|
||||||
|
pg = PgConnection(node_config, Logger(), None)
|
||||||
|
|
||||||
|
self.assertEqual(1, pg.node_id)
|
||||||
|
self.assertEqual("1.1.1.1", pg.conn_params["host"])
|
||||||
|
self.assertEqual(9999, pg.conn_params["port"])
|
||||||
|
|
||||||
|
def test_GivenNoneValueInNodeConfig_ValueIsOmittedInConnParams(self):
|
||||||
|
node_config = NodeConfig(2)
|
||||||
|
node_config.host = None
|
||||||
|
node_config.port = None
|
||||||
|
stub_psycopg2 = StubPsycopg2().add_connection(StubConnection())
|
||||||
|
pg = PgConnection(node_config, Logger(), stub_psycopg2)
|
||||||
|
pg.connect()
|
||||||
|
|
||||||
|
self.assertEqual(2, pg.node_id)
|
||||||
|
self.assertNotIn("host", pg.conn_params)
|
||||||
|
self.assertNotIn("port", pg.conn_params)
|
||||||
|
|
||||||
|
def test_FirstConnect_SetsUpConnection(self):
|
||||||
|
stub_psycopg2 = StubPsycopg2().add_connection(StubConnection())
|
||||||
|
pg = PgConnection(NodeConfig('a'), Logger(), stub_psycopg2)
|
||||||
|
|
||||||
|
result = pg.connect()
|
||||||
|
|
||||||
|
self.assertEqual("CONN_CONNECTED", result)
|
||||||
|
|
||||||
|
def test_SecondConnect_PingsAndReusesConnection(self):
|
||||||
|
cursor = StubCursor(("SELECT", [[1]]))
|
||||||
|
conn = StubConnection(cursor)
|
||||||
|
stub_psycopg2 = StubPsycopg2().add_connection(conn)
|
||||||
|
pg = PgConnection(NodeConfig('b'), Logger(), stub_psycopg2)
|
||||||
|
|
||||||
|
result1 = pg.connect()
|
||||||
|
self.assertIs(None, cursor.query)
|
||||||
|
result2 = pg.connect()
|
||||||
|
self.assertEqual("SELECT 1", cursor.query)
|
||||||
|
self.assertEqual("CONN_CONNECTED", result1)
|
||||||
|
self.assertEqual("CONN_REUSED", result2)
|
||||||
|
|
||||||
|
def test_SecondConnectPingFails_SetsUpNewConnection(self):
|
||||||
|
conn1 = StubConnection(StubCursor(psycopg2.OperationalError()))
|
||||||
|
conn2 = StubConnection()
|
||||||
|
stub_psycopg2 = StubPsycopg2(conn1, conn2)
|
||||||
|
pg = PgConnection(NodeConfig('b'), Logger(), stub_psycopg2)
|
||||||
|
|
||||||
|
result1 = pg.connect() # Connection OK
|
||||||
|
result2 = pg.connect() # Ping fails, reconnectt
|
||||||
|
|
||||||
|
self.assertEqual("CONN_CONNECTED", result1)
|
||||||
|
self.assertEqual("CONN_RECONNECTED", result2)
|
||||||
|
|
||||||
|
def test_Disconnect_ClosesConnection(self):
|
||||||
|
conn = StubConnection()
|
||||||
|
stub_psycopg2 = StubPsycopg2(conn)
|
||||||
|
pg = PgConnection(NodeConfig('disco'), Logger(), stub_psycopg2)
|
||||||
|
|
||||||
|
pg.connect()
|
||||||
|
self.assertTrue(conn.connected)
|
||||||
|
pg.disconnect()
|
||||||
|
self.assertFalse(conn.connected)
|
||||||
|
|
||||||
|
def test_BigConnectionFlow(self):
|
||||||
|
conn1 = StubConnection(
|
||||||
|
StubCursor(psycopg2.OperationalError()))
|
||||||
|
conn2 = StubConnection(
|
||||||
|
StubCursor(("SELECT", [])),
|
||||||
|
StubCursor(("SELECT", [])))
|
||||||
|
conn3 = StubConnection(
|
||||||
|
StubCursor(("SELECT", [])))
|
||||||
|
|
||||||
|
stub_psycopg2 = StubPsycopg2(conn1, conn2, conn3)
|
||||||
|
pg = PgConnection(NodeConfig('b'), Logger(), stub_psycopg2)
|
||||||
|
|
||||||
|
result1 = pg.connect() # Connection 1 OK
|
||||||
|
result2 = pg.connect() # Ping fails, new connection 2 OK
|
||||||
|
result3 = pg.connect() # Ping 1 success
|
||||||
|
result4 = pg.connect() # Ping 2 success
|
||||||
|
pg.disconnect() # Explicit disconnect
|
||||||
|
result5 = pg.connect() # Connection 3 OK
|
||||||
|
result6 = pg.connect() # Ping success
|
||||||
|
|
||||||
|
self.assertEqual("CONN_CONNECTED", result1)
|
||||||
|
self.assertEqual("CONN_RECONNECTED", result2)
|
||||||
|
self.assertEqual("CONN_REUSED", result3)
|
||||||
|
self.assertEqual("CONN_REUSED", result4)
|
||||||
|
self.assertEqual("CONN_CONNECTED", result5)
|
||||||
|
self.assertEqual("CONN_REUSED", result6)
|
||||||
|
|
||||||
|
|
||||||
|
class PgReplicationConnectionTests(unittest.TestCase):
|
||||||
|
def test_LogicalReplicationConnection_IsUsed(self):
|
||||||
|
conn = StubConnection()
|
||||||
|
stub_psycopg2 = StubPsycopg2().add_connection(conn)
|
||||||
|
pg = PgReplicationConnection(
|
||||||
|
NodeConfig("foo"), Logger(), stub_psycopg2)
|
||||||
|
|
||||||
|
pg.connect()
|
||||||
|
self.assertEqual(
|
||||||
|
psycopg2.extras.LogicalReplicationConnection,
|
||||||
|
pg.conn_params["connection_factory"])
|
||||||
|
|
||||||
|
def test_GivenFailingConnection_ReplicationStatusIsOffline(self):
|
||||||
|
stub_psycopg2 = StubPsycopg2().add_auth_failure()
|
||||||
|
pg = PgReplicationConnection(
|
||||||
|
NodeConfig("foo"), Logger(), stub_psycopg2)
|
||||||
|
|
||||||
|
status = pg.get_replication_status()
|
||||||
|
self.assertEqual({
|
||||||
|
"status": "NODE_OFFLINE",
|
||||||
|
"system_id": None,
|
||||||
|
"timeline_id": None}, status)
|
||||||
|
|
||||||
|
def test_GivenFailingStandbyQuery_ReplicationStatusRaisesException(self):
|
||||||
|
conn = StubConnection(StubCursor(psycopg2.OperationalError()))
|
||||||
|
pg = PgReplicationConnection(
|
||||||
|
NodeConfig("foo"), Logger(), StubPsycopg2(conn))
|
||||||
|
|
||||||
|
with self.assertRaises(RetrievingPgReplicationStatusFailed) as context:
|
||||||
|
pg.get_replication_status()
|
||||||
|
self.assertIn("pg_is_in_recovery() failed", str(context.exception))
|
||||||
|
|
||||||
|
def test_GivenFailingIdentifySystemQuery_ReplicationStatusRaisesException(self):
|
||||||
|
conn = StubConnection(
|
||||||
|
StubCursor(("SELECT", [[False]])),
|
||||||
|
psycopg2.OperationalError())
|
||||||
|
pg = PgReplicationConnection(
|
||||||
|
NodeConfig("foo"), Logger(), StubPsycopg2(conn))
|
||||||
|
|
||||||
|
with self.assertRaises(RetrievingPgReplicationStatusFailed) as context:
|
||||||
|
pg.get_replication_status()
|
||||||
|
self.assertIn("IDENTIFY_SYSTEM failed", str(context.exception))
|
||||||
|
|
||||||
|
def test_GivenConnectionToPrimaryNode_ReplicationStatusIsPrimary(self):
|
||||||
|
conn = StubConnection(
|
||||||
|
StubCursor(("SELECT", [[False]])),
|
||||||
|
StubCursor(("IDENTIFY_SYSTEM", [["id", 1234, "other", "fields"]])))
|
||||||
|
stub_psycopg2 = StubPsycopg2(conn)
|
||||||
|
pg = PgReplicationConnection(
|
||||||
|
NodeConfig("foo"), Logger(), stub_psycopg2)
|
||||||
|
|
||||||
|
status = pg.get_replication_status()
|
||||||
|
self.assertEqual({
|
||||||
|
"status": "NODE_PRIMARY",
|
||||||
|
"system_id": "id",
|
||||||
|
"timeline_id": 1234}, status)
|
||||||
|
|
||||||
|
def test_GivenConnectionToStandbyNode_ReplicationStatusIsStandby(self):
|
||||||
|
conn = StubConnection(
|
||||||
|
StubCursor(("SELECT", [[True]])),
|
||||||
|
StubCursor(("IDENTIFY_SYSTEM", [["towel", 42, "other", "fields"]])))
|
||||||
|
stub_psycopg2 = StubPsycopg2(conn)
|
||||||
|
pg = PgReplicationConnection(
|
||||||
|
NodeConfig("foo"), Logger(), stub_psycopg2)
|
||||||
|
|
||||||
|
status = pg.get_replication_status()
|
||||||
|
self.assertEqual({
|
||||||
|
"status": "NODE_STANDBY",
|
||||||
|
"system_id": "towel",
|
||||||
|
"timeline_id": 42}, status)
|
||||||
|
|
||||||
|
|
||||||
|
class PgConnectionViaPgbouncerTests(unittest.TestCase):
|
||||||
|
def test_NodeConfig_AndPgBouncerConfig_AreAppliedToConnParams(self):
|
||||||
|
node_config = NodeConfig(777)
|
||||||
|
node_config.host = "1.1.1.1"
|
||||||
|
node_config.port = 9999
|
||||||
|
pgbouncer_config = {"host": "192.168.0.1", "port": 7654}
|
||||||
|
|
||||||
|
pg = PgConnectionViaPgbouncer(
|
||||||
|
node_config, pgbouncer_config, Logger(), None)
|
||||||
|
|
||||||
|
self.assertEqual(777, pg.node_id)
|
||||||
|
self.assertEqual("template1", pg.conn_params["database"])
|
||||||
|
self.assertEqual("192.168.0.1", pg.conn_params["host"])
|
||||||
|
self.assertEqual(7654, pg.conn_params["port"])
|
||||||
|
|
||||||
|
def test_WhenVerifyQueryTimesOut_ExceptionIsRaised(self):
|
||||||
|
def sleeping_query():
|
||||||
|
time.sleep(2)
|
||||||
|
conn = StubConnection(StubCursor(sleeping_query))
|
||||||
|
stub_psycopg2 = StubPsycopg2(conn)
|
||||||
|
node_config = NodeConfig('xyz123')
|
||||||
|
node_config.connect_timeout = 1
|
||||||
|
pgbouncer_config = {"host": "192.168.0.1", "port": 7654}
|
||||||
|
pg = PgConnectionViaPgbouncer(
|
||||||
|
node_config, pgbouncer_config, Logger(), stub_psycopg2)
|
||||||
|
pg.verify_connection()
|
||||||
|
##### TODO
|
||||||
|
|
||||||
|
|
||||||
|
class PgBouncerConsoleConnectionTests(unittest.TestCase):
|
||||||
|
def test_OnConnect_AutoCommitIsEnabled(self):
|
||||||
|
conn = StubConnection()
|
||||||
|
stub_psycopg2 = StubPsycopg2().add_connection(conn)
|
||||||
|
pg = PgBouncerConsoleConnection(
|
||||||
|
NodeConfig("bob"), Logger(), stub_psycopg2)
|
||||||
|
|
||||||
|
self.assertFalse(conn.autocommit)
|
||||||
|
pg.connect()
|
||||||
|
self.assertTrue(conn.autocommit)
|
|
@ -4,7 +4,8 @@ import os
|
||||||
import unittest
|
import unittest
|
||||||
from pgbouncemgr.logger import *
|
from pgbouncemgr.logger import *
|
||||||
from pgbouncemgr.state import *
|
from pgbouncemgr.state import *
|
||||||
from pgbouncemgr.constants import LEADER_UNKNOWN, LEADER_CONNECTED
|
from pgbouncemgr.constants import \
|
||||||
|
LEADER_UNKNOWN, LEADER_CONNECTED, NODE_PRIMARY, NODE_STANDBY
|
||||||
|
|
||||||
|
|
||||||
PGBOUNCER_CONFIG = os.path.join(
|
PGBOUNCER_CONFIG = os.path.join(
|
||||||
|
|
|
@ -0,0 +1,96 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
import psycopg2
|
||||||
|
from tests.stub_psycopg2 import *
|
||||||
|
|
||||||
|
|
||||||
|
class StubPsycopg2Tests(unittest.TestCase):
|
||||||
|
def test_AuthenticationFailure(self):
|
||||||
|
with self.assertRaises(psycopg2.OperationalError) as context:
|
||||||
|
StubPsycopg2().add_auth_failure().connect()
|
||||||
|
self.assertIn("authentication failed", str(context.exception))
|
||||||
|
|
||||||
|
def test_ConnectionTimeout(self):
|
||||||
|
with self.assertRaises(psycopg2.OperationalError) as context:
|
||||||
|
StubPsycopg2().add_conn_timeout().connect()
|
||||||
|
self.assertIn("timeout expired", str(context.exception))
|
||||||
|
|
||||||
|
def test_ConnectionFailure(self):
|
||||||
|
with self.assertRaises(psycopg2.OperationalError) as context:
|
||||||
|
StubPsycopg2().add_conn_failure().connect()
|
||||||
|
self.assertIn("could not connect to server", str(context.exception))
|
||||||
|
|
||||||
|
def test_AdminStartup(self):
|
||||||
|
with self.assertRaises(psycopg2.OperationalError) as context:
|
||||||
|
StubPsycopg2().add_admin_startup().connect()
|
||||||
|
self.assertIn("system is starting up", str(context.exception))
|
||||||
|
|
||||||
|
def test_AdminShutdown(self):
|
||||||
|
with self.assertRaises(psycopg2.errors.AdminShutdown) as context:
|
||||||
|
StubPsycopg2().add_admin_shutdown().connect()
|
||||||
|
self.assertIn("terminating connection", str(context.exception))
|
||||||
|
|
||||||
|
def test_GivenNoFixtures_Connect_RaisesIndexError(self):
|
||||||
|
with self.assertRaises(IndexError) as context:
|
||||||
|
StubPsycopg2().connect()
|
||||||
|
|
||||||
|
def test_SuccessfulConnect(self):
|
||||||
|
conn = StubConnection()
|
||||||
|
StubPsycopg2().add_connection(conn)
|
||||||
|
|
||||||
|
|
||||||
|
class StubConnectionTests(unittest.TestCase):
|
||||||
|
def test_GivenLiveConnection_Cursor_ReturnsCursor(self):
|
||||||
|
cursor = StubCursor()
|
||||||
|
conn = StubConnection(cursor)
|
||||||
|
self.assertIs(cursor, conn.cursor())
|
||||||
|
|
||||||
|
def test_GivenClosedConnection_Cursor_ReturnsInterfaceError(self):
|
||||||
|
conn = StubConnection()
|
||||||
|
conn.close()
|
||||||
|
with self.assertRaises(psycopg2.InterfaceError):
|
||||||
|
conn.cursor()
|
||||||
|
|
||||||
|
def test_IntegrationWithPsycopg2Stub(self):
|
||||||
|
cursor1 = StubCursor()
|
||||||
|
cursor2 = StubCursor()
|
||||||
|
conn = StubConnection(cursor1, cursor2)
|
||||||
|
pg = StubPsycopg2(conn)
|
||||||
|
|
||||||
|
conn_result = pg.connect()
|
||||||
|
cursor1_result = conn_result.cursor()
|
||||||
|
cursor2_result = conn_result.cursor()
|
||||||
|
|
||||||
|
self.assertIs(conn, conn_result)
|
||||||
|
self.assertIs(cursor1, cursor1_result)
|
||||||
|
self.assertIs(cursor2, cursor2_result)
|
||||||
|
|
||||||
|
class StubCursorTests(unittest.TestCase):
|
||||||
|
def test_GivenNoExecuteCalled_FetchOne_RaisesProgrammingError(self):
|
||||||
|
cursor = StubCursor()
|
||||||
|
with self.assertRaises(psycopg2.ProgrammingError):
|
||||||
|
cursor.fetchone()
|
||||||
|
|
||||||
|
def test_GivenExecuteCalled_WithoutResultRows_FetchOneReturnsNone(self):
|
||||||
|
cursor = StubCursor(('OK', None))
|
||||||
|
cursor.execute("SELECT query")
|
||||||
|
self.assertIs(None, cursor.fetchone())
|
||||||
|
self.assertEqual("SELECT query", cursor.query)
|
||||||
|
self.assertEqual("OK", cursor.statusmessage)
|
||||||
|
|
||||||
|
def test_GivenExecuteCalled_WithResultRows_FetchOneRetursRow(self):
|
||||||
|
cursor = StubCursor(
|
||||||
|
("STATUS1", [['value1', 'value2'], ['value3', 'value4']]),
|
||||||
|
("STATUS2", [['value5', 'value6'], ['value7', 'value8']]))
|
||||||
|
|
||||||
|
cursor.execute("SELECT query 1")
|
||||||
|
self.assertEqual("STATUS1", cursor.statusmessage)
|
||||||
|
self.assertEqual("value1", cursor.fetchone()[0])
|
||||||
|
self.assertEqual("value3", cursor.fetchone()[0])
|
||||||
|
|
||||||
|
cursor.execute("SELECT query 2")
|
||||||
|
self.assertEqual("STATUS2", cursor.statusmessage)
|
||||||
|
self.assertEqual("value6", cursor.fetchone()[1])
|
||||||
|
self.assertEqual("value8", cursor.fetchone()[1])
|
||||||
|
self.assertIs(None, cursor.fetchone())
|
Loading…
Reference in New Issue