Skip to content

Commit 69b35cb

Browse files
Merge pull request #165 from dmitry-lipetsk/master-fix164--v01
Port numbers management is improved (#164)
2 parents 0b1b3de + 663612c commit 69b35cb

File tree

8 files changed

+508
-22
lines changed

8 files changed

+508
-22
lines changed

testgres/node.py

+82-17
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,13 @@
8383

8484
from .standby import First
8585

86+
from . import utils
87+
8688
from .utils import \
8789
PgVer, \
8890
eprint, \
8991
get_bin_path, \
9092
get_pg_version, \
91-
reserve_port, \
92-
release_port, \
9393
execute_utility, \
9494
options_string, \
9595
clean_on_error
@@ -128,6 +128,9 @@ def __repr__(self):
128128

129129

130130
class PostgresNode(object):
131+
# a max number of node start attempts
132+
_C_MAX_START_ATEMPTS = 5
133+
131134
def __init__(self, name=None, base_dir=None, port=None, conn_params: ConnectionParams = ConnectionParams(), bin_dir=None, prefix=None):
132135
"""
133136
PostgresNode constructor.
@@ -158,7 +161,7 @@ def __init__(self, name=None, base_dir=None, port=None, conn_params: ConnectionP
158161
self.os_ops = LocalOperations(conn_params)
159162

160163
self.host = self.os_ops.host
161-
self.port = port or reserve_port()
164+
self.port = port or utils.reserve_port()
162165

163166
self.ssh_key = self.os_ops.ssh_key
164167

@@ -471,6 +474,28 @@ def _collect_special_files(self):
471474

472475
return result
473476

477+
def _collect_log_files(self):
478+
# dictionary of log files + size in bytes
479+
480+
files = [
481+
self.pg_log_file
482+
] # yapf: disable
483+
484+
result = {}
485+
486+
for f in files:
487+
# skip missing files
488+
if not self.os_ops.path_exists(f):
489+
continue
490+
491+
file_size = self.os_ops.get_file_size(f)
492+
assert type(file_size) == int # noqa: E721
493+
assert file_size >= 0
494+
495+
result[f] = file_size
496+
497+
return result
498+
474499
def init(self, initdb_params=None, cached=True, **kwargs):
475500
"""
476501
Perform initdb for this node.
@@ -722,6 +747,22 @@ def slow_start(self, replica=False, dbname='template1', username=None, max_attem
722747
OperationalError},
723748
max_attempts=max_attempts)
724749

750+
def _detect_port_conflict(self, log_files0, log_files1):
751+
assert type(log_files0) == dict # noqa: E721
752+
assert type(log_files1) == dict # noqa: E721
753+
754+
for file in log_files1.keys():
755+
read_pos = 0
756+
757+
if file in log_files0.keys():
758+
read_pos = log_files0[file] # the previous size
759+
760+
file_content = self.os_ops.read_binary(file, read_pos)
761+
file_content_s = file_content.decode()
762+
if 'Is another postmaster already running on port' in file_content_s:
763+
return True
764+
return False
765+
725766
def start(self, params=[], wait=True):
726767
"""
727768
Starts the PostgreSQL node using pg_ctl if node has not been started.
@@ -736,6 +777,9 @@ def start(self, params=[], wait=True):
736777
Returns:
737778
This instance of :class:`.PostgresNode`.
738779
"""
780+
781+
assert __class__._C_MAX_START_ATEMPTS > 1
782+
739783
if self.is_started:
740784
return self
741785

@@ -745,27 +789,46 @@ def start(self, params=[], wait=True):
745789
"-w" if wait else '-W', # --wait or --no-wait
746790
"start"] + params # yapf: disable
747791

