1 //! [Unlock Notification](http://sqlite.org/unlock_notify.html)
2 
3 use std::os::raw::c_int;
4 use std::os::raw::c_void;
5 use std::panic::catch_unwind;
6 use std::sync::{Condvar, Mutex};
7 
8 use crate::ffi;
9 
10 struct UnlockNotification {
11     cond: Condvar,      // Condition variable to wait on
12     mutex: Mutex<bool>, // Mutex to protect structure
13 }
14 
15 #[allow(clippy::mutex_atomic)]
16 impl UnlockNotification {
new() -> UnlockNotification17     fn new() -> UnlockNotification {
18         UnlockNotification {
19             cond: Condvar::new(),
20             mutex: Mutex::new(false),
21         }
22     }
23 
fired(&self)24     fn fired(&self) {
25         let mut flag = unpoison(self.mutex.lock());
26         *flag = true;
27         self.cond.notify_one();
28     }
29 
wait(&self)30     fn wait(&self) {
31         let mut fired = unpoison(self.mutex.lock());
32         while !*fired {
33             fired = unpoison(self.cond.wait(fired));
34         }
35     }
36 }
37 
38 #[inline]
unpoison<T>(r: Result<T, std::sync::PoisonError<T>>) -> T39 fn unpoison<T>(r: Result<T, std::sync::PoisonError<T>>) -> T {
40     r.unwrap_or_else(std::sync::PoisonError::into_inner)
41 }
42 
43 /// This function is an unlock-notify callback
unlock_notify_cb(ap_arg: *mut *mut c_void, n_arg: c_int)44 unsafe extern "C" fn unlock_notify_cb(ap_arg: *mut *mut c_void, n_arg: c_int) {
45     use std::slice::from_raw_parts;
46     let args = from_raw_parts(ap_arg as *const &UnlockNotification, n_arg as usize);
47     for un in args {
48         drop(catch_unwind(std::panic::AssertUnwindSafe(|| un.fired())));
49     }
50 }
51 
is_locked(db: *mut ffi::sqlite3, rc: c_int) -> bool52 pub unsafe fn is_locked(db: *mut ffi::sqlite3, rc: c_int) -> bool {
53     rc == ffi::SQLITE_LOCKED_SHAREDCACHE
54         || (rc & 0xFF) == ffi::SQLITE_LOCKED
55             && ffi::sqlite3_extended_errcode(db) == ffi::SQLITE_LOCKED_SHAREDCACHE
56 }
57 
58 /// This function assumes that an SQLite API call (either `sqlite3_prepare_v2()`
59 /// or `sqlite3_step()`) has just returned `SQLITE_LOCKED`. The argument is the
60 /// associated database connection.
61 ///
62 /// This function calls `sqlite3_unlock_notify()` to register for an
63 /// unlock-notify callback, then blocks until that callback is delivered
64 /// and returns `SQLITE_OK`. The caller should then retry the failed operation.
65 ///
66 /// Or, if `sqlite3_unlock_notify()` indicates that to block would deadlock
67 /// the system, then this function returns `SQLITE_LOCKED` immediately. In
68 /// this case the caller should not retry the operation and should roll
69 /// back the current transaction (if any).
70 #[cfg(feature = "unlock_notify")]
wait_for_unlock_notify(db: *mut ffi::sqlite3) -> c_int71 pub unsafe fn wait_for_unlock_notify(db: *mut ffi::sqlite3) -> c_int {
72     let un = UnlockNotification::new();
73     /* Register for an unlock-notify callback. */
74     let rc = ffi::sqlite3_unlock_notify(
75         db,
76         Some(unlock_notify_cb),
77         &un as *const UnlockNotification as *mut c_void,
78     );
79     debug_assert!(
80         rc == ffi::SQLITE_LOCKED || rc == ffi::SQLITE_LOCKED_SHAREDCACHE || rc == ffi::SQLITE_OK
81     );
82     if rc == ffi::SQLITE_OK {
83         un.wait();
84     }
85     rc
86 }
87 
88 #[cfg(test)]
89 mod test {
90     use crate::{Connection, OpenFlags, Result, Transaction, TransactionBehavior};
91     use std::sync::mpsc::sync_channel;
92     use std::thread;
93     use std::time;
94 
95     #[test]
test_unlock_notify() -> Result<()>96     fn test_unlock_notify() -> Result<()> {
97         let url = "file::memory:?cache=shared";
98         let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_URI;
99         let db1 = Connection::open_with_flags(url, flags)?;
100         db1.execute_batch("CREATE TABLE foo (x)")?;
101         let (rx, tx) = sync_channel(0);
102         let child = thread::spawn(move || {
103             let mut db2 = Connection::open_with_flags(url, flags).unwrap();
104             let tx2 = Transaction::new(&mut db2, TransactionBehavior::Immediate).unwrap();
105             tx2.execute_batch("INSERT INTO foo VALUES (42)").unwrap();
106             rx.send(1).unwrap();
107             let ten_millis = time::Duration::from_millis(10);
108             thread::sleep(ten_millis);
109             tx2.commit().unwrap();
110         });
111         assert_eq!(tx.recv().unwrap(), 1);
112         let the_answer: i64 = db1.one_column("SELECT x FROM foo")?;
113         assert_eq!(42i64, the_answer);
114         child.join().unwrap();
115         Ok(())
116     }
117 }
118