Skip to content

[Refactoring] Default port manager functions now use PortManager__Generic and LocalOperations #251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
setup(
version='1.11.0',
name='testgres',
packages=['testgres', 'testgres.operations'],
packages=['testgres', 'testgres.operations', 'testgres.impl'],
description='Testing utility for PostgreSQL and its extensions',
url='https://github.com/postgrespro/testgres',
long_description=readme,
Expand Down
64 changes: 64 additions & 0 deletions testgres/impl/port_manager__generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from ..operations.os_ops import OsOperations

from ..port_manager import PortManager
from ..exceptions import PortForException

import threading
import random
import typing


class PortManager__Generic(PortManager):
_os_ops: OsOperations
_guard: object
# TODO: is there better to use bitmap fot _available_ports?
_available_ports: typing.Set[int]
_reserved_ports: typing.Set[int]

def __init__(self, os_ops: OsOperations):
assert os_ops is not None
assert isinstance(os_ops, OsOperations)
self._os_ops = os_ops
self._guard = threading.Lock()
self._available_ports: typing.Set[int] = set(range(1024, 65535))
self._reserved_ports: typing.Set[int] = set()

def reserve_port(self) -> int:
assert self._guard is not None
assert type(self._available_ports) == set # noqa: E721t
assert type(self._reserved_ports) == set # noqa: E721

with self._guard:
t = tuple(self._available_ports)
assert len(t) == len(self._available_ports)
sampled_ports = random.sample(t, min(len(t), 100))
t = None

for port in sampled_ports:
assert not (port in self._reserved_ports)
assert port in self._available_ports

if not self._os_ops.is_port_free(port):
continue

self._reserved_ports.add(port)
self._available_ports.discard(port)
assert port in self._reserved_ports
assert not (port in self._available_ports)
return port

raise PortForException("Can't select a port.")

def release_port(self, number: int) -> None:
assert type(number) == int # noqa: E721

assert self._guard is not None
assert type(self._reserved_ports) == set # noqa: E721

with self._guard:
assert number in self._reserved_ports
assert not (number in self._available_ports)
self._available_ports.add(number)
self._reserved_ports.discard(number)
assert not (number in self._reserved_ports)
assert number in self._available_ports
33 changes: 33 additions & 0 deletions testgres/impl/port_manager__this_host.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from ..port_manager import PortManager

from .. import utils

import threading


class PortManager__ThisHost(PortManager):
sm_single_instance: PortManager = None
sm_single_instance_guard = threading.Lock()

@staticmethod
def get_single_instance() -> PortManager:
assert __class__ == PortManager__ThisHost
assert __class__.sm_single_instance_guard is not None

if __class__.sm_single_instance is not None:
assert type(__class__.sm_single_instance) == __class__ # noqa: E721
return __class__.sm_single_instance

with __class__.sm_single_instance_guard:
if __class__.sm_single_instance is None:
__class__.sm_single_instance = __class__()
assert __class__.sm_single_instance is not None
assert type(__class__.sm_single_instance) == __class__ # noqa: E721
return __class__.sm_single_instance

def reserve_port(self) -> int:
return utils.reserve_port()

def release_port(self, number: int) -> None:
assert type(number) == int # noqa: E721
return utils.release_port(number)
6 changes: 3 additions & 3 deletions testgres/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@
InvalidOperationException

from .port_manager import PortManager
from .port_manager import PortManager__ThisHost
from .port_manager import PortManager__Generic
from .impl.port_manager__this_host import PortManager__ThisHost
from .impl.port_manager__generic import PortManager__Generic

from .logger import TestgresLogger

Expand Down Expand Up @@ -272,7 +272,7 @@ def _get_port_manager(os_ops: OsOperations) -> PortManager:
assert isinstance(os_ops, OsOperations)

if isinstance(os_ops, LocalOperations):
return PortManager__ThisHost()
return PortManager__ThisHost.get_single_instance()

# TODO: Throw the exception "Please define a port manager." ?
return PortManager__Generic(os_ops)
Expand Down
93 changes: 0 additions & 93 deletions testgres/port_manager.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,3 @@
from .operations.os_ops import OsOperations

from .exceptions import PortForException

from . import utils

import threading
import random
import typing


class PortManager:
def __init__(self):
super().__init__()
Expand All @@ -19,85 +8,3 @@ def reserve_port(self) -> int:
def release_port(self, number: int) -> None:
assert type(number) == int # noqa: E721
raise NotImplementedError("PortManager::release_port is not implemented.")


class PortManager__ThisHost(PortManager):
sm_single_instance: PortManager = None
sm_single_instance_guard = threading.Lock()

def __init__(self):
pass

def __new__(cls) -> PortManager:
assert __class__ == PortManager__ThisHost
assert __class__.sm_single_instance_guard is not None

if __class__.sm_single_instance is None:
with __class__.sm_single_instance_guard:
__class__.sm_single_instance = super().__new__(cls)
assert __class__.sm_single_instance
assert type(__class__.sm_single_instance) == __class__ # noqa: E721
return __class__.sm_single_instance

def reserve_port(self) -> int:
return utils.reserve_port()

def release_port(self, number: int) -> None:
assert type(number) == int # noqa: E721
return utils.release_port(number)


class PortManager__Generic(PortManager):
_os_ops: OsOperations
_guard: object
# TODO: is there better to use bitmap fot _available_ports?
_available_ports: typing.Set[int]
_reserved_ports: typing.Set[int]

def __init__(self, os_ops: OsOperations):
assert os_ops is not None
assert isinstance(os_ops, OsOperations)
self._os_ops = os_ops
self._guard = threading.Lock()
self._available_ports: typing.Set[int] = set(range(1024, 65535))
self._reserved_ports: typing.Set[int] = set()

def reserve_port(self) -> int:
assert self._guard is not None
assert type(self._available_ports) == set # noqa: E721t
assert type(self._reserved_ports) == set # noqa: E721

with self._guard:
t = tuple(self._available_ports)
assert len(t) == len(self._available_ports)
sampled_ports = random.sample(t, min(len(t), 100))
t = None

for port in sampled_ports:
assert not (port in self._reserved_ports)
assert port in self._available_ports

if not self._os_ops.is_port_free(port):
continue

self._reserved_ports.add(port)
self._available_ports.discard(port)
assert port in self._reserved_ports
assert not (port in self._available_ports)
return port

raise PortForException("Can't select a port.")

def release_port(self, number: int) -> None:
assert type(number) == int # noqa: E721

assert self._guard is not None
assert type(self._reserved_ports) == set # noqa: E721

with self._guard:
assert number in self._reserved_ports
assert not (number in self._available_ports)
self._available_ports.add(number)
self._reserved_ports.discard(number)
assert not (number in self._reserved_ports)
assert number in self._available_ports
42 changes: 13 additions & 29 deletions testgres/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,34 @@
import os

import sys
import socket
import random

from contextlib import contextmanager
from packaging.version import Version, InvalidVersion
import re

from six import iteritems

from .exceptions import PortForException
from .exceptions import ExecUtilException
from .config import testgres_config as tconf
from .operations.os_ops import OsOperations
from .operations.remote_ops import RemoteOperations
from .operations.local_ops import LocalOperations
from .operations.helpers import Helpers as OsHelpers

from .impl.port_manager__generic import PortManager__Generic

# rows returned by PG_CONFIG
_pg_config_data = {}

_local_operations = LocalOperations()

#
# The old, global "port manager" always worked with LOCAL system
#
_old_port_manager = PortManager__Generic(_local_operations)

# ports used by nodes
bound_ports = set()
bound_ports = _old_port_manager._reserved_ports


# re-export version type
Expand All @@ -43,28 +50,7 @@ def internal__reserve_port():
"""
Generate a new port and add it to 'bound_ports'.
"""
def LOCAL__is_port_free(port: int) -> bool:
"""Check if a port is free to use."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(("", port))
return True
except OSError:
return False

ports = set(range(1024, 65535))
assert type(ports) == set # noqa: E721
assert type(bound_ports) == set # noqa: E721
ports.difference_update(bound_ports)

sampled_ports = random.sample(tuple(ports), min(len(ports), 100))

for port in sampled_ports:
if LOCAL__is_port_free(port):
bound_ports.add(port)
return port

raise PortForException("Can't select a port")
return _old_port_manager.reserve_port()


def internal__release_port(port):
Expand All @@ -73,9 +59,7 @@ def internal__release_port(port):
"""

assert type(port) == int # noqa: E721
assert port in bound_ports

bound_ports.discard(port)
return _old_port_manager.release_port(port)


reserve_port = internal__reserve_port
Expand Down
2 changes: 1 addition & 1 deletion tests/helpers/global_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class OsOpsDescrs:
class PortManagers:
sm_remote_port_manager = PortManager__Generic(OsOpsDescrs.sm_remote_os_ops)

sm_local_port_manager = PortManager__ThisHost()
sm_local_port_manager = PortManager__ThisHost.get_single_instance()

sm_local2_port_manager = PortManager__Generic(OsOpsDescrs.sm_local_os_ops)

Expand Down