@@ -113,7 +113,8 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
113
113
char pretty_time_str [20 ];
114
114
115
115
/* files to push in multi-thread mode */
116
- parray * batch_files = NULL ;
116
+ parray * batch_files = NULL ;
117
+ int n_threads ;
117
118
118
119
my_pid = getpid ();
119
120
@@ -148,25 +149,32 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
148
149
is_compress = true;
149
150
#endif
150
151
151
- if (num_threads > batch_size )
152
- num_threads = batch_size ;
153
-
154
152
/* Setup filelist and locks */
155
153
batch_files = setup_push_filelist (archive_status_dir , wal_file_name , batch_size );
156
154
155
+ n_threads = num_threads ;
156
+ if (num_threads > parray_num (batch_files ))
157
+ n_threads = parray_num (batch_files );
158
+
157
159
elog (INFO , "PID [%d]: pg_probackup push file %s into archive, "
158
- "threads: %i, batch size: %i, compression: %s" ,
159
- my_pid , wal_file_name , num_threads ,
160
- batch_size , is_compress ? "zlib" : "none" );
160
+ "threads: %i/%i, batch: %lu/%i, compression: %s" ,
161
+ my_pid , wal_file_name , n_threads , num_threads ,
162
+ parray_num (batch_files ), batch_size ,
163
+ is_compress ? "zlib" : "none" );
164
+
165
+ num_threads = n_threads ;
161
166
162
167
/* Single-thread push
163
- * We don`t want to start multi-thread push, if number of threads in equal to 1.
168
+ * We don`t want to start multi-thread push, if number of threads in equal to 1,
169
+ * or the number of files ready to push is small.
164
170
* Multithreading in remote mode isn`t cheap,
165
171
* establishing ssh connection can take 100-200ms, so running and terminating
166
172
* one thread using generic multithread approach can take
167
173
* almost as much time as copying itself.
174
+ * TODO: maybe we should be more conservative and force single thread
175
+ * push if batch_files array is small.
168
176
*/
169
- if (num_threads == 1 )
177
+ if (num_threads == 1 || ( parray_num ( batch_files ) == 1 ) )
170
178
{
171
179
INSTR_TIME_SET_CURRENT (start_time );
172
180
for (i = 0 ; i < parray_num (batch_files ); i ++ )
@@ -297,8 +305,7 @@ push_files(void *arg)
297
305
rc = push_file (xlogfile , args -> archive_status_dir ,
298
306
args -> pg_xlog_dir , args -> archive_dir ,
299
307
args -> overwrite , args -> no_sync ,
300
- args -> archive_timeout ,
301
- no_ready_rename ,
308
+ args -> archive_timeout , no_ready_rename ,
302
309
/* do not compress .backup, .partial and .history files */
303
310
args -> compress && IsXLogFileName (xlogfile -> name ) ? true : false,
304
311
args -> compress_level , args -> thread_num );
@@ -459,7 +466,7 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
459
466
/* first round */
460
467
if (!partial_try_count )
461
468
{
462
- elog (VERBOSE , "Thread [%d]: Temp WAL file already exists, "
469
+ elog (LOG , "Thread [%d]: Temp WAL file already exists, "
463
470
"waiting on it %u seconds: \"%s\"" ,
464
471
thread_num , archive_timeout , to_fullpath_part );
465
472
partial_file_size = st .st_size ;
@@ -704,7 +711,7 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
704
711
/* first round */
705
712
if (!partial_try_count )
706
713
{
707
- elog (VERBOSE , "Thread [%d]: Temp WAL file already exists, "
714
+ elog (LOG , "Thread [%d]: Temp WAL file already exists, "
708
715
"waiting on it %u seconds: \"%s\"" ,
709
716
thread_num , archive_timeout , to_fullpath_gz_part );
710
717
partial_file_size = st .st_size ;
@@ -1143,10 +1150,6 @@ setup_push_filelist(const char *archive_status_dir, const char *first_file,
1143
1150
char suffix [MAXFNAMELEN ];
1144
1151
pgFile * file = (pgFile * ) parray_get (status_files , i );
1145
1152
1146
- /* first filename already in batch list */
1147
- if (strcmp (file -> name , first_file ) == 0 )
1148
- continue ;
1149
-
1150
1153
result = sscanf (file -> name , "%[^.]%s" , (char * ) & filename , (char * ) & suffix );
1151
1154
1152
1155
if (result != 2 )
@@ -1155,6 +1158,10 @@ setup_push_filelist(const char *archive_status_dir, const char *first_file,
1155
1158
if (strcmp (suffix , ".ready" ) != 0 )
1156
1159
continue ;
1157
1160
1161
+ /* first filename already in batch list */
1162
+ if (strcmp (filename , first_file ) == 0 )
1163
+ continue ;
1164
+
1158
1165
xlogfile = palloc (sizeof (WALSegno ));
1159
1166
pg_atomic_init_flag (& xlogfile -> lock );
1160
1167
0 commit comments