# -*- coding: utf-8 -*- from pgbouncemgr.postgres import * class StubPgReplicationConnection(PgReplicationConnection): def __init__(self, node_config, log, psycopg2_module=psycopg2): super().__init__(node_config, log, psycopg2_module) self.connected = False self.node_config = node_config def connect(self): self.connected = True def disconnect(self): self.connected = False def get_replication_status(self): self.connect() if (not hasattr(self.node_config, "stub_replication_statuses") or not self.node_config.stub_replication_statuses): raise Exception( "Node %s misses stub_replication_statuses" % __class__) response = self.node_config.stub_replication_statuses.pop(0) if isinstance(response, Exception): raise response return response