748-
startup_retries = 5
792+
log_files0 = self._collect_log_files()
793+
assert type(log_files0) == dict # noqa: E721
794+
795+
nAttempt = 0
796+
timeout = 1
749797
while True:
798+
assert nAttempt >= 0
799+
assert nAttempt < __class__._C_MAX_START_ATEMPTS
800+
nAttempt += 1
750801
try:
751802
exit_status, out, error = execute_utility(_params, self.utils_log_file, verbose=True)
752803
if error and 'does not exist' in error:
753804
raise Exception
754805
except Exception as e:
755-
files = self._collect_special_files()
756-
if any(len(file) > 1 and 'Is another postmaster already '
757-
'running on port' in file[1].decode() for
758-
file in files):
759-
logging.warning("Detected an issue with connecting to port {0}. "
760-
"Trying another port after a 5-second sleep...".format(self.port))
761-
self.port = reserve_port()
762-
options = {'port': str(self.port)}
763-
self.set_auto_conf(options)
764-
startup_retries -= 1
765-
time.sleep(5)
766-
continue
806+
assert nAttempt > 0
807+
assert nAttempt <= __class__._C_MAX_START_ATEMPTS
808+
if self._should_free_port and nAttempt < __class__._C_MAX_START_ATEMPTS:
809+
log_files1 = self._collect_log_files()
810+
if self._detect_port_conflict(log_files0, log_files1):
811+
log_files0 = log_files1
812+
logging.warning(
813+
"Detected an issue with connecting to port {0}. "
814+
"Trying another port after a {1}-second sleep...".format(self.port, timeout)
815+
)
816+
time.sleep(timeout)
817+
timeout = min(2 * timeout, 5)
818+
cur_port = self.port
819+
new_port = utils.reserve_port() # can raise
820+
try:
821+
options = {'port': str(new_port)}
822+
self.set_auto_conf(options)
823+
except: # noqa: E722
824+
utils.release_port(new_port)
825+
raise
826+
self.port = new_port
827+
utils.release_port(cur_port)
828+
continue
767829

768830
msg = 'Cannot start node'
831+
files = self._collect_special_files()
769832
raise_from(StartNodeException(msg, files), e)
770833
break
771834
self._maybe_start_logger()
@@ -930,8 +993,10 @@ def free_port(self):
930993
"""
931994

932995
if self._should_free_port:
996+
port = self.port
933997
self._should_free_port = False
934-
release_port(self.port)
998+
self.port = None
999+
utils.release_port(port)
9351000

9361001
def cleanup(self, max_attempts=3, full=False):
9371002
"""

testgres/operations/local_ops.py

+16
Original file line numberDiff line numberDiff line change
@@ -308,12 +308,28 @@ def readlines(self, filename, num_lines=0, binary=False, encoding=None):
308308
buffers * max(2, int(num_lines / max(cur_lines, 1)))
309309
) # Adjust buffer size
310310

311+
def read_binary(self, filename, start_pos):
312+
assert type(filename) == str # noqa: E721
313+
assert type(start_pos) == int # noqa: E721
314+
assert start_pos >= 0
315+
316+
with open(filename, 'rb') as file: # open in a binary mode
317+
file.seek(start_pos, os.SEEK_SET)
318+
r = file.read()
319+
assert type(r) == bytes # noqa: E721
320+
return r
321+
311322
def isfile(self, remote_file):
312323
return os.path.isfile(remote_file)
313324

314325
def isdir(self, dirname):
315326
return os.path.isdir(dirname)
316327

328+
def get_file_size(self, filename):
329+
assert filename is not None
330+
assert type(filename) == str # noqa: E721
331+
return os.path.getsize(filename)
332+
317333
def remove_file(self, filename):
318334
return os.remove(filename)
319335

testgres/operations/os_ops.py

+9
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,18 @@ def read(self, filename, encoding, binary):
9898
def readlines(self, filename):
9999
raise NotImplementedError()
100100

101+
def read_binary(self, filename, start_pos):
102+
assert type(filename) == str # noqa: E721
103+
assert type(start_pos) == int # noqa: E721
104+
assert start_pos >= 0
105+
raise NotImplementedError()
106+
101107
def isfile(self, remote_file):
102108
raise NotImplementedError()
103109

110+
def get_file_size(self, filename):
111+
raise NotImplementedError()
112+
104113
# Processes control
105114
def kill(self, pid, signal):
106115
# Kill the process

testgres/operations/remote_ops.py

+83
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,16 @@ def readlines(self, filename, num_lines=0, binary=False, encoding=None):
340340

341341
return lines
342342

