Skip to content

Commit 7ab8306

Browse files
pietroalbiniJoshua Nelson
authored and
Joshua Nelson
committed
storage: move deleting a prefix into Storage and add tests for it
1 parent ef295b3 commit 7ab8306

File tree

5 files changed

+184
-128
lines changed

5 files changed

+184
-128
lines changed

src/bin/cratesfyi.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -483,11 +483,11 @@ impl DatabaseSubcommand {
483483

484484
Self::Delete {
485485
command: DeleteSubcommand::Version { name, version },
486-
} => db::delete_version(&*ctx.config()?, &*ctx.conn()?, &name, &version)
486+
} => db::delete_version(&*ctx.conn()?, &*ctx.storage()?, &name, &version)
487487
.context("failed to delete the crate")?,
488488
Self::Delete {
489489
command: DeleteSubcommand::Crate { name },
490-
} => db::delete_crate(&*ctx.config()?, &*ctx.conn()?, &name)
490+
} => db::delete_crate(&*ctx.conn()?, &*ctx.storage()?, &name)
491491
.context("failed to delete the crate")?,
492492
Self::Blacklist { command } => command.handle_args(ctx)?,
493493
}

src/db/delete.rs

+8-78
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
1-
use crate::{
2-
storage::s3::{s3_client, S3_RUNTIME},
3-
Config,
4-
};
1+
use crate::Storage;
52
use failure::{Error, Fail};
63
use postgres::Connection;
7-
use rusoto_s3::{DeleteObjectsRequest, ListObjectsV2Request, ObjectIdentifier, S3Client, S3};
84

95
/// List of directories in docs.rs's underlying storage (either the database or S3) containing a
106
/// subdirectory named after the crate. Those subdirectories will be deleted.
@@ -16,35 +12,27 @@ enum CrateDeletionError {
1612
MissingCrate(String),
1713
}
1814

