Skip to content

New OsOperations methods: makedir, rmdir #253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions testgres/operations/local_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ def makedirs(self, path, remove_existing=False):
except FileExistsError:
pass

def makedir(self, path: str):
assert type(path) == str # noqa: E721
os.mkdir(path)

# [2025-02-03] Old name of parameter attempts is "retries".
def rmdirs(self, path, ignore_errors=True, attempts=3, delay=1):
"""
Expand Down Expand Up @@ -293,6 +297,10 @@ def rmdirs(self, path, ignore_errors=True, attempts=3, delay=1):
# OK!
return True

def rmdir(self, path: str):
assert type(path) == str # noqa: E721
os.rmdir(path)

def listdir(self, path):
return os.listdir(path)

Expand Down
8 changes: 8 additions & 0 deletions testgres/operations/os_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,17 @@ def get_name(self):
def makedirs(self, path, remove_existing=False):
raise NotImplementedError()

def makedir(self, path: str):
assert type(path) == str # noqa: E721
raise NotImplementedError()

def rmdirs(self, path, ignore_errors=True):
raise NotImplementedError()

def rmdir(self, path: str):
assert type(path) == str # noqa: E721
raise NotImplementedError()

def listdir(self, path):
raise NotImplementedError()

Expand Down
10 changes: 10 additions & 0 deletions testgres/operations/remote_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ def makedirs(self, path, remove_existing=False):
raise Exception("Couldn't create dir {} because of error {}".format(path, error))
return result

def makedir(self, path: str):
assert type(path) == str # noqa: E721
cmd = ["mkdir", path]
self.exec_command(cmd)

def rmdirs(self, path, ignore_errors=True):
"""
Remove a directory in the remote server.
Expand Down Expand Up @@ -265,6 +270,11 @@ def rmdirs(self, path, ignore_errors=True):
return False
return True

def rmdir(self, path: str):
assert type(path) == str # noqa: E721
cmd = ["rmdir", path]
self.exec_command(cmd)