343+
def read_binary(self, filename, start_pos):
344+
assert type(filename) == str # noqa: E721
345+
assert type(start_pos) == int # noqa: E721
346+
assert start_pos >= 0
347+
348+
cmd = "tail -c +{} {}".format(start_pos + 1, __class__._escape_path(filename))
349+
r = self.exec_command(cmd)
350+
assert type(r) == bytes # noqa: E721
351+
return r
352+
343353
def isfile(self, remote_file):
344354
stdout = self.exec_command("test -f {}; echo $?".format(remote_file))
345355
result = int(stdout.strip())
@@ -350,6 +360,70 @@ def isdir(self, dirname):
350360
response = self.exec_command(cmd)
351361
return response.strip() == b"True"
352362

363+
def get_file_size(self, filename):
364+
C_ERR_SRC = "RemoteOpertions::get_file_size"
365+
366+
assert filename is not None
367+
assert type(filename) == str # noqa: E721
368+
cmd = "du -b " + __class__._escape_path(filename)
369+
370+
s = self.exec_command(cmd, encoding=get_default_encoding())
371+
assert type(s) == str # noqa: E721
372+
373+
if len(s) == 0:
374+
raise Exception(
375+
"[BUG CHECK] Can't get size of file [{2}]. Remote operation returned an empty string. Check point [{0}][{1}].".format(
376+
C_ERR_SRC,
377+
"#001",
378+
filename
379+
)
380+
)
381+
382+
i = 0
383+
384+
while i < len(s) and s[i].isdigit():
385+
assert s[i] >= '0'
386+
assert s[i] <= '9'
387+
i += 1
388+
389+
if i == 0:
390+
raise Exception(
391+
"[BUG CHECK] Can't get size of file [{2}]. Remote operation returned a bad formatted string. Check point [{0}][{1}].".format(
392+
C_ERR_SRC,
393+
"#002",
394+
filename
395+
)
396+
)
397+
398+
if i == len(s):
399+
raise Exception(
400+
"[BUG CHECK] Can't get size of file [{2}]. Remote operation returned a bad formatted string. Check point [{0}][{1}].".format(
401+
C_ERR_SRC,
402+
"#003",
403+
filename
404+
)
405+
)
406+
407+
if not s[i].isspace():
408+
raise Exception(
409+
"[BUG CHECK] Can't get size of file [{2}]. Remote operation returned a bad formatted string. Check point [{0}][{1}].".format(
410+
C_ERR_SRC,
411+
"#004",
412+
filename
413+
)
414+
)
415+
416+
r = 0
417+
418+
for i2 in range(0, i):
419+
ch = s[i2]
420+
assert ch >= '0'
421+
assert ch <= '9'
422+
# Here is needed to check overflow or that it is a human-valid result?
423+
r = (r * 10) + ord(ch) - ord('0')
424+
425+
return r
426+
353427
def remove_file(self, filename):
354428
cmd = "rm {}".format(filename)
355429
return self.exec_command(cmd)
@@ -386,6 +460,15 @@ def db_connect(self, dbname, user, password=None, host="localhost", port=5432):
386460
)
387461
return conn
388462

463+
def _escape_path(path):
464+
assert type(path) == str # noqa: E721
465+
assert path != "" # Ok?
466+
467+
r = "'"
468+
r += path
469+
r += "'"
470+
return r
471+
389472

390473
def normalize_error(error):
391474
if isinstance(error, bytes):

testgres/utils.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def __init__(self, version: str) -> None:
3434
super().__init__(version)
3535

3636

37-
def reserve_port():
37+
def internal__reserve_port():
3838
"""
3939
Generate a new port and add it to 'bound_ports'.
4040
"""
@@ -45,14 +45,18 @@ def reserve_port():
4545
return port
4646

4747

48-
def release_port(port):
48+
def internal__release_port(port):
4949
"""
5050
Free port provided by reserve_port().
5151
"""
5252

5353
bound_ports.discard(port)
5454

5555

56+
reserve_port = internal__reserve_port
57+
release_port = internal__release_port
58+
59+
5660
def execute_utility(args, logfile=None, verbose=False):
5761
"""
5862
Execute utility (pg_ctl, pg_dump etc).

0 commit comments

Comments
 (0)