aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Mercl <0xjnml@gmail.com>2019-12-21 11:00:50 +0100
committerJan Mercl <0xjnml@gmail.com>2019-12-21 11:00:50 +0100
commit8717080461412ef105e587b66fecfc6221691674 (patch)
tree2c505c6e20609c4863b48791c5c6e04b601fbc37
parentbfb8a00ee0efc6704123d17c4f37ec0b075b1459 (diff)
add concurrent inserts test
-rw-r--r--all_test.go108
-rw-r--r--go.mod5
-rw-r--r--mutex.go55
3 files changed, 161 insertions, 7 deletions
diff --git a/all_test.go b/all_test.go
index cc0447c..0ec9844 100644
--- a/all_test.go
+++ b/all_test.go
@@ -6,6 +6,7 @@ package sqlite // import "modernc.org/sqlite"
import (
"bytes"
+ "context"
"database/sql"
"flag"
"fmt"
@@ -15,8 +16,11 @@ import (
"path/filepath"
"runtime"
"strings"
+ "sync"
"testing"
"time"
+
+ "modernc.org/mathutil"
)
func caller(s string, va ...interface{}) {
@@ -348,3 +352,107 @@ func TestMemDB(t *testing.T) {
t.Fatal(err)
}
}
+
+func TestConcurrentInserts(t *testing.T) {
+ const (
+ ngoroutines = 8
+ nrows = 2500
+ )
+
+ dir, err := ioutil.TempDir("", "sqlite-test-")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ defer os.RemoveAll(dir)
+
+ db, err := sql.Open(driverName, filepath.Join(dir, "test.db"))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ tx, err := db.BeginTx(context.Background(), nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if _, err := tx.Exec("create table t(i)"); err != nil {
+ t.Fatal(err)
+ }
+
+ prep, err := tx.Prepare("insert into t values(?)")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ rnd := make(chan int, 100)
+ go func() {
+ lim := ngoroutines * nrows
+ rng, err := mathutil.NewFC32(0, lim-1, false)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ for i := 0; i < lim; i++ {
+ rnd <- int(rng.Next())
+ }
+ }()
+
+ start := make(chan int)
+ var wg sync.WaitGroup
+ for i := 0; i < ngoroutines; i++ {
+ wg.Add(1)
+
+ go func(id int) {
+
+ defer wg.Done()
+
+ next:
+ for i := 0; i < nrows; i++ {
+ n := <-rnd
+ var err error
+ for j := 0; j < 10; j++ {
+ if _, err := prep.Exec(n); err == nil {
+ continue next
+ }
+ }
+
+ t.Errorf("id %d, seq %d: %v", id, i, err)
+ return
+ }
+ }(i)
+ }
+ t0 := time.Now()
+ close(start)
+ wg.Wait()
+ if err := tx.Commit(); err != nil {
+ t.Fatal(err)
+ }
+
+ d := time.Since(t0)
+ rows, err := db.Query("select * from t order by i")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var i int
+ for ; rows.Next(); i++ {
+ var j int
+ if err := rows.Scan(&j); err != nil {
+ t.Fatalf("seq %d: %v", i, err)
+ }
+
+ if g, e := j, i; g != e {
+ t.Fatalf("seq %d: got %d, exp %d", i, g, e)
+ }
+ }
+ if err := rows.Err(); err != nil {
+ t.Fatal(err)
+ }
+
+ if g, e := i, ngoroutines*nrows; g != e {
+ t.Fatalf("got %d rows, expected %d", g, e)
+ }
+
+ t.Logf("%d goroutines concurrently inserted %d rows in %v", ngoroutines, ngoroutines*nrows, d)
+}
diff --git a/go.mod b/go.mod
index 1685ff1..5895e86 100644
--- a/go.mod
+++ b/go.mod
@@ -2,4 +2,7 @@ module modernc.org/sqlite
go 1.13
-require modernc.org/crt/v2 v2.0.0-20191219143825-5728f219e36a
+require (
+ modernc.org/crt/v2 v2.0.0-20191219143825-5728f219e36a
+ modernc.org/mathutil v1.0.0
+)
diff --git a/mutex.go b/mutex.go
index 0bcc241..5b5cc15 100644
--- a/mutex.go
+++ b/mutex.go
@@ -52,10 +52,17 @@ type mutex struct {
cnt int32
id int32
sync.Mutex
- wait sync.Mutex
+ wait sync.Mutex
+ recursive bool
}
func (m *mutex) enter(id int32) {
+ if !m.recursive {
+ m.Lock()
+ m.id = id
+ return
+ }
+
for {
m.Lock()
switch m.id {
@@ -77,7 +84,36 @@ func (m *mutex) enter(id int32) {
}
}
+func (m *mutex) try(id int32) int32 {
+ if !m.recursive {
+ return bin.DSQLITE_BUSY
+ }
+
+ m.Lock()
+ switch m.id {
+ case 0:
+ m.cnt = 1
+ m.id = id
+ m.wait.Lock()
+ m.Unlock()
+ return bin.DSQLITE_OK
+ case id:
+ m.cnt++
+ m.Unlock()
+ return bin.DSQLITE_OK
+ }
+
+ m.Unlock()
+ return bin.DSQLITE_BUSY
+}
+
func (m *mutex) leave() {
+ if !m.recursive {
+ m.id = 0
+ m.Unlock()
+ return
+ }
+
m.Lock()
m.cnt--
if m.cnt == 0 {
@@ -155,11 +191,12 @@ func mutexAlloc(tls *crt.TLS, typ int32) (r crt.Intptr) {
defer func() {
}()
switch typ {
- case
- bin.DSQLITE_MUTEX_FAST,
- bin.DSQLITE_MUTEX_RECURSIVE:
-
+ case bin.DSQLITE_MUTEX_FAST:
return crt.Xcalloc(tls, 1, crt.Intptr(unsafe.Sizeof(mutex{})))
+ case bin.DSQLITE_MUTEX_RECURSIVE:
+ p := crt.Xcalloc(tls, 1, crt.Intptr(unsafe.Sizeof(mutex{})))
+ (*mutex)(unsafe.Pointer(uintptr(p))).recursive = true
+ return p
case bin.DSQLITE_MUTEX_STATIC_MASTER:
return crt.Intptr(uintptr(unsafe.Pointer(&mutexMaster)))
case bin.DSQLITE_MUTEX_STATIC_MEM:
@@ -216,7 +253,13 @@ func mutexEnter(tls *crt.TLS, m crt.Intptr) {
}
// int (*xMutexTry)(sqlite3_mutex *);
-func mutexTry(tls *crt.TLS, m crt.Intptr) int32 { return bin.DSQLITE_BUSY }
+func mutexTry(tls *crt.TLS, m crt.Intptr) int32 {
+ if m == 0 {
+ return bin.DSQLITE_OK
+ }
+
+ return (*mutex)(unsafe.Pointer(uintptr(m))).try(tls.ID)
+}
// void (*xMutexLeave)(sqlite3_mutex *);
func mutexLeave(tls *crt.TLS, m crt.Intptr) {