diff options
author | Jan Mercl <0xjnml@gmail.com> | 2019-12-21 11:00:50 +0100 |
---|---|---|
committer | Jan Mercl <0xjnml@gmail.com> | 2019-12-21 11:00:50 +0100 |
commit | 8717080461412ef105e587b66fecfc6221691674 (patch) | |
tree | 2c505c6e20609c4863b48791c5c6e04b601fbc37 | |
parent | bfb8a00ee0efc6704123d17c4f37ec0b075b1459 (diff) |
add concurrent inserts test
-rw-r--r-- | all_test.go | 108 | ||||
-rw-r--r-- | go.mod | 5 | ||||
-rw-r--r-- | mutex.go | 55 |
3 files changed, 161 insertions, 7 deletions
diff --git a/all_test.go b/all_test.go index cc0447c..0ec9844 100644 --- a/all_test.go +++ b/all_test.go @@ -6,6 +6,7 @@ package sqlite // import "modernc.org/sqlite" import ( "bytes" + "context" "database/sql" "flag" "fmt" @@ -15,8 +16,11 @@ import ( "path/filepath" "runtime" "strings" + "sync" "testing" "time" + + "modernc.org/mathutil" ) func caller(s string, va ...interface{}) { @@ -348,3 +352,107 @@ func TestMemDB(t *testing.T) { t.Fatal(err) } } + +func TestConcurrentInserts(t *testing.T) { + const ( + ngoroutines = 8 + nrows = 2500 + ) + + dir, err := ioutil.TempDir("", "sqlite-test-") + if err != nil { + t.Fatal(err) + } + + defer os.RemoveAll(dir) + + db, err := sql.Open(driverName, filepath.Join(dir, "test.db")) + if err != nil { + t.Fatal(err) + } + + tx, err := db.BeginTx(context.Background(), nil) + if err != nil { + t.Fatal(err) + } + + if _, err := tx.Exec("create table t(i)"); err != nil { + t.Fatal(err) + } + + prep, err := tx.Prepare("insert into t values(?)") + if err != nil { + t.Fatal(err) + } + + rnd := make(chan int, 100) + go func() { + lim := ngoroutines * nrows + rng, err := mathutil.NewFC32(0, lim-1, false) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < lim; i++ { + rnd <- int(rng.Next()) + } + }() + + start := make(chan int) + var wg sync.WaitGroup + for i := 0; i < ngoroutines; i++ { + wg.Add(1) + + go func(id int) { + + defer wg.Done() + + next: + for i := 0; i < nrows; i++ { + n := <-rnd + var err error + for j := 0; j < 10; j++ { + if _, err := prep.Exec(n); err == nil { + continue next + } + } + + t.Errorf("id %d, seq %d: %v", id, i, err) + return + } + }(i) + } + t0 := time.Now() + close(start) + wg.Wait() + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + + d := time.Since(t0) + rows, err := db.Query("select * from t order by i") + if err != nil { + t.Fatal(err) + } + + var i int + for ; rows.Next(); i++ { + var j int + if err := rows.Scan(&j); err != nil { + t.Fatalf("seq %d: %v", i, err) + } + + if g, e := j, i; g != e { + t.Fatalf("seq %d: got %d, exp %d", i, g, e) + } + } + if err := rows.Err(); err != nil { + t.Fatal(err) + } + + if g, e := i, ngoroutines*nrows; g != e { + t.Fatalf("got %d rows, expected %d", g, e) + } + + t.Logf("%d goroutines concurrently inserted %d rows in %v", ngoroutines, ngoroutines*nrows, d) +} @@ -2,4 +2,7 @@ module modernc.org/sqlite go 1.13 -require modernc.org/crt/v2 v2.0.0-20191219143825-5728f219e36a +require ( + modernc.org/crt/v2 v2.0.0-20191219143825-5728f219e36a + modernc.org/mathutil v1.0.0 +) @@ -52,10 +52,17 @@ type mutex struct { cnt int32 id int32 sync.Mutex - wait sync.Mutex + wait sync.Mutex + recursive bool } func (m *mutex) enter(id int32) { + if !m.recursive { + m.Lock() + m.id = id + return + } + for { m.Lock() switch m.id { @@ -77,7 +84,36 @@ func (m *mutex) enter(id int32) { } } +func (m *mutex) try(id int32) int32 { + if !m.recursive { + return bin.DSQLITE_BUSY + } + + m.Lock() + switch m.id { + case 0: + m.cnt = 1 + m.id = id + m.wait.Lock() + m.Unlock() + return bin.DSQLITE_OK + case id: + m.cnt++ + m.Unlock() + return bin.DSQLITE_OK + } + + m.Unlock() + return bin.DSQLITE_BUSY +} + func (m *mutex) leave() { + if !m.recursive { + m.id = 0 + m.Unlock() + return + } + m.Lock() m.cnt-- if m.cnt == 0 { @@ -155,11 +191,12 @@ func mutexAlloc(tls *crt.TLS, typ int32) (r crt.Intptr) { defer func() { }() switch typ { - case - bin.DSQLITE_MUTEX_FAST, - bin.DSQLITE_MUTEX_RECURSIVE: - + case bin.DSQLITE_MUTEX_FAST: return crt.Xcalloc(tls, 1, crt.Intptr(unsafe.Sizeof(mutex{}))) + case bin.DSQLITE_MUTEX_RECURSIVE: + p := crt.Xcalloc(tls, 1, crt.Intptr(unsafe.Sizeof(mutex{}))) + (*mutex)(unsafe.Pointer(uintptr(p))).recursive = true + return p case bin.DSQLITE_MUTEX_STATIC_MASTER: return crt.Intptr(uintptr(unsafe.Pointer(&mutexMaster))) case bin.DSQLITE_MUTEX_STATIC_MEM: @@ -216,7 +253,13 @@ func mutexEnter(tls *crt.TLS, m crt.Intptr) { } // int (*xMutexTry)(sqlite3_mutex *); -func mutexTry(tls *crt.TLS, m crt.Intptr) int32 { return bin.DSQLITE_BUSY } +func mutexTry(tls *crt.TLS, m crt.Intptr) int32 { + if m == 0 { + return bin.DSQLITE_OK + } + + return (*mutex)(unsafe.Pointer(uintptr(m))).try(tls.ID) +} // void (*xMutexLeave)(sqlite3_mutex *); func mutexLeave(tls *crt.TLS, m crt.Intptr) { |