From eb08795f52358ce90e601fe964c564ad27cf73e0 Mon Sep 17 00:00:00 2001 From: Mura Li Date: Sun, 30 Sep 2018 10:06:56 +0800 Subject: [PATCH] Add support for sqlite3_unlock_notify --- .travis.yml | 2 +- sqlite3.go | 45 +++++- sqlite3_opt_unlock_notify.c | 85 ++++++++++++ sqlite3_opt_unlock_notify.go | 92 +++++++++++++ sqlite3_opt_unlock_notify_test.go | 222 ++++++++++++++++++++++++++++++ 5 files changed, 441 insertions(+), 5 deletions(-) create mode 100644 sqlite3_opt_unlock_notify.c create mode 100644 sqlite3_opt_unlock_notify.go create mode 100644 sqlite3_opt_unlock_notify_test.go diff --git a/.travis.yml b/.travis.yml index ad6c6e9..2ae08be 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,7 @@ env: matrix: - GOTAGS= - GOTAGS=libsqlite3 - - GOTAGS="sqlite_allow_uri_authority sqlite_app_armor sqlite_foreign_keys sqlite_fts5 sqlite_icu sqlite_introspect sqlite_json sqlite_secure_delete sqlite_see sqlite_stat4 sqlite_trace sqlite_userauth sqlite_vacuum_incr sqlite_vtable" + - GOTAGS="sqlite_allow_uri_authority sqlite_app_armor sqlite_foreign_keys sqlite_fts5 sqlite_icu sqlite_introspect sqlite_json sqlite_secure_delete sqlite_see sqlite_stat4 sqlite_trace sqlite_userauth sqlite_vacuum_incr sqlite_vtable sqlite_unlock_notify" - GOTAGS=sqlite_vacuum_full go: diff --git a/sqlite3.go b/sqlite3.go index b17e634..186ee5b 100644 --- a/sqlite3.go +++ b/sqlite3.go @@ -78,8 +78,38 @@ _sqlite3_exec(sqlite3* db, const char* pcmd, long long* rowid, long long* change return rv; } +#ifdef SQLITE_ENABLE_UNLOCK_NOTIFY +extern int sqlite3_step_blocking(sqlite3_stmt *stmt); +extern int _sqlite3_step_blocking(sqlite3_stmt* stmt, long long* rowid, long long* changes); +extern int sqlite3_prepare_v2_blocking(sqlite3 *db, const char *zSql, int nBytes, sqlite3_stmt **ppStmt, const char **pzTail); + static int -_sqlite3_step(sqlite3_stmt* stmt, long long* rowid, long long* changes) +sqlite3_step_internal(sqlite3_stmt *stmt) +{ + return sqlite3_step_blocking(stmt); +} + +static int +_sqlite3_step_internal(sqlite3_stmt* stmt, long long* rowid, long long* changes) +{ + return _sqlite3_step_blocking(stmt, rowid, changes); +} + +static int +sqlite3_prepare_v2_internal(sqlite3 *db, const char *zSql, int nBytes, sqlite3_stmt **ppStmt, const char **pzTail) +{ + return sqlite3_prepare_v2_blocking(db, zSql, nBytes, ppStmt, pzTail); +} + +#else +static int +sqlite3_step_internal(sqlite3_stmt *stmt) +{ + return sqlite3_step(stmt); +} + +static int +_sqlite3_step_internal(sqlite3_stmt* stmt, long long* rowid, long long* changes) { int rv = sqlite3_step(stmt); sqlite3* db = sqlite3_db_handle(stmt); @@ -88,6 +118,13 @@ _sqlite3_step(sqlite3_stmt* stmt, long long* rowid, long long* changes) return rv; } +static int +sqlite3_prepare_v2_internal(sqlite3 *db, const char *zSql, int nBytes, sqlite3_stmt **ppStmt, const char **pzTail) +{ + return sqlite3_prepare_v2(db, zSql, nBytes, ppStmt, pzTail); +} +#endif + void _sqlite3_result_text(sqlite3_context* ctx, const char* s) { sqlite3_result_text(ctx, s, -1, &free); } @@ -1637,7 +1674,7 @@ func (c *SQLiteConn) prepare(ctx context.Context, query string) (driver.Stmt, er defer C.free(unsafe.Pointer(pquery)) var s *C.sqlite3_stmt var tail *C.char - rv := C.sqlite3_prepare_v2(c.db, pquery, -1, &s, &tail) + rv := C.sqlite3_prepare_v2_internal(c.db, pquery, -1, &s, &tail) if rv != C.SQLITE_OK { return nil, c.lastError() } @@ -1871,7 +1908,7 @@ func (s *SQLiteStmt) exec(ctx context.Context, args []namedValue) (driver.Result } var rowid, changes C.longlong - rv := C._sqlite3_step(s.s, &rowid, &changes) + rv := C._sqlite3_step_internal(s.s, &rowid, &changes) if rv != C.SQLITE_ROW && rv != C.SQLITE_OK && rv != C.SQLITE_DONE { err := s.c.lastError() C.sqlite3_reset(s.s) @@ -1943,7 +1980,7 @@ func (rc *SQLiteRows) Next(dest []driver.Value) error { if rc.s.closed { return io.EOF } - rv := C.sqlite3_step(rc.s.s) + rv := C.sqlite3_step_internal(rc.s.s) if rv == C.SQLITE_DONE { return io.EOF } diff --git a/sqlite3_opt_unlock_notify.c b/sqlite3_opt_unlock_notify.c new file mode 100644 index 0000000..c481adb --- /dev/null +++ b/sqlite3_opt_unlock_notify.c @@ -0,0 +1,85 @@ +// Copyright (C) 2018 Yasuhiro Matsumoto . +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +#ifdef SQLITE_ENABLE_UNLOCK_NOTIFY +#include +#include + +extern int unlock_notify_wait(sqlite3 *db); + +int +sqlite3_step_blocking(sqlite3_stmt *stmt) +{ + int rv; + sqlite3* db; + + db = sqlite3_db_handle(stmt); + for (;;) { + rv = sqlite3_step(stmt); + if (rv != SQLITE_LOCKED) { + break; + } + if (sqlite3_extended_errcode(db) != SQLITE_LOCKED_SHAREDCACHE) { + break; + } + rv = unlock_notify_wait(db); + if (rv != SQLITE_OK) { + break; + } + sqlite3_reset(stmt); + } + + return rv; +} + +int +_sqlite3_step_blocking(sqlite3_stmt* stmt, long long* rowid, long long* changes) +{ + int rv; + sqlite3* db; + + db = sqlite3_db_handle(stmt); + for (;;) { + rv = sqlite3_step(stmt); + if (rv!=SQLITE_LOCKED) { + break; + } + if (sqlite3_extended_errcode(db) != SQLITE_LOCKED_SHAREDCACHE) { + break; + } + rv = unlock_notify_wait(db); + if (rv != SQLITE_OK) { + break; + } + sqlite3_reset(stmt); + } + + *rowid = (long long) sqlite3_last_insert_rowid(db); + *changes = (long long) sqlite3_changes(db); + return rv; +} + +int +sqlite3_prepare_v2_blocking(sqlite3 *db, const char *zSql, int nBytes, sqlite3_stmt **ppStmt, const char **pzTail) +{ + int rv; + + for (;;) { + rv = sqlite3_prepare_v2(db, zSql, nBytes, ppStmt, pzTail); + if (rv!=SQLITE_LOCKED) { + break; + } + if (sqlite3_extended_errcode(db) != SQLITE_LOCKED_SHAREDCACHE) { + break; + } + rv = unlock_notify_wait(db); + if (rv != SQLITE_OK) { + break; + } + } + + return rv; +} +#endif diff --git a/sqlite3_opt_unlock_notify.go b/sqlite3_opt_unlock_notify.go new file mode 100644 index 0000000..ee29439 --- /dev/null +++ b/sqlite3_opt_unlock_notify.go @@ -0,0 +1,92 @@ +// Copyright (C) 2018 Yasuhiro Matsumoto . +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +// +build cgo +// +build sqlite_unlock_notify + +package sqlite3 + +/* +#cgo CFLAGS: -DSQLITE_ENABLE_UNLOCK_NOTIFY + +#include +#include + +extern void unlock_notify_callback(void *arg, int argc); +*/ +import "C" +import ( + "fmt" + "sync" + "unsafe" +) + +type unlock_notify_table struct { + sync.Mutex + seqnum uint + table map[uint]chan struct{} +} + +var unt unlock_notify_table = unlock_notify_table{table: make(map[uint]chan struct{})} + +func (t *unlock_notify_table) add(c chan struct{}) uint { + t.Lock() + defer t.Unlock() + h := t.seqnum + t.table[h] = c + t.seqnum++ + return h +} + +func (t *unlock_notify_table) remove(h uint) { + t.Lock() + defer t.Unlock() + delete(t.table, h) +} + +func (t *unlock_notify_table) get(h uint) chan struct{} { + t.Lock() + defer t.Unlock() + c, ok := t.table[h] + if !ok { + panic(fmt.Sprintf("Non-existent key for unlcok-notify channel: %d", h)) + } + return c +} + +//export unlock_notify_callback +func unlock_notify_callback(argv unsafe.Pointer, argc C.int) { + for i := 0; i < int(argc); i++ { + parg := ((*(*[1 << 30]*[1]uint)(argv))[i]) + arg := *parg + h := arg[0] + c := unt.get(h) + c <- struct{}{} + } +} + +//export unlock_notify_wait +func unlock_notify_wait(db *C.sqlite3) C.int { + // It has to be a bufferred channel to not block in sqlite_unlock_notify + // as sqlite_unlock_notify could invoke the callback before it returns. + c := make(chan struct{}, 1) + defer close(c) + + h := unt.add(c) + defer unt.remove(h) + + pargv := C.malloc(C.sizeof_uint) + defer C.free(pargv) + + argv := (*[1]uint)(pargv) + argv[0] = h + if rv := C.sqlite3_unlock_notify(db, (*[0]byte)(C.unlock_notify_callback), unsafe.Pointer(pargv)); rv != C.SQLITE_OK { + return rv + } + + <-c + + return C.SQLITE_OK +} diff --git a/sqlite3_opt_unlock_notify_test.go b/sqlite3_opt_unlock_notify_test.go new file mode 100644 index 0000000..812c543 --- /dev/null +++ b/sqlite3_opt_unlock_notify_test.go @@ -0,0 +1,222 @@ +// Copyright (C) 2018 Yasuhiro Matsumoto . +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +// +build sqlite_unlock_notify + +package sqlite3 + +import ( + "database/sql" + "fmt" + "os" + "sync" + "testing" + "time" +) + +func TestUnlockNotify(t *testing.T) { + tempFilename := TempFilename(t) + defer os.Remove(tempFilename) + dsn := fmt.Sprintf("file:%s?cache=shared&mode=rwc&_busy_timeout=%d", tempFilename, 500) + db, err := sql.Open("sqlite3", dsn) + if err != nil { + t.Fatal("Failed to open database:", err) + } + defer db.Close() + + _, err = db.Exec("CREATE TABLE foo(id INTEGER, status INTEGER)") + if err != nil { + t.Fatal("Failed to create table:", err) + } + + tx, err := db.Begin() + if err != nil { + t.Fatal("Failed to begin transaction:", err) + } + + _, err = tx.Exec("INSERT INTO foo(id, status) VALUES(1, 100)") + if err != nil { + t.Fatal("Failed to insert null:", err) + } + + _, err = tx.Exec("UPDATE foo SET status = 200 WHERE id = 1") + if err != nil { + t.Fatal("Failed to update table:", err) + } + + wg := sync.WaitGroup{} + wg.Add(1) + timer := time.NewTimer(500 * time.Millisecond) + go func() { + <-timer.C + err := tx.Commit() + if err != nil { + t.Fatal("Failed to commit transaction:", err) + } + wg.Done() + }() + + rows, err := db.Query("SELECT count(*) from foo") + if err != nil { + t.Fatal("Unable to query foo table:", err) + } + + if rows.Next() { + var count int + if err := rows.Scan(&count); err != nil { + t.Fatal("Failed to Scan rows", err) + } + } + if err := rows.Err(); err != nil { + t.Fatal("Failed at the call to Next:", err) + } + wg.Wait() + +} + +func TestUnlockNotifyMany(t *testing.T) { + tempFilename := TempFilename(t) + defer os.Remove(tempFilename) + dsn := fmt.Sprintf("file:%s?cache=shared&mode=rwc&_busy_timeout=%d", tempFilename, 500) + db, err := sql.Open("sqlite3", dsn) + if err != nil { + t.Fatal("Failed to open database:", err) + } + defer db.Close() + + _, err = db.Exec("CREATE TABLE foo(id INTEGER, status INTEGER)") + if err != nil { + t.Fatal("Failed to create table:", err) + } + + tx, err := db.Begin() + if err != nil { + t.Fatal("Failed to begin transaction:", err) + } + + _, err = tx.Exec("INSERT INTO foo(id, status) VALUES(1, 100)") + if err != nil { + t.Fatal("Failed to insert null:", err) + } + + _, err = tx.Exec("UPDATE foo SET status = 200 WHERE id = 1") + if err != nil { + t.Fatal("Failed to update table:", err) + } + + wg := sync.WaitGroup{} + wg.Add(1) + timer := time.NewTimer(500 * time.Millisecond) + go func() { + <-timer.C + err := tx.Commit() + if err != nil { + t.Fatal("Failed to commit transaction:", err) + } + wg.Done() + }() + + const concurrentQueries = 1000 + wg.Add(concurrentQueries) + for i := 0; i < concurrentQueries; i++ { + go func() { + rows, err := db.Query("SELECT count(*) from foo") + if err != nil { + t.Fatal("Unable to query foo table:", err) + } + + if rows.Next() { + var count int + if err := rows.Scan(&count); err != nil { + t.Fatal("Failed to Scan rows", err) + } + } + if err := rows.Err(); err != nil { + t.Fatal("Failed at the call to Next:", err) + } + wg.Done() + }() + } + wg.Wait() +} + +func TestUnlockNotifyDeadlock(t *testing.T) { + tempFilename := TempFilename(t) + defer os.Remove(tempFilename) + dsn := fmt.Sprintf("file:%s?cache=shared&mode=rwc&_busy_timeout=%d", tempFilename, 500) + db, err := sql.Open("sqlite3", dsn) + if err != nil { + t.Fatal("Failed to open database:", err) + } + defer db.Close() + + _, err = db.Exec("CREATE TABLE foo(id INTEGER, status INTEGER)") + if err != nil { + t.Fatal("Failed to create table:", err) + } + + tx, err := db.Begin() + if err != nil { + t.Fatal("Failed to begin transaction:", err) + } + + _, err = tx.Exec("INSERT INTO foo(id, status) VALUES(1, 100)") + if err != nil { + t.Fatal("Failed to insert null:", err) + } + + _, err = tx.Exec("UPDATE foo SET status = 200 WHERE id = 1") + if err != nil { + t.Fatal("Failed to update table:", err) + } + + wg := sync.WaitGroup{} + wg.Add(1) + timer := time.NewTimer(500 * time.Millisecond) + go func() { + <-timer.C + err := tx.Commit() + if err != nil { + t.Fatal("Failed to commit transaction:", err) + } + wg.Done() + }() + + wg.Add(1) + go func() { + tx2, err := db.Begin() + if err != nil { + t.Fatal("Failed to begin transaction:", err) + } + defer tx2.Rollback() + + _, err = tx2.Exec("DELETE FROM foo") + if err != nil { + t.Fatal("Failed to delete table:", err) + } + err = tx2.Commit() + if err != nil { + t.Fatal("Failed to commit transaction:", err) + } + wg.Done() + }() + + rows, err := tx.Query("SELECT count(*) from foo") + if err != nil { + t.Fatal("Unable to query foo table:", err) + } + + if rows.Next() { + var count int + if err := rows.Scan(&count); err != nil { + t.Fatal("Failed to Scan rows", err) + } + } + if err := rows.Err(); err != nil { + t.Fatal("Failed at the call to Next:", err) + } + + wg.Wait() +}