def listdir(self, path):
"""
List all files and directories in a directory.
Expand Down
268 changes: 268 additions & 0 deletions tests/test_os_ops_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
import socket
import threading
import typing
import uuid

from testgres import InvalidOperationException
from testgres import ExecUtilException

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import Future as ThreadFuture


class TestOsOpsCommon:
sm_os_ops_descrs: typing.List[OsOpsDescr] = [
Expand Down Expand Up @@ -812,3 +816,267 @@ def LOCAL_server(s: socket.socket):

if ok_count == 0:
raise RuntimeError("No one free port was found.")

class tagData_OS_OPS__NUMS:
os_ops_descr: OsOpsDescr
nums: int

def __init__(self, os_ops_descr: OsOpsDescr, nums: int):
assert isinstance(os_ops_descr, OsOpsDescr)
assert type(nums) == int # noqa: E721

self.os_ops_descr = os_ops_descr
self.nums = nums

sm_test_exclusive_creation__mt__data = [
tagData_OS_OPS__NUMS(OsOpsDescrs.sm_local_os_ops_descr, 100000),
tagData_OS_OPS__NUMS(OsOpsDescrs.sm_remote_os_ops_descr, 120),
]

@pytest.fixture(
params=sm_test_exclusive_creation__mt__data,
ids=[x.os_ops_descr.sign for x in sm_test_exclusive_creation__mt__data]
)
def data001(self, request: pytest.FixtureRequest) -> tagData_OS_OPS__NUMS:
assert isinstance(request, pytest.FixtureRequest)
return request.param

def test_mkdir__mt(self, data001: tagData_OS_OPS__NUMS):
assert type(data001) == __class__.tagData_OS_OPS__NUMS # noqa: E721

N_WORKERS = 4
N_NUMBERS = data001.nums
assert type(N_NUMBERS) == int # noqa: E721

os_ops = data001.os_ops_descr.os_ops
assert isinstance(os_ops, OsOperations)

lock_dir_prefix = "test_mkdir_mt--" + uuid.uuid4().hex

lock_dir = os_ops.mkdtemp(prefix=lock_dir_prefix)

logging.info("A lock file [{}] is creating ...".format(lock_dir))

assert os.path.exists(lock_dir)

def MAKE_PATH(lock_dir: str, num: int) -> str:
assert type(lock_dir) == str # noqa: E721
assert type(num) == int # noqa: E721
return os.path.join(lock_dir, str(num) + ".lock")

def LOCAL_WORKER(os_ops: OsOperations,
workerID: int,
lock_dir: str,
cNumbers: int,
reservedNumbers: typing.Set[int]) -> None:
assert isinstance(os_ops, OsOperations)
assert type(workerID) == int # noqa: E721
assert type(lock_dir) == str # noqa: E721
assert type(cNumbers) == int # noqa: E721
assert type(reservedNumbers) == set # noqa: E721
assert cNumbers > 0
assert len(reservedNumbers) == 0

assert os.path.exists(lock_dir)

def LOG_INFO(template: str, *args: list) -> None:
assert type(template) == str # noqa: E721
assert type(args) == tuple # noqa: E721

msg = template.format(*args)
assert type(msg) == str # noqa: E721

logging.info("[Worker #{}] {}".format(workerID, msg))
return

LOG_INFO("HELLO! I am here!")

for num in range(cNumbers):
assert not (num in reservedNumbers)

file_path = MAKE_PATH(lock_dir, num)

try:
os_ops.makedir(file_path)
except Exception as e:
LOG_INFO(
"Can't reserve {}. Error ({}): {}",
num,
type(e).__name__,
str(e)
)
continue

LOG_INFO("Number {} is reserved!", num)
assert os_ops.path_exists(file_path)
reservedNumbers.add(num)
continue

n_total = cNumbers
n_ok = len(reservedNumbers)
assert n_ok <= n_total

LOG_INFO("Finish! OK: {}. FAILED: {}.", n_ok, n_total - n_ok)
return

# -----------------------
logging.info("Worker are creating ...")

threadPool = ThreadPoolExecutor(
max_workers=N_WORKERS,
thread_name_prefix="ex_creator"
)

class tadWorkerData:
future: ThreadFuture
reservedNumbers: typing.Set[int]

workerDatas: typing.List[tadWorkerData] = list()

nErrors = 0

try:
for n in range(N_WORKERS):
logging.info("worker #{} is creating ...".format(n))

workerDatas.append(tadWorkerData())

workerDatas[n].reservedNumbers = set()

workerDatas[n].future = threadPool.submit(
LOCAL_WORKER,
os_ops,
n,
lock_dir,
N_NUMBERS,
workerDatas[n].reservedNumbers
)

assert workerDatas[n].future is not None

logging.info("OK. All the workers were created!")
except Exception as e:
nErrors += 1
logging.error("A problem is detected ({}): {}".format(type(e).__name__, str(e)))

logging.info("Will wait for stop of all the workers...")

nWorkers = 0

assert type(workerDatas) == list # noqa: E721

for i in range(len(workerDatas)):
worker = workerDatas[i].future

if worker is None:
continue

nWorkers += 1

assert isinstance(worker, ThreadFuture)

try:
logging.info("Wait for worker #{}".format(i))
worker.result()
except Exception as e:
nErrors += 1
logging.error("Worker #{} finished with error ({}): {}".format(
i,
type(e).__name__,
str(e),
))
continue

assert nWorkers == N_WORKERS

if nErrors != 0:
raise RuntimeError("Some problems were detected. Please examine the log messages.")

logging.info("OK. Let's check worker results!")

reservedNumbers: typing.Dict[int, int] = dict()

for i in range(N_WORKERS):
logging.info("Worker #{} is checked ...".format(i))

workerNumbers = workerDatas[i].reservedNumbers
assert type(workerNumbers) == set # noqa: E721

for n in workerNumbers:
if n < 0 or n >= N_NUMBERS:
nErrors += 1
logging.error("Unexpected number {}".format(n))
continue

if n in reservedNumbers.keys():
nErrors += 1
logging.error("Number {} was already reserved by worker #{}".format(
n,
reservedNumbers[n]
))
else:
reservedNumbers[n] = i

file_path = MAKE_PATH(lock_dir, n)
if not os_ops.path_exists(file_path):
nErrors += 1
logging.error("File {} is not found!".format(file_path))
continue

continue

logging.info("OK. Let's check reservedNumbers!")

for n in range(N_NUMBERS):
if not (n in reservedNumbers.keys()):
nErrors += 1
logging.error("Number {} is not reserved!".format(n))
continue

file_path = MAKE_PATH(lock_dir, n)
if not os_ops.path_exists(file_path):
nErrors += 1
logging.error("File {} is not found!".format(file_path))
continue

# OK!
continue

logging.info("Verification is finished! Total error count is {}.".format(nErrors))

if nErrors == 0:
logging.info("Root lock-directory [{}] will be deleted.".format(
lock_dir
))

for n in range(N_NUMBERS):
file_path = MAKE_PATH(lock_dir, n)
try:
os_ops.rmdir(file_path)
except Exception as e:
nErrors += 1
logging.error("Cannot delete directory [{}]. Error ({}): {}".format(
file_path,
type(e).__name__,
str(e)
))
continue

if os_ops.path_exists(file_path):
nErrors += 1
logging.error("Directory {} is not deleted!".format(file_path))
continue

if nErrors == 0:
try:
os_ops.rmdir(lock_dir)
except Exception as e:
nErrors += 1
logging.error("Cannot delete directory [{}]. Error ({}): {}".format(
lock_dir,
type(e).__name__,
str(e)
))

logging.info("Test is finished! Total error count is {}.".format(nErrors))
return