-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[WIP][SPARK-52011][SQL] Reduce HDFS NameNode RPC on vectorized Parquet reader #50765
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
ParquetFileReader fileReader; | ||
if (fileFooter.isDefined()) { | ||
fileReader = new ParquetFileReader(configuration, file, fileFooter.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This constructor internally calls HadoopInputFile.fromPath(file, configuration)
, which produces an unnecessary GetFileInfo
RPC
public static HadoopInputFile fromPath(Path path, Configuration conf) throws IOException {
FileSystem fs = path.getFileSystem(conf);
return new HadoopInputFile(fs, fs.getFileStatus(path), conf);
}
also cc @turboFei |
val footerFilter = ParquetFooterReader.footerFilter( | ||
sharedConf, file, ParquetFooterReader.WITH_ROW_GROUPS) | ||
val footer = ParquetFooterReader.readFooter( | ||
hadoopInputFile, fileInputStream, footerFilter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: close the reader but keep the fileInputStream
open, waiting for apache/parquet-java#3208
What changes were proposed in this pull request?
On a busy Hadoop cluster, the
GetFileInfo
andGetBlockLocations
contribute the most RPCs to the HDFS NameNode. After investigating the Spark Parquet vectorized reader, I think 3/4 RPCs can be reduced.Currently, the Parquet vectorized reader produces 4 NameNode RPCs on reading each file (or split):
GetFileInfo
and oneGetBlockLocations
GetFileInfo
and oneGetBlockLocations
The key idea of this PR is:
FileStatus
for each Parquet file during the planning phase, we can transfer theFileStatus
from the driver to the executor viaPartitionFile
, so that the task doesn't need to ask the NameNode again, this saves twoGetFileInfo
RPCs.SeekableInputStream
on reading footer and row groups, this saves oneGetBlockLocations
RPC.TODO: The PR currently requires some changes on Parquet side first.
Why are the changes needed?
Reduce unnecessary RPCs of NameNode to improve performance and stability for large Hadoop clusters.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Manually tested on a Hadoop cluster, the test uses TPC-H Q4, based on sf3000 Parquet tables.
HDFS NameNode metrics (master VS. this PR)
HDFS NameNode audit logs:
Taking file
part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet
as example, the file is supposed to be split into 3 splitsBefore
After
Was this patch authored or co-authored using generative AI tooling?
No.