@@ -784,28 +784,6 @@ def _collect_special_files(self):
784
784
785
785
return result
786
786
787
- def _collect_log_files (self ):
788
- # dictionary of log files + size in bytes
789
-
790
- files = [
791
- self .pg_log_file
792
- ] # yapf: disable
793
-
794
- result = {}
795
-
796
- for f in files :
797
- # skip missing files
798
- if not self .os_ops .path_exists (f ):
799
- continue
800
-
801
- file_size = self .os_ops .get_file_size (f )
802
- assert type (file_size ) == int # noqa: E721
803
- assert file_size >= 0
804
-
805
- result [f ] = file_size
806
-
807
- return result
808
-
809
787
def init (self , initdb_params = None , cached = True , ** kwargs ):
810
788
"""
811
789
Perform initdb for this node.
@@ -1062,22 +1040,6 @@ def slow_start(self, replica=False, dbname='template1', username=None, max_attem
1062
1040
OperationalError },
1063
1041
max_attempts = max_attempts )
1064
1042
1065
- def _detect_port_conflict (self , log_files0 , log_files1 ):
1066
- assert type (log_files0 ) == dict # noqa: E721
1067
- assert type (log_files1 ) == dict # noqa: E721
1068
-
1069
- for file in log_files1 .keys ():
1070
- read_pos = 0
1071
-
1072
- if file in log_files0 .keys ():
1073
- read_pos = log_files0 [file ] # the previous size
1074
-
1075
- file_content = self .os_ops .read_binary (file , read_pos )
1076
- file_content_s = file_content .decode ()
1077
- if 'Is another postmaster already running on port' in file_content_s :
1078
- return True
1079
- return False
1080
-
1081
1043
def start (self , params = [], wait = True , exec_env = None ):
1082
1044
"""
1083
1045
Starts the PostgreSQL node using pg_ctl if node has not been started.
@@ -1137,8 +1099,7 @@ def LOCAL__raise_cannot_start_node__std(from_exception):
1137
1099
assert isinstance (self ._port_manager , PortManager )
1138
1100
assert __class__ ._C_MAX_START_ATEMPTS > 1
1139
1101
1140
- log_files0 = self ._collect_log_files ()
1141
- assert type (log_files0 ) == dict # noqa: E721
1102
+ log_reader = PostgresNodeLogReader (self , from_beginnig = False )
1142
1103
1143
1104
nAttempt = 0
1144
1105
timeout = 1
@@ -1154,11 +1115,11 @@ def LOCAL__raise_cannot_start_node__std(from_exception):
1154
1115
if nAttempt == __class__ ._C_MAX_START_ATEMPTS :
1155
1116
LOCAL__raise_cannot_start_node (e , "Cannot start node after multiple attempts." )
1156
1117
1157
- log_files1 = self ._collect_log_files ()
1158
- if not self ._detect_port_conflict (log_files0 , log_files1 ):
1118
+ is_it_port_conflict = PostgresNodeUtils .delect_port_conflict (log_reader )
1119
+
1120
+ if not is_it_port_conflict :
1159
1121
LOCAL__raise_cannot_start_node__std (e )
1160
1122
1161
- log_files0 = log_files1
1162
1123
logging .warning (
1163
1124
"Detected a conflict with using the port {0}. Trying another port after a {1}-second sleep..." .format (self ._port , timeout )
1164
1125
)
@@ -2192,6 +2153,167 @@ def _escape_config_value(value):
2192
2153
return result
2193
2154
2194
2155
2156
+ class PostgresNodeLogReader :
2157
+ class LogInfo :
2158
+ position : int
2159
+
2160
+ def __init__ (self , position : int ):
2161
+ self .position = position
2162
+
2163
+ # --------------------------------------------------------------------
2164
+ class LogDataBlock :
2165
+ _file_name : str
2166
+ _position : int
2167
+ _data : str
2168
+
2169
+ def __init__ (
2170
+ self ,
2171
+ file_name : str ,
2172
+ position : int ,
2173
+ data : str
2174
+ ):
2175
+ assert type (file_name ) == str # noqa: E721
2176
+ assert type (position ) == int # noqa: E721
2177
+ assert type (data ) == str # noqa: E721
2178
+ assert file_name != ""
2179
+ assert position >= 0
2180
+ self ._file_name = file_name
2181
+ self ._position = position
2182
+ self ._data = data
2183
+
2184
+ @property
2185
+ def file_name (self ) -> str :
2186
+ assert type (self ._file_name ) == str # noqa: E721
2187
+ assert self ._file_name != ""
2188
+ return self ._file_name
2189
+
2190
+ @property
2191
+ def position (self ) -> int :
2192
+ assert type (self ._position ) == int # noqa: E721
2193
+ assert self ._position >= 0
2194
+ return self ._position
2195
+
2196
+ @property
2197
+ def data (self ) -> str :
2198
+ assert type (self ._data ) == str # noqa: E721
2199
+ return self ._data
2200
+
2201
+ # --------------------------------------------------------------------
2202
+ _node : PostgresNode
2203
+ _logs : typing .Dict [str , LogInfo ]
2204
+
2205
+ # --------------------------------------------------------------------
2206
+ def __init__ (self , node : PostgresNode , from_beginnig : bool ):
2207
+ assert node is not None
2208
+ assert isinstance (node , PostgresNode )
2209
+ assert type (from_beginnig ) == bool # noqa: E721
2210
+
2211
+ self ._node = node
2212
+
2213
+ if from_beginnig :
2214
+ self ._logs = dict ()
2215
+ else :
2216
+ self ._logs = self ._collect_logs ()
2217
+
2218
+ assert type (self ._logs ) == dict # noqa: E721
2219
+ return
2220
+
2221
+ def read (self ) -> typing .List [LogDataBlock ]:
2222
+ assert self ._node is not None
2223
+ assert isinstance (self ._node , PostgresNode )
2224
+
2225
+ cur_logs : typing .Dict [__class__ .LogInfo ] = self ._collect_logs ()
2226
+ assert cur_logs is not None
2227
+ assert type (cur_logs ) == dict # noqa: E721
2228
+
2229
+ assert type (self ._logs ) == dict # noqa: E721
2230
+
2231
+ result = list ()
2232
+
2233
+ for file_name , cur_log_info in cur_logs .items ():
2234
+ assert type (file_name ) == str # noqa: E721
2235
+ assert type (cur_log_info ) == __class__ .LogInfo # noqa: E721
2236
+
2237
+ read_pos = 0
2238
+
2239
+ if file_name in self ._logs .keys ():
2240
+ prev_log_info = self ._logs [file_name ]
2241
+ assert type (prev_log_info ) == __class__ .LogInfo # noqa: E721
2242
+ read_pos = prev_log_info .position # the previous size
2243
+
2244
+ file_content_b = self ._node .os_ops .read_binary (file_name , read_pos )
2245
+ assert type (file_content_b ) == bytes # noqa: E721
2246
+
2247
+ #
2248
+ # A POTENTIAL PROBLEM: file_content_b may contain an incompleted UTF-8 symbol.
2249
+ #
2250
+ file_content_s = file_content_b .decode ()
2251
+ assert type (file_content_s ) == str # noqa: E721
2252
+
2253
+ next_read_pos = read_pos + len (file_content_b )
2254
+
2255
+ # It is a research/paranoja check.
2256
+ # When we will process partial UTF-8 symbol, it must be adjusted.
2257
+ assert cur_log_info .position <= next_read_pos
2258
+
2259
+ cur_log_info .position = next_read_pos
2260
+
2261
+ block = __class__ .LogDataBlock (
2262
+ file_name ,
2263
+ read_pos ,
2264
+ file_content_s
2265
+ )
2266
+
2267
+ result .append (block )
2268
+
2269
+ # A new check point
2270
+ self ._logs = cur_logs
2271
+
2272
+ return result
2273
+
2274
+ def _collect_logs (self ) -> typing .Dict [LogInfo ]:
2275
+ assert self ._node is not None
2276
+ assert isinstance (self ._node , PostgresNode )
2277
+
2278
+ files = [
2279
+ self ._node .pg_log_file
2280
+ ] # yapf: disable
2281
+
2282
+ result = dict ()
2283
+
2284
+ for f in files :
2285
+ assert type (f ) == str # noqa: E721
2286
+
2287
+ # skip missing files
2288
+ if not self ._node .os_ops .path_exists (f ):
2289
+ continue
2290
+
2291
+ file_size = self ._node .os_ops .get_file_size (f )
2292
+ assert type (file_size ) == int # noqa: E721
2293
+ assert file_size >= 0
2294
+
2295
+ result [f ] = __class__ .LogInfo (file_size )
2296
+
2297
+ return result
2298
+
2299
+
2300
+ class PostgresNodeUtils :
2301
+ @staticmethod
2302
+ def delect_port_conflict (log_reader : PostgresNodeLogReader ) -> bool :
2303
+ assert type (log_reader ) == PostgresNodeLogReader # noqa: E721
2304
+
2305
+ blocks = log_reader .read ()
2306
+ assert type (blocks ) == list # noqa: E721
2307
+
2308
+ for block in blocks :
2309
+ assert type (block ) == PostgresNodeLogReader .LogDataBlock # noqa: E721
2310
+
2311
+ if 'Is another postmaster already running on port' in block .data :
2312
+ return True
2313
+
2314
+ return False
2315
+
2316
+
2195
2317
class NodeApp :
2196
2318
2197
2319
def __init__ (self , test_path = None , nodes_to_cleanup = None , os_ops = None ):
0 commit comments