46
46
except ImportError :
47
47
raise ImportError ("You must have psycopg2 or pg8000 modules installed" )
48
48
49
-
50
49
bound_ports = set ()
51
50
registered_nodes = []
52
51
util_threads = []
56
55
57
56
58
57
class ClusterException (Exception ):
59
-
60
58
"""
61
59
Predefined exceptions
62
60
"""
63
61
pass
64
62
65
63
66
64
class QueryException (Exception ):
67
-
68
65
"""
69
66
Predefined exceptions
70
67
"""
71
68
pass
72
69
73
70
74
71
class InitPostgresNodeException (Exception ):
75
-
76
72
"""
77
73
Predefined exceptions
78
74
"""
@@ -138,12 +134,16 @@ def log_watch(node_name, pg_logname):
138
134
139
135
140
136
class NodeConnection (object ):
141
-
142
137
"""
143
138
Transaction wrapper returned by Node
144
139
"""
145
140
146
- def __init__ (self , parent_node , dbname , host = "127.0.0.1" , user = None , password = None ):
141
+ def __init__ (self ,
142
+ parent_node ,
143
+ dbname ,
144
+ host = "127.0.0.1" ,
145
+ user = None ,
146
+ password = None ):
147
147
self .parent_node = parent_node
148
148
if user is None :
149
149
user = get_username ()
@@ -152,8 +152,7 @@ def __init__(self, parent_node, dbname, host="127.0.0.1", user=None, password=No
152
152
user = user ,
153
153
port = parent_node .port ,
154
154
host = host ,
155
- password = password
156
- )
155
+ password = password )
157
156
158
157
self .cursor = self .connection .cursor ()
159
158
@@ -164,10 +163,13 @@ def __exit__(self, type, value, tb):
164
163
self .connection .close ()
165
164
166
165
def begin (self , isolation_level = 0 ):
167
- levels = ['read uncommitted' ,
168
- 'read committed' ,
169
- 'repeatable read' ,
170
- 'serializable' ]
166
+ # yapf: disable
167
+ levels = [
168
+ 'read uncommitted' ,
169
+ 'read committed' ,
170
+ 'repeatable read' ,
171
+ 'serializable'
172
+ ]
171
173
172
174
# Check if level is int [0..3]
173
175
if (isinstance (isolation_level , int ) and
@@ -185,11 +187,11 @@ def begin(self, isolation_level=0):
185
187
186
188
# Something is wrong, emit exception
187
189
else :
188
- raise QueryException ('Invalid isolation level "{}"' . format (
189
- isolation_level ))
190
+ raise QueryException (
191
+ 'Invalid isolation level "{}"' . format ( isolation_level ))
190
192
191
- self .cursor .execute ('SET TRANSACTION ISOLATION LEVEL {}' . format (
192
- isolation_level ))
193
+ self .cursor .execute (
194
+ 'SET TRANSACTION ISOLATION LEVEL {}' . format ( isolation_level ))
193
195
194
196
def commit (self ):
195
197
self .connection .commit ()
@@ -210,14 +212,13 @@ def close(self):
210
212
211
213
212
214
class PostgresNode (object ):
213
-
214
215
def __init__ (self , name , port , base_dir = None , use_logging = False ):
215
216
global bound_ports
216
217
217
218
# check that port is not used
218
219
if port in bound_ports :
219
220
raise InitPostgresNodeException (
220
- 'port {} is already in use' .format (port ))
221
+ 'port {} is already in use' .format (port ))
221
222
222
223
# mark port as used
223
224
bound_ports .add (port )
@@ -289,8 +290,10 @@ def initdb(self, directory, initdb_params=[]):
289
290
stderr = subprocess .STDOUT )
290
291
291
292
if ret :
292
- raise ClusterException ("Cluster initialization failed. You"
293
- " can find additional information at '%s'" % initdb_logfile )
293
+ raise ClusterException (
294
+ "Cluster initialization failed. You"
295
+ " can find additional information at '%s'" %
296
+ initdb_logfile )
294
297
295
298
def _setup_data_dir (self , data_dir ):
296
299
global base_data_dir
@@ -302,7 +305,6 @@ def _setup_data_dir(self, data_dir):
302
305
303
306
shutil .copytree (base_data_dir , data_dir )
304
307
305
-
306
308
def init (self , allows_streaming = False , initdb_params = []):
307
309
""" Performs initdb """
308
310
@@ -322,24 +324,22 @@ def init(self, allows_streaming=False, initdb_params=[]):
322
324
323
325
# add parameters to config file
324
326
with open (postgres_conf , "w" ) as conf :
327
+ conf .write ("fsync = off\n "
328
+ "log_statement = all\n "
329
+ "port = {}\n " .format (self .port ))
325
330
conf .write (
326
- "fsync = off\n "
327
- "log_statement = all\n "
328
- "port = {}\n " .format (self .port ))
329
- conf .write (
330
- # "unix_socket_directories = '%s'\n"
331
- # "listen_addresses = ''\n";)
331
+ # "unix_socket_directories = '%s'\n"
332
+ # "listen_addresses = ''\n";)
332
333
"listen_addresses = '{}'\n " .format (self .host ))
333
334
334
335
if allows_streaming :
335
336
# TODO: wal_level = hot_standby (9.5)
336
- conf .write (
337
- "max_wal_senders = 5\n "
338
- "wal_keep_segments = 20\n "
339
- "max_wal_size = 128MB\n "
340
- "wal_log_hints = on\n "
341
- "hot_standby = on\n "
342
- "max_connections = 10\n " )
337
+ conf .write ("max_wal_senders = 5\n "
338
+ "wal_keep_segments = 20\n "
339
+ "max_wal_size = 128MB\n "
340
+ "wal_log_hints = on\n "
341
+ "hot_standby = on\n "
342
+ "max_connections = 10\n " )
343
343
if get_config ().get ("VERSION_NUM" ) < 906000 :
344
344
conf .write ("wal_level = hot_standby\n " )
345
345
else :
@@ -349,7 +349,10 @@ def init(self, allows_streaming=False, initdb_params=[]):
349
349
350
350
return self
351
351
352
- def init_from_backup (self , root_node , backup_name , has_streaming = False ,
352
+ def init_from_backup (self ,
353
+ root_node ,
354
+ backup_name ,
355
+ has_streaming = False ,
353
356
hba_permit_replication = True ):
354
357
"""Initializes cluster from backup, made by another node"""
355
358
@@ -359,10 +362,7 @@ def init_from_backup(self, root_node, backup_name, has_streaming=False,
359
362
os .chmod (self .data_dir , 0o0700 )
360
363
361
364
# Change port in config file
362
- self .append_conf (
363
- "postgresql.conf" ,
364
- "port = {}" .format (self .port )
365
- )
365
+ self .append_conf ("postgresql.conf" , "port = {}" .format (self .port ))
366
366
# Enable streaming
367
367
if hba_permit_replication :
368
368
self .set_replication_conf ()
@@ -378,9 +378,9 @@ def set_replication_conf(self):
378
378
def enable_streaming (self , root_node ):
379
379
recovery_conf = os .path .join (self .data_dir , "recovery.conf" )
380
380
with open (recovery_conf , "a" ) as conf :
381
- conf .write (
382
- "primary_conninfo='{} application_name={}' \n "
383
- "standby_mode=on \n " . format ( root_node . connstr , self .name ))
381
+ conf .write ("primary_conninfo='{} application_name={}' \n "
382
+ "standby_mode=on \n ". format ( root_node . connstr ,
383
+ self .name ))
384
384
385
385
def append_conf (self , filename , string ):
386
386
"""Appends line to a config file like "postgresql.conf"
@@ -412,10 +412,7 @@ def pg_ctl(self, command, params={}, command_options=[]):
412
412
open (self .error_filename , "a" ) as file_err :
413
413
414
414
res = subprocess .call (
415
- arguments + command_options ,
416
- stdout = file_out ,
417
- stderr = file_err
418
- )
415
+ arguments + command_options , stdout = file_out , stderr = file_err )
419
416
420
417
if res > 0 :
421
418
with open (self .error_filename , "r" ) as errfile :
@@ -427,7 +424,8 @@ def start(self, params={}):
427
424
""" Starts cluster """
428
425
429
426
if self .use_logging :
430
- tmpfile = tempfile .NamedTemporaryFile ('w' , dir = self .logs_dir , delete = False )
427
+ tmpfile = tempfile .NamedTemporaryFile (
428
+ 'w' , dir = self .logs_dir , delete = False )
431
429
logfile = tmpfile .name
432
430
433
431
self .logger = log_watch (self .name , logfile )
@@ -454,8 +452,9 @@ def status(self):
454
452
"""
455
453
try :
456
454
res = subprocess .check_output ([
457
- self .get_bin_path ("pg_ctl" ), 'status' , '-D' , '{0}' .format (self .data_dir )
458
- ])
455
+ self .get_bin_path ("pg_ctl" ), 'status' , '-D' ,
456
+ '{0}' .format (self .data_dir )
457
+ ])
459
458
return True
460
459
except subprocess .CalledProcessError as e :
461
460
if e .returncode == 3 :
@@ -485,8 +484,7 @@ def get_control_data(self):
485
484
try :
486
485
lines = subprocess .check_output (
487
486
[pg_controldata ] + ["-D" , self .data_dir ],
488
- stderr = subprocess .STDOUT
489
- ).decode ("utf-8" ).splitlines ()
487
+ stderr = subprocess .STDOUT ).decode ("utf-8" ).splitlines ()
490
488
except subprocess .CalledProcessError as e :
491
489
raise PgcontroldataException (e .output , e .cmd )
492
490
@@ -497,10 +495,7 @@ def get_control_data(self):
497
495
498
496
def stop (self , params = {}):
499
497
""" Stops cluster """
500
- _params = {
501
- "-D" : self .data_dir ,
502
- "-w" : None
503
- }
498
+ _params = {"-D" : self .data_dir , "-w" : None }
504
499
_params .update (params )
505
500
self .pg_ctl ("stop" , _params )
506
501
@@ -513,10 +508,7 @@ def stop(self, params={}):
513
508
514
509
def restart (self , params = {}):
515
510
""" Restarts cluster """
516
- _params = {
517
- "-D" : self .data_dir ,
518
- "-w" : None
519
- }
511
+ _params = {"-D" : self .data_dir , "-w" : None }
520
512
_params .update (params )
521
513
self .pg_ctl ("restart" , _params )
522
514
@@ -554,7 +546,8 @@ def psql(self, dbname, query=None, filename=None, username=None):
554
546
"""
555
547
psql = self .get_bin_path ("psql" )
556
548
psql_params = [
557
- psql , "-XAtq" , "-h{}" .format (self .host ), "-p {}" .format (self .port ), dbname
549
+ psql , "-XAtq" , "-h{}" .format (self .host ), "-p {}" .format (self .port ),
550
+ dbname
558
551
]
559
552
560
553
if query :
@@ -570,10 +563,7 @@ def psql(self, dbname, query=None, filename=None, username=None):
570
563
571
564
# start psql process
572
565
process = subprocess .Popen (
573
- psql_params ,
574
- stdout = subprocess .PIPE ,
575
- stderr = subprocess .PIPE
576
- )
566
+ psql_params , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
577
567
578
568
# wait untill it finishes and get stdout and stderr
579
569
out , err = process .communicate ()
@@ -594,10 +584,8 @@ def dump(self, dbname, filename):
594
584
"""Invoke pg_dump and exports database to a file as an sql script"""
595
585
path = os .path .join (self .base_dir , filename )
596
586
params = [
597
- self .get_bin_path ("pg_dump" ),
598
- "-p {}" .format (self .port ),
599
- "-f" , path ,
600
- dbname
587
+ self .get_bin_path ("pg_dump" ), "-p {}" .format (self .port ), "-f" ,
588
+ path , dbname
601
589
]
602
590
603
591
with open (self .error_filename , "a" ) as file_err :
@@ -647,15 +635,13 @@ def backup(self, name):
647
635
pg_basebackup = self .get_bin_path ("pg_basebackup" )
648
636
backup_path = os .path .join (self .base_dir , name )
649
637
os .makedirs (backup_path )
650
- params = [pg_basebackup , "-D" , backup_path , "-p {}" .format (
651
- self .port ), "-X" , "fetch" ]
638
+ params = [
639
+ pg_basebackup , "-D" , backup_path , "-p {}" .format (self .port ), "-X" ,
640
+ "fetch"
641
+ ]
652
642
with open (self .output_filename , "a" ) as file_out , \
653
643
open (self .error_filename , "a" ) as file_err :
654
- ret = subprocess .call (
655
- params ,
656
- stdout = file_out ,
657
- stderr = file_err
658
- )
644
+ ret = subprocess .call (params , stdout = file_out , stderr = file_err )
659
645
if ret :
660
646
raise ClusterException ("Base backup failed" )
661
647
@@ -664,19 +650,12 @@ def backup(self, name):
664
650
def pgbench_init (self , dbname = 'postgres' , scale = 1 , options = []):
665
651
"""Prepare pgbench database"""
666
652
pgbench = self .get_bin_path ("pgbench" )
667
- params = [
668
- pgbench ,
669
- "-i" ,
670
- "-s" , "%i" % scale ,
671
- "-p" , "%i" % self .port
672
- ] + options + [dbname ]
653
+ params = [pgbench , "-i" , "-s" ,
654
+ "%i" % scale , "-p" ,
655
+ "%i" % self .port ] + options + [dbname ]
673
656
with open (self .output_filename , "a" ) as file_out , \
674
657
open (self .error_filename , "a" ) as file_err :
675
- ret = subprocess .call (
676
- params ,
677
- stdout = file_out ,
678
- stderr = file_err
679
- )
658
+ ret = subprocess .call (params , stdout = file_out , stderr = file_err )
680
659
if ret :
681
660
raise ClusterException ("pgbench init failed" )
682
661
@@ -685,15 +664,8 @@ def pgbench_init(self, dbname='postgres', scale=1, options=[]):
685
664
def pgbench (self , dbname = 'postgres' , stdout = None , stderr = None , options = []):
686
665
"""Make pgbench process"""
687
666
pgbench = self .get_bin_path ("pgbench" )
688
- params = [
689
- pgbench ,
690
- "-p" , "%i" % self .port
691
- ] + options + [dbname ]
692
- proc = subprocess .Popen (
693
- params ,
694
- stdout = stdout ,
695
- stderr = stderr
696
- )
667
+ params = [pgbench , "-p" , "%i" % self .port ] + options + [dbname ]
668
+ proc = subprocess .Popen (params , stdout = stdout , stderr = stderr )
697
669
698
670
return proc
699
671
@@ -715,7 +687,8 @@ def get_config():
715
687
716
688
try :
717
689
out = six .StringIO (
718
- subprocess .check_output ([pg_config_cmd ], universal_newlines = True ))
690
+ subprocess .check_output (
691
+ [pg_config_cmd ], universal_newlines = True ))
719
692
for line in out :
720
693
if line and "=" in line :
721
694
key , value = line .split ("=" , 1 )
0 commit comments