mirror of
https://github.com/aljazceru/ark.git
synced 2025-12-17 20:24:21 +01:00
Support macaroons and TLS && Add arkd wallet cmds (#232)
* Update protos * Update handlers * Support macaroons and TLS * Add arkd cli * Minor fixes * Update deps * Fixes * Update makefile * Fixes * Fix * Fix * Fix * Remove trusted onboarding from client * Completely remove trusted onboarding * Fix compose files and add --no-macaroon flag to arkd cli * Lint * Remove e2e for trusted onboarding * Add sleep time
This commit is contained in:
committed by
GitHub
parent
059e837794
commit
57ce08f239
213
server/pkg/kvdb/etcd/commit_queue.go
Normal file
213
server/pkg/kvdb/etcd/commit_queue.go
Normal file
@@ -0,0 +1,213 @@
|
||||
//go:build kvdb_etcd
|
||||
// +build kvdb_etcd
|
||||
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// commitQueue is a simple execution queue to manage conflicts for transactions
|
||||
// and thereby reduce the number of times conflicting transactions need to be
|
||||
// retried. When a new transaction is added to the queue, we first upgrade the
|
||||
// read/write counts in the queue's own accounting to decide whether the new
|
||||
// transaction has any conflicting dependencies. If the transaction does not
|
||||
// conflict with any other, then it is committed immediately, otherwise it'll be
|
||||
// queued up for later execution.
|
||||
// The algorithm is described in: http://www.cs.umd.edu/~abadi/papers/vll-vldb13.pdf
|
||||
type commitQueue struct {
|
||||
ctx context.Context
|
||||
mx sync.Mutex
|
||||
readerMap map[string]int
|
||||
writerMap map[string]int
|
||||
|
||||
queue *list.List
|
||||
queueMx sync.Mutex
|
||||
queueCond *sync.Cond
|
||||
|
||||
shutdown chan struct{}
|
||||
}
|
||||
|
||||
type commitQueueTxn struct {
|
||||
commitLoop func()
|
||||
blocked bool
|
||||
rset []string
|
||||
wset []string
|
||||
}
|
||||
|
||||
// NewCommitQueue creates a new commit queue, with the passed abort context.
|
||||
func NewCommitQueue(ctx context.Context) *commitQueue {
|
||||
q := &commitQueue{
|
||||
ctx: ctx,
|
||||
readerMap: make(map[string]int),
|
||||
writerMap: make(map[string]int),
|
||||
queue: list.New(),
|
||||
shutdown: make(chan struct{}),
|
||||
}
|
||||
q.queueCond = sync.NewCond(&q.queueMx)
|
||||
|
||||
// Start the queue consumer loop.
|
||||
go q.mainLoop()
|
||||
|
||||
return q
|
||||
}
|
||||
|
||||
// Stop signals the queue to stop after the queue context has been canceled and
|
||||
// waits until the has stopped.
|
||||
func (c *commitQueue) Stop() {
|
||||
// Signal the queue's condition variable to ensure the mainLoop reliably
|
||||
// unblocks to check for the exit condition.
|
||||
c.queueCond.Signal()
|
||||
<-c.shutdown
|
||||
}
|
||||
|
||||
// Add increases lock counts and queues up tx commit closure for execution.
|
||||
// Transactions that don't have any conflicts are executed immediately by
|
||||
// "downgrading" the count mutex to allow concurrency.
|
||||
func (c *commitQueue) Add(commitLoop func(), rset []string, wset []string) {
|
||||
c.mx.Lock()
|
||||
blocked := false
|
||||
|
||||
// Mark as blocked if there's any writer changing any of the keys in
|
||||
// the read set. Do not increment the reader counts yet as we'll need to
|
||||
// use the original reader counts when scanning through the write set.
|
||||
for _, key := range rset {
|
||||
if c.writerMap[key] > 0 {
|
||||
blocked = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Mark as blocked if there's any writer or reader for any of the keys
|
||||
// in the write set.
|
||||
for _, key := range wset {
|
||||
blocked = blocked || c.readerMap[key] > 0 || c.writerMap[key] > 0
|
||||
|
||||
// Increment the writer count.
|
||||
c.writerMap[key] += 1
|
||||
}
|
||||
|
||||
// Finally we can increment the reader counts for keys in the read set.
|
||||
for _, key := range rset {
|
||||
c.readerMap[key] += 1
|
||||
}
|
||||
|
||||
c.queueCond.L.Lock()
|
||||
c.queue.PushBack(&commitQueueTxn{
|
||||
commitLoop: commitLoop,
|
||||
blocked: blocked,
|
||||
rset: rset,
|
||||
wset: wset,
|
||||
})
|
||||
c.queueCond.L.Unlock()
|
||||
|
||||
c.mx.Unlock()
|
||||
|
||||
c.queueCond.Signal()
|
||||
}
|
||||
|
||||
// done decreases lock counts of the keys in the read/write sets.
|
||||
func (c *commitQueue) done(rset []string, wset []string) {
|
||||
c.mx.Lock()
|
||||
defer c.mx.Unlock()
|
||||
|
||||
for _, key := range rset {
|
||||
c.readerMap[key] -= 1
|
||||
if c.readerMap[key] == 0 {
|
||||
delete(c.readerMap, key)
|
||||
}
|
||||
}
|
||||
|
||||
for _, key := range wset {
|
||||
c.writerMap[key] -= 1
|
||||
if c.writerMap[key] == 0 {
|
||||
delete(c.writerMap, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// mainLoop executes queued transaction commits for transactions that have
|
||||
// dependencies. The queue ensures that the top element doesn't conflict with
|
||||
// any other transactions and therefore can be executed freely.
|
||||
func (c *commitQueue) mainLoop() {
|
||||
defer close(c.shutdown)
|
||||
|
||||
for {
|
||||
// Wait until there are no unblocked transactions being
|
||||
// executed, and for there to be at least one blocked
|
||||
// transaction in our queue.
|
||||
c.queueCond.L.Lock()
|
||||
for c.queue.Front() == nil {
|
||||
c.queueCond.Wait()
|
||||
|
||||
// Check the exit condition before looping again.
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
c.queueCond.L.Unlock()
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Now collect all txns until we find the next blocking one.
|
||||
// These shouldn't conflict (if the precollected read/write
|
||||
// keys sets don't grow), meaning we can safely commit them
|
||||
// in parallel.
|
||||
work := make([]*commitQueueTxn, 1)
|
||||
e := c.queue.Front()
|
||||
work[0] = c.queue.Remove(e).(*commitQueueTxn)
|
||||
|
||||
for {
|
||||
e := c.queue.Front()
|
||||
if e == nil {
|
||||
break
|
||||
}
|
||||
|
||||
next := e.Value.(*commitQueueTxn)
|
||||
if !next.blocked {
|
||||
work = append(work, next)
|
||||
c.queue.Remove(e)
|
||||
} else {
|
||||
// We found the next blocking txn which means
|
||||
// the block of work needs to be cut here.
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
c.queueCond.L.Unlock()
|
||||
|
||||
// Check if we need to exit before continuing.
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(work))
|
||||
|
||||
// Fire up N goroutines where each will run its commit loop
|
||||
// and then clean up the reader/writer maps.
|
||||
for _, txn := range work {
|
||||
go func(txn *commitQueueTxn) {
|
||||
defer wg.Done()
|
||||
txn.commitLoop()
|
||||
|
||||
// We can safely cleanup here as done only
|
||||
// holds the main mutex.
|
||||
c.done(txn.rset, txn.wset)
|
||||
}(txn)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Check if we need to exit before continuing.
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user