diff --git a/kvdb/etcd/commit_queue.go b/kvdb/etcd/commit_queue.go index 54f33c01..138c08e6 100644 --- a/kvdb/etcd/commit_queue.go +++ b/kvdb/etcd/commit_queue.go @@ -3,14 +3,11 @@ package etcd import ( + "container/list" "context" "sync" ) -// commitQueueSize is the maximum number of commits we let to queue up. All -// remaining commits will block on commitQueue.Add(). -const commitQueueSize = 100 - // commitQueue is a simple execution queue to manage conflicts for transactions // and thereby reduce the number of times conflicting transactions need to be // retried. When a new transaction is added to the queue, we first upgrade the @@ -25,9 +22,18 @@ type commitQueue struct { readerMap map[string]int writerMap map[string]int - commitMutex sync.RWMutex - queue chan (func()) - wg sync.WaitGroup + queue *list.List + queueMx sync.Mutex + queueCond *sync.Cond + + shutdown chan struct{} +} + +type commitQueueTxn struct { + commitLoop func() + blocked bool + rset []string + wset []string } // NewCommitQueue creates a new commit queue, with the passed abort context. @@ -36,19 +42,24 @@ func NewCommitQueue(ctx context.Context) *commitQueue { ctx: ctx, readerMap: make(map[string]int), writerMap: make(map[string]int), - queue: make(chan func(), commitQueueSize), + queue: list.New(), + shutdown: make(chan struct{}), } + q.queueCond = sync.NewCond(&q.queueMx) // Start the queue consumer loop. - q.wg.Add(1) go q.mainLoop() return q } -// Wait waits for the queue to stop (after the queue context has been canceled). -func (c *commitQueue) Wait() { - c.wg.Wait() +// Stop signals the queue to stop after the queue context has been canceled and +// waits until the has stopped. +func (c *commitQueue) Stop() { + // Signal the queue's condition variable to ensure the mainLoop reliably + // unblocks to check for the exit condition. + c.queueCond.Signal() + <-c.shutdown } // Add increases lock counts and queues up tx commit closure for execution. @@ -82,33 +93,22 @@ func (c *commitQueue) Add(commitLoop func(), rset []string, wset []string) { c.readerMap[key] += 1 } - if blocked { - // Add the transaction to the queue if conflicts with an already - // queued one. - c.mx.Unlock() + c.queueCond.L.Lock() + c.queue.PushBack(&commitQueueTxn{ + commitLoop: commitLoop, + blocked: blocked, + rset: rset, + wset: wset, + }) + c.queueCond.L.Unlock() - select { - case c.queue <- commitLoop: - case <-c.ctx.Done(): - } - } else { - // To make sure we don't add a new tx to the queue that depends - // on this "unblocked" tx, grab the commitMutex before lifting - // the mutex guarding the lock maps. - c.commitMutex.RLock() - c.mx.Unlock() + c.mx.Unlock() - // At this point we're safe to execute the "unblocked" tx, as - // we cannot execute blocked tx that may have been read from the - // queue until the commitMutex is held. - commitLoop() - - c.commitMutex.RUnlock() - } + c.queueCond.Signal() } -// Done decreases lock counts of the keys in the read/write sets. -func (c *commitQueue) Done(rset []string, wset []string) { +// done decreases lock counts of the keys in the read/write sets. +func (c *commitQueue) done(rset []string, wset []string) { c.mx.Lock() defer c.mx.Unlock() @@ -131,20 +131,82 @@ func (c *commitQueue) Done(rset []string, wset []string) { // dependencies. The queue ensures that the top element doesn't conflict with // any other transactions and therefore can be executed freely. func (c *commitQueue) mainLoop() { - defer c.wg.Done() + defer close(c.shutdown) for { - select { - case top := <-c.queue: - // Execute the next blocked transaction. As it is - // the top element in the queue it means that it doesn't - // depend on any other transactions anymore. - c.commitMutex.Lock() - top() - c.commitMutex.Unlock() + // Wait until there are no unblocked transactions being + // executed, and for there to be at least one blocked + // transaction in our queue. + c.queueCond.L.Lock() + for c.queue.Front() == nil { + c.queueCond.Wait() + // Check the exit condition before looping again. + select { + case <-c.ctx.Done(): + c.queueCond.L.Unlock() + return + default: + } + } + + // Now collect all txns until we find the next blocking one. + // These shouldn't conflict (if the precollected read/write + // keys sets don't grow), meaning we can safely commit them + // in parallel. + work := make([]*commitQueueTxn, 1) + e := c.queue.Front() + work[0] = c.queue.Remove(e).(*commitQueueTxn) + + for { + e := c.queue.Front() + if e == nil { + break + } + + next := e.Value.(*commitQueueTxn) + if !next.blocked { + work = append(work, next) + c.queue.Remove(e) + } else { + // We found the next blocking txn which means + // the block of work needs to be cut here. + break + } + } + + c.queueCond.L.Unlock() + + // Check if we need to exit before continuing. + select { case <-c.ctx.Done(): return + default: + } + + var wg sync.WaitGroup + wg.Add(len(work)) + + // Fire up N goroutines where each will run its commit loop + // and then clean up the reader/writer maps. + for _, txn := range work { + go func(txn *commitQueueTxn) { + defer wg.Done() + txn.commitLoop() + + // We can safely cleanup here as done only + // holds the main mutex. + c.done(txn.rset, txn.wset) + }(txn) + } + + wg.Wait() + + // Check if we need to exit before continuing. + select { + case <-c.ctx.Done(): + return + default: } } } diff --git a/kvdb/etcd/commit_queue_test.go b/kvdb/etcd/commit_queue_test.go index 25b226b5..a7ebcca2 100644 --- a/kvdb/etcd/commit_queue_test.go +++ b/kvdb/etcd/commit_queue_test.go @@ -17,7 +17,7 @@ import ( func TestCommitQueue(t *testing.T) { // The duration of each commit. const commitDuration = time.Millisecond * 500 - const numCommits = 4 + const numCommits = 5 var wg sync.WaitGroup commits := make([]string, numCommits) @@ -30,25 +30,25 @@ func TestCommitQueue(t *testing.T) { // Update our log of commit order. Avoid blocking // by preallocating the commit log and increasing // the log index atomically. - i := atomic.AddInt32(&idx, 1) - commits[i] = tag - if sleep { time.Sleep(commitDuration) } + + i := atomic.AddInt32(&idx, 1) + commits[i] = tag } } ctx := context.Background() ctx, cancel := context.WithCancel(ctx) q := NewCommitQueue(ctx) - defer q.Wait() + defer q.Stop() defer cancel() wg.Add(numCommits) t1 := time.Now() - // Tx1: reads: key1, key2, writes: key3, conflict: none + // Tx1 (long): reads: key1, key2, writes: key3, conflict: none q.Add( commit("free", true), []string{"key1", "key2"}, @@ -60,12 +60,18 @@ func TestCommitQueue(t *testing.T) { []string{"key1", "key2"}, []string{"key3"}, ) - // Tx3: reads: key1, writes: key4, conflict: none + // Tx3 (long): reads: key1, writes: key4, conflict: none q.Add( commit("free", true), []string{"key1", "key2"}, []string{"key4"}, ) + // Tx4 (long): reads: key1, writes: none, conflict: none + q.Add( + commit("free", true), + []string{"key1", "key2"}, + []string{}, + ) // Tx4: reads: key2, writes: key4 conflict: Tx3 q.Add( commit("blocked2", false), @@ -87,7 +93,7 @@ func TestCommitQueue(t *testing.T) { // before the blocking ones, and the blocking ones are executed in // the order of addition. require.Equal(t, - []string{"free", "free", "blocked1", "blocked2"}, + []string{"free", "blocked1", "free", "free", "blocked2"}, commits, ) } diff --git a/kvdb/etcd/db.go b/kvdb/etcd/db.go index e3bad3b2..12b89868 100644 --- a/kvdb/etcd/db.go +++ b/kvdb/etcd/db.go @@ -122,6 +122,7 @@ func (c *commitStatsCollector) callback(succ bool, stats CommitStats) { type db struct { cfg Config ctx context.Context + cancel func() cli *clientv3.Client commitStatsCollector *commitStatsCollector txQueue *commitQueue @@ -135,7 +136,6 @@ var _ walletdb.DB = (*db)(nil) // config. If etcd connection cannot be established, then returns error. func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) { clientCfg := clientv3.Config{ - Context: ctx, Endpoints: []string{cfg.Host}, DialTimeout: etcdConnectionTimeout, Username: cfg.User, @@ -158,8 +158,11 @@ func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) { clientCfg.TLS = tlsConfig } + ctx, cancel := context.WithCancel(ctx) + clientCfg.Context = ctx cli, err := clientv3.New(clientCfg) if err != nil { + cancel() return nil, err } @@ -171,6 +174,7 @@ func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) { backend := &db{ cfg: cfg, ctx: ctx, + cancel: cancel, cli: cli, txQueue: NewCommitQueue(ctx), } @@ -296,5 +300,8 @@ func (db *db) Copy(w io.Writer) error { // Close cleanly shuts down the database and syncs all data. // This function is part of the walletdb.Db interface implementation. func (db *db) Close() error { - return db.cli.Close() + err := db.cli.Close() + db.cancel() + db.txQueue.Stop() + return err } diff --git a/kvdb/etcd/stm.go b/kvdb/etcd/stm.go index 28a2fc69..9d1e50e9 100644 --- a/kvdb/etcd/stm.go +++ b/kvdb/etcd/stm.go @@ -283,7 +283,7 @@ func runSTM(s *stm, apply func(STM) error) error { // Make a copy of the read/write set keys here. The reason why we need // to do this is because subsequent applies may change (shrink) these // sets and so when we decrease reference counts in the commit queue in - // Done(...) we'd potentially miss removing references which would + // done(...) we'd potentially miss removing references which would // result in queueing up transactions and contending DB access. // Copying these strings is cheap due to Go's immutable string which is // always a reference. @@ -309,10 +309,9 @@ func runSTM(s *stm, apply func(STM) error) error { select { case <-done: case <-s.options.ctx.Done(): + return context.Canceled } - s.txQueue.Done(rkeys, wkeys) - if s.options.commitStatsCallback != nil { stats.Retries = retries s.options.commitStatsCallback(executeErr == nil, stats) diff --git a/kvdb/etcd/stm_test.go b/kvdb/etcd/stm_test.go index a780f862..e4a3810c 100644 --- a/kvdb/etcd/stm_test.go +++ b/kvdb/etcd/stm_test.go @@ -28,7 +28,7 @@ func TestPutToEmpty(t *testing.T) { defer func() { cancel() f.Cleanup() - txQueue.Wait() + txQueue.Stop() }() db, err := newEtcdBackend(ctx, f.BackendConfig()) @@ -55,7 +55,7 @@ func TestGetPutDel(t *testing.T) { defer func() { cancel() f.Cleanup() - txQueue.Wait() + txQueue.Stop() }() testKeyValues := []KV{ @@ -141,7 +141,7 @@ func TestFirstLastNextPrev(t *testing.T) { defer func() { cancel() f.Cleanup() - txQueue.Wait() + txQueue.Stop() }() testKeyValues := []KV{ @@ -299,7 +299,7 @@ func TestCommitError(t *testing.T) { defer func() { cancel() f.Cleanup() - txQueue.Wait() + txQueue.Stop() }() db, err := newEtcdBackend(ctx, f.BackendConfig()) @@ -347,7 +347,7 @@ func TestManualTxError(t *testing.T) { defer func() { cancel() f.Cleanup() - txQueue.Wait() + txQueue.Stop() }() db, err := newEtcdBackend(ctx, f.BackendConfig())