19-
pub fn delete_crate(config: &Config, conn: &Connection, name: &str) -> Result<(), Error> {
15+
pub fn delete_crate(conn: &Connection, storage: &Storage, name: &str) -> Result<(), Error> {
2016
let crate_id = get_id(conn, name)?;
2117
delete_crate_from_database(conn, name, crate_id)?;
2218

23-
if let Some(client) = s3_client() {
24-
for prefix in STORAGE_PATHS_TO_DELETE {
25-
delete_prefix_from_s3(config, &client, &format!("{}/{}/", prefix, name))?;
26-
}
19+
for prefix in STORAGE_PATHS_TO_DELETE {
20+
storage.delete_prefix(&format!("{}/{}/", prefix, name))?;
2721
}
2822

2923
Ok(())
3024
}
3125

3226
pub fn delete_version(
33-
config: &Config,
3427
conn: &Connection,
28+
storage: &Storage,
3529
name: &str,
3630
version: &str,
3731
) -> Result<(), Error> {
3832
delete_version_from_database(conn, name, version)?;
3933

40-
if let Some(client) = s3_client() {
41-
for prefix in STORAGE_PATHS_TO_DELETE {
42-
delete_prefix_from_s3(
43-
config,
44-
&client,
45-
&format!("{}/{}/{}/", prefix, name, version),
46-
)?;
47-
}
34+
for prefix in STORAGE_PATHS_TO_DELETE {
35+
storage.delete_prefix(&format!("{}/{}/{}/", prefix, name, version))?;
4836
}
4937

5038
Ok(())
@@ -120,70 +108,12 @@ fn delete_crate_from_database(conn: &Connection, name: &str, crate_id: i32) -> R
120108
transaction.execute("DELETE FROM releases WHERE crate_id = $1;", &[&crate_id])?;
121109
transaction.execute("DELETE FROM crates WHERE id = $1;", &[&crate_id])?;
122110

123-
for prefix in STORAGE_PATHS_TO_DELETE {
124-
transaction.execute(
125-
"DELETE FROM files WHERE path LIKE $1;",
126-
&[&format!("{}/{}/%", prefix, name)],
127-
)?;
128-
}
129-
130111
// Transactions automatically rollback when not committing, so if any of the previous queries
131112
// fail the whole transaction will be aborted.
132113
transaction.commit()?;
133114
Ok(())
134115
}
135116

136-
fn delete_prefix_from_s3(config: &Config, s3: &S3Client, name: &str) -> Result<(), Error> {
137-
S3_RUNTIME.handle().block_on(async {
138-
let mut continuation_token = None;
139-
loop {
140-
let list = s3
141-
.list_objects_v2(ListObjectsV2Request {
142-
bucket: config.s3_bucket.clone(),
143-
prefix: Some(name.into()),
144-
continuation_token,
145-
..ListObjectsV2Request::default()
146-
})
147-
.await?;
148-
149-
let to_delete = list
150-
.contents
151-
.unwrap_or_else(Vec::new)
152-
.into_iter()
153-
.filter_map(|o| o.key)
154-
.map(|key| ObjectIdentifier {
155-
key,
156-
version_id: None,
157-
})
158-
.collect::<Vec<_>>();
159-
160-
let resp = s3
161-
.delete_objects(DeleteObjectsRequest {
162-
bucket: config.s3_bucket.clone(),
163-
delete: rusoto_s3::Delete {
164-
objects: to_delete,
165-
quiet: None,
166-
},
167-
..DeleteObjectsRequest::default()
168-
})
169-
.await?;
170-
171-
if let Some(errs) = resp.errors {
172-
for err in &errs {
173-
log::error!("error deleting file from s3: {:?}", err);
174-
}
175-
176-
failure::bail!("uploading to s3 failed");
177-
}
178-
179-
continuation_token = list.continuation_token;
180-
if continuation_token.is_none() {
181-
return Ok(());
182-
}
183-
}
184-
})
185-
}
186-
187117
#[cfg(test)]
188118
mod tests {
189119
use super::*;
@@ -289,7 +219,7 @@ mod tests {
289219
vec!["malicious actor".to_string(), "Peter Rabbit".to_string()]
290220
);
291221

292-
delete_version(&*env.config(), &db.conn(), "a", "1.0.0")?;
222+
delete_version(&db.conn(), &*env.storage(), "a", "1.0.0")?;
293223
assert!(!release_exists(&db.conn(), v1)?);
294224
assert!(release_exists(&db.conn(), v2)?);
295225
assert_eq!(

src/storage/database.rs

+8
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,14 @@ impl<'a> StorageTransaction for DatabaseStorageTransaction<'a> {
9999
Ok(())
100100
}
101101

102+
fn delete_prefix(&mut self, prefix: &str) -> Result<(), Error> {
103+
self.transaction.execute(
104+
"DELETE FROM files WHERE path LIKE $1;",
105+
&[&format!("{}%", prefix.replace('%', "\\%"))],
106+
)?;
107+
Ok(())
108+
}
109+
102110
fn complete(self: Box<Self>) -> Result<(), Error> {
103111
self.transaction.commit()?;
104112
Ok(())

src/storage/mod.rs

+100-22
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,24 @@ impl Storage {
111111
Ok(blob)
112112
}
113113

114+
fn transaction<T, F>(&self, f: F) -> Result<T, Error>
115+
where
116+
F: FnOnce(&mut dyn StorageTransaction) -> Result<T, Error>,
117+
{
118+
let conn;
119+
let mut trans: Box<dyn StorageTransaction> = match &self.backend {
120+
StorageBackend::Database(db) => {
121+
conn = db.start_connection()?;
122+
Box::new(conn.start_storage_transaction()?)
123+
}
124+
StorageBackend::S3(s3) => Box::new(s3.start_storage_transaction()?),
125+
};
126+
127+
let res = f(trans.as_mut())?;
128+
trans.complete()?;
129+
Ok(res)
130+
}
131+
114132
// Store all files in `root_dir` into the backend under `prefix`.
115133
//
116134
// If the environment is configured with S3 credentials, this will upload to S3;
@@ -167,30 +185,23 @@ impl Storage {
167185
&self,
168186
mut blobs: impl Iterator<Item = Result<Blob, Error>>,
169187
) -> Result<(), Error> {
170-
let conn;
171-
let mut trans: Box<dyn StorageTransaction> = match &self.backend {
172-
StorageBackend::Database(db) => {
173-
conn = db.start_connection()?;
174-
Box::new(conn.start_storage_transaction()?)
175-
}
176-
StorageBackend::S3(s3) => Box::new(s3.start_storage_transaction()?),
177-
};
178-
179-
loop {
180-
let batch: Vec<_> = blobs
181-
.by_ref()
182-
.take(MAX_CONCURRENT_UPLOADS)
183-
.collect::<Result<_, Error>>()?;
184-
185-
if batch.is_empty() {
186-
break;
188+
self.transaction(|trans| {
189+
loop {
190+
let batch: Vec<_> = blobs
191+
.by_ref()
192+
.take(MAX_CONCURRENT_UPLOADS)
193+
.collect::<Result<_, Error>>()?;
194+
if batch.is_empty() {
195+
break;
196+
}
197+
trans.store_batch(batch)?;
187198
}
199+
Ok(())
200+
})
201+
}
188202

189-
trans.store_batch(batch)?;
190-
}
191-
192-
trans.complete()?;
193-
Ok(())
203+
pub(crate) fn delete_prefix(&self, prefix: &str) -> Result<(), Error> {
204+
self.transaction(|trans| trans.delete_prefix(prefix))
194205
}
195206

196207
// We're using `&self` instead of consuming `self` or creating a Drop impl because during tests
@@ -216,6 +227,7 @@ impl std::fmt::Debug for Storage {
216227

217228
trait StorageTransaction {
218229
fn store_batch(&mut self, batch: Vec<Blob>) -> Result<(), Error>;
230+
fn delete_prefix(&mut self, prefix: &str) -> Result<(), Error>;
219231
fn complete(self: Box<Self>) -> Result<(), Error>;
220232
}
221233

@@ -453,6 +465,70 @@ mod backend_tests {
453465
Ok(())
454466
}
455467

468+
fn test_delete_prefix(storage: &Storage) -> Result<(), Error> {
469+
test_deletion(
470+
storage,
471+
"foo/bar/",
472+
&[
473+
"foo.txt",
474+
"foo/bar.txt",
475+
"foo/bar/baz.txt",
476+
"foo/bar/foobar.txt",
477+
"bar.txt",
478+
],
479+
&["foo.txt", "foo/bar.txt", "bar.txt"],
480+
&["foo/bar/baz.txt", "foo/bar/foobar.txt"],
481+
)
482+
}
483+
484+
fn test_delete_percent(storage: &Storage) -> Result<(), Error> {
485+
// PostgreSQL treats "%" as a special char when deleting a prefix. Make sure any "%" in the
486+
// provided prefix is properly escaped.
487+
test_deletion(
488+
storage,
489+
"foo/%/",
490+
&["foo/bar.txt", "foo/%/bar.txt"],
491+
&["foo/bar.txt"],
492+
&["foo/%/bar.txt"],
493+
)
494+
}
495+
496+
fn test_deletion(
497+
storage: &Storage,
498+
prefix: &str,
499+
start: &[&str],
500+
present: &[&str],
501+
missing: &[&str],
502+
) -> Result<(), Error> {
503+
storage.store_blobs(
504+
start
505+
.iter()
506+
.map(|path| Blob {
507+
path: (*path).to_string(),
508+
content: b"foo\n".to_vec(),
509+
compression: None,
510+
mime: "text/plain".into(),
511+
date_updated: Utc::now(),
512+
})
513+
.collect(),
514+
)?;
515+
516+
storage.delete_prefix(prefix)?;
517+
518+
for existing in present {
519+
assert!(storage.get(existing, std::usize::MAX).is_ok());
520+
}
521+
for missing in missing {
522+
assert!(storage
523+
.get(missing, std::usize::MAX)
524+
.unwrap_err()
525+
.downcast_ref::<PathNotFoundError>()
526+
.is_some());
527+
}
528+
529+
Ok(())
530+
}
531+
456532
// Remember to add the test name to the macro below when adding a new one.
457533

458534
macro_rules! backend_tests {
@@ -495,6 +571,8 @@ mod backend_tests {
495571
test_get_too_big,
496572
test_store_blobs,
497573
test_store_all,
574+
test_delete_prefix,
575+
test_delete_percent,
498576
}
499577
}
500578
}

0 commit comments

Comments
 (0)