diff --git a/Cargo.toml b/Cargo.toml index 89c385a3..1860c696 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,9 +10,4 @@ members = [ ] [workspace.dependencies] -rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "477264453b", default-features = false, features = [ - "buildtime_bindgen", - "bundled-libsql-wasm-experimental", - "column_decltype", - "load_extension" -] } +libsql = { version = "0.1.6", default-features = false } diff --git a/bottomless/src/lib.rs b/bottomless/src/lib.rs index 406f0881..ea2bba98 100644 --- a/bottomless/src/lib.rs +++ b/bottomless/src/lib.rs @@ -57,18 +57,18 @@ pub extern "C" fn xOpen( let rc = unsafe { (orig_methods.xOpen.unwrap())(vfs, db_file, wal_name, no_shm_mode, max_size, methods, wal) }; - if rc != ffi::SQLITE_OK { + if rc != ffi::SQLITE_OK as i32 { return rc; } if !is_regular(vfs) { tracing::error!("Bottomless WAL is currently only supported for regular VFS"); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } if is_local() { tracing::info!("Running in local-mode only, without any replication"); - return ffi::SQLITE_OK; + return ffi::SQLITE_OK as i32; } let runtime = match tokio::runtime::Builder::new_current_thread() @@ -78,7 +78,7 @@ pub extern "C" fn xOpen( Ok(runtime) => runtime, Err(e) => { tracing::error!("Failed to initialize async runtime: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } }; @@ -88,7 +88,7 @@ pub extern "C" fn xOpen( Ok(path) => path, Err(e) => { tracing::error!("Failed to parse the main database path: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } } }; @@ -98,12 +98,12 @@ pub extern "C" fn xOpen( Ok(repl) => repl, Err(e) => { tracing::error!("Failed to initialize replicator: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } }; let rc = block_on!(runtime, try_restore(&mut replicator)); - if rc != ffi::SQLITE_OK { + if rc != ffi::SQLITE_OK as i32 { return rc; } @@ -114,7 +114,7 @@ pub extern "C" fn xOpen( let context_ptr = Box::into_raw(Box::new(context)) as *mut c_void; unsafe { (*(*wal)).pMethodsData = context_ptr }; - ffi::SQLITE_OK + ffi::SQLITE_OK as i32 } fn get_orig_methods(wal: *mut Wal) -> &'static libsql_wal_methods { @@ -138,7 +138,7 @@ pub extern "C" fn xClose( let orig_methods = get_orig_methods(wal); let methods_data = unsafe { (*wal).pMethodsData as *mut replicator::Context }; let rc = unsafe { (orig_methods.xClose.unwrap())(wal, db, sync_flags, n_buf, z_buf) }; - if rc != ffi::SQLITE_OK { + if rc != ffi::SQLITE_OK as i32 { return rc; } if !is_local() && !methods_data.is_null() { @@ -194,7 +194,7 @@ pub extern "C" fn xUndo( ) -> i32 { let orig_methods = get_orig_methods(wal); let rc = unsafe { (orig_methods.xUndo.unwrap())(wal, func, ctx) }; - if is_local() || rc != ffi::SQLITE_OK { + if is_local() || rc != ffi::SQLITE_OK as i32 { return rc; } @@ -207,7 +207,7 @@ pub extern "C" fn xUndo( ); ctx.replicator.rollback_to_frame(last_valid_frame); - ffi::SQLITE_OK + ffi::SQLITE_OK as i32 } pub extern "C" fn xSavepoint(wal: *mut Wal, wal_data: *mut u32) { @@ -218,7 +218,7 @@ pub extern "C" fn xSavepoint(wal: *mut Wal, wal_data: *mut u32) { pub extern "C" fn xSavepointUndo(wal: *mut Wal, wal_data: *mut u32) -> i32 { let orig_methods = get_orig_methods(wal); let rc = unsafe { (orig_methods.xSavepointUndo.unwrap())(wal, wal_data) }; - if is_local() || rc != ffi::SQLITE_OK { + if is_local() || rc != ffi::SQLITE_OK as i32 { return rc; } @@ -231,7 +231,7 @@ pub extern "C" fn xSavepointUndo(wal: *mut Wal, wal_data: *mut u32) -> i32 { ); ctx.replicator.rollback_to_frame(last_valid_frame); - ffi::SQLITE_OK + ffi::SQLITE_OK as i32 } pub extern "C" fn xFrames( @@ -253,7 +253,7 @@ pub extern "C" fn xFrames( // supported by bottomless storage. if let Err(e) = ctx.replicator.set_page_size(page_size as usize) { tracing::error!("{}", e); - return ffi::SQLITE_IOERR_WRITE; + return ffi::SQLITE_IOERR_WRITE as i32; } let frame_count = ffi::PageHdrIter::new(page_headers, page_size as usize).count(); if size_after != 0 { @@ -273,11 +273,11 @@ pub extern "C" fn xFrames( sync_flags, ) }; - if is_local() || rc != ffi::SQLITE_OK { + if is_local() || rc != ffi::SQLITE_OK as i32 { return rc; } - ffi::SQLITE_OK + ffi::SQLITE_OK as i32 } extern "C" fn always_wait(_busy_param: *mut c_void) -> i32 { @@ -307,9 +307,9 @@ pub extern "C" fn xCheckpoint( ** In order to avoid autocheckpoint on close (that's too often), ** checkpoint attempts weaker than TRUNCATE are ignored. */ - if emode < ffi::SQLITE_CHECKPOINT_TRUNCATE { + if emode < ffi::SQLITE_CHECKPOINT_TRUNCATE as i32 { tracing::trace!("Ignoring a checkpoint request weaker than TRUNCATE"); - return ffi::SQLITE_OK; + return ffi::SQLITE_OK as i32; } /* If there's no busy handler, let's provide a default one, ** since we auto-upgrade the passive checkpoint @@ -335,14 +335,14 @@ pub extern "C" fn xCheckpoint( ) }; - if is_local() || rc != ffi::SQLITE_OK { + if is_local() || rc != ffi::SQLITE_OK as i32 { return rc; } let ctx = get_replicator_context(wal); if ctx.replicator.commits_in_current_generation() == 0 { tracing::debug!("No commits happened in this generation, not snapshotting"); - return ffi::SQLITE_OK; + return ffi::SQLITE_OK as i32; } let last_known_frame = ctx.replicator.last_known_frame(); @@ -352,7 +352,7 @@ pub extern "C" fn xCheckpoint( ctx.replicator.wait_until_committed(last_known_frame) ) { tracing::error!("Failed to finalize replication: {}", e); - return ffi::SQLITE_IOERR_WRITE; + return ffi::SQLITE_IOERR_WRITE as i32; } ctx.replicator.new_generation(); @@ -363,10 +363,10 @@ pub extern "C" fn xCheckpoint( "Failed to snapshot the main db file during checkpoint: {}", e ); - return ffi::SQLITE_IOERR_WRITE; + return ffi::SQLITE_IOERR_WRITE as i32; } - ffi::SQLITE_OK + ffi::SQLITE_OK as i32 } pub extern "C" fn xCallback(wal: *mut Wal) -> i32 { @@ -416,13 +416,13 @@ async fn try_restore(replicator: &mut replicator::Replicator) -> i32 { replicator.new_generation(); if let Err(e) = replicator.snapshot_main_db_file().await { tracing::error!("Failed to snapshot the main db file: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } // Restoration process only leaves the local WAL file if it was // detected to be newer than its remote counterpart. if let Err(e) = replicator.maybe_replicate_wal().await { tracing::error!("Failed to replicate local WAL: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } } Ok(replicator::RestoreAction::ReuseGeneration(gen)) => { @@ -430,28 +430,28 @@ async fn try_restore(replicator: &mut replicator::Replicator) -> i32 { } Err(e) => { tracing::error!("Failed to restore the database: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } } - ffi::SQLITE_OK + ffi::SQLITE_OK as i32 } pub extern "C" fn xPreMainDbOpen(_methods: *mut libsql_wal_methods, path: *const c_char) -> i32 { if is_local() { tracing::info!("Running in local-mode only, without any replication"); - return ffi::SQLITE_OK; + return ffi::SQLITE_OK as i32; } if path.is_null() { - return ffi::SQLITE_OK; + return ffi::SQLITE_OK as i32; } let path = unsafe { match std::ffi::CStr::from_ptr(path).to_str() { Ok(path) => path, Err(e) => { tracing::error!("Failed to parse the main database path: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } } }; @@ -464,7 +464,7 @@ pub extern "C" fn xPreMainDbOpen(_methods: *mut libsql_wal_methods, path: *const Ok(runtime) => runtime, Err(e) => { tracing::error!("Failed to initialize async runtime: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } }; @@ -472,7 +472,7 @@ pub extern "C" fn xPreMainDbOpen(_methods: *mut libsql_wal_methods, path: *const Ok(options) => options, Err(e) => { tracing::error!("Failed to parse replicator options: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } }; let replicator = block_on!(runtime, replicator::Replicator::with_options(path, options)); @@ -480,7 +480,7 @@ pub extern "C" fn xPreMainDbOpen(_methods: *mut libsql_wal_methods, path: *const Ok(repl) => repl, Err(e) => { tracing::error!("Failed to initialize replicator: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } }; block_on!(runtime, try_restore(&mut replicator)) @@ -561,7 +561,7 @@ pub mod static_init { if orig_methods.is_null() {} let methods = crate::bottomless_methods(orig_methods); let rc = unsafe { libsql_wal_methods_register(methods) }; - if rc != crate::ffi::SQLITE_OK { + if rc != crate::ffi::SQLITE_OK as i32 { let _box = unsafe { Box::from_raw(methods as *mut libsql_wal_methods) }; tracing::warn!("Failed to instantiate bottomless WAL methods"); } diff --git a/sqld-libsql-bindings/Cargo.toml b/sqld-libsql-bindings/Cargo.toml index a278ba64..9687f86f 100644 --- a/sqld-libsql-bindings/Cargo.toml +++ b/sqld-libsql-bindings/Cargo.toml @@ -7,9 +7,9 @@ edition = "2021" [dependencies] anyhow = "1.0.66" -rusqlite = { workspace = true } tracing = "0.1.37" once_cell = "1.17.1" +libsql = { workspace = true } [features] unix-excl-vfs = [] diff --git a/sqld-libsql-bindings/src/ffi/mod.rs b/sqld-libsql-bindings/src/ffi/mod.rs index 222bb2ad..b353806d 100644 --- a/sqld-libsql-bindings/src/ffi/mod.rs +++ b/sqld-libsql-bindings/src/ffi/mod.rs @@ -2,16 +2,9 @@ pub mod types; -pub use rusqlite::ffi::{ - libsql_wal_methods, libsql_wal_methods_find, libsql_wal_methods_register, - libsql_wal_methods_unregister, sqlite3, sqlite3_file, sqlite3_hard_heap_limit64, - sqlite3_io_methods, sqlite3_soft_heap_limit64, sqlite3_vfs, WalIndexHdr, SQLITE_CANTOPEN, - SQLITE_CHECKPOINT_FULL, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR_WRITE, SQLITE_OK, -}; - -pub use rusqlite::ffi::libsql_pghdr as PgHdr; -pub use rusqlite::ffi::libsql_wal as Wal; -pub use rusqlite::ffi::*; +pub use libsql::ffi::libsql_pghdr as PgHdr; +pub use libsql::ffi::libsql_wal as Wal; +pub use libsql::ffi::*; pub struct PageHdrIter { current_ptr: *const PgHdr, diff --git a/sqld-libsql-bindings/src/ffi/types.rs b/sqld-libsql-bindings/src/ffi/types.rs index c3757745..42fecca7 100644 --- a/sqld-libsql-bindings/src/ffi/types.rs +++ b/sqld-libsql-bindings/src/ffi/types.rs @@ -2,7 +2,7 @@ use std::ffi::{c_char, c_int, c_uint, c_void}; use super::{libsql_wal_methods, sqlite3_file, sqlite3_vfs, PgHdr, Wal}; -use rusqlite::ffi::sqlite3; +use libsql::ffi::sqlite3; // WAL methods pub type XWalLimitFn = extern "C" fn(wal: *mut Wal, limit: i64); @@ -18,7 +18,7 @@ pub type XWalSavepointFn = extern "C" fn(wal: *mut Wal, wal_data: *mut u32); pub type XWalSavePointUndoFn = unsafe extern "C" fn(wal: *mut Wal, wal_data: *mut u32) -> c_int; pub type XWalCheckpointFn = unsafe extern "C" fn( wal: *mut Wal, - db: *mut rusqlite::ffi::sqlite3, + db: *mut libsql::ffi::sqlite3, emode: c_int, busy_handler: Option c_int>, busy_arg: *mut c_void, @@ -32,7 +32,7 @@ pub type XWalCallbackFn = extern "C" fn(wal: *mut Wal) -> c_int; pub type XWalExclusiveModeFn = extern "C" fn(wal: *mut Wal, op: c_int) -> c_int; pub type XWalHeapMemoryFn = extern "C" fn(wal: *mut Wal) -> c_int; pub type XWalFileFn = extern "C" fn(wal: *mut Wal) -> *mut sqlite3_file; -pub type XWalDbFn = extern "C" fn(wal: *mut Wal, db: *mut rusqlite::ffi::sqlite3); +pub type XWalDbFn = extern "C" fn(wal: *mut Wal, db: *mut libsql::ffi::sqlite3); pub type XWalPathNameLenFn = extern "C" fn(orig_len: c_int) -> c_int; pub type XWalGetPathNameFn = extern "C" fn(buf: *mut c_char, orig: *const c_char, orig_len: c_int); pub type XWalPreMainDbOpen = diff --git a/sqld-libsql-bindings/src/lib.rs b/sqld-libsql-bindings/src/lib.rs index 4777f55c..b02f2237 100644 --- a/sqld-libsql-bindings/src/lib.rs +++ b/sqld-libsql-bindings/src/lib.rs @@ -3,11 +3,13 @@ pub mod ffi; pub mod wal_hook; -use std::{ffi::CString, marker::PhantomData, ops::Deref, time::Duration}; +use std::{ffi::CString, marker::PhantomData, ops::Deref}; pub use crate::wal_hook::WalMethodsHook; pub use once_cell::sync::Lazy; +pub use libsql; + use self::{ ffi::{libsql_wal_methods, libsql_wal_methods_find}, wal_hook::WalHook, @@ -23,41 +25,25 @@ pub fn get_orig_wal_methods() -> anyhow::Result<*mut libsql_wal_methods> { } pub struct Connection<'a> { - conn: rusqlite::Connection, + conn: libsql::Connection, _pth: PhantomData<&'a mut ()>, } impl Deref for Connection<'_> { - type Target = rusqlite::Connection; + type Target = libsql::Connection; fn deref(&self) -> &Self::Target { &self.conn } } -impl Drop for Connection<'_> { - fn drop(&mut self) { - unsafe { - let db = self.conn.handle(); - if db.is_null() { - return; - } - let mut stmt = ffi::sqlite3_next_stmt(db, std::ptr::null_mut()); - while !stmt.is_null() { - let rc = ffi::sqlite3_finalize(stmt); - if rc != ffi::SQLITE_OK { - tracing::error!("Failed to finalize a dangling statement: {rc}") - } - stmt = ffi::sqlite3_next_stmt(db, stmt); - } - } - } -} - impl<'a> Connection<'a> { /// returns a dummy, in-memory connection. For testing purposes only pub fn test(_: &mut ()) -> Self { - let conn = rusqlite::Connection::open_in_memory().unwrap(); + let conn = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); Self { conn, _pth: PhantomData, @@ -67,12 +53,12 @@ impl<'a> Connection<'a> { /// Opens a database with the regular wal methods in the directory pointed to by path pub fn open( path: impl AsRef, - flags: rusqlite::OpenFlags, + flags: std::ffi::c_int, // we technically _only_ need to know about W, but requiring a static ref to the wal_hook ensures that // it has been instanciated and lives for long enough _wal_hook: &'static WalMethodsHook, hook_ctx: &'a mut W::Context, - ) -> Result { + ) -> Result { let path = path.as_ref().join("data"); tracing::trace!( "Opening a connection with regular WAL at {}", @@ -81,34 +67,31 @@ impl<'a> Connection<'a> { let conn_str = format!("file:{}?_journal_mode=WAL", path.display()); let filename = CString::new(conn_str).unwrap(); - let mut db: *mut rusqlite::ffi::sqlite3 = std::ptr::null_mut(); + let mut db: *mut ffi::sqlite3 = std::ptr::null_mut(); unsafe { // We pass a pointer to the WAL methods data to the database connection. This means // that the reference must outlive the connection. This is guaranteed by the marker in // the returned connection. - let rc = rusqlite::ffi::libsql_open_v2( + let rc = ffi::libsql_open_v2( filename.as_ptr(), &mut db as *mut _, - flags.bits(), + flags, std::ptr::null_mut(), W::name().as_ptr(), hook_ctx as *mut _ as *mut _, ); if rc != 0 { - rusqlite::ffi::sqlite3_close(db); - return Err(rusqlite::Error::SqliteFailure( - rusqlite::ffi::Error::new(rc), - None, - )); + ffi::sqlite3_close(db); + return Err(rc); } assert!(!db.is_null()); + ffi::sqlite3_busy_timeout(db, 5000); }; - let conn = unsafe { rusqlite::Connection::from_handle_owned(db)? }; - conn.busy_timeout(Duration::from_millis(5000))?; + let conn = libsql::Connection::from_handle(db); Ok(Connection { conn, diff --git a/sqld-libsql-bindings/src/wal_hook.rs b/sqld-libsql-bindings/src/wal_hook.rs index 7f09ad31..9c54aac1 100644 --- a/sqld-libsql-bindings/src/wal_hook.rs +++ b/sqld-libsql-bindings/src/wal_hook.rs @@ -227,7 +227,7 @@ fn get_methods(wal: &mut Wal) -> &mut WalMethodsHook { #[allow(non_snake_case)] pub extern "C" fn xClose( wal: *mut Wal, - db: *mut rusqlite::ffi::sqlite3, + db: *mut libsql::ffi::sqlite3, sync_flags: i32, n_buf: c_int, z_buf: *mut u8, @@ -343,7 +343,7 @@ pub extern "C" fn xFrames( #[allow(non_snake_case)] pub extern "C" fn xCheckpoint( wal: *mut Wal, - db: *mut rusqlite::ffi::sqlite3, + db: *mut libsql::ffi::sqlite3, emode: c_int, busy_handler: Option c_int>, busy_arg: *mut c_void, @@ -395,7 +395,7 @@ pub extern "C" fn xFile(wal: *mut Wal) -> *mut sqlite3_file { } #[allow(non_snake_case)] -pub extern "C" fn xDb(wal: *mut Wal, db: *mut rusqlite::ffi::sqlite3) { +pub extern "C" fn xDb(wal: *mut Wal, db: *mut libsql::ffi::sqlite3) { let orig_methods = unsafe { get_orig_methods::(&mut *wal) }; unsafe { (orig_methods.xDb.unwrap())(wal, db) } } diff --git a/sqld/Cargo.toml b/sqld/Cargo.toml index ef7255cb..d15fb876 100644 --- a/sqld/Cargo.toml +++ b/sqld/Cargo.toml @@ -27,6 +27,7 @@ hyper = { version = "0.14.23", features = ["http2"] } hyper-tungstenite = "0.10" itertools = "0.10.5" jsonwebtoken = "8.2.0" +libsql = { workspace = true } memmap = "0.7.0" mimalloc = { version = "0.1.36", default-features = false } nix = { version = "0.26.2", features = ["fs"] } @@ -37,7 +38,6 @@ prost = "0.11.3" rand = "0.8" regex = "1.7.0" reqwest = { version = "0.11.16", features = ["json", "rustls-tls"], default-features = false } -rusqlite = { workspace = true } serde = { version = "1.0.149", features = ["derive", "rc"] } serde_json = { version = "1.0.91", features = ["preserve_order"] } sha2 = "0.10" @@ -76,4 +76,4 @@ vergen = { version = "8", features = ["build", "git", "gitcl"] } [features] unix-excl-vfs = ["sqld-libsql-bindings/unix-excl-vfs"] -debug-tools = ["console-subscriber", "rusqlite/trace", "tokio/tracing"] +debug-tools = ["console-subscriber", "tokio/tracing"] diff --git a/sqld/src/database/dump/exporter.rs b/sqld/src/database/dump/exporter.rs index e2a4b69f..195e09ef 100644 --- a/sqld/src/database/dump/exporter.rs +++ b/sqld/src/database/dump/exporter.rs @@ -3,9 +3,8 @@ use std::ffi::CString; use std::fmt::{Display, Write as _}; use std::io::Write; -use anyhow::bail; -use rusqlite::types::ValueRef; -use rusqlite::OptionalExtension; +use anyhow::{bail, Context}; +use libsql::params::{Params, ValueRef}; struct DumpState { /// true if db is in writable_schema mode @@ -13,16 +12,16 @@ struct DumpState { writer: W, } -use rusqlite::ffi::{sqlite3_keyword_check, sqlite3_table_column_metadata, SQLITE_OK}; +use sqld_libsql_bindings::ffi::{sqlite3_keyword_check, sqlite3_table_column_metadata, SQLITE_OK}; impl DumpState { fn run_schema_dump_query( &mut self, - txn: &rusqlite::Connection, + txn: &libsql::Connection, stmt: &str, ) -> anyhow::Result<()> { - let mut stmt = txn.prepare(stmt)?; - let mut rows = stmt.query(())?; + let stmt = txn.prepare(stmt)?; + let rows = stmt.execute(&Params::None).context("Empty response")?; while let Some(row) = rows.next()? { let ValueRef::Text(table) = row.get_ref(0)? else { bail!("invalid schema table") }; let ValueRef::Text(ty) = row.get_ref(1)? else { bail!("invalid schema table") }; @@ -92,8 +91,8 @@ impl DumpState { write!(&mut select, " FROM {}", Quoted(table_str))?; - let mut stmt = txn.prepare(&select)?; - let mut rows = stmt.query(())?; + let stmt = txn.prepare(&select)?; + let rows = stmt.execute(&Params::None).context("Empty response")?; while let Some(row) = rows.next()? { write!(self.writer, "{insert}")?; if row_id_col.is_some() { @@ -105,7 +104,7 @@ impl DumpState { if i != 0 || row_id_col.is_some() { write!(self.writer, ",")?; } - write_value_ref(&mut self.writer, row.get_ref(i)?)?; + write_value_ref(&mut self.writer, row.get_ref(i as i32)?)?; } writeln!(self.writer, ");")?; } @@ -115,15 +114,15 @@ impl DumpState { Ok(()) } - fn run_table_dump_query(&mut self, txn: &rusqlite::Connection, q: &str) -> anyhow::Result<()> { - let mut stmt = txn.prepare(q)?; + fn run_table_dump_query(&mut self, txn: &libsql::Connection, q: &str) -> anyhow::Result<()> { + let stmt = txn.prepare(q)?; let col_count = stmt.column_count(); - let mut rows = stmt.query(())?; + let rows = stmt.execute(&Params::None).context("Empty response")?; while let Some(row) = rows.next()? { let ValueRef::Text(sql) = row.get_ref(0)? else { bail!("the first row in a table dump query should be of type text") }; self.writer.write_all(sql)?; for i in 1..col_count { - let ValueRef::Text(s) = row.get_ref(i)? else { bail!("row {i} in table dump query should be of type text") }; + let ValueRef::Text(s) = row.get_ref(i as i32)? else { bail!("row {i} in table dump query should be of type text") }; let s = std::str::from_utf8(s)?; write!(self.writer, ",{s}")?; } @@ -134,7 +133,7 @@ impl DumpState { fn list_table_columns( &self, - txn: &rusqlite::Connection, + txn: &libsql::Connection, table: &str, ) -> anyhow::Result<(Option, Vec)> { let mut cols = Vec::new(); @@ -143,18 +142,18 @@ impl DumpState { let mut preserve_row_id = false; let mut row_id_col = None; - txn.pragma(None, "table_info", table, |row| { - let name: String = row.get_unwrap(1); - cols.push(name); - // this is a primary key col - if row.get_unwrap::<_, usize>(5) != 0 { - num_primary_keys += 1; - is_integer_primary_key = num_primary_keys == 1 - && matches!(row.get_ref_unwrap(2), ValueRef::Text(b"INTEGER")); + if let Some(results) = txn.execute("pragma table_info", ())? { + while let Ok(Some(row)) = results.next() { + let name: String = row.get(1).unwrap(); + cols.push(name); + // this is a primary key col + if row.get::(5).unwrap() != 0 { + num_primary_keys += 1; + is_integer_primary_key = num_primary_keys == 1 + && matches!(row.get_ref(2).unwrap(), ValueRef::Text(b"INTEGER")); + } } - - Ok(()) - })?; + } // from sqlite: // > The decision of whether or not a rowid really needs to be preserved @@ -171,16 +170,12 @@ impl DumpState { // > there is a "pk" entry in "PRAGMA index_list". There will be // > no "pk" index if the PRIMARY KEY really is an alias for the ROWID. - txn.query_row( - "SELECT 1 FROM pragma_index_list(?) WHERE origin='pk'", - [table], - |_| { - // re-set preserve_row_id iif there is a row - preserve_row_id = true; - Ok(()) - }, - ) - .optional()?; + if let Ok(Some(rows)) = txn.execute( + "SELECT 1 FROM pragma_index_list(?) WHERE origin='pk'", + Params::Positional(vec![table.into()]), + ) { + preserve_row_id = rows.next()?.is_none(); + } } if preserve_row_id { @@ -206,7 +201,7 @@ impl DumpState { ) }; - if rc == SQLITE_OK { + if rc == SQLITE_OK as i32 { row_id_col = Some(row_id_name.to_owned()); break; } @@ -420,10 +415,11 @@ fn find_unused_str(haystack: &str, needle1: &str, needle2: &str) -> String { } } -pub fn export_dump(mut db: rusqlite::Connection, writer: impl Write) -> anyhow::Result<()> { - let mut txn = db.transaction()?; - txn.execute("PRAGMA writable_schema=ON", ())?; - let savepoint = txn.savepoint_with_name("dump")?; +pub fn export_dump(mut db: libsql::Connection, writer: impl Write) -> anyhow::Result<()> { + db.execute("BEGIN", ())?; + db.execute("PRAGMA writable_schema=ON", ())?; + // FIXME: savepoint logic is lost during the out-of-rusqlite migration, we need to restore it + db.execute("SAVEPOINT dump", ())?; let mut state = DumpState { writable_schema: false, writer, @@ -441,12 +437,12 @@ pub fn export_dump(mut db: rusqlite::Connection, writer: impl Write) -> anyhow:: WHERE type=='table' AND sql NOT NULL ORDER BY tbl_name='sqlite_sequence', rowid"; - state.run_schema_dump_query(&savepoint, q)?; + state.run_schema_dump_query(&mut db, q)?; let q = "SELECT sql FROM sqlite_schema AS o WHERE sql NOT NULL AND type IN ('index','trigger','view')"; - state.run_table_dump_query(&savepoint, q)?; + state.run_table_dump_query(&mut db, q)?; if state.writable_schema { writeln!(state.writer, "PRAGMA writable_schema=OFF;")?; @@ -454,8 +450,8 @@ AND type IN ('index','trigger','view')"; writeln!(state.writer, "COMMIT;")?; - let _ = savepoint.execute("PRAGMA writable_schema = OFF;", ()); - let _ = savepoint.finish(); + db.execute("PRAGMA writable_schema = OFF;", ())?; + db.execute("COMMIT", ())?; Ok(()) } diff --git a/sqld/src/database/dump/loader.rs b/sqld/src/database/dump/loader.rs index 87592483..653fc823 100644 --- a/sqld/src/database/dump/loader.rs +++ b/sqld/src/database/dump/loader.rs @@ -5,14 +5,13 @@ use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; -use rusqlite::ErrorCode; use tokio::sync::{mpsc, oneshot}; use crate::database::libsql::open_db; use crate::replication::primary::logger::{ReplicationLoggerHookCtx, REPLICATION_METHODS}; use crate::replication::ReplicationLogger; -type OpMsg = Box; +type OpMsg = Box; #[derive(Debug)] pub struct DumpLoader { @@ -25,6 +24,7 @@ impl DumpLoader { logger: Arc, bottomless_replicator: Option>>, ) -> anyhow::Result { + const BUSY: i32 = sqld_libsql_bindings::ffi::SQLITE_BUSY as i32; let (sender, mut receiver) = mpsc::channel::(1); let (ok_snd, ok_rcv) = oneshot::channel::>(); @@ -43,13 +43,7 @@ impl DumpLoader { // Creating the loader database can, in rare occurences, return sqlite busy, // because of a race condition opening the monitor thread db. This is there to // retry a bunch of times if that happens. - Err(rusqlite::Error::SqliteFailure( - rusqlite::ffi::Error { - code: ErrorCode::DatabaseBusy, - .. - }, - _, - )) if retries < 10 => { + Err(libsql::Error::LibError(BUSY)) if retries < 10 => { retries += 1; std::thread::sleep(Duration::from_millis(100)); } @@ -93,7 +87,7 @@ impl DumpLoader { const WASM_TABLE_CREATE: &str = "CREATE TABLE libsql_wasm_func_table (name text PRIMARY KEY, body text) WITHOUT ROWID;"; -fn perform_load_dump(conn: &rusqlite::Connection, path: PathBuf) -> anyhow::Result<()> { +fn perform_load_dump(conn: &libsql::Connection, path: PathBuf) -> anyhow::Result<()> { let mut f = BufReader::new(File::open(path)?); let mut curr = String::new(); let mut line = String::new(); diff --git a/sqld/src/database/libsql.rs b/sqld/src/database/libsql.rs index 02a9af53..09ccefb5 100644 --- a/sqld/src/database/libsql.rs +++ b/sqld/src/database/libsql.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use crossbeam::channel::RecvTimeoutError; -use rusqlite::{ErrorCode, OpenFlags, StatementStatus}; use sqld_libsql_bindings::wal_hook::WalMethodsHook; use tokio::sync::oneshot; use tracing::warn; @@ -78,18 +77,11 @@ where async fn try_create_db(&self) -> Result { // try 100 times to acquire initial db connection. let mut retries = 0; + const BUSY: i32 = sqld_libsql_bindings::ffi::SQLITE_BUSY as std::ffi::c_int; loop { match self.create_database().await { Ok(conn) => return Ok(conn), - Err( - err @ Error::RusqliteError(rusqlite::Error::SqliteFailure( - rusqlite::ffi::Error { - code: ErrorCode::DatabaseBusy, - .. - }, - _, - )), - ) => { + Err(err @ Error::LibSqlError(libsql::Error::LibError(BUSY))) => { if retries < 100 { tracing::warn!("Database file is busy, retrying..."); retries += 1; @@ -141,19 +133,20 @@ pub fn open_db<'a, W>( path: &Path, wal_methods: &'static WalMethodsHook, hook_ctx: &'a mut W::Context, - flags: Option, -) -> Result, rusqlite::Error> + flags: Option, +) -> Result, libsql::Error> where W: WalHook, { let flags = flags.unwrap_or( - OpenFlags::SQLITE_OPEN_READ_WRITE - | OpenFlags::SQLITE_OPEN_CREATE - | OpenFlags::SQLITE_OPEN_URI - | OpenFlags::SQLITE_OPEN_NO_MUTEX, + (sqld_libsql_bindings::ffi::SQLITE_OPEN_READWRITE + | sqld_libsql_bindings::ffi::SQLITE_OPEN_CREATE + | sqld_libsql_bindings::ffi::SQLITE_OPEN_URI + | sqld_libsql_bindings::ffi::SQLITE_OPEN_NOMUTEX) as i32, ); sqld_libsql_bindings::Connection::open(path, flags, wal_methods, hook_ctx) + .map_err(|rc| libsql::Error::LibError(rc)) } impl LibSqlDb { @@ -261,14 +254,21 @@ impl<'a> Connection<'a> { }; for ext in extensions { - unsafe { - let _guard = rusqlite::LoadExtensionGuard::new(&this.conn).unwrap(); - if let Err(e) = this.conn.load_extension(&ext, None) { - tracing::error!("failed to load extension: {}", ext.display()); - Err(e)?; - } - tracing::debug!("Loaded extension {}", ext.display()); + let rc = unsafe { + // FIXME: gather the error message from the 4th param and print/return it + // if applicable. + libsql::ffi::sqlite3_load_extension( + this.conn.handle(), + ext.to_str().unwrap().as_ptr() as *const _, + std::ptr::null(), + std::ptr::null_mut(), + ) + }; + if rc != libsql::ffi::SQLITE_OK as i32 { + tracing::error!("failed to load extension: {}", ext.display()); + Err(libsql::Error::LibError(rc))?; } + tracing::debug!("Loaded extension {}", ext.display()); } Ok(this) @@ -354,26 +354,28 @@ impl<'a> Connection<'a> { let cols = stmt.columns(); let cols_count = cols.len(); - builder.cols_description(cols.iter())?; - drop(cols); + builder.cols_description(cols.into_iter())?; query .params .bind(&mut stmt) .map_err(Error::LibSqlInvalidQueryParams)?; - let mut qresult = stmt.raw_query(); - builder.begin_rows()?; - while let Some(row) = qresult.next()? { - builder.begin_row()?; - for i in 0..cols_count { - let val = row.get_ref(i)?; - builder.add_row_value(val)?; + // FIXME: in current libsql implementation, the error will only be returned + // upon first call to `next()`. Let's reconsider? That's not very intuitive. + if let Some(qresult) = stmt.execute(&libsql::Params::None) { + builder.begin_rows()?; + while let Some(row) = qresult.next()? { + builder.begin_row()?; + for i in 0..cols_count { + let val = row.get_ref(i as i32)?; + builder.add_row_value(val)?; + } + builder.finish_row()?; } - builder.finish_row()?; - } - builder.finish_rows()?; + builder.finish_rows()?; + } // sqlite3_changes() is only modified for INSERT, UPDATE or DELETE; it is not reset for SELECT, // but we want to return 0 in that case. @@ -389,8 +391,6 @@ impl<'a> Connection<'a> { false => None, }; - drop(qresult); - self.update_stats(&stmt); Ok((affected_row_count, last_insert_rowid)) @@ -400,9 +400,10 @@ impl<'a> Connection<'a> { let _ = self.conn.execute("ROLLBACK", ()); } - fn update_stats(&self, stmt: &rusqlite::Statement) { - let rows_read = stmt.get_status(StatementStatus::RowsRead); - let rows_written = stmt.get_status(StatementStatus::RowsWritten); + fn update_stats(&self, stmt: &libsql::Statement) { + use sqld_libsql_bindings::ffi; + let rows_read = stmt.get_status(ffi::LIBSQL_STMTSTATUS_ROWS_READ as i32); + let rows_written = stmt.get_status(ffi::LIBSQL_STMTSTATUS_ROWS_WRITTEN as i32); let rows_read = if rows_read == 0 && rows_written == 0 { 1 } else { @@ -417,7 +418,7 @@ impl<'a> Connection<'a> { let params = (1..=stmt.parameter_count()) .map(|param_i| { - let name = stmt.parameter_name(param_i).map(|n| n.into()); + let name = stmt.parameter_name(param_i as i32).map(|n| n.into()); DescribeParam { name } }) .collect(); diff --git a/sqld/src/database/write_proxy.rs b/sqld/src/database/write_proxy.rs index d31ea6ba..c34da35f 100644 --- a/sqld/src/database/write_proxy.rs +++ b/sqld/src/database/write_proxy.rs @@ -2,7 +2,6 @@ use std::path::PathBuf; use std::sync::Arc; use parking_lot::Mutex as PMutex; -use rusqlite::types::ValueRef; use sqld_libsql_bindings::wal_hook::TRANSPARENT_METHODS; use tokio::sync::{watch, Mutex}; use tonic::transport::Channel; @@ -25,6 +24,7 @@ use crate::Result; use super::config::DatabaseConfigStore; use super::Program; use super::{factory::DbFactory, libsql::LibSqlDb, Database, DescribeResult}; +use libsql::params::ValueRef; #[derive(Clone)] pub struct WriteProxyDbFactory { @@ -108,7 +108,7 @@ fn execute_results_to_builder( builder.begin_step()?; builder.cols_description(rows.column_descriptions.iter().map(|c| Column { name: &c.name, - decl_ty: c.decltype.as_deref(), + decl_type: c.decltype.as_deref(), }))?; builder.begin_rows()?; diff --git a/sqld/src/error.rs b/sqld/src/error.rs index f34a3ff3..b36e9d8b 100644 --- a/sqld/src/error.rs +++ b/sqld/src/error.rs @@ -12,7 +12,7 @@ pub enum Error { #[error(transparent)] IOError(#[from] std::io::Error), #[error(transparent)] - RusqliteError(#[from] rusqlite::Error), + LibSqlError(#[from] libsql::Error), #[error("Failed to execute query via RPC. Error code: {}, message: {}", .0.code, .0.message)] RpcQueryError(crate::rpc::proxy::rpc::Error), #[error("Failed to execute queries via RPC protocol: `{0}`")] diff --git a/sqld/src/hrana/result_builder.rs b/sqld/src/hrana/result_builder.rs index 1cd6aaa1..5a23b677 100644 --- a/sqld/src/hrana/result_builder.rs +++ b/sqld/src/hrana/result_builder.rs @@ -2,7 +2,7 @@ use std::fmt::{self, Write as _}; use std::io; use bytes::Bytes; -use rusqlite::types::ValueRef; +use libsql::params::ValueRef; use crate::hrana::stmt::{proto_error_from_stmt_error, stmt_error_from_sqld_error}; use crate::query_result_builder::{ @@ -115,7 +115,7 @@ impl QueryResultBuilder for SingleStatementBuilder { cols_size += estimate_cols_json_size(&c); proto::Col { name: Some(c.name.to_owned()), - decltype: c.decl_ty.map(ToString::to_string), + decltype: c.decl_type.map(ToString::to_string), } })); @@ -207,7 +207,7 @@ fn estimate_cols_json_size(c: &Column) -> u64 { &mut f, r#"{{"name":"{}","decltype":"{}"}}"#, c.name, - c.decl_ty.unwrap_or("null") + c.decl_type.unwrap_or("null") ) .unwrap(); f.0 diff --git a/sqld/src/hrana/stmt.rs b/sqld/src/hrana/stmt.rs index 88bccb8e..ea1b0e49 100644 --- a/sqld/src/hrana/stmt.rs +++ b/sqld/src/hrana/stmt.rs @@ -1,4 +1,5 @@ use anyhow::{anyhow, bail, Result}; +use sqld_libsql_bindings::ffi; use std::collections::HashMap; use super::result_builder::SingleStatementBuilder; @@ -31,16 +32,9 @@ pub enum StmtError { TransactionBusy, #[error("SQLite error: {message}")] SqliteError { - source: rusqlite::ffi::Error, + source: libsql::Error, message: String, }, - #[error("SQL input error: {message} (at offset {offset})")] - SqlInputError { - source: rusqlite::ffi::Error, - message: String, - offset: i32, - }, - #[error("Operation was blocked{}", .reason.as_ref().map(|msg| format!(": {}", msg)).unwrap_or_default())] Blocked { reason: Option }, #[error("Response is too large")] @@ -202,26 +196,9 @@ pub fn stmt_error_from_sqld_error(sqld_error: SqldError) -> Result StmtError::Blocked { reason }, - SqldError::RusqliteError(rusqlite_error) => match rusqlite_error { - rusqlite::Error::SqliteFailure(sqlite_error, Some(message)) => StmtError::SqliteError { - source: sqlite_error, - message, - }, - rusqlite::Error::SqliteFailure(sqlite_error, None) => StmtError::SqliteError { - message: sqlite_error.to_string(), - source: sqlite_error, - }, - rusqlite::Error::SqlInputError { - error: sqlite_error, - msg: message, - offset, - .. - } => StmtError::SqlInputError { - source: sqlite_error, - message, - offset, - }, - rusqlite_error => return Err(SqldError::RusqliteError(rusqlite_error)), + SqldError::LibSqlError(libsql::Error::LibError(rc)) => StmtError::SqliteError { + message: libsql::errors::error_from_code(rc), + source: libsql::Error::LibError(rc), }, sqld_error => return Err(sqld_error), }) @@ -244,40 +221,41 @@ impl StmtError { Self::ArgsBothPositionalAndNamed => "ARGS_BOTH_POSITIONAL_AND_NAMED", Self::TransactionTimeout => "TRANSACTION_TIMEOUT", Self::TransactionBusy => "TRANSACTION_BUSY", - Self::SqliteError { source, .. } => sqlite_error_code(source.code), - Self::SqlInputError { .. } => "SQL_INPUT_ERROR", + Self::SqliteError { source, .. } => sqlite_error_code(source), Self::Blocked { .. } => "BLOCKED", Self::ResponseTooLarge => "RESPONSE_TOO_LARGE", } } } -fn sqlite_error_code(code: rusqlite::ffi::ErrorCode) -> &'static str { - match code { - rusqlite::ErrorCode::InternalMalfunction => "SQLITE_INTERNAL", - rusqlite::ErrorCode::PermissionDenied => "SQLITE_PERM", - rusqlite::ErrorCode::OperationAborted => "SQLITE_ABORT", - rusqlite::ErrorCode::DatabaseBusy => "SQLITE_BUSY", - rusqlite::ErrorCode::DatabaseLocked => "SQLITE_LOCKED", - rusqlite::ErrorCode::OutOfMemory => "SQLITE_NOMEM", - rusqlite::ErrorCode::ReadOnly => "SQLITE_READONLY", - rusqlite::ErrorCode::OperationInterrupted => "SQLITE_INTERRUPT", - rusqlite::ErrorCode::SystemIoFailure => "SQLITE_IOERR", - rusqlite::ErrorCode::DatabaseCorrupt => "SQLITE_CORRUPT", - rusqlite::ErrorCode::NotFound => "SQLITE_NOTFOUND", - rusqlite::ErrorCode::DiskFull => "SQLITE_FULL", - rusqlite::ErrorCode::CannotOpen => "SQLITE_CANTOPEN", - rusqlite::ErrorCode::FileLockingProtocolFailed => "SQLITE_PROTOCOL", - rusqlite::ErrorCode::SchemaChanged => "SQLITE_SCHEMA", - rusqlite::ErrorCode::TooBig => "SQLITE_TOOBIG", - rusqlite::ErrorCode::ConstraintViolation => "SQLITE_CONSTRAINT", - rusqlite::ErrorCode::TypeMismatch => "SQLITE_MISMATCH", - rusqlite::ErrorCode::ApiMisuse => "SQLITE_MISUSE", - rusqlite::ErrorCode::NoLargeFileSupport => "SQLITE_NOLFS", - rusqlite::ErrorCode::AuthorizationForStatementDenied => "SQLITE_AUTH", - rusqlite::ErrorCode::ParameterOutOfRange => "SQLITE_RANGE", - rusqlite::ErrorCode::NotADatabase => "SQLITE_NOTADB", - rusqlite::ErrorCode::Unknown => "SQLITE_UNKNOWN", +fn sqlite_error_code(err: &libsql::Error) -> &'static str { + match err { + libsql::Error::LibError(code) => match (*code) as u32 { + ffi::SQLITE_INTERNAL => "SQLITE_INTERNAL", + ffi::SQLITE_PERM => "SQLITE_PERM", + ffi::SQLITE_ABORT => "SQLITE_ABORT", + ffi::SQLITE_BUSY => "SQLITE_BUSY", + ffi::SQLITE_LOCKED => "SQLITE_LOCKED", + ffi::SQLITE_NOMEM => "SQLITE_NOMEM", + ffi::SQLITE_READONLY => "SQLITE_READONLY", + ffi::SQLITE_INTERRUPT => "SQLITE_INTERRUPT", + ffi::SQLITE_IOERR => "SQLITE_IOERR", + ffi::SQLITE_CORRUPT => "SQLITE_CORRUPT", + ffi::SQLITE_NOTFOUND => "SQLITE_NOTFOUND", + ffi::SQLITE_FULL => "SQLITE_FULL", + ffi::SQLITE_CANTOPEN => "SQLITE_CANTOPEN", + ffi::SQLITE_PROTOCOL => "SQLITE_PROTOCOL", + ffi::SQLITE_SCHEMA => "SQLITE_SCHEMA", + ffi::SQLITE_TOOBIG => "SQLITE_TOOBIG", + ffi::SQLITE_CONSTRAINT => "SQLITE_CONSTRAINT", + ffi::SQLITE_MISMATCH => "SQLITE_MISMATCH", + ffi::SQLITE_MISUSE => "SQLITE_MISUSE", + ffi::SQLITE_NOLFS => "SQLITE_NOLFS", + ffi::SQLITE_AUTH => "SQLITE_AUTH", + ffi::SQLITE_RANGE => "SQLITE_RANGE", + ffi::SQLITE_NOTADB => "SQLITE_NOTADB", + _ => "SQLITE_UNKNOWN", + }, _ => "SQLITE_UNKNOWN", } } diff --git a/sqld/src/http/hrana_over_http_1.rs b/sqld/src/http/hrana_over_http_1.rs index edda8958..6138d57a 100644 --- a/sqld/src/http/hrana_over_http_1.rs +++ b/sqld/src/http/hrana_over_http_1.rs @@ -137,7 +137,6 @@ fn response_error_response(err: ResponseError) -> hyper::Response { | StmtError::SqlNoStmt | StmtError::SqlManyStmts | StmtError::ArgsInvalid { .. } - | StmtError::SqlInputError { .. } | StmtError::ResponseTooLarge | StmtError::Blocked { .. } => hyper::StatusCode::BAD_REQUEST, StmtError::ArgsBothPositionalAndNamed => hyper::StatusCode::NOT_IMPLEMENTED, diff --git a/sqld/src/http/result_builder.rs b/sqld/src/http/result_builder.rs index 1b00e8f8..3a40f17d 100644 --- a/sqld/src/http/result_builder.rs +++ b/sqld/src/http/result_builder.rs @@ -1,7 +1,7 @@ +use libsql::params::ValueRef; use std::io; use std::ops::{Deref, DerefMut}; -use rusqlite::types::ValueRef; use serde::{Serialize, Serializer}; use serde_json::ser::{CompactFormatter, Formatter}; @@ -70,7 +70,7 @@ impl io::Write for LimitBuffer { } } -struct HttpJsonValueSerializer<'a>(&'a ValueRef<'a>); +struct HttpJsonValueSerializer<'a>(&'a libsql::params::ValueRef<'a>); impl JsonHttpPayloadBuilder { pub fn new() -> Self { diff --git a/sqld/src/lib.rs b/sqld/src/lib.rs index a144e744..bf9e8a72 100644 --- a/sqld/src/lib.rs +++ b/sqld/src/lib.rs @@ -5,10 +5,10 @@ use std::sync::mpsc::RecvTimeoutError; use std::sync::Arc; use std::time::Duration; +use crate::libsql::wal_hook::TRANSPARENT_METHODS; use anyhow::Context as AnyhowContext; use enclose::enclose; use futures::never::Never; -use libsql::wal_hook::TRANSPARENT_METHODS; use once_cell::sync::Lazy; use rpc::run_rpc_server; use tokio::sync::{mpsc, Notify}; @@ -551,7 +551,7 @@ async fn run_storage_monitor(db_path: PathBuf, stats: Stats) -> anyhow::Result<( // initialize a connection here, and keep it alive for the entirety of the program. If we // fail to open it, we wait for `duration` and try again later. let ctx = &mut (); - let maybe_conn = match open_db(&db_path, &TRANSPARENT_METHODS, ctx, Some(rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY)) { + let maybe_conn = match open_db(&db_path, &TRANSPARENT_METHODS, ctx, Some(sqld_libsql_bindings::ffi::SQLITE_OPEN_READONLY as i32)) { Ok(conn) => Some(conn), Err(e) => { tracing::warn!("failed to open connection for storager monitor: {e}, trying again in {duration:?}"); @@ -561,12 +561,14 @@ async fn run_storage_monitor(db_path: PathBuf, stats: Stats) -> anyhow::Result<( loop { if let Some(ref conn) = maybe_conn { - if let Ok(storage_bytes_used) = - conn.query_row("select sum(pgsize) from dbstat;", [], |row| { - row.get::(0) - }) + if let Ok(Some(rows)) = + conn.execute("select sum(pgsize) from dbstat;", ()) { - stats.set_storage_bytes_used(storage_bytes_used); + if let Ok(Some(storage_bytes_used)) = rows.next() { + if let Ok(storage_bytes_used) = storage_bytes_used.get(0) { + stats.set_storage_bytes_used(storage_bytes_used); + } + } } } diff --git a/sqld/src/main.rs b/sqld/src/main.rs index f05d9b9e..1858244e 100644 --- a/sqld/src/main.rs +++ b/sqld/src/main.rs @@ -312,7 +312,7 @@ fn perform_dump(dump_path: Option<&Path>, db_path: &Path) -> anyhow::Result<()> } None => Box::new(stdout()), }; - let conn = rusqlite::Connection::open(db_path.join("data"))?; + let conn = libsql::Database::open(db_path.join("data").to_str().unwrap())?.connect()?; export_dump(conn, out)?; @@ -329,9 +329,11 @@ fn enable_libsql_logging() { tracing::error!("sqlite error {code}: {msg}"); } + /* FIXME: introduce tracing to libsql/libsql_sys ONCE.call_once(|| unsafe { rusqlite::trace::config_log(Some(libsql_log)).unwrap(); }); + */ } #[tokio::main] diff --git a/sqld/src/query.rs b/sqld/src/query.rs index 3d1939ac..82da7110 100644 --- a/sqld/src/query.rs +++ b/sqld/src/query.rs @@ -1,8 +1,6 @@ use std::collections::HashMap; use anyhow::{anyhow, ensure, Context}; -use rusqlite::types::{ToSqlOutput, ValueRef}; -use rusqlite::ToSql; use serde::{Deserialize, Serialize}; use crate::query_analysis::Statement; @@ -18,28 +16,28 @@ pub enum Value { Blob(Vec), } -impl<'a> From<&'a Value> for ValueRef<'a> { +impl<'a> From<&'a Value> for libsql::params::ValueRef<'a> { fn from(value: &'a Value) -> Self { match value { - Value::Null => ValueRef::Null, - Value::Integer(i) => ValueRef::Integer(*i), - Value::Real(x) => ValueRef::Real(*x), - Value::Text(s) => ValueRef::Text(s.as_bytes()), - Value::Blob(b) => ValueRef::Blob(b.as_slice()), + Value::Null => libsql::params::ValueRef::Null, + Value::Integer(i) => libsql::params::ValueRef::Integer(*i), + Value::Real(x) => libsql::params::ValueRef::Real(*x), + Value::Text(s) => libsql::params::ValueRef::Text(s.as_bytes()), + Value::Blob(b) => libsql::params::ValueRef::Blob(b), } } } -impl TryFrom> for Value { +impl TryFrom> for Value { type Error = anyhow::Error; - fn try_from(value: rusqlite::types::ValueRef<'_>) -> anyhow::Result { + fn try_from(value: libsql::params::ValueRef<'_>) -> anyhow::Result { let val = match value { - rusqlite::types::ValueRef::Null => Value::Null, - rusqlite::types::ValueRef::Integer(i) => Value::Integer(i), - rusqlite::types::ValueRef::Real(x) => Value::Real(x), - rusqlite::types::ValueRef::Text(s) => Value::Text(String::from_utf8(Vec::from(s))?), - rusqlite::types::ValueRef::Blob(b) => Value::Blob(Vec::from(b)), + libsql::params::ValueRef::Null => Value::Null, + libsql::params::ValueRef::Integer(i) => Value::Integer(i), + libsql::params::ValueRef::Real(x) => Value::Real(x), + libsql::params::ValueRef::Text(s) => Value::Text(String::from_utf8(Vec::from(s))?), + libsql::params::ValueRef::Blob(b) => Value::Blob(Vec::from(b)), }; Ok(val) @@ -53,20 +51,6 @@ pub struct Query { pub want_rows: bool, } -impl ToSql for Value { - fn to_sql(&self) -> rusqlite::Result> { - let val = match self { - Value::Null => ToSqlOutput::Owned(rusqlite::types::Value::Null), - Value::Integer(i) => ToSqlOutput::Owned(rusqlite::types::Value::Integer(*i)), - Value::Real(x) => ToSqlOutput::Owned(rusqlite::types::Value::Real(*x)), - Value::Text(s) => ToSqlOutput::Borrowed(rusqlite::types::ValueRef::Text(s.as_bytes())), - Value::Blob(b) => ToSqlOutput::Borrowed(rusqlite::types::ValueRef::Blob(b)), - }; - - Ok(val) - } -} - #[derive(Debug, Serialize, Clone)] pub enum Params { Named(HashMap), @@ -108,7 +92,7 @@ impl Params { } } - pub fn bind(&self, stmt: &mut rusqlite::Statement) -> anyhow::Result<()> { + pub fn bind(&self, stmt: &mut libsql::Statement) -> anyhow::Result<()> { let param_count = stmt.parameter_count(); ensure!( param_count >= self.len(), @@ -120,7 +104,7 @@ impl Params { for index in 1..=param_count { let mut param_name = None; // get by name - let maybe_value = match stmt.parameter_name(index) { + let maybe_value = match stmt.parameter_name(index as i32) { Some(name) => { param_name = Some(name); let mut chars = name.chars(); @@ -140,7 +124,7 @@ impl Params { }; if let Some(value) = maybe_value { - stmt.raw_bind_parameter(index, value)?; + stmt.bind_value(index as i32, value.try_into()?); } else if let Some(name) = param_name { return Err(anyhow!("value for parameter {} not found", name)); } else { @@ -159,7 +143,10 @@ mod test { #[test] fn test_bind_params_positional_simple() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con.prepare("SELECT ?").unwrap(); let params = Params::new_positional(vec![Value::Integer(10)]); params.bind(&mut stmt).unwrap(); @@ -169,7 +156,10 @@ mod test { #[test] fn test_bind_params_positional_numbered() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con.prepare("SELECT ? || ?2 || ?1").unwrap(); let params = Params::new_positional(vec![Value::Integer(10), Value::Integer(20)]); params.bind(&mut stmt).unwrap(); @@ -179,7 +169,10 @@ mod test { #[test] fn test_bind_params_positional_named() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con.prepare("SELECT :first || $second").unwrap(); let mut params = HashMap::new(); params.insert(":first".to_owned(), Value::Integer(10)); @@ -192,7 +185,10 @@ mod test { #[test] fn test_bind_params_positional_named_no_prefix() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con.prepare("SELECT :first || $second").unwrap(); let mut params = HashMap::new(); params.insert("first".to_owned(), Value::Integer(10)); @@ -205,7 +201,10 @@ mod test { #[test] fn test_bind_params_positional_named_conflict() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con.prepare("SELECT :first || $first").unwrap(); let mut params = HashMap::new(); params.insert("first".to_owned(), Value::Integer(10)); @@ -218,7 +217,10 @@ mod test { #[test] fn test_bind_params_positional_named_repeated() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con .prepare("SELECT :first || $second || $first || $second") .unwrap(); @@ -233,7 +235,10 @@ mod test { #[test] fn test_bind_params_too_many_params() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con.prepare("SELECT :first || $second").unwrap(); let mut params = HashMap::new(); params.insert(":first".to_owned(), Value::Integer(10)); @@ -245,7 +250,10 @@ mod test { #[test] fn test_bind_params_too_few_params() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con.prepare("SELECT :first || $second").unwrap(); let mut params = HashMap::new(); params.insert(":first".to_owned(), Value::Integer(10)); @@ -255,7 +263,10 @@ mod test { #[test] fn test_bind_params_invalid_positional() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con.prepare("SELECT ?invalid").unwrap(); let params = Params::empty(); assert!(params.bind(&mut stmt).is_err()); diff --git a/sqld/src/query_result_builder.rs b/sqld/src/query_result_builder.rs index a9aeadd7..a47909a7 100644 --- a/sqld/src/query_result_builder.rs +++ b/sqld/src/query_result_builder.rs @@ -3,10 +3,12 @@ use std::io::{self, ErrorKind}; use std::ops::{Deref, DerefMut}; use bytesize::ByteSize; -use rusqlite::types::ValueRef; +use libsql::params::ValueRef; use serde::Serialize; use serde_json::ser::Formatter; +pub use libsql::Column; + #[derive(Debug)] pub enum QueryResultBuilderError { ResponseTooLarge(u64), @@ -54,28 +56,6 @@ impl From for QueryResultBuilderError { } } -/// Identical to rusqlite::Column, with visible fields. -#[cfg_attr(test, derive(arbitrary::Arbitrary))] -pub struct Column<'a> { - pub(crate) name: &'a str, - pub(crate) decl_ty: Option<&'a str>, -} - -impl<'a> From<(&'a str, Option<&'a str>)> for Column<'a> { - fn from((name, decl_ty): (&'a str, Option<&'a str>)) -> Self { - Self { name, decl_ty } - } -} - -impl<'a> From<&'a rusqlite::Column<'a>> for Column<'a> { - fn from(value: &'a rusqlite::Column<'a>) -> Self { - Self { - name: value.name(), - decl_ty: value.decl_type(), - } - } -} - #[derive(Debug, Clone, Copy, Default)] pub struct QueryBuilderConfig { pub max_size: Option, @@ -107,7 +87,8 @@ pub trait QueryResultBuilder: Send + 'static { /// begin a new row for the current step fn begin_row(&mut self) -> Result<(), QueryResultBuilderError>; /// add value to current row - fn add_row_value(&mut self, v: ValueRef) -> Result<(), QueryResultBuilderError>; + fn add_row_value(&mut self, v: libsql::params::ValueRef) + -> Result<(), QueryResultBuilderError>; /// finish current row fn finish_row(&mut self) -> Result<(), QueryResultBuilderError>; /// end adding rows @@ -588,14 +569,14 @@ pub mod test { Blob(&'a [u8]), } - impl<'a> From> for rusqlite::types::ValueRef<'a> { + impl<'a> From> for libsql::params::ValueRef<'a> { fn from(value: ValueRef<'a>) -> Self { match value { - ValueRef::Null => rusqlite::types::ValueRef::Null, - ValueRef::Integer(i) => rusqlite::types::ValueRef::Integer(i), - ValueRef::Real(x) => rusqlite::types::ValueRef::Real(x), - ValueRef::Text(s) => rusqlite::types::ValueRef::Text(s.as_bytes()), - ValueRef::Blob(b) => rusqlite::types::ValueRef::Blob(b), + ValueRef::Null => libsql::params::Value::Null, + ValueRef::Integer(i) => libsql::params::Value::Integer(i), + ValueRef::Real(x) => libsql::params::Value::Float(x), + ValueRef::Text(s) => libsql::params::Value::Text(s.as_bytes()), + ValueRef::Blob(b) => libsql::params::Value::Blob(b), } } } diff --git a/sqld/src/replication/primary/logger.rs b/sqld/src/replication/primary/logger.rs index 6ec012dc..bb69faf1 100644 --- a/sqld/src/replication/primary/logger.rs +++ b/sqld/src/replication/primary/logger.rs @@ -86,7 +86,7 @@ unsafe impl WalHook for ReplicationLoggerHook { if let Err(e) = ctx.flush(ntruncate) { tracing::error!("error writing to replication log: {e}"); // returning IO_ERR ensure that xUndo will be called by sqlite. - return SQLITE_IOERR; + return SQLITE_IOERR as c_int; } let rc = unsafe { @@ -149,7 +149,7 @@ unsafe impl WalHook for ReplicationLoggerHook { fn on_savepoint_undo(wal: &mut Wal, wal_data: *mut u32, orig: XWalSavePointUndoFn) -> i32 { let rc = unsafe { orig(wal, wal_data) }; - if rc != SQLITE_OK { + if rc != SQLITE_OK as i32 { return rc; }; @@ -193,9 +193,9 @@ unsafe impl WalHook for ReplicationLoggerHook { ** In order to avoid autocheckpoint on close (that's too often), ** checkpoint attempts weaker than TRUNCATE are ignored. */ - if emode < SQLITE_CHECKPOINT_TRUNCATE { + if emode < SQLITE_CHECKPOINT_TRUNCATE as i32 { tracing::trace!("Ignoring a checkpoint request weaker than TRUNCATE"); - return SQLITE_OK; + return SQLITE_OK as i32; } } let rc = unsafe { @@ -213,7 +213,7 @@ unsafe impl WalHook for ReplicationLoggerHook { ) }; - if rc != SQLITE_OK { + if rc != SQLITE_OK as i32 { return rc; } @@ -226,7 +226,7 @@ unsafe impl WalHook for ReplicationLoggerHook { let mut replicator = replicator.lock().unwrap(); if replicator.commits_in_current_generation() == 0 { tracing::debug!("No commits happened in this generation, not snapshotting"); - return SQLITE_OK; + return SQLITE_OK as i32; } let last_known_frame = replicator.last_known_frame(); replicator.request_flush(); @@ -237,18 +237,18 @@ unsafe impl WalHook for ReplicationLoggerHook { last_known_frame, e ); - return SQLITE_IOERR_WRITE; + return SQLITE_IOERR_WRITE as i32; } replicator.new_generation(); if let Err(e) = runtime.block_on(async move { replicator.snapshot_main_db_file().await }) { tracing::error!("Failed to snapshot the main db file during checkpoint: {e}"); - return SQLITE_IOERR_WRITE; + return SQLITE_IOERR_WRITE as i32; } } } - SQLITE_OK + SQLITE_OK as i32 } } @@ -887,21 +887,24 @@ impl ReplicationLogger { fn checkpoint_db(data_path: &Path) -> anyhow::Result<()> { unsafe { - let conn = rusqlite::Connection::open(data_path)?; - conn.pragma_query(None, "page_size", |row| { - let page_size = row.get::<_, i32>(0).unwrap(); + // unwrap: data_path is expected to be a validated path + let conn = libsql::Database::open(data_path.to_str().unwrap())?.connect()?; + if let Ok(row) = conn + .prepare("pragma page_size")? + .query_row(&libsql::Params::None) + { + let page_size: i32 = row.get(0).unwrap(); assert_eq!( page_size, WAL_PAGE_SIZE, "invalid database file, expected page size to be {}, but found {} instead", WAL_PAGE_SIZE, page_size ); - Ok(()) - })?; + } let mut num_checkpointed: c_int = 0; - let rc = rusqlite::ffi::sqlite3_wal_checkpoint_v2( + let rc = sqld_libsql_bindings::ffi::sqlite3_wal_checkpoint_v2( conn.handle(), std::ptr::null(), - SQLITE_CHECKPOINT_TRUNCATE, + SQLITE_CHECKPOINT_TRUNCATE as i32, &mut num_checkpointed as *mut _, std::ptr::null_mut(), ); diff --git a/sqld/src/replication/replica/hook.rs b/sqld/src/replication/replica/hook.rs index 75fe6882..b12f6808 100644 --- a/sqld/src/replication/replica/hook.rs +++ b/sqld/src/replication/replica/hook.rs @@ -1,8 +1,7 @@ use std::ffi::{c_int, CStr}; use std::marker::PhantomData; -use rusqlite::ffi::{PgHdr, SQLITE_ERROR}; -use sqld_libsql_bindings::ffi::Wal; +use sqld_libsql_bindings::ffi::{PgHdr, Wal, SQLITE_ERROR}; use sqld_libsql_bindings::init_static_wal_method; use sqld_libsql_bindings::{ffi::types::XWalFrameFn, wal_hook::WalHook}; @@ -167,7 +166,7 @@ unsafe impl WalHook for InjectorHook { if let Err(e) = ret { tracing::error!("replication error: {e}"); - return SQLITE_ERROR; + return SQLITE_ERROR as c_int; } if !ctx.is_txn { diff --git a/sqld/src/replication/replica/injector.rs b/sqld/src/replication/replica/injector.rs index 3965da10..11d0d252 100644 --- a/sqld/src/replication/replica/injector.rs +++ b/sqld/src/replication/replica/injector.rs @@ -1,7 +1,6 @@ +use sqld_libsql_bindings::ffi; use std::path::Path; -use rusqlite::OpenFlags; - use crate::replication::replica::hook::{SQLITE_CONTINUE_REPLICATION, SQLITE_EXIT_REPLICATION}; use super::hook::{InjectorHookCtx, INJECTOR_METHODS}; @@ -14,30 +13,31 @@ impl<'a> FrameInjector<'a> { pub fn new(db_path: &Path, hook_ctx: &'a mut InjectorHookCtx) -> anyhow::Result { let conn = sqld_libsql_bindings::Connection::open( db_path, - OpenFlags::SQLITE_OPEN_READ_WRITE - | OpenFlags::SQLITE_OPEN_CREATE - | OpenFlags::SQLITE_OPEN_URI - | OpenFlags::SQLITE_OPEN_NO_MUTEX, + (ffi::SQLITE_OPEN_READWRITE + | ffi::SQLITE_OPEN_CREATE + | ffi::SQLITE_OPEN_URI + | ffi::SQLITE_OPEN_NOMUTEX) as i32, &INJECTOR_METHODS, hook_ctx, - )?; + ) + .map_err(|rc| libsql::Error::LibError(rc))?; Ok(Self { conn }) } pub fn step(&mut self) -> anyhow::Result { - self.conn.pragma_update(None, "writable_schema", "on")?; + self.conn.execute("pragma writable_schema=on", ())?; let res = self.conn.execute("create table __dummy__ (dummy);", ()); match res { Ok(_) => panic!("replication hook was not called"), Err(e) => { - if let Some(e) = e.sqlite_error() { - if e.extended_code == SQLITE_EXIT_REPLICATION { - self.conn.pragma_update(None, "writable_schema", "reset")?; + if let libsql::Error::LibError(rc) = e { + if rc == SQLITE_EXIT_REPLICATION { + self.conn.execute("pragma writable_schema=reset", ())?; return Ok(false); } - if e.extended_code == SQLITE_CONTINUE_REPLICATION { + if rc == SQLITE_CONTINUE_REPLICATION { return Ok(true); } } diff --git a/sqld/src/rpc/proxy.rs b/sqld/src/rpc/proxy.rs index 6ed1c389..993dbb9d 100644 --- a/sqld/src/rpc/proxy.rs +++ b/sqld/src/rpc/proxy.rs @@ -353,7 +353,7 @@ impl QueryResultBuilder for ExecuteResultBuilder { for col in cols { let col = col.into(); let col_len = - (col.decl_ty.map(|s| s.len()).unwrap_or_default() + col.name.len()) as u64; + (col.decl_type.map(|s| s.len()).unwrap_or_default() + col.name.len()) as u64; if col_len + self.current_step_size + self.current_size > self.max_size { return Err(QueryResultBuilderError::ResponseTooLarge(self.max_size)); } @@ -361,7 +361,7 @@ impl QueryResultBuilder for ExecuteResultBuilder { let col = rpc::Column { name: col.name.to_owned(), - decltype: col.decl_ty.map(ToString::to_string), + decltype: col.decl_type.map(ToString::to_string), }; self.current_col_description.push(col); @@ -380,7 +380,7 @@ impl QueryResultBuilder for ExecuteResultBuilder { fn add_row_value( &mut self, - v: rusqlite::types::ValueRef, + v: libsql::params::ValueRef, ) -> Result<(), QueryResultBuilderError> { let data = bincode::serialize( &crate::query::Value::try_from(v).map_err(QueryResultBuilderError::from_any)?,