Skip to content

[#235] test_pg_ctl_wait_option detects a port conflict #257

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 165 additions & 43 deletions testgres/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,28 +784,6 @@ def _collect_special_files(self):

return result

def _collect_log_files(self):
# dictionary of log files + size in bytes

files = [
self.pg_log_file
] # yapf: disable

result = {}

for f in files:
# skip missing files
if not self.os_ops.path_exists(f):
continue

file_size = self.os_ops.get_file_size(f)
assert type(file_size) == int # noqa: E721
assert file_size >= 0

result[f] = file_size

return result

def init(self, initdb_params=None, cached=True, **kwargs):
"""
Perform initdb for this node.
Expand Down Expand Up @@ -1062,22 +1040,6 @@ def slow_start(self, replica=False, dbname='template1', username=None, max_attem
OperationalError},
max_attempts=max_attempts)

def _detect_port_conflict(self, log_files0, log_files1):
assert type(log_files0) == dict # noqa: E721
assert type(log_files1) == dict # noqa: E721

for file in log_files1.keys():
read_pos = 0

if file in log_files0.keys():
read_pos = log_files0[file] # the previous size

file_content = self.os_ops.read_binary(file, read_pos)
file_content_s = file_content.decode()
if 'Is another postmaster already running on port' in file_content_s:
return True
return False

def start(self, params=[], wait=True, exec_env=None):
"""
Starts the PostgreSQL node using pg_ctl if node has not been started.
Expand Down Expand Up @@ -1137,8 +1099,7 @@ def LOCAL__raise_cannot_start_node__std(from_exception):
assert isinstance(self._port_manager, PortManager)
assert __class__._C_MAX_START_ATEMPTS > 1

log_files0 = self._collect_log_files()
assert type(log_files0) == dict # noqa: E721
log_reader = PostgresNodeLogReader(self, from_beginnig=False)

nAttempt = 0
timeout = 1
Expand All @@ -1154,11 +1115,11 @@ def LOCAL__raise_cannot_start_node__std(from_exception):
if nAttempt == __class__._C_MAX_START_ATEMPTS:
LOCAL__raise_cannot_start_node(e, "Cannot start node after multiple attempts.")

log_files1 = self._collect_log_files()
if not self._detect_port_conflict(log_files0, log_files1):
is_it_port_conflict = PostgresNodeUtils.delect_port_conflict(log_reader)

if not is_it_port_conflict:
LOCAL__raise_cannot_start_node__std(e)

log_files0 = log_files1
logging.warning(
"Detected a conflict with using the port {0}. Trying another port after a {1}-second sleep...".format(self._port, timeout)
)
Expand Down Expand Up @@ -2192,6 +2153,167 @@ def _escape_config_value(value):
return result


class PostgresNodeLogReader:
class LogInfo:
position: int

def __init__(self, position: int):
self.position = position

# --------------------------------------------------------------------
class LogDataBlock:
_file_name: str
_position: int
_data: str

def __init__(
self,
file_name: str,
position: int,
data: str
):
assert type(file_name) == str # noqa: E721
assert type(position) == int # noqa: E721
assert type(data) == str # noqa: E721
assert file_name != ""
assert position >= 0
self._file_name = file_name
self._position = position
self._data = data

@property
def file_name(self) -> str:
assert type(self._file_name) == str # noqa: E721
assert self._file_name != ""
return self._file_name

@property
def position(self) -> int:
assert type(self._position) == int # noqa: E721
assert self._position >= 0
return self._position

@property
def data(self) -> str:
assert type(self._data) == str # noqa: E721
return self._data

# --------------------------------------------------------------------
_node: PostgresNode
_logs: typing.Dict[str, LogInfo]

# --------------------------------------------------------------------
def __init__(self, node: PostgresNode, from_beginnig: bool):
assert node is not None
assert isinstance(node, PostgresNode)
assert type(from_beginnig) == bool # noqa: E721

self._node = node

if from_beginnig:
self._logs = dict()
else:
self._logs = self._collect_logs()

assert type(self._logs) == dict # noqa: E721
return

def read(self) -> typing.List[LogDataBlock]:
assert self._node is not None
assert isinstance(self._node, PostgresNode)

cur_logs: typing.Dict[__class__.LogInfo] = self._collect_logs()
assert cur_logs is not None
assert type(cur_logs) == dict # noqa: E721

assert type(self._logs) == dict # noqa: E721

result = list()

for file_name, cur_log_info in cur_logs.items():
assert type(file_name) == str # noqa: E721
assert type(cur_log_info) == __class__.LogInfo # noqa: E721

read_pos = 0

if file_name in self._logs.keys():
prev_log_info = self._logs[file_name]
assert type(prev_log_info) == __class__.LogInfo # noqa: E721
read_pos = prev_log_info.position # the previous size

file_content_b = self._node.os_ops.read_binary(file_name, read_pos)
assert type(file_content_b) == bytes # noqa: E721

#
# A POTENTIAL PROBLEM: file_content_b may contain an incompleted UTF-8 symbol.
#
file_content_s = file_content_b.decode()
assert type(file_content_s) == str # noqa: E721

next_read_pos = read_pos + len(file_content_b)

# It is a research/paranoja check.
# When we will process partial UTF-8 symbol, it must be adjusted.
assert cur_log_info.position <= next_read_pos

cur_log_info.position = next_read_pos

block = __class__.LogDataBlock(
file_name,
read_pos,
file_content_s
)

result.append(block)

# A new check point
self._logs = cur_logs

return result

def _collect_logs(self) -> typing.Dict[LogInfo]:
assert self._node is not None
assert isinstance(self._node, PostgresNode)

files = [
self._node.pg_log_file
] # yapf: disable

result = dict()

for f in files:
assert type(f) == str # noqa: E721

# skip missing files
if not self._node.os_ops.path_exists(f):
continue

file_size = self._node.os_ops.get_file_size(f)
assert type(file_size) == int # noqa: E721
assert file_size >= 0

result[f] = __class__.LogInfo(file_size)

return result


class PostgresNodeUtils:
@staticmethod
def delect_port_conflict(log_reader: PostgresNodeLogReader) -> bool:
assert type(log_reader) == PostgresNodeLogReader # noqa: E721

blocks = log_reader.read()
assert type(blocks) == list # noqa: E721

for block in blocks:
assert type(block) == PostgresNodeLogReader.LogDataBlock # noqa: E721

if 'Is another postmaster already running on port' in block.data:
return True

return False


class NodeApp:

def __init__(self, test_path=None, nodes_to_cleanup=None, os_ops=None):
Expand Down
37 changes: 35 additions & 2 deletions tests/test_testgres_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from testgres.node import PgVer
from testgres.node import PostgresNode
from testgres.node import PostgresNodeLogReader
from testgres.node import PostgresNodeUtils
from testgres.utils import get_pg_version2
from testgres.utils import file_tail
from testgres.utils import get_bin_path2
Expand Down Expand Up @@ -883,8 +885,29 @@ def test_backup_wrong_xlog_method(self, node_svc: PostgresNodeService):

def test_pg_ctl_wait_option(self, node_svc: PostgresNodeService):
assert isinstance(node_svc, PostgresNodeService)
with __class__.helper__get_node(node_svc) as node:
self.impl__test_pg_ctl_wait_option(node_svc, node)

C_MAX_ATTEMPT = 5

nAttempt = 0

while True:
if nAttempt == C_MAX_ATTEMPT:
raise Exception("PostgresSQL did not start.")

nAttempt += 1
logging.info("------------------------ NODE #{}".format(
nAttempt
))

with __class__.helper__get_node(node_svc, port=12345) as node:
if self.impl__test_pg_ctl_wait_option(node_svc, node):
break
continue

logging.info("OK. Test is passed. Number of attempts is {}".format(
nAttempt
))
return

def impl__test_pg_ctl_wait_option(
self,
Expand All @@ -899,9 +922,18 @@ def impl__test_pg_ctl_wait_option(

node.init()
assert node.status() == NodeStatus.Stopped

node_log_reader = PostgresNodeLogReader(node, from_beginnig=True)

node.start(wait=False)
nAttempt = 0
while True:
if PostgresNodeUtils.delect_port_conflict(node_log_reader):
logging.info("Node port {} conflicted with another PostgreSQL instance.".format(
node.port
))
return False

if nAttempt == C_MAX_ATTEMPTS:
#
# [2025-03-11]
Expand Down Expand Up @@ -960,6 +992,7 @@ def impl__test_pg_ctl_wait_option(
raise Exception("Unexpected node status: {0}.".format(s1))

logging.info("OK. Node is stopped.")
return True

def test_replicate(self, node_svc: PostgresNodeService):
assert isinstance(node_svc, PostgresNodeService)
Expand Down