From d1e88fe304d9729fd1c9343e3e682a81bc36472a Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 25 Jul 2023 12:13:34 +0200 Subject: [PATCH] treewide: migrate from rusqlite to libsql and libsql_sys This huge patch migrates sqld from rusqlite to libsql crate. Since both rusqlite and libsql link with compiled sqlite3.c, the migration has to be done in one piece. Good luck, reviewers! --- Cargo.toml | 7 +- bottomless/src/lib.rs | 68 ++++++++--------- sqld-libsql-bindings/Cargo.toml | 2 +- sqld-libsql-bindings/src/ffi/mod.rs | 13 +--- sqld-libsql-bindings/src/ffi/types.rs | 6 +- sqld-libsql-bindings/src/lib.rs | 53 +++++--------- sqld-libsql-bindings/src/wal_hook.rs | 6 +- sqld/Cargo.toml | 4 +- sqld/src/database/dump/exporter.rs | 86 +++++++++++----------- sqld/src/database/dump/loader.rs | 14 +--- sqld/src/database/libsql.rs | 83 ++++++++++----------- sqld/src/database/write_proxy.rs | 4 +- sqld/src/error.rs | 2 +- sqld/src/hrana/result_builder.rs | 6 +- sqld/src/hrana/stmt.rs | 90 +++++++++-------------- sqld/src/http/hrana_over_http_1.rs | 1 - sqld/src/http/result_builder.rs | 4 +- sqld/src/lib.rs | 16 ++-- sqld/src/main.rs | 4 +- sqld/src/query.rs | 93 +++++++++++++----------- sqld/src/query_result_builder.rs | 41 +++-------- sqld/src/replication/primary/logger.rs | 35 +++++---- sqld/src/replication/replica/hook.rs | 5 +- sqld/src/replication/replica/injector.rs | 24 +++--- sqld/src/rpc/proxy.rs | 6 +- 25 files changed, 305 insertions(+), 368 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 89c385a3..1860c696 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,9 +10,4 @@ members = [ ] [workspace.dependencies] -rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "477264453b", default-features = false, features = [ - "buildtime_bindgen", - "bundled-libsql-wasm-experimental", - "column_decltype", - "load_extension" -] } +libsql = { version = "0.1.6", default-features = false } diff --git a/bottomless/src/lib.rs b/bottomless/src/lib.rs index 406f0881..ea2bba98 100644 --- a/bottomless/src/lib.rs +++ b/bottomless/src/lib.rs @@ -57,18 +57,18 @@ pub extern "C" fn xOpen( let rc = unsafe { (orig_methods.xOpen.unwrap())(vfs, db_file, wal_name, no_shm_mode, max_size, methods, wal) }; - if rc != ffi::SQLITE_OK { + if rc != ffi::SQLITE_OK as i32 { return rc; } if !is_regular(vfs) { tracing::error!("Bottomless WAL is currently only supported for regular VFS"); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } if is_local() { tracing::info!("Running in local-mode only, without any replication"); - return ffi::SQLITE_OK; + return ffi::SQLITE_OK as i32; } let runtime = match tokio::runtime::Builder::new_current_thread() @@ -78,7 +78,7 @@ pub extern "C" fn xOpen( Ok(runtime) => runtime, Err(e) => { tracing::error!("Failed to initialize async runtime: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } }; @@ -88,7 +88,7 @@ pub extern "C" fn xOpen( Ok(path) => path, Err(e) => { tracing::error!("Failed to parse the main database path: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } } }; @@ -98,12 +98,12 @@ pub extern "C" fn xOpen( Ok(repl) => repl, Err(e) => { tracing::error!("Failed to initialize replicator: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } }; let rc = block_on!(runtime, try_restore(&mut replicator)); - if rc != ffi::SQLITE_OK { + if rc != ffi::SQLITE_OK as i32 { return rc; } @@ -114,7 +114,7 @@ pub extern "C" fn xOpen( let context_ptr = Box::into_raw(Box::new(context)) as *mut c_void; unsafe { (*(*wal)).pMethodsData = context_ptr }; - ffi::SQLITE_OK + ffi::SQLITE_OK as i32 } fn get_orig_methods(wal: *mut Wal) -> &'static libsql_wal_methods { @@ -138,7 +138,7 @@ pub extern "C" fn xClose( let orig_methods = get_orig_methods(wal); let methods_data = unsafe { (*wal).pMethodsData as *mut replicator::Context }; let rc = unsafe { (orig_methods.xClose.unwrap())(wal, db, sync_flags, n_buf, z_buf) }; - if rc != ffi::SQLITE_OK { + if rc != ffi::SQLITE_OK as i32 { return rc; } if !is_local() && !methods_data.is_null() { @@ -194,7 +194,7 @@ pub extern "C" fn xUndo( ) -> i32 { let orig_methods = get_orig_methods(wal); let rc = unsafe { (orig_methods.xUndo.unwrap())(wal, func, ctx) }; - if is_local() || rc != ffi::SQLITE_OK { + if is_local() || rc != ffi::SQLITE_OK as i32 { return rc; } @@ -207,7 +207,7 @@ pub extern "C" fn xUndo( ); ctx.replicator.rollback_to_frame(last_valid_frame); - ffi::SQLITE_OK + ffi::SQLITE_OK as i32 } pub extern "C" fn xSavepoint(wal: *mut Wal, wal_data: *mut u32) { @@ -218,7 +218,7 @@ pub extern "C" fn xSavepoint(wal: *mut Wal, wal_data: *mut u32) { pub extern "C" fn xSavepointUndo(wal: *mut Wal, wal_data: *mut u32) -> i32 { let orig_methods = get_orig_methods(wal); let rc = unsafe { (orig_methods.xSavepointUndo.unwrap())(wal, wal_data) }; - if is_local() || rc != ffi::SQLITE_OK { + if is_local() || rc != ffi::SQLITE_OK as i32 { return rc; } @@ -231,7 +231,7 @@ pub extern "C" fn xSavepointUndo(wal: *mut Wal, wal_data: *mut u32) -> i32 { ); ctx.replicator.rollback_to_frame(last_valid_frame); - ffi::SQLITE_OK + ffi::SQLITE_OK as i32 } pub extern "C" fn xFrames( @@ -253,7 +253,7 @@ pub extern "C" fn xFrames( // supported by bottomless storage. if let Err(e) = ctx.replicator.set_page_size(page_size as usize) { tracing::error!("{}", e); - return ffi::SQLITE_IOERR_WRITE; + return ffi::SQLITE_IOERR_WRITE as i32; } let frame_count = ffi::PageHdrIter::new(page_headers, page_size as usize).count(); if size_after != 0 { @@ -273,11 +273,11 @@ pub extern "C" fn xFrames( sync_flags, ) }; - if is_local() || rc != ffi::SQLITE_OK { + if is_local() || rc != ffi::SQLITE_OK as i32 { return rc; } - ffi::SQLITE_OK + ffi::SQLITE_OK as i32 } extern "C" fn always_wait(_busy_param: *mut c_void) -> i32 { @@ -307,9 +307,9 @@ pub extern "C" fn xCheckpoint( ** In order to avoid autocheckpoint on close (that's too often), ** checkpoint attempts weaker than TRUNCATE are ignored. */ - if emode < ffi::SQLITE_CHECKPOINT_TRUNCATE { + if emode < ffi::SQLITE_CHECKPOINT_TRUNCATE as i32 { tracing::trace!("Ignoring a checkpoint request weaker than TRUNCATE"); - return ffi::SQLITE_OK; + return ffi::SQLITE_OK as i32; } /* If there's no busy handler, let's provide a default one, ** since we auto-upgrade the passive checkpoint @@ -335,14 +335,14 @@ pub extern "C" fn xCheckpoint( ) }; - if is_local() || rc != ffi::SQLITE_OK { + if is_local() || rc != ffi::SQLITE_OK as i32 { return rc; } let ctx = get_replicator_context(wal); if ctx.replicator.commits_in_current_generation() == 0 { tracing::debug!("No commits happened in this generation, not snapshotting"); - return ffi::SQLITE_OK; + return ffi::SQLITE_OK as i32; } let last_known_frame = ctx.replicator.last_known_frame(); @@ -352,7 +352,7 @@ pub extern "C" fn xCheckpoint( ctx.replicator.wait_until_committed(last_known_frame) ) { tracing::error!("Failed to finalize replication: {}", e); - return ffi::SQLITE_IOERR_WRITE; + return ffi::SQLITE_IOERR_WRITE as i32; } ctx.replicator.new_generation(); @@ -363,10 +363,10 @@ pub extern "C" fn xCheckpoint( "Failed to snapshot the main db file during checkpoint: {}", e ); - return ffi::SQLITE_IOERR_WRITE; + return ffi::SQLITE_IOERR_WRITE as i32; } - ffi::SQLITE_OK + ffi::SQLITE_OK as i32 } pub extern "C" fn xCallback(wal: *mut Wal) -> i32 { @@ -416,13 +416,13 @@ async fn try_restore(replicator: &mut replicator::Replicator) -> i32 { replicator.new_generation(); if let Err(e) = replicator.snapshot_main_db_file().await { tracing::error!("Failed to snapshot the main db file: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } // Restoration process only leaves the local WAL file if it was // detected to be newer than its remote counterpart. if let Err(e) = replicator.maybe_replicate_wal().await { tracing::error!("Failed to replicate local WAL: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } } Ok(replicator::RestoreAction::ReuseGeneration(gen)) => { @@ -430,28 +430,28 @@ async fn try_restore(replicator: &mut replicator::Replicator) -> i32 { } Err(e) => { tracing::error!("Failed to restore the database: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } } - ffi::SQLITE_OK + ffi::SQLITE_OK as i32 } pub extern "C" fn xPreMainDbOpen(_methods: *mut libsql_wal_methods, path: *const c_char) -> i32 { if is_local() { tracing::info!("Running in local-mode only, without any replication"); - return ffi::SQLITE_OK; + return ffi::SQLITE_OK as i32; } if path.is_null() { - return ffi::SQLITE_OK; + return ffi::SQLITE_OK as i32; } let path = unsafe { match std::ffi::CStr::from_ptr(path).to_str() { Ok(path) => path, Err(e) => { tracing::error!("Failed to parse the main database path: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } } }; @@ -464,7 +464,7 @@ pub extern "C" fn xPreMainDbOpen(_methods: *mut libsql_wal_methods, path: *const Ok(runtime) => runtime, Err(e) => { tracing::error!("Failed to initialize async runtime: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } }; @@ -472,7 +472,7 @@ pub extern "C" fn xPreMainDbOpen(_methods: *mut libsql_wal_methods, path: *const Ok(options) => options, Err(e) => { tracing::error!("Failed to parse replicator options: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } }; let replicator = block_on!(runtime, replicator::Replicator::with_options(path, options)); @@ -480,7 +480,7 @@ pub extern "C" fn xPreMainDbOpen(_methods: *mut libsql_wal_methods, path: *const Ok(repl) => repl, Err(e) => { tracing::error!("Failed to initialize replicator: {}", e); - return ffi::SQLITE_CANTOPEN; + return ffi::SQLITE_CANTOPEN as i32; } }; block_on!(runtime, try_restore(&mut replicator)) @@ -561,7 +561,7 @@ pub mod static_init { if orig_methods.is_null() {} let methods = crate::bottomless_methods(orig_methods); let rc = unsafe { libsql_wal_methods_register(methods) }; - if rc != crate::ffi::SQLITE_OK { + if rc != crate::ffi::SQLITE_OK as i32 { let _box = unsafe { Box::from_raw(methods as *mut libsql_wal_methods) }; tracing::warn!("Failed to instantiate bottomless WAL methods"); } diff --git a/sqld-libsql-bindings/Cargo.toml b/sqld-libsql-bindings/Cargo.toml index a278ba64..9687f86f 100644 --- a/sqld-libsql-bindings/Cargo.toml +++ b/sqld-libsql-bindings/Cargo.toml @@ -7,9 +7,9 @@ edition = "2021" [dependencies] anyhow = "1.0.66" -rusqlite = { workspace = true } tracing = "0.1.37" once_cell = "1.17.1" +libsql = { workspace = true } [features] unix-excl-vfs = [] diff --git a/sqld-libsql-bindings/src/ffi/mod.rs b/sqld-libsql-bindings/src/ffi/mod.rs index 222bb2ad..b353806d 100644 --- a/sqld-libsql-bindings/src/ffi/mod.rs +++ b/sqld-libsql-bindings/src/ffi/mod.rs @@ -2,16 +2,9 @@ pub mod types; -pub use rusqlite::ffi::{ - libsql_wal_methods, libsql_wal_methods_find, libsql_wal_methods_register, - libsql_wal_methods_unregister, sqlite3, sqlite3_file, sqlite3_hard_heap_limit64, - sqlite3_io_methods, sqlite3_soft_heap_limit64, sqlite3_vfs, WalIndexHdr, SQLITE_CANTOPEN, - SQLITE_CHECKPOINT_FULL, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR_WRITE, SQLITE_OK, -}; - -pub use rusqlite::ffi::libsql_pghdr as PgHdr; -pub use rusqlite::ffi::libsql_wal as Wal; -pub use rusqlite::ffi::*; +pub use libsql::ffi::libsql_pghdr as PgHdr; +pub use libsql::ffi::libsql_wal as Wal; +pub use libsql::ffi::*; pub struct PageHdrIter { current_ptr: *const PgHdr, diff --git a/sqld-libsql-bindings/src/ffi/types.rs b/sqld-libsql-bindings/src/ffi/types.rs index c3757745..42fecca7 100644 --- a/sqld-libsql-bindings/src/ffi/types.rs +++ b/sqld-libsql-bindings/src/ffi/types.rs @@ -2,7 +2,7 @@ use std::ffi::{c_char, c_int, c_uint, c_void}; use super::{libsql_wal_methods, sqlite3_file, sqlite3_vfs, PgHdr, Wal}; -use rusqlite::ffi::sqlite3; +use libsql::ffi::sqlite3; // WAL methods pub type XWalLimitFn = extern "C" fn(wal: *mut Wal, limit: i64); @@ -18,7 +18,7 @@ pub type XWalSavepointFn = extern "C" fn(wal: *mut Wal, wal_data: *mut u32); pub type XWalSavePointUndoFn = unsafe extern "C" fn(wal: *mut Wal, wal_data: *mut u32) -> c_int; pub type XWalCheckpointFn = unsafe extern "C" fn( wal: *mut Wal, - db: *mut rusqlite::ffi::sqlite3, + db: *mut libsql::ffi::sqlite3, emode: c_int, busy_handler: Option c_int>, busy_arg: *mut c_void, @@ -32,7 +32,7 @@ pub type XWalCallbackFn = extern "C" fn(wal: *mut Wal) -> c_int; pub type XWalExclusiveModeFn = extern "C" fn(wal: *mut Wal, op: c_int) -> c_int; pub type XWalHeapMemoryFn = extern "C" fn(wal: *mut Wal) -> c_int; pub type XWalFileFn = extern "C" fn(wal: *mut Wal) -> *mut sqlite3_file; -pub type XWalDbFn = extern "C" fn(wal: *mut Wal, db: *mut rusqlite::ffi::sqlite3); +pub type XWalDbFn = extern "C" fn(wal: *mut Wal, db: *mut libsql::ffi::sqlite3); pub type XWalPathNameLenFn = extern "C" fn(orig_len: c_int) -> c_int; pub type XWalGetPathNameFn = extern "C" fn(buf: *mut c_char, orig: *const c_char, orig_len: c_int); pub type XWalPreMainDbOpen = diff --git a/sqld-libsql-bindings/src/lib.rs b/sqld-libsql-bindings/src/lib.rs index 4777f55c..b02f2237 100644 --- a/sqld-libsql-bindings/src/lib.rs +++ b/sqld-libsql-bindings/src/lib.rs @@ -3,11 +3,13 @@ pub mod ffi; pub mod wal_hook; -use std::{ffi::CString, marker::PhantomData, ops::Deref, time::Duration}; +use std::{ffi::CString, marker::PhantomData, ops::Deref}; pub use crate::wal_hook::WalMethodsHook; pub use once_cell::sync::Lazy; +pub use libsql; + use self::{ ffi::{libsql_wal_methods, libsql_wal_methods_find}, wal_hook::WalHook, @@ -23,41 +25,25 @@ pub fn get_orig_wal_methods() -> anyhow::Result<*mut libsql_wal_methods> { } pub struct Connection<'a> { - conn: rusqlite::Connection, + conn: libsql::Connection, _pth: PhantomData<&'a mut ()>, } impl Deref for Connection<'_> { - type Target = rusqlite::Connection; + type Target = libsql::Connection; fn deref(&self) -> &Self::Target { &self.conn } } -impl Drop for Connection<'_> { - fn drop(&mut self) { - unsafe { - let db = self.conn.handle(); - if db.is_null() { - return; - } - let mut stmt = ffi::sqlite3_next_stmt(db, std::ptr::null_mut()); - while !stmt.is_null() { - let rc = ffi::sqlite3_finalize(stmt); - if rc != ffi::SQLITE_OK { - tracing::error!("Failed to finalize a dangling statement: {rc}") - } - stmt = ffi::sqlite3_next_stmt(db, stmt); - } - } - } -} - impl<'a> Connection<'a> { /// returns a dummy, in-memory connection. For testing purposes only pub fn test(_: &mut ()) -> Self { - let conn = rusqlite::Connection::open_in_memory().unwrap(); + let conn = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); Self { conn, _pth: PhantomData, @@ -67,12 +53,12 @@ impl<'a> Connection<'a> { /// Opens a database with the regular wal methods in the directory pointed to by path pub fn open( path: impl AsRef, - flags: rusqlite::OpenFlags, + flags: std::ffi::c_int, // we technically _only_ need to know about W, but requiring a static ref to the wal_hook ensures that // it has been instanciated and lives for long enough _wal_hook: &'static WalMethodsHook, hook_ctx: &'a mut W::Context, - ) -> Result { + ) -> Result { let path = path.as_ref().join("data"); tracing::trace!( "Opening a connection with regular WAL at {}", @@ -81,34 +67,31 @@ impl<'a> Connection<'a> { let conn_str = format!("file:{}?_journal_mode=WAL", path.display()); let filename = CString::new(conn_str).unwrap(); - let mut db: *mut rusqlite::ffi::sqlite3 = std::ptr::null_mut(); + let mut db: *mut ffi::sqlite3 = std::ptr::null_mut(); unsafe { // We pass a pointer to the WAL methods data to the database connection. This means // that the reference must outlive the connection. This is guaranteed by the marker in // the returned connection. - let rc = rusqlite::ffi::libsql_open_v2( + let rc = ffi::libsql_open_v2( filename.as_ptr(), &mut db as *mut _, - flags.bits(), + flags, std::ptr::null_mut(), W::name().as_ptr(), hook_ctx as *mut _ as *mut _, ); if rc != 0 { - rusqlite::ffi::sqlite3_close(db); - return Err(rusqlite::Error::SqliteFailure( - rusqlite::ffi::Error::new(rc), - None, - )); + ffi::sqlite3_close(db); + return Err(rc); } assert!(!db.is_null()); + ffi::sqlite3_busy_timeout(db, 5000); }; - let conn = unsafe { rusqlite::Connection::from_handle_owned(db)? }; - conn.busy_timeout(Duration::from_millis(5000))?; + let conn = libsql::Connection::from_handle(db); Ok(Connection { conn, diff --git a/sqld-libsql-bindings/src/wal_hook.rs b/sqld-libsql-bindings/src/wal_hook.rs index 7f09ad31..9c54aac1 100644 --- a/sqld-libsql-bindings/src/wal_hook.rs +++ b/sqld-libsql-bindings/src/wal_hook.rs @@ -227,7 +227,7 @@ fn get_methods(wal: &mut Wal) -> &mut WalMethodsHook { #[allow(non_snake_case)] pub extern "C" fn xClose( wal: *mut Wal, - db: *mut rusqlite::ffi::sqlite3, + db: *mut libsql::ffi::sqlite3, sync_flags: i32, n_buf: c_int, z_buf: *mut u8, @@ -343,7 +343,7 @@ pub extern "C" fn xFrames( #[allow(non_snake_case)] pub extern "C" fn xCheckpoint( wal: *mut Wal, - db: *mut rusqlite::ffi::sqlite3, + db: *mut libsql::ffi::sqlite3, emode: c_int, busy_handler: Option c_int>, busy_arg: *mut c_void, @@ -395,7 +395,7 @@ pub extern "C" fn xFile(wal: *mut Wal) -> *mut sqlite3_file { } #[allow(non_snake_case)] -pub extern "C" fn xDb(wal: *mut Wal, db: *mut rusqlite::ffi::sqlite3) { +pub extern "C" fn xDb(wal: *mut Wal, db: *mut libsql::ffi::sqlite3) { let orig_methods = unsafe { get_orig_methods::(&mut *wal) }; unsafe { (orig_methods.xDb.unwrap())(wal, db) } } diff --git a/sqld/Cargo.toml b/sqld/Cargo.toml index ef7255cb..d15fb876 100644 --- a/sqld/Cargo.toml +++ b/sqld/Cargo.toml @@ -27,6 +27,7 @@ hyper = { version = "0.14.23", features = ["http2"] } hyper-tungstenite = "0.10" itertools = "0.10.5" jsonwebtoken = "8.2.0" +libsql = { workspace = true } memmap = "0.7.0" mimalloc = { version = "0.1.36", default-features = false } nix = { version = "0.26.2", features = ["fs"] } @@ -37,7 +38,6 @@ prost = "0.11.3" rand = "0.8" regex = "1.7.0" reqwest = { version = "0.11.16", features = ["json", "rustls-tls"], default-features = false } -rusqlite = { workspace = true } serde = { version = "1.0.149", features = ["derive", "rc"] } serde_json = { version = "1.0.91", features = ["preserve_order"] } sha2 = "0.10" @@ -76,4 +76,4 @@ vergen = { version = "8", features = ["build", "git", "gitcl"] } [features] unix-excl-vfs = ["sqld-libsql-bindings/unix-excl-vfs"] -debug-tools = ["console-subscriber", "rusqlite/trace", "tokio/tracing"] +debug-tools = ["console-subscriber", "tokio/tracing"] diff --git a/sqld/src/database/dump/exporter.rs b/sqld/src/database/dump/exporter.rs index e2a4b69f..195e09ef 100644 --- a/sqld/src/database/dump/exporter.rs +++ b/sqld/src/database/dump/exporter.rs @@ -3,9 +3,8 @@ use std::ffi::CString; use std::fmt::{Display, Write as _}; use std::io::Write; -use anyhow::bail; -use rusqlite::types::ValueRef; -use rusqlite::OptionalExtension; +use anyhow::{bail, Context}; +use libsql::params::{Params, ValueRef}; struct DumpState { /// true if db is in writable_schema mode @@ -13,16 +12,16 @@ struct DumpState { writer: W, } -use rusqlite::ffi::{sqlite3_keyword_check, sqlite3_table_column_metadata, SQLITE_OK}; +use sqld_libsql_bindings::ffi::{sqlite3_keyword_check, sqlite3_table_column_metadata, SQLITE_OK}; impl DumpState { fn run_schema_dump_query( &mut self, - txn: &rusqlite::Connection, + txn: &libsql::Connection, stmt: &str, ) -> anyhow::Result<()> { - let mut stmt = txn.prepare(stmt)?; - let mut rows = stmt.query(())?; + let stmt = txn.prepare(stmt)?; + let rows = stmt.execute(&Params::None).context("Empty response")?; while let Some(row) = rows.next()? { let ValueRef::Text(table) = row.get_ref(0)? else { bail!("invalid schema table") }; let ValueRef::Text(ty) = row.get_ref(1)? else { bail!("invalid schema table") }; @@ -92,8 +91,8 @@ impl DumpState { write!(&mut select, " FROM {}", Quoted(table_str))?; - let mut stmt = txn.prepare(&select)?; - let mut rows = stmt.query(())?; + let stmt = txn.prepare(&select)?; + let rows = stmt.execute(&Params::None).context("Empty response")?; while let Some(row) = rows.next()? { write!(self.writer, "{insert}")?; if row_id_col.is_some() { @@ -105,7 +104,7 @@ impl DumpState { if i != 0 || row_id_col.is_some() { write!(self.writer, ",")?; } - write_value_ref(&mut self.writer, row.get_ref(i)?)?; + write_value_ref(&mut self.writer, row.get_ref(i as i32)?)?; } writeln!(self.writer, ");")?; } @@ -115,15 +114,15 @@ impl DumpState { Ok(()) } - fn run_table_dump_query(&mut self, txn: &rusqlite::Connection, q: &str) -> anyhow::Result<()> { - let mut stmt = txn.prepare(q)?; + fn run_table_dump_query(&mut self, txn: &libsql::Connection, q: &str) -> anyhow::Result<()> { + let stmt = txn.prepare(q)?; let col_count = stmt.column_count(); - let mut rows = stmt.query(())?; + let rows = stmt.execute(&Params::None).context("Empty response")?; while let Some(row) = rows.next()? { let ValueRef::Text(sql) = row.get_ref(0)? else { bail!("the first row in a table dump query should be of type text") }; self.writer.write_all(sql)?; for i in 1..col_count { - let ValueRef::Text(s) = row.get_ref(i)? else { bail!("row {i} in table dump query should be of type text") }; + let ValueRef::Text(s) = row.get_ref(i as i32)? else { bail!("row {i} in table dump query should be of type text") }; let s = std::str::from_utf8(s)?; write!(self.writer, ",{s}")?; } @@ -134,7 +133,7 @@ impl DumpState { fn list_table_columns( &self, - txn: &rusqlite::Connection, + txn: &libsql::Connection, table: &str, ) -> anyhow::Result<(Option, Vec)> { let mut cols = Vec::new(); @@ -143,18 +142,18 @@ impl DumpState { let mut preserve_row_id = false; let mut row_id_col = None; - txn.pragma(None, "table_info", table, |row| { - let name: String = row.get_unwrap(1); - cols.push(name); - // this is a primary key col - if row.get_unwrap::<_, usize>(5) != 0 { - num_primary_keys += 1; - is_integer_primary_key = num_primary_keys == 1 - && matches!(row.get_ref_unwrap(2), ValueRef::Text(b"INTEGER")); + if let Some(results) = txn.execute("pragma table_info", ())? { + while let Ok(Some(row)) = results.next() { + let name: String = row.get(1).unwrap(); + cols.push(name); + // this is a primary key col + if row.get::(5).unwrap() != 0 { + num_primary_keys += 1; + is_integer_primary_key = num_primary_keys == 1 + && matches!(row.get_ref(2).unwrap(), ValueRef::Text(b"INTEGER")); + } } - - Ok(()) - })?; + } // from sqlite: // > The decision of whether or not a rowid really needs to be preserved @@ -171,16 +170,12 @@ impl DumpState { // > there is a "pk" entry in "PRAGMA index_list". There will be // > no "pk" index if the PRIMARY KEY really is an alias for the ROWID. - txn.query_row( - "SELECT 1 FROM pragma_index_list(?) WHERE origin='pk'", - [table], - |_| { - // re-set preserve_row_id iif there is a row - preserve_row_id = true; - Ok(()) - }, - ) - .optional()?; + if let Ok(Some(rows)) = txn.execute( + "SELECT 1 FROM pragma_index_list(?) WHERE origin='pk'", + Params::Positional(vec![table.into()]), + ) { + preserve_row_id = rows.next()?.is_none(); + } } if preserve_row_id { @@ -206,7 +201,7 @@ impl DumpState { ) }; - if rc == SQLITE_OK { + if rc == SQLITE_OK as i32 { row_id_col = Some(row_id_name.to_owned()); break; } @@ -420,10 +415,11 @@ fn find_unused_str(haystack: &str, needle1: &str, needle2: &str) -> String { } } -pub fn export_dump(mut db: rusqlite::Connection, writer: impl Write) -> anyhow::Result<()> { - let mut txn = db.transaction()?; - txn.execute("PRAGMA writable_schema=ON", ())?; - let savepoint = txn.savepoint_with_name("dump")?; +pub fn export_dump(mut db: libsql::Connection, writer: impl Write) -> anyhow::Result<()> { + db.execute("BEGIN", ())?; + db.execute("PRAGMA writable_schema=ON", ())?; + // FIXME: savepoint logic is lost during the out-of-rusqlite migration, we need to restore it + db.execute("SAVEPOINT dump", ())?; let mut state = DumpState { writable_schema: false, writer, @@ -441,12 +437,12 @@ pub fn export_dump(mut db: rusqlite::Connection, writer: impl Write) -> anyhow:: WHERE type=='table' AND sql NOT NULL ORDER BY tbl_name='sqlite_sequence', rowid"; - state.run_schema_dump_query(&savepoint, q)?; + state.run_schema_dump_query(&mut db, q)?; let q = "SELECT sql FROM sqlite_schema AS o WHERE sql NOT NULL AND type IN ('index','trigger','view')"; - state.run_table_dump_query(&savepoint, q)?; + state.run_table_dump_query(&mut db, q)?; if state.writable_schema { writeln!(state.writer, "PRAGMA writable_schema=OFF;")?; @@ -454,8 +450,8 @@ AND type IN ('index','trigger','view')"; writeln!(state.writer, "COMMIT;")?; - let _ = savepoint.execute("PRAGMA writable_schema = OFF;", ()); - let _ = savepoint.finish(); + db.execute("PRAGMA writable_schema = OFF;", ())?; + db.execute("COMMIT", ())?; Ok(()) } diff --git a/sqld/src/database/dump/loader.rs b/sqld/src/database/dump/loader.rs index 87592483..653fc823 100644 --- a/sqld/src/database/dump/loader.rs +++ b/sqld/src/database/dump/loader.rs @@ -5,14 +5,13 @@ use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; -use rusqlite::ErrorCode; use tokio::sync::{mpsc, oneshot}; use crate::database::libsql::open_db; use crate::replication::primary::logger::{ReplicationLoggerHookCtx, REPLICATION_METHODS}; use crate::replication::ReplicationLogger; -type OpMsg = Box; +type OpMsg = Box; #[derive(Debug)] pub struct DumpLoader { @@ -25,6 +24,7 @@ impl DumpLoader { logger: Arc, bottomless_replicator: Option>>, ) -> anyhow::Result { + const BUSY: i32 = sqld_libsql_bindings::ffi::SQLITE_BUSY as i32; let (sender, mut receiver) = mpsc::channel::(1); let (ok_snd, ok_rcv) = oneshot::channel::>(); @@ -43,13 +43,7 @@ impl DumpLoader { // Creating the loader database can, in rare occurences, return sqlite busy, // because of a race condition opening the monitor thread db. This is there to // retry a bunch of times if that happens. - Err(rusqlite::Error::SqliteFailure( - rusqlite::ffi::Error { - code: ErrorCode::DatabaseBusy, - .. - }, - _, - )) if retries < 10 => { + Err(libsql::Error::LibError(BUSY)) if retries < 10 => { retries += 1; std::thread::sleep(Duration::from_millis(100)); } @@ -93,7 +87,7 @@ impl DumpLoader { const WASM_TABLE_CREATE: &str = "CREATE TABLE libsql_wasm_func_table (name text PRIMARY KEY, body text) WITHOUT ROWID;"; -fn perform_load_dump(conn: &rusqlite::Connection, path: PathBuf) -> anyhow::Result<()> { +fn perform_load_dump(conn: &libsql::Connection, path: PathBuf) -> anyhow::Result<()> { let mut f = BufReader::new(File::open(path)?); let mut curr = String::new(); let mut line = String::new(); diff --git a/sqld/src/database/libsql.rs b/sqld/src/database/libsql.rs index 02a9af53..09ccefb5 100644 --- a/sqld/src/database/libsql.rs +++ b/sqld/src/database/libsql.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use crossbeam::channel::RecvTimeoutError; -use rusqlite::{ErrorCode, OpenFlags, StatementStatus}; use sqld_libsql_bindings::wal_hook::WalMethodsHook; use tokio::sync::oneshot; use tracing::warn; @@ -78,18 +77,11 @@ where async fn try_create_db(&self) -> Result { // try 100 times to acquire initial db connection. let mut retries = 0; + const BUSY: i32 = sqld_libsql_bindings::ffi::SQLITE_BUSY as std::ffi::c_int; loop { match self.create_database().await { Ok(conn) => return Ok(conn), - Err( - err @ Error::RusqliteError(rusqlite::Error::SqliteFailure( - rusqlite::ffi::Error { - code: ErrorCode::DatabaseBusy, - .. - }, - _, - )), - ) => { + Err(err @ Error::LibSqlError(libsql::Error::LibError(BUSY))) => { if retries < 100 { tracing::warn!("Database file is busy, retrying..."); retries += 1; @@ -141,19 +133,20 @@ pub fn open_db<'a, W>( path: &Path, wal_methods: &'static WalMethodsHook, hook_ctx: &'a mut W::Context, - flags: Option, -) -> Result, rusqlite::Error> + flags: Option, +) -> Result, libsql::Error> where W: WalHook, { let flags = flags.unwrap_or( - OpenFlags::SQLITE_OPEN_READ_WRITE - | OpenFlags::SQLITE_OPEN_CREATE - | OpenFlags::SQLITE_OPEN_URI - | OpenFlags::SQLITE_OPEN_NO_MUTEX, + (sqld_libsql_bindings::ffi::SQLITE_OPEN_READWRITE + | sqld_libsql_bindings::ffi::SQLITE_OPEN_CREATE + | sqld_libsql_bindings::ffi::SQLITE_OPEN_URI + | sqld_libsql_bindings::ffi::SQLITE_OPEN_NOMUTEX) as i32, ); sqld_libsql_bindings::Connection::open(path, flags, wal_methods, hook_ctx) + .map_err(|rc| libsql::Error::LibError(rc)) } impl LibSqlDb { @@ -261,14 +254,21 @@ impl<'a> Connection<'a> { }; for ext in extensions { - unsafe { - let _guard = rusqlite::LoadExtensionGuard::new(&this.conn).unwrap(); - if let Err(e) = this.conn.load_extension(&ext, None) { - tracing::error!("failed to load extension: {}", ext.display()); - Err(e)?; - } - tracing::debug!("Loaded extension {}", ext.display()); + let rc = unsafe { + // FIXME: gather the error message from the 4th param and print/return it + // if applicable. + libsql::ffi::sqlite3_load_extension( + this.conn.handle(), + ext.to_str().unwrap().as_ptr() as *const _, + std::ptr::null(), + std::ptr::null_mut(), + ) + }; + if rc != libsql::ffi::SQLITE_OK as i32 { + tracing::error!("failed to load extension: {}", ext.display()); + Err(libsql::Error::LibError(rc))?; } + tracing::debug!("Loaded extension {}", ext.display()); } Ok(this) @@ -354,26 +354,28 @@ impl<'a> Connection<'a> { let cols = stmt.columns(); let cols_count = cols.len(); - builder.cols_description(cols.iter())?; - drop(cols); + builder.cols_description(cols.into_iter())?; query .params .bind(&mut stmt) .map_err(Error::LibSqlInvalidQueryParams)?; - let mut qresult = stmt.raw_query(); - builder.begin_rows()?; - while let Some(row) = qresult.next()? { - builder.begin_row()?; - for i in 0..cols_count { - let val = row.get_ref(i)?; - builder.add_row_value(val)?; + // FIXME: in current libsql implementation, the error will only be returned + // upon first call to `next()`. Let's reconsider? That's not very intuitive. + if let Some(qresult) = stmt.execute(&libsql::Params::None) { + builder.begin_rows()?; + while let Some(row) = qresult.next()? { + builder.begin_row()?; + for i in 0..cols_count { + let val = row.get_ref(i as i32)?; + builder.add_row_value(val)?; + } + builder.finish_row()?; } - builder.finish_row()?; - } - builder.finish_rows()?; + builder.finish_rows()?; + } // sqlite3_changes() is only modified for INSERT, UPDATE or DELETE; it is not reset for SELECT, // but we want to return 0 in that case. @@ -389,8 +391,6 @@ impl<'a> Connection<'a> { false => None, }; - drop(qresult); - self.update_stats(&stmt); Ok((affected_row_count, last_insert_rowid)) @@ -400,9 +400,10 @@ impl<'a> Connection<'a> { let _ = self.conn.execute("ROLLBACK", ()); } - fn update_stats(&self, stmt: &rusqlite::Statement) { - let rows_read = stmt.get_status(StatementStatus::RowsRead); - let rows_written = stmt.get_status(StatementStatus::RowsWritten); + fn update_stats(&self, stmt: &libsql::Statement) { + use sqld_libsql_bindings::ffi; + let rows_read = stmt.get_status(ffi::LIBSQL_STMTSTATUS_ROWS_READ as i32); + let rows_written = stmt.get_status(ffi::LIBSQL_STMTSTATUS_ROWS_WRITTEN as i32); let rows_read = if rows_read == 0 && rows_written == 0 { 1 } else { @@ -417,7 +418,7 @@ impl<'a> Connection<'a> { let params = (1..=stmt.parameter_count()) .map(|param_i| { - let name = stmt.parameter_name(param_i).map(|n| n.into()); + let name = stmt.parameter_name(param_i as i32).map(|n| n.into()); DescribeParam { name } }) .collect(); diff --git a/sqld/src/database/write_proxy.rs b/sqld/src/database/write_proxy.rs index d31ea6ba..c34da35f 100644 --- a/sqld/src/database/write_proxy.rs +++ b/sqld/src/database/write_proxy.rs @@ -2,7 +2,6 @@ use std::path::PathBuf; use std::sync::Arc; use parking_lot::Mutex as PMutex; -use rusqlite::types::ValueRef; use sqld_libsql_bindings::wal_hook::TRANSPARENT_METHODS; use tokio::sync::{watch, Mutex}; use tonic::transport::Channel; @@ -25,6 +24,7 @@ use crate::Result; use super::config::DatabaseConfigStore; use super::Program; use super::{factory::DbFactory, libsql::LibSqlDb, Database, DescribeResult}; +use libsql::params::ValueRef; #[derive(Clone)] pub struct WriteProxyDbFactory { @@ -108,7 +108,7 @@ fn execute_results_to_builder( builder.begin_step()?; builder.cols_description(rows.column_descriptions.iter().map(|c| Column { name: &c.name, - decl_ty: c.decltype.as_deref(), + decl_type: c.decltype.as_deref(), }))?; builder.begin_rows()?; diff --git a/sqld/src/error.rs b/sqld/src/error.rs index f34a3ff3..b36e9d8b 100644 --- a/sqld/src/error.rs +++ b/sqld/src/error.rs @@ -12,7 +12,7 @@ pub enum Error { #[error(transparent)] IOError(#[from] std::io::Error), #[error(transparent)] - RusqliteError(#[from] rusqlite::Error), + LibSqlError(#[from] libsql::Error), #[error("Failed to execute query via RPC. Error code: {}, message: {}", .0.code, .0.message)] RpcQueryError(crate::rpc::proxy::rpc::Error), #[error("Failed to execute queries via RPC protocol: `{0}`")] diff --git a/sqld/src/hrana/result_builder.rs b/sqld/src/hrana/result_builder.rs index 1cd6aaa1..5a23b677 100644 --- a/sqld/src/hrana/result_builder.rs +++ b/sqld/src/hrana/result_builder.rs @@ -2,7 +2,7 @@ use std::fmt::{self, Write as _}; use std::io; use bytes::Bytes; -use rusqlite::types::ValueRef; +use libsql::params::ValueRef; use crate::hrana::stmt::{proto_error_from_stmt_error, stmt_error_from_sqld_error}; use crate::query_result_builder::{ @@ -115,7 +115,7 @@ impl QueryResultBuilder for SingleStatementBuilder { cols_size += estimate_cols_json_size(&c); proto::Col { name: Some(c.name.to_owned()), - decltype: c.decl_ty.map(ToString::to_string), + decltype: c.decl_type.map(ToString::to_string), } })); @@ -207,7 +207,7 @@ fn estimate_cols_json_size(c: &Column) -> u64 { &mut f, r#"{{"name":"{}","decltype":"{}"}}"#, c.name, - c.decl_ty.unwrap_or("null") + c.decl_type.unwrap_or("null") ) .unwrap(); f.0 diff --git a/sqld/src/hrana/stmt.rs b/sqld/src/hrana/stmt.rs index 88bccb8e..ea1b0e49 100644 --- a/sqld/src/hrana/stmt.rs +++ b/sqld/src/hrana/stmt.rs @@ -1,4 +1,5 @@ use anyhow::{anyhow, bail, Result}; +use sqld_libsql_bindings::ffi; use std::collections::HashMap; use super::result_builder::SingleStatementBuilder; @@ -31,16 +32,9 @@ pub enum StmtError { TransactionBusy, #[error("SQLite error: {message}")] SqliteError { - source: rusqlite::ffi::Error, + source: libsql::Error, message: String, }, - #[error("SQL input error: {message} (at offset {offset})")] - SqlInputError { - source: rusqlite::ffi::Error, - message: String, - offset: i32, - }, - #[error("Operation was blocked{}", .reason.as_ref().map(|msg| format!(": {}", msg)).unwrap_or_default())] Blocked { reason: Option }, #[error("Response is too large")] @@ -202,26 +196,9 @@ pub fn stmt_error_from_sqld_error(sqld_error: SqldError) -> Result StmtError::Blocked { reason }, - SqldError::RusqliteError(rusqlite_error) => match rusqlite_error { - rusqlite::Error::SqliteFailure(sqlite_error, Some(message)) => StmtError::SqliteError { - source: sqlite_error, - message, - }, - rusqlite::Error::SqliteFailure(sqlite_error, None) => StmtError::SqliteError { - message: sqlite_error.to_string(), - source: sqlite_error, - }, - rusqlite::Error::SqlInputError { - error: sqlite_error, - msg: message, - offset, - .. - } => StmtError::SqlInputError { - source: sqlite_error, - message, - offset, - }, - rusqlite_error => return Err(SqldError::RusqliteError(rusqlite_error)), + SqldError::LibSqlError(libsql::Error::LibError(rc)) => StmtError::SqliteError { + message: libsql::errors::error_from_code(rc), + source: libsql::Error::LibError(rc), }, sqld_error => return Err(sqld_error), }) @@ -244,40 +221,41 @@ impl StmtError { Self::ArgsBothPositionalAndNamed => "ARGS_BOTH_POSITIONAL_AND_NAMED", Self::TransactionTimeout => "TRANSACTION_TIMEOUT", Self::TransactionBusy => "TRANSACTION_BUSY", - Self::SqliteError { source, .. } => sqlite_error_code(source.code), - Self::SqlInputError { .. } => "SQL_INPUT_ERROR", + Self::SqliteError { source, .. } => sqlite_error_code(source), Self::Blocked { .. } => "BLOCKED", Self::ResponseTooLarge => "RESPONSE_TOO_LARGE", } } } -fn sqlite_error_code(code: rusqlite::ffi::ErrorCode) -> &'static str { - match code { - rusqlite::ErrorCode::InternalMalfunction => "SQLITE_INTERNAL", - rusqlite::ErrorCode::PermissionDenied => "SQLITE_PERM", - rusqlite::ErrorCode::OperationAborted => "SQLITE_ABORT", - rusqlite::ErrorCode::DatabaseBusy => "SQLITE_BUSY", - rusqlite::ErrorCode::DatabaseLocked => "SQLITE_LOCKED", - rusqlite::ErrorCode::OutOfMemory => "SQLITE_NOMEM", - rusqlite::ErrorCode::ReadOnly => "SQLITE_READONLY", - rusqlite::ErrorCode::OperationInterrupted => "SQLITE_INTERRUPT", - rusqlite::ErrorCode::SystemIoFailure => "SQLITE_IOERR", - rusqlite::ErrorCode::DatabaseCorrupt => "SQLITE_CORRUPT", - rusqlite::ErrorCode::NotFound => "SQLITE_NOTFOUND", - rusqlite::ErrorCode::DiskFull => "SQLITE_FULL", - rusqlite::ErrorCode::CannotOpen => "SQLITE_CANTOPEN", - rusqlite::ErrorCode::FileLockingProtocolFailed => "SQLITE_PROTOCOL", - rusqlite::ErrorCode::SchemaChanged => "SQLITE_SCHEMA", - rusqlite::ErrorCode::TooBig => "SQLITE_TOOBIG", - rusqlite::ErrorCode::ConstraintViolation => "SQLITE_CONSTRAINT", - rusqlite::ErrorCode::TypeMismatch => "SQLITE_MISMATCH", - rusqlite::ErrorCode::ApiMisuse => "SQLITE_MISUSE", - rusqlite::ErrorCode::NoLargeFileSupport => "SQLITE_NOLFS", - rusqlite::ErrorCode::AuthorizationForStatementDenied => "SQLITE_AUTH", - rusqlite::ErrorCode::ParameterOutOfRange => "SQLITE_RANGE", - rusqlite::ErrorCode::NotADatabase => "SQLITE_NOTADB", - rusqlite::ErrorCode::Unknown => "SQLITE_UNKNOWN", +fn sqlite_error_code(err: &libsql::Error) -> &'static str { + match err { + libsql::Error::LibError(code) => match (*code) as u32 { + ffi::SQLITE_INTERNAL => "SQLITE_INTERNAL", + ffi::SQLITE_PERM => "SQLITE_PERM", + ffi::SQLITE_ABORT => "SQLITE_ABORT", + ffi::SQLITE_BUSY => "SQLITE_BUSY", + ffi::SQLITE_LOCKED => "SQLITE_LOCKED", + ffi::SQLITE_NOMEM => "SQLITE_NOMEM", + ffi::SQLITE_READONLY => "SQLITE_READONLY", + ffi::SQLITE_INTERRUPT => "SQLITE_INTERRUPT", + ffi::SQLITE_IOERR => "SQLITE_IOERR", + ffi::SQLITE_CORRUPT => "SQLITE_CORRUPT", + ffi::SQLITE_NOTFOUND => "SQLITE_NOTFOUND", + ffi::SQLITE_FULL => "SQLITE_FULL", + ffi::SQLITE_CANTOPEN => "SQLITE_CANTOPEN", + ffi::SQLITE_PROTOCOL => "SQLITE_PROTOCOL", + ffi::SQLITE_SCHEMA => "SQLITE_SCHEMA", + ffi::SQLITE_TOOBIG => "SQLITE_TOOBIG", + ffi::SQLITE_CONSTRAINT => "SQLITE_CONSTRAINT", + ffi::SQLITE_MISMATCH => "SQLITE_MISMATCH", + ffi::SQLITE_MISUSE => "SQLITE_MISUSE", + ffi::SQLITE_NOLFS => "SQLITE_NOLFS", + ffi::SQLITE_AUTH => "SQLITE_AUTH", + ffi::SQLITE_RANGE => "SQLITE_RANGE", + ffi::SQLITE_NOTADB => "SQLITE_NOTADB", + _ => "SQLITE_UNKNOWN", + }, _ => "SQLITE_UNKNOWN", } } diff --git a/sqld/src/http/hrana_over_http_1.rs b/sqld/src/http/hrana_over_http_1.rs index edda8958..6138d57a 100644 --- a/sqld/src/http/hrana_over_http_1.rs +++ b/sqld/src/http/hrana_over_http_1.rs @@ -137,7 +137,6 @@ fn response_error_response(err: ResponseError) -> hyper::Response { | StmtError::SqlNoStmt | StmtError::SqlManyStmts | StmtError::ArgsInvalid { .. } - | StmtError::SqlInputError { .. } | StmtError::ResponseTooLarge | StmtError::Blocked { .. } => hyper::StatusCode::BAD_REQUEST, StmtError::ArgsBothPositionalAndNamed => hyper::StatusCode::NOT_IMPLEMENTED, diff --git a/sqld/src/http/result_builder.rs b/sqld/src/http/result_builder.rs index 1b00e8f8..3a40f17d 100644 --- a/sqld/src/http/result_builder.rs +++ b/sqld/src/http/result_builder.rs @@ -1,7 +1,7 @@ +use libsql::params::ValueRef; use std::io; use std::ops::{Deref, DerefMut}; -use rusqlite::types::ValueRef; use serde::{Serialize, Serializer}; use serde_json::ser::{CompactFormatter, Formatter}; @@ -70,7 +70,7 @@ impl io::Write for LimitBuffer { } } -struct HttpJsonValueSerializer<'a>(&'a ValueRef<'a>); +struct HttpJsonValueSerializer<'a>(&'a libsql::params::ValueRef<'a>); impl JsonHttpPayloadBuilder { pub fn new() -> Self { diff --git a/sqld/src/lib.rs b/sqld/src/lib.rs index a144e744..bf9e8a72 100644 --- a/sqld/src/lib.rs +++ b/sqld/src/lib.rs @@ -5,10 +5,10 @@ use std::sync::mpsc::RecvTimeoutError; use std::sync::Arc; use std::time::Duration; +use crate::libsql::wal_hook::TRANSPARENT_METHODS; use anyhow::Context as AnyhowContext; use enclose::enclose; use futures::never::Never; -use libsql::wal_hook::TRANSPARENT_METHODS; use once_cell::sync::Lazy; use rpc::run_rpc_server; use tokio::sync::{mpsc, Notify}; @@ -551,7 +551,7 @@ async fn run_storage_monitor(db_path: PathBuf, stats: Stats) -> anyhow::Result<( // initialize a connection here, and keep it alive for the entirety of the program. If we // fail to open it, we wait for `duration` and try again later. let ctx = &mut (); - let maybe_conn = match open_db(&db_path, &TRANSPARENT_METHODS, ctx, Some(rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY)) { + let maybe_conn = match open_db(&db_path, &TRANSPARENT_METHODS, ctx, Some(sqld_libsql_bindings::ffi::SQLITE_OPEN_READONLY as i32)) { Ok(conn) => Some(conn), Err(e) => { tracing::warn!("failed to open connection for storager monitor: {e}, trying again in {duration:?}"); @@ -561,12 +561,14 @@ async fn run_storage_monitor(db_path: PathBuf, stats: Stats) -> anyhow::Result<( loop { if let Some(ref conn) = maybe_conn { - if let Ok(storage_bytes_used) = - conn.query_row("select sum(pgsize) from dbstat;", [], |row| { - row.get::(0) - }) + if let Ok(Some(rows)) = + conn.execute("select sum(pgsize) from dbstat;", ()) { - stats.set_storage_bytes_used(storage_bytes_used); + if let Ok(Some(storage_bytes_used)) = rows.next() { + if let Ok(storage_bytes_used) = storage_bytes_used.get(0) { + stats.set_storage_bytes_used(storage_bytes_used); + } + } } } diff --git a/sqld/src/main.rs b/sqld/src/main.rs index f05d9b9e..1858244e 100644 --- a/sqld/src/main.rs +++ b/sqld/src/main.rs @@ -312,7 +312,7 @@ fn perform_dump(dump_path: Option<&Path>, db_path: &Path) -> anyhow::Result<()> } None => Box::new(stdout()), }; - let conn = rusqlite::Connection::open(db_path.join("data"))?; + let conn = libsql::Database::open(db_path.join("data").to_str().unwrap())?.connect()?; export_dump(conn, out)?; @@ -329,9 +329,11 @@ fn enable_libsql_logging() { tracing::error!("sqlite error {code}: {msg}"); } + /* FIXME: introduce tracing to libsql/libsql_sys ONCE.call_once(|| unsafe { rusqlite::trace::config_log(Some(libsql_log)).unwrap(); }); + */ } #[tokio::main] diff --git a/sqld/src/query.rs b/sqld/src/query.rs index 3d1939ac..82da7110 100644 --- a/sqld/src/query.rs +++ b/sqld/src/query.rs @@ -1,8 +1,6 @@ use std::collections::HashMap; use anyhow::{anyhow, ensure, Context}; -use rusqlite::types::{ToSqlOutput, ValueRef}; -use rusqlite::ToSql; use serde::{Deserialize, Serialize}; use crate::query_analysis::Statement; @@ -18,28 +16,28 @@ pub enum Value { Blob(Vec), } -impl<'a> From<&'a Value> for ValueRef<'a> { +impl<'a> From<&'a Value> for libsql::params::ValueRef<'a> { fn from(value: &'a Value) -> Self { match value { - Value::Null => ValueRef::Null, - Value::Integer(i) => ValueRef::Integer(*i), - Value::Real(x) => ValueRef::Real(*x), - Value::Text(s) => ValueRef::Text(s.as_bytes()), - Value::Blob(b) => ValueRef::Blob(b.as_slice()), + Value::Null => libsql::params::ValueRef::Null, + Value::Integer(i) => libsql::params::ValueRef::Integer(*i), + Value::Real(x) => libsql::params::ValueRef::Real(*x), + Value::Text(s) => libsql::params::ValueRef::Text(s.as_bytes()), + Value::Blob(b) => libsql::params::ValueRef::Blob(b), } } } -impl TryFrom> for Value { +impl TryFrom> for Value { type Error = anyhow::Error; - fn try_from(value: rusqlite::types::ValueRef<'_>) -> anyhow::Result { + fn try_from(value: libsql::params::ValueRef<'_>) -> anyhow::Result { let val = match value { - rusqlite::types::ValueRef::Null => Value::Null, - rusqlite::types::ValueRef::Integer(i) => Value::Integer(i), - rusqlite::types::ValueRef::Real(x) => Value::Real(x), - rusqlite::types::ValueRef::Text(s) => Value::Text(String::from_utf8(Vec::from(s))?), - rusqlite::types::ValueRef::Blob(b) => Value::Blob(Vec::from(b)), + libsql::params::ValueRef::Null => Value::Null, + libsql::params::ValueRef::Integer(i) => Value::Integer(i), + libsql::params::ValueRef::Real(x) => Value::Real(x), + libsql::params::ValueRef::Text(s) => Value::Text(String::from_utf8(Vec::from(s))?), + libsql::params::ValueRef::Blob(b) => Value::Blob(Vec::from(b)), }; Ok(val) @@ -53,20 +51,6 @@ pub struct Query { pub want_rows: bool, } -impl ToSql for Value { - fn to_sql(&self) -> rusqlite::Result> { - let val = match self { - Value::Null => ToSqlOutput::Owned(rusqlite::types::Value::Null), - Value::Integer(i) => ToSqlOutput::Owned(rusqlite::types::Value::Integer(*i)), - Value::Real(x) => ToSqlOutput::Owned(rusqlite::types::Value::Real(*x)), - Value::Text(s) => ToSqlOutput::Borrowed(rusqlite::types::ValueRef::Text(s.as_bytes())), - Value::Blob(b) => ToSqlOutput::Borrowed(rusqlite::types::ValueRef::Blob(b)), - }; - - Ok(val) - } -} - #[derive(Debug, Serialize, Clone)] pub enum Params { Named(HashMap), @@ -108,7 +92,7 @@ impl Params { } } - pub fn bind(&self, stmt: &mut rusqlite::Statement) -> anyhow::Result<()> { + pub fn bind(&self, stmt: &mut libsql::Statement) -> anyhow::Result<()> { let param_count = stmt.parameter_count(); ensure!( param_count >= self.len(), @@ -120,7 +104,7 @@ impl Params { for index in 1..=param_count { let mut param_name = None; // get by name - let maybe_value = match stmt.parameter_name(index) { + let maybe_value = match stmt.parameter_name(index as i32) { Some(name) => { param_name = Some(name); let mut chars = name.chars(); @@ -140,7 +124,7 @@ impl Params { }; if let Some(value) = maybe_value { - stmt.raw_bind_parameter(index, value)?; + stmt.bind_value(index as i32, value.try_into()?); } else if let Some(name) = param_name { return Err(anyhow!("value for parameter {} not found", name)); } else { @@ -159,7 +143,10 @@ mod test { #[test] fn test_bind_params_positional_simple() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con.prepare("SELECT ?").unwrap(); let params = Params::new_positional(vec![Value::Integer(10)]); params.bind(&mut stmt).unwrap(); @@ -169,7 +156,10 @@ mod test { #[test] fn test_bind_params_positional_numbered() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con.prepare("SELECT ? || ?2 || ?1").unwrap(); let params = Params::new_positional(vec![Value::Integer(10), Value::Integer(20)]); params.bind(&mut stmt).unwrap(); @@ -179,7 +169,10 @@ mod test { #[test] fn test_bind_params_positional_named() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con.prepare("SELECT :first || $second").unwrap(); let mut params = HashMap::new(); params.insert(":first".to_owned(), Value::Integer(10)); @@ -192,7 +185,10 @@ mod test { #[test] fn test_bind_params_positional_named_no_prefix() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con.prepare("SELECT :first || $second").unwrap(); let mut params = HashMap::new(); params.insert("first".to_owned(), Value::Integer(10)); @@ -205,7 +201,10 @@ mod test { #[test] fn test_bind_params_positional_named_conflict() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con.prepare("SELECT :first || $first").unwrap(); let mut params = HashMap::new(); params.insert("first".to_owned(), Value::Integer(10)); @@ -218,7 +217,10 @@ mod test { #[test] fn test_bind_params_positional_named_repeated() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con .prepare("SELECT :first || $second || $first || $second") .unwrap(); @@ -233,7 +235,10 @@ mod test { #[test] fn test_bind_params_too_many_params() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con.prepare("SELECT :first || $second").unwrap(); let mut params = HashMap::new(); params.insert(":first".to_owned(), Value::Integer(10)); @@ -245,7 +250,10 @@ mod test { #[test] fn test_bind_params_too_few_params() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con.prepare("SELECT :first || $second").unwrap(); let mut params = HashMap::new(); params.insert(":first".to_owned(), Value::Integer(10)); @@ -255,7 +263,10 @@ mod test { #[test] fn test_bind_params_invalid_positional() { - let con = rusqlite::Connection::open_in_memory().unwrap(); + let con = libsql::Database::open(":memory:") + .unwrap() + .connect() + .unwrap(); let mut stmt = con.prepare("SELECT ?invalid").unwrap(); let params = Params::empty(); assert!(params.bind(&mut stmt).is_err()); diff --git a/sqld/src/query_result_builder.rs b/sqld/src/query_result_builder.rs index a9aeadd7..a47909a7 100644 --- a/sqld/src/query_result_builder.rs +++ b/sqld/src/query_result_builder.rs @@ -3,10 +3,12 @@ use std::io::{self, ErrorKind}; use std::ops::{Deref, DerefMut}; use bytesize::ByteSize; -use rusqlite::types::ValueRef; +use libsql::params::ValueRef; use serde::Serialize; use serde_json::ser::Formatter; +pub use libsql::Column; + #[derive(Debug)] pub enum QueryResultBuilderError { ResponseTooLarge(u64), @@ -54,28 +56,6 @@ impl From for QueryResultBuilderError { } } -/// Identical to rusqlite::Column, with visible fields. -#[cfg_attr(test, derive(arbitrary::Arbitrary))] -pub struct Column<'a> { - pub(crate) name: &'a str, - pub(crate) decl_ty: Option<&'a str>, -} - -impl<'a> From<(&'a str, Option<&'a str>)> for Column<'a> { - fn from((name, decl_ty): (&'a str, Option<&'a str>)) -> Self { - Self { name, decl_ty } - } -} - -impl<'a> From<&'a rusqlite::Column<'a>> for Column<'a> { - fn from(value: &'a rusqlite::Column<'a>) -> Self { - Self { - name: value.name(), - decl_ty: value.decl_type(), - } - } -} - #[derive(Debug, Clone, Copy, Default)] pub struct QueryBuilderConfig { pub max_size: Option, @@ -107,7 +87,8 @@ pub trait QueryResultBuilder: Send + 'static { /// begin a new row for the current step fn begin_row(&mut self) -> Result<(), QueryResultBuilderError>; /// add value to current row - fn add_row_value(&mut self, v: ValueRef) -> Result<(), QueryResultBuilderError>; + fn add_row_value(&mut self, v: libsql::params::ValueRef) + -> Result<(), QueryResultBuilderError>; /// finish current row fn finish_row(&mut self) -> Result<(), QueryResultBuilderError>; /// end adding rows @@ -588,14 +569,14 @@ pub mod test { Blob(&'a [u8]), } - impl<'a> From> for rusqlite::types::ValueRef<'a> { + impl<'a> From> for libsql::params::ValueRef<'a> { fn from(value: ValueRef<'a>) -> Self { match value { - ValueRef::Null => rusqlite::types::ValueRef::Null, - ValueRef::Integer(i) => rusqlite::types::ValueRef::Integer(i), - ValueRef::Real(x) => rusqlite::types::ValueRef::Real(x), - ValueRef::Text(s) => rusqlite::types::ValueRef::Text(s.as_bytes()), - ValueRef::Blob(b) => rusqlite::types::ValueRef::Blob(b), + ValueRef::Null => libsql::params::Value::Null, + ValueRef::Integer(i) => libsql::params::Value::Integer(i), + ValueRef::Real(x) => libsql::params::Value::Float(x), + ValueRef::Text(s) => libsql::params::Value::Text(s.as_bytes()), + ValueRef::Blob(b) => libsql::params::Value::Blob(b), } } } diff --git a/sqld/src/replication/primary/logger.rs b/sqld/src/replication/primary/logger.rs index 6ec012dc..bb69faf1 100644 --- a/sqld/src/replication/primary/logger.rs +++ b/sqld/src/replication/primary/logger.rs @@ -86,7 +86,7 @@ unsafe impl WalHook for ReplicationLoggerHook { if let Err(e) = ctx.flush(ntruncate) { tracing::error!("error writing to replication log: {e}"); // returning IO_ERR ensure that xUndo will be called by sqlite. - return SQLITE_IOERR; + return SQLITE_IOERR as c_int; } let rc = unsafe { @@ -149,7 +149,7 @@ unsafe impl WalHook for ReplicationLoggerHook { fn on_savepoint_undo(wal: &mut Wal, wal_data: *mut u32, orig: XWalSavePointUndoFn) -> i32 { let rc = unsafe { orig(wal, wal_data) }; - if rc != SQLITE_OK { + if rc != SQLITE_OK as i32 { return rc; }; @@ -193,9 +193,9 @@ unsafe impl WalHook for ReplicationLoggerHook { ** In order to avoid autocheckpoint on close (that's too often), ** checkpoint attempts weaker than TRUNCATE are ignored. */ - if emode < SQLITE_CHECKPOINT_TRUNCATE { + if emode < SQLITE_CHECKPOINT_TRUNCATE as i32 { tracing::trace!("Ignoring a checkpoint request weaker than TRUNCATE"); - return SQLITE_OK; + return SQLITE_OK as i32; } } let rc = unsafe { @@ -213,7 +213,7 @@ unsafe impl WalHook for ReplicationLoggerHook { ) }; - if rc != SQLITE_OK { + if rc != SQLITE_OK as i32 { return rc; } @@ -226,7 +226,7 @@ unsafe impl WalHook for ReplicationLoggerHook { let mut replicator = replicator.lock().unwrap(); if replicator.commits_in_current_generation() == 0 { tracing::debug!("No commits happened in this generation, not snapshotting"); - return SQLITE_OK; + return SQLITE_OK as i32; } let last_known_frame = replicator.last_known_frame(); replicator.request_flush(); @@ -237,18 +237,18 @@ unsafe impl WalHook for ReplicationLoggerHook { last_known_frame, e ); - return SQLITE_IOERR_WRITE; + return SQLITE_IOERR_WRITE as i32; } replicator.new_generation(); if let Err(e) = runtime.block_on(async move { replicator.snapshot_main_db_file().await }) { tracing::error!("Failed to snapshot the main db file during checkpoint: {e}"); - return SQLITE_IOERR_WRITE; + return SQLITE_IOERR_WRITE as i32; } } } - SQLITE_OK + SQLITE_OK as i32 } } @@ -887,21 +887,24 @@ impl ReplicationLogger { fn checkpoint_db(data_path: &Path) -> anyhow::Result<()> { unsafe { - let conn = rusqlite::Connection::open(data_path)?; - conn.pragma_query(None, "page_size", |row| { - let page_size = row.get::<_, i32>(0).unwrap(); + // unwrap: data_path is expected to be a validated path + let conn = libsql::Database::open(data_path.to_str().unwrap())?.connect()?; + if let Ok(row) = conn + .prepare("pragma page_size")? + .query_row(&libsql::Params::None) + { + let page_size: i32 = row.get(0).unwrap(); assert_eq!( page_size, WAL_PAGE_SIZE, "invalid database file, expected page size to be {}, but found {} instead", WAL_PAGE_SIZE, page_size ); - Ok(()) - })?; + } let mut num_checkpointed: c_int = 0; - let rc = rusqlite::ffi::sqlite3_wal_checkpoint_v2( + let rc = sqld_libsql_bindings::ffi::sqlite3_wal_checkpoint_v2( conn.handle(), std::ptr::null(), - SQLITE_CHECKPOINT_TRUNCATE, + SQLITE_CHECKPOINT_TRUNCATE as i32, &mut num_checkpointed as *mut _, std::ptr::null_mut(), ); diff --git a/sqld/src/replication/replica/hook.rs b/sqld/src/replication/replica/hook.rs index 75fe6882..b12f6808 100644 --- a/sqld/src/replication/replica/hook.rs +++ b/sqld/src/replication/replica/hook.rs @@ -1,8 +1,7 @@ use std::ffi::{c_int, CStr}; use std::marker::PhantomData; -use rusqlite::ffi::{PgHdr, SQLITE_ERROR}; -use sqld_libsql_bindings::ffi::Wal; +use sqld_libsql_bindings::ffi::{PgHdr, Wal, SQLITE_ERROR}; use sqld_libsql_bindings::init_static_wal_method; use sqld_libsql_bindings::{ffi::types::XWalFrameFn, wal_hook::WalHook}; @@ -167,7 +166,7 @@ unsafe impl WalHook for InjectorHook { if let Err(e) = ret { tracing::error!("replication error: {e}"); - return SQLITE_ERROR; + return SQLITE_ERROR as c_int; } if !ctx.is_txn { diff --git a/sqld/src/replication/replica/injector.rs b/sqld/src/replication/replica/injector.rs index 3965da10..11d0d252 100644 --- a/sqld/src/replication/replica/injector.rs +++ b/sqld/src/replication/replica/injector.rs @@ -1,7 +1,6 @@ +use sqld_libsql_bindings::ffi; use std::path::Path; -use rusqlite::OpenFlags; - use crate::replication::replica::hook::{SQLITE_CONTINUE_REPLICATION, SQLITE_EXIT_REPLICATION}; use super::hook::{InjectorHookCtx, INJECTOR_METHODS}; @@ -14,30 +13,31 @@ impl<'a> FrameInjector<'a> { pub fn new(db_path: &Path, hook_ctx: &'a mut InjectorHookCtx) -> anyhow::Result { let conn = sqld_libsql_bindings::Connection::open( db_path, - OpenFlags::SQLITE_OPEN_READ_WRITE - | OpenFlags::SQLITE_OPEN_CREATE - | OpenFlags::SQLITE_OPEN_URI - | OpenFlags::SQLITE_OPEN_NO_MUTEX, + (ffi::SQLITE_OPEN_READWRITE + | ffi::SQLITE_OPEN_CREATE + | ffi::SQLITE_OPEN_URI + | ffi::SQLITE_OPEN_NOMUTEX) as i32, &INJECTOR_METHODS, hook_ctx, - )?; + ) + .map_err(|rc| libsql::Error::LibError(rc))?; Ok(Self { conn }) } pub fn step(&mut self) -> anyhow::Result { - self.conn.pragma_update(None, "writable_schema", "on")?; + self.conn.execute("pragma writable_schema=on", ())?; let res = self.conn.execute("create table __dummy__ (dummy);", ()); match res { Ok(_) => panic!("replication hook was not called"), Err(e) => { - if let Some(e) = e.sqlite_error() { - if e.extended_code == SQLITE_EXIT_REPLICATION { - self.conn.pragma_update(None, "writable_schema", "reset")?; + if let libsql::Error::LibError(rc) = e { + if rc == SQLITE_EXIT_REPLICATION { + self.conn.execute("pragma writable_schema=reset", ())?; return Ok(false); } - if e.extended_code == SQLITE_CONTINUE_REPLICATION { + if rc == SQLITE_CONTINUE_REPLICATION { return Ok(true); } } diff --git a/sqld/src/rpc/proxy.rs b/sqld/src/rpc/proxy.rs index 6ed1c389..993dbb9d 100644 --- a/sqld/src/rpc/proxy.rs +++ b/sqld/src/rpc/proxy.rs @@ -353,7 +353,7 @@ impl QueryResultBuilder for ExecuteResultBuilder { for col in cols { let col = col.into(); let col_len = - (col.decl_ty.map(|s| s.len()).unwrap_or_default() + col.name.len()) as u64; + (col.decl_type.map(|s| s.len()).unwrap_or_default() + col.name.len()) as u64; if col_len + self.current_step_size + self.current_size > self.max_size { return Err(QueryResultBuilderError::ResponseTooLarge(self.max_size)); } @@ -361,7 +361,7 @@ impl QueryResultBuilder for ExecuteResultBuilder { let col = rpc::Column { name: col.name.to_owned(), - decltype: col.decl_ty.map(ToString::to_string), + decltype: col.decl_type.map(ToString::to_string), }; self.current_col_description.push(col); @@ -380,7 +380,7 @@ impl QueryResultBuilder for ExecuteResultBuilder { fn add_row_value( &mut self, - v: rusqlite::types::ValueRef, + v: libsql::params::ValueRef, ) -> Result<(), QueryResultBuilderError> { let data = bincode::serialize( &crate::query::Value::try_from(v).map_err(QueryResultBuilderError::from_any)?,