From e45bff3c703c33dbc74c5517caaec857c5cb812b Mon Sep 17 00:00:00 2001 From: Dusan Sekulic Date: Fri, 26 Jul 2024 02:09:48 +0200 Subject: [PATCH] Sqlite - add sqlc and migration (#217) --- Dockerfile | 4 +- server/Makefile | 30 +- server/cmd/arkd/main.go | 1 + server/go.mod | 3 + server/go.sum | 9 + server/internal/app-config/config.go | 3 +- server/internal/config/config.go | 7 +- server/internal/infrastructure/db/service.go | 34 +- .../infrastructure/db/service_test.go | 2 +- .../migration/20240703120550_init.down.sql | 9 + .../migration/20240703120550_init.up.sql | 77 ++ .../infrastructure/db/sqlite/round_repo.go | 735 ++++++--------- .../infrastructure/db/sqlite/sqlc.yaml | 9 + .../db/sqlite/sqlc/queries/db.go | 31 + .../db/sqlite/sqlc/queries/models.go | 100 +++ .../db/sqlite/sqlc/queries/query.sql.go | 847 ++++++++++++++++++ .../infrastructure/db/sqlite/sqlc/query.sql | 152 ++++ .../infrastructure/db/sqlite/utils.go | 40 + .../infrastructure/db/sqlite/vtxo_repo.go | 356 +++----- 19 files changed, 1743 insertions(+), 706 deletions(-) create mode 100644 server/internal/infrastructure/db/sqlite/migration/20240703120550_init.down.sql create mode 100644 server/internal/infrastructure/db/sqlite/migration/20240703120550_init.up.sql create mode 100644 server/internal/infrastructure/db/sqlite/sqlc.yaml create mode 100644 server/internal/infrastructure/db/sqlite/sqlc/queries/db.go create mode 100644 server/internal/infrastructure/db/sqlite/sqlc/queries/models.go create mode 100644 server/internal/infrastructure/db/sqlite/sqlc/queries/query.sql.go create mode 100644 server/internal/infrastructure/db/sqlite/sqlc/query.sql diff --git a/Dockerfile b/Dockerfile index cb2354d..bfe2945 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,11 +20,13 @@ FROM alpine:3.12 WORKDIR /app -COPY --from=builder /app/bin/* /app +COPY --from=builder /app/bin/* /app/ +COPY --from=builder /app/server/internal/infrastructure/db/sqlite/migration/* /app/ ENV PATH="/app:${PATH}" ENV ARK_DATADIR=/app/data ENV ARK_WALLET_DATADIR=/app/wallet-data +ENV ARK_DB_MIGRATION_PATH=file:// # Expose volume containing all 'arkd' data VOLUME /app/data diff --git a/server/Makefile b/server/Makefile index 405c062..82f109d 100755 --- a/server/Makefile +++ b/server/Makefile @@ -61,4 +61,32 @@ proto-lint: @echo "Linting protos..." @docker build -q -t buf -f buf.Dockerfile . &> /dev/null @docker run --rm --volume "$(shell pwd):/workspace" --workdir /workspace buf lint - \ No newline at end of file + + +## mig_file: creates pg migration file(eg. make FILE=init mig_file) +mig_file: + @migrate create -ext sql -dir ./internal/infrastructure/db/sqlite/migration/ $(FILE) + +## mig_up: creates db schema for provided db path +mig_up: + @echo "creating db schema..." + @migrate -database "sqlite://$(DB_PATH)/sqlite.db" -path ./internal/infrastructure/db/sqlite/migration/ up + +## mig_down: apply down migration +mig_down: + @echo "migration down..." + @migrate -database "sqlite://$(DB_PATH)/sqlite.db" -path ./internal/infrastructure/db/sqlite/migration/ down + +## mig_down_yes: apply down migration without prompt +mig_down_yes: + @echo "migration down..." + @"yes" | migrate -database "sqlite://path/to/database" -path ./internal/infrastructure/db/sqlite/migration/ down + +## vet_db: check if mig_up and mig_down are ok +vet_db: recreatedb mig_up mig_down_yes + @echo "vet db migration scripts..." + +## sqlc: gen sql +sqlc: + @echo "gen sql..." + cd ./internal/infrastructure/db/sqlite; sqlc generate \ No newline at end of file diff --git a/server/cmd/arkd/main.go b/server/cmd/arkd/main.go index 17cbb21..29f08c8 100755 --- a/server/cmd/arkd/main.go +++ b/server/cmd/arkd/main.go @@ -37,6 +37,7 @@ func main() { EventDbType: cfg.EventDbType, DbType: cfg.DbType, DbDir: cfg.DbDir, + DbMigrationPath: cfg.DbMigrationPath, EventDbDir: cfg.DbDir, RoundInterval: cfg.RoundInterval, Network: cfg.Network, diff --git a/server/go.mod b/server/go.mod index 618d228..fdd9fb9 100644 --- a/server/go.mod +++ b/server/go.mod @@ -9,6 +9,7 @@ require ( github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 github.com/dgraph-io/badger/v4 v4.2.0 github.com/go-co-op/gocron v1.37.0 + github.com/golang-migrate/migrate/v4 v4.17.1 github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 @@ -28,6 +29,8 @@ require ( github.com/FactomProject/basen v0.0.0-20150613233007-fe3947df716e // indirect github.com/FactomProject/btcutilecc v0.0.0-20130527213604-d3a63a5752ec // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/ncruces/go-strftime v0.1.9 // indirect diff --git a/server/go.sum b/server/go.sum index 620fe5d..dd9e9c2 100644 --- a/server/go.sum +++ b/server/go.sum @@ -95,6 +95,8 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-migrate/migrate/v4 v4.17.1 h1:4zQ6iqL6t6AiItphxJctQb3cFqWiSpMnX7wLTPnnYO4= +github.com/golang-migrate/migrate/v4 v4.17.1/go.mod h1:m8hinFyWBn0SA4QKHuKh175Pm9wjmxj3S2Mia7dbXzM= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.1.1/go.mod h1:zR+okUeTbrL6EL3xHUDxZuEtGv04p5shwip1+mL/rLQ= github.com/golang/glog v1.2.1 h1:OptwRhECazUx5ix5TTWC3EZhsZEHWcYWY4FQHTIubm4= @@ -148,6 +150,11 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDa github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8= github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= @@ -174,6 +181,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= diff --git a/server/internal/app-config/config.go b/server/internal/app-config/config.go index e4f2ee0..c1389d9 100644 --- a/server/internal/app-config/config.go +++ b/server/internal/app-config/config.go @@ -40,6 +40,7 @@ type Config struct { DbType string EventDbType string DbDir string + DbMigrationPath string EventDbDir string RoundInterval int64 Network common.Network @@ -167,7 +168,7 @@ func (c *Config) repoManager() error { case "badger": dataStoreConfig = []interface{}{c.DbDir, logger} case "sqlite": - dataStoreConfig = []interface{}{c.DbDir} + dataStoreConfig = []interface{}{c.DbDir, c.DbMigrationPath} default: return fmt.Errorf("unknown db type") } diff --git a/server/internal/config/config.go b/server/internal/config/config.go index c4de6b1..c2e6281 100644 --- a/server/internal/config/config.go +++ b/server/internal/config/config.go @@ -6,7 +6,7 @@ import ( "path/filepath" "strings" - common "github.com/ark-network/ark/common" + "github.com/ark-network/ark/common" "github.com/spf13/viper" ) @@ -17,6 +17,7 @@ type Config struct { EventDbType string DbType string DbDir string + DbMigrationPath string SchedulerType string TxBuilderType string BlockchainScannerType string @@ -37,6 +38,7 @@ var ( Port = "PORT" EventDbType = "EVENT_DB_TYPE" DbType = "DB_TYPE" + DbMigrationPath = "DB_MIGRATION_PATH" SchedulerType = "SCHEDULER_TYPE" TxBuilderType = "TX_BUILDER_TYPE" BlockchainScannerType = "BC_SCANNER_TYPE" @@ -54,6 +56,7 @@ var ( defaultPort = 6000 defaultWalletAddr = "localhost:18000" defaultDbType = "sqlite" + defaultDbMigrationPath = "file://internal/infrastructure/db/sqlite/migration" defaultEventDbType = "badger" defaultSchedulerType = "gocron" defaultTxBuilderType = "covenant" @@ -75,6 +78,7 @@ func LoadConfig() (*Config, error) { viper.SetDefault(Datadir, defaultDatadir) viper.SetDefault(Port, defaultPort) viper.SetDefault(DbType, defaultDbType) + viper.SetDefault(DbMigrationPath, defaultDbMigrationPath) viper.SetDefault(Insecure, defaultInsecure) viper.SetDefault(LogLevel, defaultLogLevel) viper.SetDefault(Network, defaultNetwork) @@ -105,6 +109,7 @@ func LoadConfig() (*Config, error) { Port: viper.GetUint32(Port), EventDbType: viper.GetString(EventDbType), DbType: viper.GetString(DbType), + DbMigrationPath: viper.GetString(DbMigrationPath), SchedulerType: viper.GetString(SchedulerType), TxBuilderType: viper.GetString(TxBuilderType), BlockchainScannerType: viper.GetString(BlockchainScannerType), diff --git a/server/internal/infrastructure/db/service.go b/server/internal/infrastructure/db/service.go index d2ae81b..1063516 100644 --- a/server/internal/infrastructure/db/service.go +++ b/server/internal/infrastructure/db/service.go @@ -1,6 +1,7 @@ package db import ( + "errors" "fmt" "path/filepath" @@ -8,6 +9,9 @@ import ( "github.com/ark-network/ark/internal/core/ports" badgerdb "github.com/ark-network/ark/internal/infrastructure/db/badger" sqlitedb "github.com/ark-network/ark/internal/infrastructure/db/sqlite" + "github.com/golang-migrate/migrate/v4" + sqlitemigrate "github.com/golang-migrate/migrate/v4/database/sqlite" + _ "github.com/golang-migrate/migrate/v4/source/file" ) var ( @@ -82,18 +86,44 @@ func NewService(config ServiceConfig) (ports.RepoManager, error) { return nil, fmt.Errorf("failed to open vtxo store: %s", err) } case "sqlite": - if len(config.DataStoreConfig) != 1 { + if len(config.DataStoreConfig) != 2 { return nil, fmt.Errorf("invalid data store config") } + baseDir, ok := config.DataStoreConfig[0].(string) if !ok { return nil, fmt.Errorf("invalid base directory") } - db, err := sqlitedb.OpenDb(filepath.Join(baseDir, sqliteDbFile)) + + migrationPath, ok := config.DataStoreConfig[1].(string) + if !ok { + return nil, fmt.Errorf("invalid migration path") + } + + dbFile := filepath.Join(baseDir, sqliteDbFile) + db, err := sqlitedb.OpenDb(dbFile) + if err != nil { + return nil, fmt.Errorf("failed to open db: %s", err) + } + + driver, err := sqlitemigrate.WithInstance(db, &sqlitemigrate.Config{}) if err != nil { return nil, err } + m, err := migrate.NewWithDatabaseInstance( + migrationPath, + "arkdb", + driver, + ) + if err != nil { + return nil, fmt.Errorf("failed to create migration instance: %s", err) + } + + if err := m.Up(); err != nil && !errors.Is(err, migrate.ErrNoChange) { + return nil, fmt.Errorf("failed to run migrations: %s", err) + } + roundStore, err = roundStoreFactory(db) if err != nil { return nil, fmt.Errorf("failed to open round store: %s", err) diff --git a/server/internal/infrastructure/db/service_test.go b/server/internal/infrastructure/db/service_test.go index 38ccefc..b7598dd 100644 --- a/server/internal/infrastructure/db/service_test.go +++ b/server/internal/infrastructure/db/service_test.go @@ -96,7 +96,7 @@ func TestService(t *testing.T) { EventStoreType: "badger", DataStoreType: "sqlite", EventStoreConfig: []interface{}{"", nil}, - DataStoreConfig: []interface{}{dbDir}, + DataStoreConfig: []interface{}{dbDir, "file://sqlite/migration"}, }, }, } diff --git a/server/internal/infrastructure/db/sqlite/migration/20240703120550_init.down.sql b/server/internal/infrastructure/db/sqlite/migration/20240703120550_init.down.sql new file mode 100644 index 0000000..6ae382f --- /dev/null +++ b/server/internal/infrastructure/db/sqlite/migration/20240703120550_init.down.sql @@ -0,0 +1,9 @@ +DROP TABLE IF EXISTS round; + +DROP TABLE IF EXISTS payment; + +DROP TABLE IF EXISTS receiver; + +DROP TABLE IF EXISTS tx; + +DROP TABLE IF EXISTS vtxo; diff --git a/server/internal/infrastructure/db/sqlite/migration/20240703120550_init.up.sql b/server/internal/infrastructure/db/sqlite/migration/20240703120550_init.up.sql new file mode 100644 index 0000000..98b6cd1 --- /dev/null +++ b/server/internal/infrastructure/db/sqlite/migration/20240703120550_init.up.sql @@ -0,0 +1,77 @@ +CREATE TABLE IF NOT EXISTS round ( + id TEXT PRIMARY KEY, + starting_timestamp INTEGER NOT NULL, + ending_timestamp INTEGER NOT NULL, + ended BOOLEAN NOT NULL, + failed BOOLEAN NOT NULL, + stage_code INTEGER NOT NULL, + txid TEXT NOT NULL, + unsigned_tx TEXT NOT NULL, + connector_address TEXT NOT NULL, + dust_amount INTEGER NOT NULL, + version INTEGER NOT NULL, + swept BOOLEAN NOT NULL +); + +CREATE TABLE IF NOT EXISTS payment ( + id TEXT PRIMARY KEY, + round_id TEXT NOT NULL, + FOREIGN KEY (round_id) REFERENCES round(id) +); + +CREATE TABLE IF NOT EXISTS receiver ( + payment_id TEXT NOT NULL, + pubkey TEXT NOT NULL, + amount INTEGER NOT NULL, + onchain_address TEXT NOT NULL, + FOREIGN KEY (payment_id) REFERENCES payment(id), + PRIMARY KEY (payment_id, pubkey) +); + +CREATE TABLE IF NOT EXISTS tx ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + tx TEXT NOT NULL, + round_id TEXT NOT NULL, + type TEXT NOT NULL, + position INTEGER NOT NULL, + txid TEXT, + tree_level INTEGER, + parent_txid TEXT, + is_leaf BOOLEAN, + FOREIGN KEY (round_id) REFERENCES round(id) +); + +CREATE TABLE IF NOT EXISTS vtxo ( + txid TEXT NOT NULL PRIMARY KEY, + vout INTEGER NOT NULL, + pubkey TEXT NOT NULL, + amount INTEGER NOT NULL, + pool_tx TEXT NOT NULL, + spent_by TEXT NOT NULL, + spent BOOLEAN NOT NULL, + redeemed BOOLEAN NOT NULL, + swept BOOLEAN NOT NULL, + expire_at INTEGER NOT NULL, + payment_id TEXT, + FOREIGN KEY (payment_id) REFERENCES payment(id) +); + +CREATE VIEW round_payment_vw AS SELECT payment.* +FROM round +LEFT OUTER JOIN payment +ON round.id=payment.round_id; + +CREATE VIEW round_tx_vw AS SELECT tx.* +FROM round +LEFT OUTER JOIN tx +ON round.id=tx.round_id; + +CREATE VIEW payment_receiver_vw AS SELECT receiver.* +FROM payment +LEFT OUTER JOIN receiver +ON payment.id=receiver.payment_id; + +CREATE VIEW payment_vtxo_vw AS SELECT vtxo.* +FROM payment +LEFT OUTER JOIN vtxo +ON payment.id=vtxo.payment_id; \ No newline at end of file diff --git a/server/internal/infrastructure/db/sqlite/round_repo.go b/server/internal/infrastructure/db/sqlite/round_repo.go index 47755d6..a9a95d5 100644 --- a/server/internal/infrastructure/db/sqlite/round_repo.go +++ b/server/internal/infrastructure/db/sqlite/round_repo.go @@ -8,185 +8,12 @@ import ( "github.com/ark-network/ark/common/tree" "github.com/ark-network/ark/internal/core/domain" + "github.com/ark-network/ark/internal/infrastructure/db/sqlite/sqlc/queries" ) -const ( - createReceiverTable = ` -CREATE TABLE IF NOT EXISTS receiver ( - payment_id TEXT NOT NULL, - pubkey TEXT NOT NULL, - amount INTEGER NOT NULL, - onchain_address TEXT NOT NULL, - FOREIGN KEY (payment_id) REFERENCES payment(id) - PRIMARY KEY (payment_id, pubkey) -); -` - - createPaymentTable = ` -CREATE TABLE IF NOT EXISTS payment ( - id TEXT PRIMARY KEY, - round_id TEXT NOT NULL, - FOREIGN KEY (round_id) REFERENCES round(id) -); -` - - createRoundTable = ` -CREATE TABLE IF NOT EXISTS round ( - id TEXT PRIMARY KEY, - starting_timestamp INTEGER NOT NULL, - ending_timestamp INTEGER NOT NULL, - ended BOOLEAN NOT NULL, - failed BOOLEAN NOT NULL, - stage_code INTEGER NOT NULL, - txid TEXT NOT NULL, - unsigned_tx TEXT NOT NULL, - connector_address TEXT NOT NULL, - dust_amount INTEGER NOT NULL, - version INTEGER NOT NULL, - swept BOOLEAN NOT NULL -); -` - - createTransactionTable = ` -CREATE TABLE IF NOT EXISTS tx ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - tx TEXT NOT NULL, - round_id TEXT NOT NULL, - type TEXT NOT NULL, - position INTEGER NOT NULL, - txid TEXT, - tree_level INTEGER, - parent_txid TEXT, - is_leaf BOOLEAN, - FOREIGN KEY (round_id) REFERENCES round(id) -); -` - upsertTransaction = ` -INSERT INTO tx ( - tx, round_id, type, position, txid, tree_level, parent_txid, is_leaf -) VALUES (?, ?, ?, ?, ?, ?, ?, ?) -ON CONFLICT(id) DO UPDATE SET - tx = EXCLUDED.tx, - round_id = EXCLUDED.round_id, - type = EXCLUDED.type, - position = EXCLUDED.position, - txid = EXCLUDED.txid, - tree_level = EXCLUDED.tree_level, - parent_txid = EXCLUDED.parent_txid, - is_leaf = EXCLUDED.is_leaf; -` - - upsertRound = ` -INSERT INTO round ( - id, - starting_timestamp, - ending_timestamp, - ended, failed, - stage_code, - txid, - unsigned_tx, - connector_address, - dust_amount, - version, - swept -) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) -ON CONFLICT(id) DO UPDATE SET - starting_timestamp = EXCLUDED.starting_timestamp, - ending_timestamp = EXCLUDED.ending_timestamp, - ended = EXCLUDED.ended, - failed = EXCLUDED.failed, - stage_code = EXCLUDED.stage_code, - txid = EXCLUDED.txid, - unsigned_tx = EXCLUDED.unsigned_tx, - connector_address = EXCLUDED.connector_address, - dust_amount = EXCLUDED.dust_amount, - version = EXCLUDED.version, - swept = EXCLUDED.swept; -` - - upsertPayment = ` -INSERT INTO payment (id, round_id) VALUES (?, ?) -ON CONFLICT(id) DO UPDATE SET round_id = EXCLUDED.round_id; -` - - upsertReceiver = ` -INSERT INTO receiver (payment_id, pubkey, amount, onchain_address) VALUES (?, ?, ?, ?) -ON CONFLICT(payment_id, pubkey) DO UPDATE SET - amount = EXCLUDED.amount, - onchain_address = EXCLUDED.onchain_address, - pubkey = EXCLUDED.pubkey; -` - - updateVtxoPaymentId = ` -UPDATE vtxo SET payment_id = ? WHERE txid = ? AND vout = ? -` - - selectRound = ` -SELECT round.id, round.starting_timestamp, round.ending_timestamp, round.ended, round.failed, round.stage_code, round.txid, -round.unsigned_tx, round.connector_address, round.dust_amount, round.version, round.swept, payment.id, receiver.payment_id, -receiver.pubkey, receiver.amount, receiver.onchain_address, vtxo.txid, vtxo.vout, vtxo.pubkey, vtxo.amount, -vtxo.pool_tx, vtxo.spent_by, vtxo.spent, vtxo.redeemed, vtxo.swept, vtxo.expire_at, vtxo.payment_id, -tx.tx, tx.type, tx.position, tx.txid, -tx.tree_level, tx.parent_txid, tx.is_leaf -FROM round -LEFT OUTER JOIN payment ON round.id=payment.round_id -LEFT OUTER JOIN tx ON round.id=tx.round_id -LEFT OUTER JOIN receiver ON payment.id=receiver.payment_id -LEFT OUTER JOIN vtxo ON payment.id=vtxo.payment_id -` - - selectRoundWithId = selectRound + " WHERE round.id = ?;" - selectRoundWithTxId = selectRound + " WHERE round.txid = ?;" - selectSweepableRounds = selectRound + " WHERE round.swept = false AND round.ended = true AND round.failed = false;" - selectSweptRounds = selectRound + " WHERE round.swept = true AND round.failed = false AND round.ended = true AND round.connector_address <> '';" - - selectRoundIdsInRange = ` -SELECT id FROM round WHERE starting_timestamp > ? AND starting_timestamp < ?; -` - - selectRoundIds = ` -SELECT id FROM round; -` -) - -type receiverRow struct { - paymentId *string - pubkey *string - amount *uint64 - onchainAddress *string -} - -type paymentRow struct { - id *string -} - -type transactionRow struct { - tx *string - txType *string - position *int - txid *string - treeLevel *int - parentTxid *string - isLeaf *bool -} - -type roundRow struct { - id *string - startingTimestamp *int64 - endingTimestamp *int64 - ended *bool - failed *bool - stageCode *domain.RoundStage - txid *string - unsignedTx *string - connectorAddress *string - dustAmount *uint64 - version *uint - swept *bool -} - type roundRepository struct { - db *sql.DB + db *sql.DB + querier *queries.Queries } func NewRoundRepository(config ...interface{}) (domain.RoundRepository, error) { @@ -195,202 +22,200 @@ func NewRoundRepository(config ...interface{}) (domain.RoundRepository, error) { } db, ok := config[0].(*sql.DB) if !ok { - return nil, fmt.Errorf("cannot open round repository: invalid config") + return nil, fmt.Errorf("cannot open round repository: invalid config, expected db at 0") } - return newRoundRepository(db) -} - -func newRoundRepository(db *sql.DB) (*roundRepository, error) { - if _, err := db.Exec(createRoundTable); err != nil { - return nil, err - } - - if _, err := db.Exec(createPaymentTable); err != nil { - return nil, err - } - - if _, err := db.Exec(createReceiverTable); err != nil { - return nil, err - } - - if _, err := db.Exec(createTransactionTable); err != nil { - return nil, err - } - - return &roundRepository{db}, nil + return &roundRepository{ + db: db, + querier: queries.New(db), + }, nil } func (r *roundRepository) Close() { _ = r.db.Close() } -func (r *roundRepository) GetRoundsIds(ctx context.Context, startedAfter int64, startedBefore int64) ([]string, error) { - var rows *sql.Rows - +func (r *roundRepository) GetRoundsIds( + ctx context.Context, startedAfter int64, startedBefore int64, +) ([]string, error) { + var roundIDs []string if startedAfter == 0 && startedBefore == 0 { - stmt, err := r.db.Prepare(selectRoundIds) + ids, err := r.querier.SelectRoundIds(ctx) if err != nil { return nil, err } - defer stmt.Close() - rows, err = stmt.Query() - if err != nil { - return nil, err - } + roundIDs = ids } else { - stmt, err := r.db.Prepare(selectRoundIdsInRange) + ids, err := r.querier.SelectRoundIdsInRange( + ctx, + queries.SelectRoundIdsInRangeParams{ + StartingTimestamp: startedAfter, + StartingTimestamp_2: startedBefore, + }, + ) if err != nil { return nil, err } - defer stmt.Close() - rows, err = stmt.Query(startedAfter, startedBefore) - if err != nil { - return nil, err - } + roundIDs = ids } - defer rows.Close() - - ids := make([]string, 0) - - for rows.Next() { - var id string - if err := rows.Scan(&id); err != nil { - return nil, err - } - - ids = append(ids, id) - } - - return ids, nil + return roundIDs, nil } func (r *roundRepository) AddOrUpdateRound(ctx context.Context, round domain.Round) error { - tx, err := r.db.Begin() - if err != nil { - return err - } - - stmt, err := tx.Prepare(upsertRound) - if err != nil { - return err - } - - defer stmt.Close() - - // insert round row - _, err = stmt.Exec( - round.Id, - round.StartingTimestamp, - round.EndingTimestamp, - round.Stage.Ended, - round.Stage.Failed, - round.Stage.Code, - round.Txid, - round.UnsignedTx, - round.ConnectorAddress, - round.DustAmount, - round.Version, - round.Swept, - ) - if err != nil { - return err - } - - // insert transactions rows - if len(round.ForfeitTxs) > 0 || len(round.Connectors) > 0 || len(round.CongestionTree) > 0 { - stmt, err = tx.Prepare(upsertTransaction) - if err != nil { - return err + txBody := func(querierWithTx *queries.Queries) error { + if err := querierWithTx.UpsertRound( + ctx, + queries.UpsertRoundParams{ + ID: round.Id, + StartingTimestamp: round.StartingTimestamp, + EndingTimestamp: round.EndingTimestamp, + Ended: round.Stage.Ended, + Failed: round.Stage.Failed, + StageCode: int64(round.Stage.Code), + Txid: round.Txid, + UnsignedTx: round.UnsignedTx, + ConnectorAddress: round.ConnectorAddress, + DustAmount: int64(round.DustAmount), + Version: int64(round.Version), + Swept: round.Swept, + }, + ); err != nil { + return fmt.Errorf("failed to upsert round: %w", err) } - defer stmt.Close() - - for pos, tx := range round.ForfeitTxs { - _, err := stmt.Exec(tx, round.Id, "forfeit", pos, nil, nil, nil, nil) - if err != nil { - return err - } - } - - for pos, tx := range round.Connectors { - _, err := stmt.Exec(tx, round.Id, "connector", pos, nil, nil, nil, nil) - if err != nil { - return err - } - } - - for level, levelTxs := range round.CongestionTree { - for pos, tx := range levelTxs { - _, err := stmt.Exec(tx.Tx, round.Id, "tree", pos, tx.Txid, level, tx.ParentTxid, tx.Leaf) - if err != nil { - return err - } - } - } - } - - // insert payments rows - if len(round.Payments) > 0 { - stmtUpsertPayment, err := tx.Prepare(upsertPayment) - if err != nil { - return err - } - defer stmtUpsertPayment.Close() - - for _, payment := range round.Payments { - _, err = stmtUpsertPayment.Exec(payment.Id, round.Id) - if err != nil { - return err - } - - stmtUpsertReceiver, err := tx.Prepare(upsertReceiver) - if err != nil { - return err - } - defer stmtUpsertReceiver.Close() - - for _, receiver := range payment.Receivers { - _, err := stmtUpsertReceiver.Exec(payment.Id, receiver.Pubkey, receiver.Amount, receiver.OnchainAddress) - if err != nil { - return err + if len(round.ForfeitTxs) > 0 || len(round.Connectors) > 0 || len(round.CongestionTree) > 0 { + for pos, tx := range round.ForfeitTxs { + if err := querierWithTx.UpsertTransaction( + ctx, + queries.UpsertTransactionParams{ + Tx: tx, + RoundID: round.Id, + Type: "forfeit", + Position: int64(pos), + }, + ); err != nil { + return fmt.Errorf("failed to upsert forfeit transaction: %w", err) } } - stmtUpdatePaymentId, err := tx.Prepare(updateVtxoPaymentId) - if err != nil { - return err + for pos, tx := range round.Connectors { + if err := querierWithTx.UpsertTransaction( + ctx, + queries.UpsertTransactionParams{ + Tx: tx, + RoundID: round.Id, + Type: "connector", + Position: int64(pos), + }, + ); err != nil { + return fmt.Errorf("failed to upsert connector transaction: %w", err) + } } - defer stmtUpdatePaymentId.Close() - for _, input := range payment.Inputs { - _, err := stmtUpdatePaymentId.Exec(payment.Id, input.Txid, input.VOut) - if err != nil { - return err + for level, levelTxs := range round.CongestionTree { + for pos, tx := range levelTxs { + if err := querierWithTx.UpsertTransaction( + ctx, + queries.UpsertTransactionParams{ + Tx: tx.Tx, + RoundID: round.Id, + Type: "tree", + Position: int64(pos), + Txid: sql.NullString{ + String: tx.Txid, + Valid: true, + }, + TreeLevel: sql.NullInt64{ + Int64: int64(level), + Valid: true, + }, + ParentTxid: sql.NullString{ + String: tx.ParentTxid, + Valid: true, + }, + IsLeaf: sql.NullBool{ + Bool: tx.Leaf, + Valid: true, + }, + }, + ); err != nil { + return fmt.Errorf("failed to upsert tree transaction: %w", err) + } } } } + + if len(round.Payments) > 0 { + for _, payment := range round.Payments { + if err := querierWithTx.UpsertPayment( + ctx, + queries.UpsertPaymentParams{ + ID: payment.Id, + RoundID: round.Id, + }, + ); err != nil { + return fmt.Errorf("failed to upsert payment: %w", err) + } + + for _, receiver := range payment.Receivers { + if err := querierWithTx.UpsertReceiver( + ctx, + queries.UpsertReceiverParams{ + PaymentID: payment.Id, + Pubkey: receiver.Pubkey, + Amount: int64(receiver.Amount), + OnchainAddress: receiver.OnchainAddress, + }, + ); err != nil { + return fmt.Errorf("failed to upsert receiver: %w", err) + } + } + + for _, input := range payment.Inputs { + if err := querierWithTx.UpdateVtxoPaymentId( + ctx, + queries.UpdateVtxoPaymentIdParams{ + PaymentID: sql.NullString{ + String: payment.Id, + Valid: true, + }, + Txid: input.Txid, + Vout: int64(input.VOut), + }, + ); err != nil { + return fmt.Errorf("failed to update vtxo payment id: %w", err) + } + } + } + } + + return nil } - return tx.Commit() + return execTx(ctx, r.db, txBody) } func (r *roundRepository) GetRoundWithId(ctx context.Context, id string) (*domain.Round, error) { - stmt, err := r.db.Prepare(selectRoundWithId) - if err != nil { - return nil, err - } - defer stmt.Close() - - rows, err := stmt.Query(id) + rows, err := r.querier.SelectRoundWithRoundId(ctx, id) if err != nil { return nil, err } - rounds, err := readRoundRows(rows) + rvs := make([]roundPaymentTxReceiverVtxoRow, 0, len(rows)) + for _, row := range rows { + rvs = append(rvs, roundPaymentTxReceiverVtxoRow{ + round: row.Round, + payment: row.RoundPaymentVw, + tx: row.RoundTxVw, + receiver: row.PaymentReceiverVw, + vtxo: row.PaymentVtxoVw, + }) + } + + rounds, err := readRoundRows(rvs) if err != nil { return nil, err } @@ -403,18 +228,23 @@ func (r *roundRepository) GetRoundWithId(ctx context.Context, id string) (*domai } func (r *roundRepository) GetRoundWithTxid(ctx context.Context, txid string) (*domain.Round, error) { - stmt, err := r.db.Prepare(selectRoundWithTxId) - if err != nil { - return nil, err - } - defer stmt.Close() - - rows, err := stmt.Query(txid) + rows, err := r.querier.SelectRoundWithRoundTxId(ctx, txid) if err != nil { return nil, err } - rounds, err := readRoundRows(rows) + rvs := make([]roundPaymentTxReceiverVtxoRow, 0, len(rows)) + for _, row := range rows { + rvs = append(rvs, roundPaymentTxReceiverVtxoRow{ + round: row.Round, + payment: row.RoundPaymentVw, + tx: row.RoundTxVw, + receiver: row.PaymentReceiverVw, + vtxo: row.PaymentVtxoVw, + }) + } + + rounds, err := readRoundRows(rvs) if err != nil { return nil, err } @@ -427,18 +257,23 @@ func (r *roundRepository) GetRoundWithTxid(ctx context.Context, txid string) (*d } func (r *roundRepository) GetSweepableRounds(ctx context.Context) ([]domain.Round, error) { - stmt, err := r.db.Prepare(selectSweepableRounds) - if err != nil { - return nil, err - } - defer stmt.Close() - - rows, err := stmt.Query() + rows, err := r.querier.SelectSweepableRounds(ctx) if err != nil { return nil, err } - rounds, err := readRoundRows(rows) + rvs := make([]roundPaymentTxReceiverVtxoRow, 0, len(rows)) + for _, row := range rows { + rvs = append(rvs, roundPaymentTxReceiverVtxoRow{ + round: row.Round, + payment: row.RoundPaymentVw, + tx: row.RoundTxVw, + receiver: row.PaymentReceiverVw, + vtxo: row.PaymentVtxoVw, + }) + } + + rounds, err := readRoundRows(rvs) if err != nil { return nil, err } @@ -453,18 +288,23 @@ func (r *roundRepository) GetSweepableRounds(ctx context.Context) ([]domain.Roun } func (r *roundRepository) GetSweptRounds(ctx context.Context) ([]domain.Round, error) { - stmt, err := r.db.Prepare(selectSweptRounds) - if err != nil { - return nil, err - } - defer stmt.Close() - - rows, err := stmt.Query() + rows, err := r.querier.SelectSweptRounds(ctx) if err != nil { return nil, err } - rounds, err := readRoundRows(rows) + rvs := make([]roundPaymentTxReceiverVtxoRow, 0, len(rows)) + for _, row := range rows { + rvs = append(rvs, roundPaymentTxReceiverVtxoRow{ + round: row.Round, + payment: row.RoundPaymentVw, + tx: row.RoundTxVw, + receiver: row.PaymentReceiverVw, + vtxo: row.PaymentVtxoVw, + }) + } + + rounds, err := readRoundRows(rvs) if err != nil { return nil, err } @@ -478,116 +318,72 @@ func (r *roundRepository) GetSweptRounds(ctx context.Context) ([]domain.Round, e return res, nil } -func rowToReceiver(row receiverRow) domain.Receiver { +func rowToReceiver(row queries.PaymentReceiverVw) domain.Receiver { return domain.Receiver{ - Pubkey: *row.pubkey, - Amount: *row.amount, - OnchainAddress: *row.onchainAddress, + Pubkey: row.Pubkey.String, + Amount: uint64(row.Amount.Int64), + OnchainAddress: row.OnchainAddress.String, } } -func readRoundRows(rows *sql.Rows) ([]*domain.Round, error) { - defer rows.Close() +type roundPaymentTxReceiverVtxoRow struct { + round queries.Round + payment queries.RoundPaymentVw + tx queries.RoundTxVw + receiver queries.PaymentReceiverVw + vtxo queries.PaymentVtxoVw +} +func readRoundRows(rows []roundPaymentTxReceiverVtxoRow) ([]*domain.Round, error) { rounds := make(map[string]*domain.Round) - for rows.Next() { - var roundRow roundRow - var paymentRow paymentRow - var receiverRow receiverRow - var vtxoRow vtxoRow - var transactionRow transactionRow - - if err := rows.Scan( - &roundRow.id, - &roundRow.startingTimestamp, - &roundRow.endingTimestamp, - &roundRow.ended, - &roundRow.failed, - &roundRow.stageCode, - &roundRow.txid, - &roundRow.unsignedTx, - &roundRow.connectorAddress, - &roundRow.dustAmount, - &roundRow.version, - &roundRow.swept, - &paymentRow.id, - &receiverRow.paymentId, - &receiverRow.pubkey, - &receiverRow.amount, - &receiverRow.onchainAddress, - &vtxoRow.txid, - &vtxoRow.vout, - &vtxoRow.pubkey, - &vtxoRow.amount, - &vtxoRow.poolTx, - &vtxoRow.spentBy, - &vtxoRow.spent, - &vtxoRow.redeemed, - &vtxoRow.swept, - &vtxoRow.expireAt, - &vtxoRow.paymentID, - &transactionRow.tx, - &transactionRow.txType, - &transactionRow.position, - &transactionRow.txid, - &transactionRow.treeLevel, - &transactionRow.parentTxid, - &transactionRow.isLeaf, - ); err != nil { - return nil, err - } - + for _, v := range rows { var round *domain.Round var ok bool - if roundRow.id == nil { - continue - } - - round, ok = rounds[*roundRow.id] + round, ok = rounds[v.round.ID] if !ok { round = &domain.Round{ - Id: *roundRow.id, - StartingTimestamp: *roundRow.startingTimestamp, - EndingTimestamp: *roundRow.endingTimestamp, + Id: v.round.ID, + StartingTimestamp: v.round.StartingTimestamp, + EndingTimestamp: v.round.EndingTimestamp, Stage: domain.Stage{ - Ended: *roundRow.ended, - Failed: *roundRow.failed, - Code: *roundRow.stageCode, + Ended: v.round.Ended, + Failed: v.round.Failed, + Code: domain.RoundStage(v.round.StageCode), }, - Txid: *roundRow.txid, - UnsignedTx: *roundRow.unsignedTx, - ConnectorAddress: *roundRow.connectorAddress, - DustAmount: *roundRow.dustAmount, - Version: *roundRow.version, - Swept: *roundRow.swept, + Txid: v.round.Txid, + UnsignedTx: v.round.UnsignedTx, + ConnectorAddress: v.round.ConnectorAddress, + DustAmount: uint64(v.round.DustAmount), + Version: uint(v.round.Version), + Swept: v.round.Swept, Payments: make(map[string]domain.Payment), } } - if paymentRow.id != nil { - payment, ok := round.Payments[*paymentRow.id] + if v.payment.ID.Valid { + payment, ok := round.Payments[v.payment.ID.String] if !ok { payment = domain.Payment{ - Id: *paymentRow.id, + Id: v.payment.ID.String, Inputs: make([]domain.Vtxo, 0), Receivers: make([]domain.Receiver, 0), } - round.Payments[*paymentRow.id] = payment + round.Payments[v.payment.ID.String] = payment } - if vtxoRow.paymentID != nil { - payment, ok = round.Payments[*vtxoRow.paymentID] + if v.vtxo.PaymentID.Valid { + payment, ok = round.Payments[v.vtxo.PaymentID.String] if !ok { payment = domain.Payment{ - Id: *vtxoRow.paymentID, + Id: v.vtxo.PaymentID.String, Inputs: make([]domain.Vtxo, 0), Receivers: make([]domain.Receiver, 0), } } - vtxo := rowToVtxo(vtxoRow) + vtxo := rowToPaymentVtxoVw(v.vtxo) found := false for _, v := range payment.Inputs { @@ -598,62 +394,64 @@ func readRoundRows(rows *sql.Rows) ([]*domain.Round, error) { } if !found { - payment.Inputs = append(payment.Inputs, rowToVtxo(vtxoRow)) - round.Payments[*vtxoRow.paymentID] = payment + payment.Inputs = append(payment.Inputs, rowToPaymentVtxoVw(v.vtxo)) + round.Payments[v.vtxo.PaymentID.String] = payment } } - if receiverRow.paymentId != nil { - payment, ok = round.Payments[*receiverRow.paymentId] + if v.receiver.PaymentID.Valid { + payment, ok = round.Payments[v.receiver.PaymentID.String] if !ok { payment = domain.Payment{ - Id: *receiverRow.paymentId, + Id: v.receiver.PaymentID.String, Inputs: make([]domain.Vtxo, 0), Receivers: make([]domain.Receiver, 0), } } - rcv := rowToReceiver(receiverRow) + rcv := rowToReceiver(v.receiver) found := false for _, rcv := range payment.Receivers { - if rcv.Pubkey == *receiverRow.pubkey && rcv.Amount == *receiverRow.amount { - found = true - break + if v.receiver.Pubkey.Valid && v.receiver.Amount.Valid { + if rcv.Pubkey == v.receiver.Pubkey.String && int64(rcv.Amount) == v.receiver.Amount.Int64 { + found = true + break + } } } if !found { payment.Receivers = append(payment.Receivers, rcv) - round.Payments[*receiverRow.paymentId] = payment + round.Payments[v.receiver.PaymentID.String] = payment } } } - if transactionRow.tx != nil { - position := *transactionRow.position - switch *transactionRow.txType { + if v.tx.Tx.Valid && v.tx.Type.Valid && v.tx.Position.Valid { + position := v.tx.Position + switch v.tx.Type.String { case "forfeit": - round.ForfeitTxs = extendArray(round.ForfeitTxs, position) - round.ForfeitTxs[position] = *transactionRow.tx + round.ForfeitTxs = extendArray(round.ForfeitTxs, int(position.Int64)) + round.ForfeitTxs[position.Int64] = v.tx.Tx.String case "connector": - round.Connectors = extendArray(round.Connectors, position) - round.Connectors[position] = *transactionRow.tx + round.Connectors = extendArray(round.Connectors, int(position.Int64)) + round.Connectors[position.Int64] = v.tx.Tx.String case "tree": - level := *transactionRow.treeLevel - round.CongestionTree = extendArray(round.CongestionTree, level) - round.CongestionTree[level] = extendArray(round.CongestionTree[level], position) - if round.CongestionTree[level][position] == (tree.Node{}) { - round.CongestionTree[level][position] = tree.Node{ - Tx: *transactionRow.tx, - Txid: *transactionRow.txid, - ParentTxid: *transactionRow.parentTxid, - Leaf: *transactionRow.isLeaf, + level := v.tx.TreeLevel + round.CongestionTree = extendArray(round.CongestionTree, int(level.Int64)) + round.CongestionTree[int(level.Int64)] = extendArray(round.CongestionTree[int(level.Int64)], int(position.Int64)) + if round.CongestionTree[int(level.Int64)][position.Int64] == (tree.Node{}) { + round.CongestionTree[int(level.Int64)][position.Int64] = tree.Node{ + Tx: v.tx.Tx.String, + Txid: v.tx.Txid.String, + ParentTxid: v.tx.ParentTxid.String, + Leaf: v.tx.IsLeaf.Bool, } } } } - rounds[*roundRow.id] = round + rounds[v.round.ID] = round } var result []*domain.Round @@ -664,3 +462,22 @@ func readRoundRows(rows *sql.Rows) ([]*domain.Round, error) { return result, nil } + +func rowToPaymentVtxoVw(row queries.PaymentVtxoVw) domain.Vtxo { + return domain.Vtxo{ + VtxoKey: domain.VtxoKey{ + Txid: row.Txid.String, + VOut: uint32(row.Vout.Int64), + }, + Receiver: domain.Receiver{ + Pubkey: row.Pubkey.String, + Amount: uint64(row.Amount.Int64), + }, + PoolTx: row.PoolTx.String, + SpentBy: row.SpentBy.String, + Spent: row.Spent.Bool, + Redeemed: row.Redeemed.Bool, + Swept: row.Swept.Bool, + ExpireAt: row.ExpireAt.Int64, + } +} diff --git a/server/internal/infrastructure/db/sqlite/sqlc.yaml b/server/internal/infrastructure/db/sqlite/sqlc.yaml new file mode 100644 index 0000000..c51256b --- /dev/null +++ b/server/internal/infrastructure/db/sqlite/sqlc.yaml @@ -0,0 +1,9 @@ +version: "2" +sql: + - engine: "sqlite" + queries: "sqlc/query.sql" + schema: "migration" + gen: + go: + package: "queries" + out: "sqlc/queries" \ No newline at end of file diff --git a/server/internal/infrastructure/db/sqlite/sqlc/queries/db.go b/server/internal/infrastructure/db/sqlite/sqlc/queries/db.go new file mode 100644 index 0000000..fa78573 --- /dev/null +++ b/server/internal/infrastructure/db/sqlite/sqlc/queries/db.go @@ -0,0 +1,31 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.26.0 + +package queries + +import ( + "context" + "database/sql" +) + +type DBTX interface { + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + PrepareContext(context.Context, string) (*sql.Stmt, error) + QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) + QueryRowContext(context.Context, string, ...interface{}) *sql.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx *sql.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/server/internal/infrastructure/db/sqlite/sqlc/queries/models.go b/server/internal/infrastructure/db/sqlite/sqlc/queries/models.go new file mode 100644 index 0000000..e033ab8 --- /dev/null +++ b/server/internal/infrastructure/db/sqlite/sqlc/queries/models.go @@ -0,0 +1,100 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.26.0 + +package queries + +import ( + "database/sql" +) + +type Payment struct { + ID string + RoundID string +} + +type PaymentReceiverVw struct { + PaymentID sql.NullString + Pubkey sql.NullString + Amount sql.NullInt64 + OnchainAddress sql.NullString +} + +type PaymentVtxoVw struct { + Txid sql.NullString + Vout sql.NullInt64 + Pubkey sql.NullString + Amount sql.NullInt64 + PoolTx sql.NullString + SpentBy sql.NullString + Spent sql.NullBool + Redeemed sql.NullBool + Swept sql.NullBool + ExpireAt sql.NullInt64 + PaymentID sql.NullString +} + +type Receiver struct { + PaymentID string + Pubkey string + Amount int64 + OnchainAddress string +} + +type Round struct { + ID string + StartingTimestamp int64 + EndingTimestamp int64 + Ended bool + Failed bool + StageCode int64 + Txid string + UnsignedTx string + ConnectorAddress string + DustAmount int64 + Version int64 + Swept bool +} + +type RoundPaymentVw struct { + ID sql.NullString + RoundID sql.NullString +} + +type RoundTxVw struct { + ID sql.NullInt64 + Tx sql.NullString + RoundID sql.NullString + Type sql.NullString + Position sql.NullInt64 + Txid sql.NullString + TreeLevel sql.NullInt64 + ParentTxid sql.NullString + IsLeaf sql.NullBool +} + +type Tx struct { + ID int64 + Tx string + RoundID string + Type string + Position int64 + Txid sql.NullString + TreeLevel sql.NullInt64 + ParentTxid sql.NullString + IsLeaf sql.NullBool +} + +type Vtxo struct { + Txid string + Vout int64 + Pubkey string + Amount int64 + PoolTx string + SpentBy string + Spent bool + Redeemed bool + Swept bool + ExpireAt int64 + PaymentID sql.NullString +} diff --git a/server/internal/infrastructure/db/sqlite/sqlc/queries/query.sql.go b/server/internal/infrastructure/db/sqlite/sqlc/queries/query.sql.go new file mode 100644 index 0000000..2dfbc5c --- /dev/null +++ b/server/internal/infrastructure/db/sqlite/sqlc/queries/query.sql.go @@ -0,0 +1,847 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.26.0 +// source: query.sql + +package queries + +import ( + "context" + "database/sql" +) + +const markVtxoAsRedeemed = `-- name: MarkVtxoAsRedeemed :exec +UPDATE vtxo SET redeemed = true WHERE txid = ? AND vout = ? +` + +type MarkVtxoAsRedeemedParams struct { + Txid string + Vout int64 +} + +func (q *Queries) MarkVtxoAsRedeemed(ctx context.Context, arg MarkVtxoAsRedeemedParams) error { + _, err := q.db.ExecContext(ctx, markVtxoAsRedeemed, arg.Txid, arg.Vout) + return err +} + +const markVtxoAsSpent = `-- name: MarkVtxoAsSpent :exec +UPDATE vtxo SET spent = true, spent_by = ? WHERE txid = ? AND vout = ? +` + +type MarkVtxoAsSpentParams struct { + SpentBy string + Txid string + Vout int64 +} + +func (q *Queries) MarkVtxoAsSpent(ctx context.Context, arg MarkVtxoAsSpentParams) error { + _, err := q.db.ExecContext(ctx, markVtxoAsSpent, arg.SpentBy, arg.Txid, arg.Vout) + return err +} + +const markVtxoAsSwept = `-- name: MarkVtxoAsSwept :exec +UPDATE vtxo SET swept = true WHERE txid = ? AND vout = ? +` + +type MarkVtxoAsSweptParams struct { + Txid string + Vout int64 +} + +func (q *Queries) MarkVtxoAsSwept(ctx context.Context, arg MarkVtxoAsSweptParams) error { + _, err := q.db.ExecContext(ctx, markVtxoAsSwept, arg.Txid, arg.Vout) + return err +} + +const selectNotRedeemedVtxos = `-- name: SelectNotRedeemedVtxos :many +SELECT txid, vout, pubkey, amount, pool_tx, spent_by, spent, redeemed, swept, expire_at, payment_id FROM vtxo WHERE redeemed = false +` + +func (q *Queries) SelectNotRedeemedVtxos(ctx context.Context) ([]Vtxo, error) { + rows, err := q.db.QueryContext(ctx, selectNotRedeemedVtxos) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Vtxo + for rows.Next() { + var i Vtxo + if err := rows.Scan( + &i.Txid, + &i.Vout, + &i.Pubkey, + &i.Amount, + &i.PoolTx, + &i.SpentBy, + &i.Spent, + &i.Redeemed, + &i.Swept, + &i.ExpireAt, + &i.PaymentID, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectNotRedeemedVtxosWithPubkey = `-- name: SelectNotRedeemedVtxosWithPubkey :many +SELECT txid, vout, pubkey, amount, pool_tx, spent_by, spent, redeemed, swept, expire_at, payment_id FROM vtxo WHERE redeemed = false AND pubkey = ? +` + +func (q *Queries) SelectNotRedeemedVtxosWithPubkey(ctx context.Context, pubkey string) ([]Vtxo, error) { + rows, err := q.db.QueryContext(ctx, selectNotRedeemedVtxosWithPubkey, pubkey) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Vtxo + for rows.Next() { + var i Vtxo + if err := rows.Scan( + &i.Txid, + &i.Vout, + &i.Pubkey, + &i.Amount, + &i.PoolTx, + &i.SpentBy, + &i.Spent, + &i.Redeemed, + &i.Swept, + &i.ExpireAt, + &i.PaymentID, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectRoundIds = `-- name: SelectRoundIds :many +SELECT id FROM round +` + +func (q *Queries) SelectRoundIds(ctx context.Context) ([]string, error) { + rows, err := q.db.QueryContext(ctx, selectRoundIds) + if err != nil { + return nil, err + } + defer rows.Close() + var items []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, err + } + items = append(items, id) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectRoundIdsInRange = `-- name: SelectRoundIdsInRange :many +SELECT id FROM round WHERE starting_timestamp > ? AND starting_timestamp < ? +` + +type SelectRoundIdsInRangeParams struct { + StartingTimestamp int64 + StartingTimestamp_2 int64 +} + +func (q *Queries) SelectRoundIdsInRange(ctx context.Context, arg SelectRoundIdsInRangeParams) ([]string, error) { + rows, err := q.db.QueryContext(ctx, selectRoundIdsInRange, arg.StartingTimestamp, arg.StartingTimestamp_2) + if err != nil { + return nil, err + } + defer rows.Close() + var items []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, err + } + items = append(items, id) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectRoundWithRoundId = `-- name: SelectRoundWithRoundId :many +SELECT round.id, round.starting_timestamp, round.ending_timestamp, round.ended, round.failed, round.stage_code, round.txid, round.unsigned_tx, round.connector_address, round.dust_amount, round.version, round.swept, + round_payment_vw.id, round_payment_vw.round_id, + round_tx_vw.id, round_tx_vw.tx, round_tx_vw.round_id, round_tx_vw.type, round_tx_vw.position, round_tx_vw.txid, round_tx_vw.tree_level, round_tx_vw.parent_txid, round_tx_vw.is_leaf, + payment_receiver_vw.payment_id, payment_receiver_vw.pubkey, payment_receiver_vw.amount, payment_receiver_vw.onchain_address, + payment_vtxo_vw.txid, payment_vtxo_vw.vout, payment_vtxo_vw.pubkey, payment_vtxo_vw.amount, payment_vtxo_vw.pool_tx, payment_vtxo_vw.spent_by, payment_vtxo_vw.spent, payment_vtxo_vw.redeemed, payment_vtxo_vw.swept, payment_vtxo_vw.expire_at, payment_vtxo_vw.payment_id +FROM round + LEFT OUTER JOIN round_payment_vw ON round.id=round_payment_vw.round_id + LEFT OUTER JOIN round_tx_vw ON round.id=round_tx_vw.round_id + LEFT OUTER JOIN payment_receiver_vw ON round_payment_vw.id=payment_receiver_vw.payment_id + LEFT OUTER JOIN payment_vtxo_vw ON round_payment_vw.id=payment_vtxo_vw.payment_id +WHERE round.id = ? +` + +type SelectRoundWithRoundIdRow struct { + Round Round + RoundPaymentVw RoundPaymentVw + RoundTxVw RoundTxVw + PaymentReceiverVw PaymentReceiverVw + PaymentVtxoVw PaymentVtxoVw +} + +func (q *Queries) SelectRoundWithRoundId(ctx context.Context, id string) ([]SelectRoundWithRoundIdRow, error) { + rows, err := q.db.QueryContext(ctx, selectRoundWithRoundId, id) + if err != nil { + return nil, err + } + defer rows.Close() + var items []SelectRoundWithRoundIdRow + for rows.Next() { + var i SelectRoundWithRoundIdRow + if err := rows.Scan( + &i.Round.ID, + &i.Round.StartingTimestamp, + &i.Round.EndingTimestamp, + &i.Round.Ended, + &i.Round.Failed, + &i.Round.StageCode, + &i.Round.Txid, + &i.Round.UnsignedTx, + &i.Round.ConnectorAddress, + &i.Round.DustAmount, + &i.Round.Version, + &i.Round.Swept, + &i.RoundPaymentVw.ID, + &i.RoundPaymentVw.RoundID, + &i.RoundTxVw.ID, + &i.RoundTxVw.Tx, + &i.RoundTxVw.RoundID, + &i.RoundTxVw.Type, + &i.RoundTxVw.Position, + &i.RoundTxVw.Txid, + &i.RoundTxVw.TreeLevel, + &i.RoundTxVw.ParentTxid, + &i.RoundTxVw.IsLeaf, + &i.PaymentReceiverVw.PaymentID, + &i.PaymentReceiverVw.Pubkey, + &i.PaymentReceiverVw.Amount, + &i.PaymentReceiverVw.OnchainAddress, + &i.PaymentVtxoVw.Txid, + &i.PaymentVtxoVw.Vout, + &i.PaymentVtxoVw.Pubkey, + &i.PaymentVtxoVw.Amount, + &i.PaymentVtxoVw.PoolTx, + &i.PaymentVtxoVw.SpentBy, + &i.PaymentVtxoVw.Spent, + &i.PaymentVtxoVw.Redeemed, + &i.PaymentVtxoVw.Swept, + &i.PaymentVtxoVw.ExpireAt, + &i.PaymentVtxoVw.PaymentID, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectRoundWithRoundTxId = `-- name: SelectRoundWithRoundTxId :many +SELECT round.id, round.starting_timestamp, round.ending_timestamp, round.ended, round.failed, round.stage_code, round.txid, round.unsigned_tx, round.connector_address, round.dust_amount, round.version, round.swept, + round_payment_vw.id, round_payment_vw.round_id, + round_tx_vw.id, round_tx_vw.tx, round_tx_vw.round_id, round_tx_vw.type, round_tx_vw.position, round_tx_vw.txid, round_tx_vw.tree_level, round_tx_vw.parent_txid, round_tx_vw.is_leaf, + payment_receiver_vw.payment_id, payment_receiver_vw.pubkey, payment_receiver_vw.amount, payment_receiver_vw.onchain_address, + payment_vtxo_vw.txid, payment_vtxo_vw.vout, payment_vtxo_vw.pubkey, payment_vtxo_vw.amount, payment_vtxo_vw.pool_tx, payment_vtxo_vw.spent_by, payment_vtxo_vw.spent, payment_vtxo_vw.redeemed, payment_vtxo_vw.swept, payment_vtxo_vw.expire_at, payment_vtxo_vw.payment_id +FROM round + LEFT OUTER JOIN round_payment_vw ON round.id=round_payment_vw.round_id + LEFT OUTER JOIN round_tx_vw ON round.id=round_tx_vw.round_id + LEFT OUTER JOIN payment_receiver_vw ON round_payment_vw.id=payment_receiver_vw.payment_id + LEFT OUTER JOIN payment_vtxo_vw ON round_payment_vw.id=payment_vtxo_vw.payment_id +WHERE round.txid = ? +` + +type SelectRoundWithRoundTxIdRow struct { + Round Round + RoundPaymentVw RoundPaymentVw + RoundTxVw RoundTxVw + PaymentReceiverVw PaymentReceiverVw + PaymentVtxoVw PaymentVtxoVw +} + +func (q *Queries) SelectRoundWithRoundTxId(ctx context.Context, txid string) ([]SelectRoundWithRoundTxIdRow, error) { + rows, err := q.db.QueryContext(ctx, selectRoundWithRoundTxId, txid) + if err != nil { + return nil, err + } + defer rows.Close() + var items []SelectRoundWithRoundTxIdRow + for rows.Next() { + var i SelectRoundWithRoundTxIdRow + if err := rows.Scan( + &i.Round.ID, + &i.Round.StartingTimestamp, + &i.Round.EndingTimestamp, + &i.Round.Ended, + &i.Round.Failed, + &i.Round.StageCode, + &i.Round.Txid, + &i.Round.UnsignedTx, + &i.Round.ConnectorAddress, + &i.Round.DustAmount, + &i.Round.Version, + &i.Round.Swept, + &i.RoundPaymentVw.ID, + &i.RoundPaymentVw.RoundID, + &i.RoundTxVw.ID, + &i.RoundTxVw.Tx, + &i.RoundTxVw.RoundID, + &i.RoundTxVw.Type, + &i.RoundTxVw.Position, + &i.RoundTxVw.Txid, + &i.RoundTxVw.TreeLevel, + &i.RoundTxVw.ParentTxid, + &i.RoundTxVw.IsLeaf, + &i.PaymentReceiverVw.PaymentID, + &i.PaymentReceiverVw.Pubkey, + &i.PaymentReceiverVw.Amount, + &i.PaymentReceiverVw.OnchainAddress, + &i.PaymentVtxoVw.Txid, + &i.PaymentVtxoVw.Vout, + &i.PaymentVtxoVw.Pubkey, + &i.PaymentVtxoVw.Amount, + &i.PaymentVtxoVw.PoolTx, + &i.PaymentVtxoVw.SpentBy, + &i.PaymentVtxoVw.Spent, + &i.PaymentVtxoVw.Redeemed, + &i.PaymentVtxoVw.Swept, + &i.PaymentVtxoVw.ExpireAt, + &i.PaymentVtxoVw.PaymentID, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectSweepableRounds = `-- name: SelectSweepableRounds :many +SELECT round.id, round.starting_timestamp, round.ending_timestamp, round.ended, round.failed, round.stage_code, round.txid, round.unsigned_tx, round.connector_address, round.dust_amount, round.version, round.swept, + round_payment_vw.id, round_payment_vw.round_id, + round_tx_vw.id, round_tx_vw.tx, round_tx_vw.round_id, round_tx_vw.type, round_tx_vw.position, round_tx_vw.txid, round_tx_vw.tree_level, round_tx_vw.parent_txid, round_tx_vw.is_leaf, + payment_receiver_vw.payment_id, payment_receiver_vw.pubkey, payment_receiver_vw.amount, payment_receiver_vw.onchain_address, + payment_vtxo_vw.txid, payment_vtxo_vw.vout, payment_vtxo_vw.pubkey, payment_vtxo_vw.amount, payment_vtxo_vw.pool_tx, payment_vtxo_vw.spent_by, payment_vtxo_vw.spent, payment_vtxo_vw.redeemed, payment_vtxo_vw.swept, payment_vtxo_vw.expire_at, payment_vtxo_vw.payment_id +FROM round + LEFT OUTER JOIN round_payment_vw ON round.id=round_payment_vw.round_id + LEFT OUTER JOIN round_tx_vw ON round.id=round_tx_vw.round_id + LEFT OUTER JOIN payment_receiver_vw ON round_payment_vw.id=payment_receiver_vw.payment_id + LEFT OUTER JOIN payment_vtxo_vw ON round_payment_vw.id=payment_vtxo_vw.payment_id +WHERE round.swept = false AND round.ended = true AND round.failed = false +` + +type SelectSweepableRoundsRow struct { + Round Round + RoundPaymentVw RoundPaymentVw + RoundTxVw RoundTxVw + PaymentReceiverVw PaymentReceiverVw + PaymentVtxoVw PaymentVtxoVw +} + +func (q *Queries) SelectSweepableRounds(ctx context.Context) ([]SelectSweepableRoundsRow, error) { + rows, err := q.db.QueryContext(ctx, selectSweepableRounds) + if err != nil { + return nil, err + } + defer rows.Close() + var items []SelectSweepableRoundsRow + for rows.Next() { + var i SelectSweepableRoundsRow + if err := rows.Scan( + &i.Round.ID, + &i.Round.StartingTimestamp, + &i.Round.EndingTimestamp, + &i.Round.Ended, + &i.Round.Failed, + &i.Round.StageCode, + &i.Round.Txid, + &i.Round.UnsignedTx, + &i.Round.ConnectorAddress, + &i.Round.DustAmount, + &i.Round.Version, + &i.Round.Swept, + &i.RoundPaymentVw.ID, + &i.RoundPaymentVw.RoundID, + &i.RoundTxVw.ID, + &i.RoundTxVw.Tx, + &i.RoundTxVw.RoundID, + &i.RoundTxVw.Type, + &i.RoundTxVw.Position, + &i.RoundTxVw.Txid, + &i.RoundTxVw.TreeLevel, + &i.RoundTxVw.ParentTxid, + &i.RoundTxVw.IsLeaf, + &i.PaymentReceiverVw.PaymentID, + &i.PaymentReceiverVw.Pubkey, + &i.PaymentReceiverVw.Amount, + &i.PaymentReceiverVw.OnchainAddress, + &i.PaymentVtxoVw.Txid, + &i.PaymentVtxoVw.Vout, + &i.PaymentVtxoVw.Pubkey, + &i.PaymentVtxoVw.Amount, + &i.PaymentVtxoVw.PoolTx, + &i.PaymentVtxoVw.SpentBy, + &i.PaymentVtxoVw.Spent, + &i.PaymentVtxoVw.Redeemed, + &i.PaymentVtxoVw.Swept, + &i.PaymentVtxoVw.ExpireAt, + &i.PaymentVtxoVw.PaymentID, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectSweepableVtxos = `-- name: SelectSweepableVtxos :many +SELECT txid, vout, pubkey, amount, pool_tx, spent_by, spent, redeemed, swept, expire_at, payment_id FROM vtxo WHERE redeemed = false AND swept = false +` + +func (q *Queries) SelectSweepableVtxos(ctx context.Context) ([]Vtxo, error) { + rows, err := q.db.QueryContext(ctx, selectSweepableVtxos) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Vtxo + for rows.Next() { + var i Vtxo + if err := rows.Scan( + &i.Txid, + &i.Vout, + &i.Pubkey, + &i.Amount, + &i.PoolTx, + &i.SpentBy, + &i.Spent, + &i.Redeemed, + &i.Swept, + &i.ExpireAt, + &i.PaymentID, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectSweptRounds = `-- name: SelectSweptRounds :many +SELECT round.id, round.starting_timestamp, round.ending_timestamp, round.ended, round.failed, round.stage_code, round.txid, round.unsigned_tx, round.connector_address, round.dust_amount, round.version, round.swept, + round_payment_vw.id, round_payment_vw.round_id, + round_tx_vw.id, round_tx_vw.tx, round_tx_vw.round_id, round_tx_vw.type, round_tx_vw.position, round_tx_vw.txid, round_tx_vw.tree_level, round_tx_vw.parent_txid, round_tx_vw.is_leaf, + payment_receiver_vw.payment_id, payment_receiver_vw.pubkey, payment_receiver_vw.amount, payment_receiver_vw.onchain_address, + payment_vtxo_vw.txid, payment_vtxo_vw.vout, payment_vtxo_vw.pubkey, payment_vtxo_vw.amount, payment_vtxo_vw.pool_tx, payment_vtxo_vw.spent_by, payment_vtxo_vw.spent, payment_vtxo_vw.redeemed, payment_vtxo_vw.swept, payment_vtxo_vw.expire_at, payment_vtxo_vw.payment_id +FROM round + LEFT OUTER JOIN round_payment_vw ON round.id=round_payment_vw.round_id + LEFT OUTER JOIN round_tx_vw ON round.id=round_tx_vw.round_id + LEFT OUTER JOIN payment_receiver_vw ON round_payment_vw.id=payment_receiver_vw.payment_id + LEFT OUTER JOIN payment_vtxo_vw ON round_payment_vw.id=payment_vtxo_vw.payment_id +WHERE round.swept = true AND round.failed = false AND round.ended = true AND round.connector_address <> '' +` + +type SelectSweptRoundsRow struct { + Round Round + RoundPaymentVw RoundPaymentVw + RoundTxVw RoundTxVw + PaymentReceiverVw PaymentReceiverVw + PaymentVtxoVw PaymentVtxoVw +} + +func (q *Queries) SelectSweptRounds(ctx context.Context) ([]SelectSweptRoundsRow, error) { + rows, err := q.db.QueryContext(ctx, selectSweptRounds) + if err != nil { + return nil, err + } + defer rows.Close() + var items []SelectSweptRoundsRow + for rows.Next() { + var i SelectSweptRoundsRow + if err := rows.Scan( + &i.Round.ID, + &i.Round.StartingTimestamp, + &i.Round.EndingTimestamp, + &i.Round.Ended, + &i.Round.Failed, + &i.Round.StageCode, + &i.Round.Txid, + &i.Round.UnsignedTx, + &i.Round.ConnectorAddress, + &i.Round.DustAmount, + &i.Round.Version, + &i.Round.Swept, + &i.RoundPaymentVw.ID, + &i.RoundPaymentVw.RoundID, + &i.RoundTxVw.ID, + &i.RoundTxVw.Tx, + &i.RoundTxVw.RoundID, + &i.RoundTxVw.Type, + &i.RoundTxVw.Position, + &i.RoundTxVw.Txid, + &i.RoundTxVw.TreeLevel, + &i.RoundTxVw.ParentTxid, + &i.RoundTxVw.IsLeaf, + &i.PaymentReceiverVw.PaymentID, + &i.PaymentReceiverVw.Pubkey, + &i.PaymentReceiverVw.Amount, + &i.PaymentReceiverVw.OnchainAddress, + &i.PaymentVtxoVw.Txid, + &i.PaymentVtxoVw.Vout, + &i.PaymentVtxoVw.Pubkey, + &i.PaymentVtxoVw.Amount, + &i.PaymentVtxoVw.PoolTx, + &i.PaymentVtxoVw.SpentBy, + &i.PaymentVtxoVw.Spent, + &i.PaymentVtxoVw.Redeemed, + &i.PaymentVtxoVw.Swept, + &i.PaymentVtxoVw.ExpireAt, + &i.PaymentVtxoVw.PaymentID, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectVtxoByOutpoint = `-- name: SelectVtxoByOutpoint :one +SELECT txid, vout, pubkey, amount, pool_tx, spent_by, spent, redeemed, swept, expire_at, payment_id FROM vtxo WHERE txid = ? AND vout = ? +` + +type SelectVtxoByOutpointParams struct { + Txid string + Vout int64 +} + +func (q *Queries) SelectVtxoByOutpoint(ctx context.Context, arg SelectVtxoByOutpointParams) (Vtxo, error) { + row := q.db.QueryRowContext(ctx, selectVtxoByOutpoint, arg.Txid, arg.Vout) + var i Vtxo + err := row.Scan( + &i.Txid, + &i.Vout, + &i.Pubkey, + &i.Amount, + &i.PoolTx, + &i.SpentBy, + &i.Spent, + &i.Redeemed, + &i.Swept, + &i.ExpireAt, + &i.PaymentID, + ) + return i, err +} + +const selectVtxosByPoolTxid = `-- name: SelectVtxosByPoolTxid :many +SELECT txid, vout, pubkey, amount, pool_tx, spent_by, spent, redeemed, swept, expire_at, payment_id FROM vtxo WHERE pool_tx = ? +` + +func (q *Queries) SelectVtxosByPoolTxid(ctx context.Context, poolTx string) ([]Vtxo, error) { + rows, err := q.db.QueryContext(ctx, selectVtxosByPoolTxid, poolTx) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Vtxo + for rows.Next() { + var i Vtxo + if err := rows.Scan( + &i.Txid, + &i.Vout, + &i.Pubkey, + &i.Amount, + &i.PoolTx, + &i.SpentBy, + &i.Spent, + &i.Redeemed, + &i.Swept, + &i.ExpireAt, + &i.PaymentID, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const updateVtxoExpireAt = `-- name: UpdateVtxoExpireAt :exec +UPDATE vtxo SET expire_at = ? WHERE txid = ? AND vout = ? +` + +type UpdateVtxoExpireAtParams struct { + ExpireAt int64 + Txid string + Vout int64 +} + +func (q *Queries) UpdateVtxoExpireAt(ctx context.Context, arg UpdateVtxoExpireAtParams) error { + _, err := q.db.ExecContext(ctx, updateVtxoExpireAt, arg.ExpireAt, arg.Txid, arg.Vout) + return err +} + +const updateVtxoPaymentId = `-- name: UpdateVtxoPaymentId :exec +UPDATE vtxo SET payment_id = ? WHERE txid = ? AND vout = ? +` + +type UpdateVtxoPaymentIdParams struct { + PaymentID sql.NullString + Txid string + Vout int64 +} + +func (q *Queries) UpdateVtxoPaymentId(ctx context.Context, arg UpdateVtxoPaymentIdParams) error { + _, err := q.db.ExecContext(ctx, updateVtxoPaymentId, arg.PaymentID, arg.Txid, arg.Vout) + return err +} + +const upsertPayment = `-- name: UpsertPayment :exec +INSERT INTO payment (id, round_id) VALUES (?, ?) +ON CONFLICT(id) DO UPDATE SET round_id = EXCLUDED.round_id +` + +type UpsertPaymentParams struct { + ID string + RoundID string +} + +func (q *Queries) UpsertPayment(ctx context.Context, arg UpsertPaymentParams) error { + _, err := q.db.ExecContext(ctx, upsertPayment, arg.ID, arg.RoundID) + return err +} + +const upsertReceiver = `-- name: UpsertReceiver :exec +INSERT INTO receiver (payment_id, pubkey, amount, onchain_address) VALUES (?, ?, ?, ?) +ON CONFLICT(payment_id, pubkey) DO UPDATE SET + amount = EXCLUDED.amount, + onchain_address = EXCLUDED.onchain_address, + pubkey = EXCLUDED.pubkey +` + +type UpsertReceiverParams struct { + PaymentID string + Pubkey string + Amount int64 + OnchainAddress string +} + +func (q *Queries) UpsertReceiver(ctx context.Context, arg UpsertReceiverParams) error { + _, err := q.db.ExecContext(ctx, upsertReceiver, + arg.PaymentID, + arg.Pubkey, + arg.Amount, + arg.OnchainAddress, + ) + return err +} + +const upsertRound = `-- name: UpsertRound :exec +INSERT INTO round ( + id, + starting_timestamp, + ending_timestamp, + ended, failed, + stage_code, + txid, + unsigned_tx, + connector_address, + dust_amount, + version, + swept +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT(id) DO UPDATE SET + starting_timestamp = EXCLUDED.starting_timestamp, + ending_timestamp = EXCLUDED.ending_timestamp, + ended = EXCLUDED.ended, + failed = EXCLUDED.failed, + stage_code = EXCLUDED.stage_code, + txid = EXCLUDED.txid, + unsigned_tx = EXCLUDED.unsigned_tx, + connector_address = EXCLUDED.connector_address, + dust_amount = EXCLUDED.dust_amount, + version = EXCLUDED.version, + swept = EXCLUDED.swept +` + +type UpsertRoundParams struct { + ID string + StartingTimestamp int64 + EndingTimestamp int64 + Ended bool + Failed bool + StageCode int64 + Txid string + UnsignedTx string + ConnectorAddress string + DustAmount int64 + Version int64 + Swept bool +} + +func (q *Queries) UpsertRound(ctx context.Context, arg UpsertRoundParams) error { + _, err := q.db.ExecContext(ctx, upsertRound, + arg.ID, + arg.StartingTimestamp, + arg.EndingTimestamp, + arg.Ended, + arg.Failed, + arg.StageCode, + arg.Txid, + arg.UnsignedTx, + arg.ConnectorAddress, + arg.DustAmount, + arg.Version, + arg.Swept, + ) + return err +} + +const upsertTransaction = `-- name: UpsertTransaction :exec +INSERT INTO tx ( + tx, round_id, type, position, txid, tree_level, parent_txid, is_leaf +) VALUES (?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT(id) DO UPDATE SET + tx = EXCLUDED.tx, + round_id = EXCLUDED.round_id, + type = EXCLUDED.type, + position = EXCLUDED.position, + txid = EXCLUDED.txid, + tree_level = EXCLUDED.tree_level, + parent_txid = EXCLUDED.parent_txid, + is_leaf = EXCLUDED.is_leaf +` + +type UpsertTransactionParams struct { + Tx string + RoundID string + Type string + Position int64 + Txid sql.NullString + TreeLevel sql.NullInt64 + ParentTxid sql.NullString + IsLeaf sql.NullBool +} + +func (q *Queries) UpsertTransaction(ctx context.Context, arg UpsertTransactionParams) error { + _, err := q.db.ExecContext(ctx, upsertTransaction, + arg.Tx, + arg.RoundID, + arg.Type, + arg.Position, + arg.Txid, + arg.TreeLevel, + arg.ParentTxid, + arg.IsLeaf, + ) + return err +} + +const upsertVtxo = `-- name: UpsertVtxo :exec +INSERT INTO vtxo (txid, vout, pubkey, amount, pool_tx, spent_by, spent, redeemed, swept, expire_at) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(txid) DO UPDATE SET + vout = excluded.vout, + pubkey = excluded.pubkey, + amount = excluded.amount, + pool_tx = excluded.pool_tx, + spent_by = excluded.spent_by, + spent = excluded.spent, + redeemed = excluded.redeemed, + swept = excluded.swept, + expire_at = excluded.expire_at +` + +type UpsertVtxoParams struct { + Txid string + Vout int64 + Pubkey string + Amount int64 + PoolTx string + SpentBy string + Spent bool + Redeemed bool + Swept bool + ExpireAt int64 +} + +func (q *Queries) UpsertVtxo(ctx context.Context, arg UpsertVtxoParams) error { + _, err := q.db.ExecContext(ctx, upsertVtxo, + arg.Txid, + arg.Vout, + arg.Pubkey, + arg.Amount, + arg.PoolTx, + arg.SpentBy, + arg.Spent, + arg.Redeemed, + arg.Swept, + arg.ExpireAt, + ) + return err +} diff --git a/server/internal/infrastructure/db/sqlite/sqlc/query.sql b/server/internal/infrastructure/db/sqlite/sqlc/query.sql new file mode 100644 index 0000000..713feef --- /dev/null +++ b/server/internal/infrastructure/db/sqlite/sqlc/query.sql @@ -0,0 +1,152 @@ +-- name: UpsertTransaction :exec +INSERT INTO tx ( + tx, round_id, type, position, txid, tree_level, parent_txid, is_leaf +) VALUES (?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT(id) DO UPDATE SET + tx = EXCLUDED.tx, + round_id = EXCLUDED.round_id, + type = EXCLUDED.type, + position = EXCLUDED.position, + txid = EXCLUDED.txid, + tree_level = EXCLUDED.tree_level, + parent_txid = EXCLUDED.parent_txid, + is_leaf = EXCLUDED.is_leaf; + +-- name: UpsertRound :exec +INSERT INTO round ( + id, + starting_timestamp, + ending_timestamp, + ended, failed, + stage_code, + txid, + unsigned_tx, + connector_address, + dust_amount, + version, + swept +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT(id) DO UPDATE SET + starting_timestamp = EXCLUDED.starting_timestamp, + ending_timestamp = EXCLUDED.ending_timestamp, + ended = EXCLUDED.ended, + failed = EXCLUDED.failed, + stage_code = EXCLUDED.stage_code, + txid = EXCLUDED.txid, + unsigned_tx = EXCLUDED.unsigned_tx, + connector_address = EXCLUDED.connector_address, + dust_amount = EXCLUDED.dust_amount, + version = EXCLUDED.version, + swept = EXCLUDED.swept; + +-- name: UpsertPayment :exec +INSERT INTO payment (id, round_id) VALUES (?, ?) +ON CONFLICT(id) DO UPDATE SET round_id = EXCLUDED.round_id; + +-- name: UpsertReceiver :exec +INSERT INTO receiver (payment_id, pubkey, amount, onchain_address) VALUES (?, ?, ?, ?) +ON CONFLICT(payment_id, pubkey) DO UPDATE SET + amount = EXCLUDED.amount, + onchain_address = EXCLUDED.onchain_address, + pubkey = EXCLUDED.pubkey; + +-- name: UpdateVtxoPaymentId :exec +UPDATE vtxo SET payment_id = ? WHERE txid = ? AND vout = ?; + +-- name: SelectRoundWithRoundId :many +SELECT sqlc.embed(round), + sqlc.embed(round_payment_vw), + sqlc.embed(round_tx_vw), + sqlc.embed(payment_receiver_vw), + sqlc.embed(payment_vtxo_vw) +FROM round + LEFT OUTER JOIN round_payment_vw ON round.id=round_payment_vw.round_id + LEFT OUTER JOIN round_tx_vw ON round.id=round_tx_vw.round_id + LEFT OUTER JOIN payment_receiver_vw ON round_payment_vw.id=payment_receiver_vw.payment_id + LEFT OUTER JOIN payment_vtxo_vw ON round_payment_vw.id=payment_vtxo_vw.payment_id +WHERE round.id = ?; + +-- name: SelectRoundWithRoundTxId :many +SELECT sqlc.embed(round), + sqlc.embed(round_payment_vw), + sqlc.embed(round_tx_vw), + sqlc.embed(payment_receiver_vw), + sqlc.embed(payment_vtxo_vw) +FROM round + LEFT OUTER JOIN round_payment_vw ON round.id=round_payment_vw.round_id + LEFT OUTER JOIN round_tx_vw ON round.id=round_tx_vw.round_id + LEFT OUTER JOIN payment_receiver_vw ON round_payment_vw.id=payment_receiver_vw.payment_id + LEFT OUTER JOIN payment_vtxo_vw ON round_payment_vw.id=payment_vtxo_vw.payment_id +WHERE round.txid = ?; + +-- name: SelectSweepableRounds :many +SELECT sqlc.embed(round), + sqlc.embed(round_payment_vw), + sqlc.embed(round_tx_vw), + sqlc.embed(payment_receiver_vw), + sqlc.embed(payment_vtxo_vw) +FROM round + LEFT OUTER JOIN round_payment_vw ON round.id=round_payment_vw.round_id + LEFT OUTER JOIN round_tx_vw ON round.id=round_tx_vw.round_id + LEFT OUTER JOIN payment_receiver_vw ON round_payment_vw.id=payment_receiver_vw.payment_id + LEFT OUTER JOIN payment_vtxo_vw ON round_payment_vw.id=payment_vtxo_vw.payment_id +WHERE round.swept = false AND round.ended = true AND round.failed = false; + +-- name: SelectSweptRounds :many +SELECT sqlc.embed(round), + sqlc.embed(round_payment_vw), + sqlc.embed(round_tx_vw), + sqlc.embed(payment_receiver_vw), + sqlc.embed(payment_vtxo_vw) +FROM round + LEFT OUTER JOIN round_payment_vw ON round.id=round_payment_vw.round_id + LEFT OUTER JOIN round_tx_vw ON round.id=round_tx_vw.round_id + LEFT OUTER JOIN payment_receiver_vw ON round_payment_vw.id=payment_receiver_vw.payment_id + LEFT OUTER JOIN payment_vtxo_vw ON round_payment_vw.id=payment_vtxo_vw.payment_id +WHERE round.swept = true AND round.failed = false AND round.ended = true AND round.connector_address <> ''; + +-- name: SelectRoundIdsInRange :many +SELECT id FROM round WHERE starting_timestamp > ? AND starting_timestamp < ?; + +-- name: SelectRoundIds :many +SELECT id FROM round; + +-- name: UpsertVtxo :exec +INSERT INTO vtxo (txid, vout, pubkey, amount, pool_tx, spent_by, spent, redeemed, swept, expire_at) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(txid) DO UPDATE SET + vout = excluded.vout, + pubkey = excluded.pubkey, + amount = excluded.amount, + pool_tx = excluded.pool_tx, + spent_by = excluded.spent_by, + spent = excluded.spent, + redeemed = excluded.redeemed, + swept = excluded.swept, + expire_at = excluded.expire_at; + +-- name: SelectSweepableVtxos :many +SELECT * FROM vtxo WHERE redeemed = false AND swept = false; + +-- name: SelectNotRedeemedVtxos :many +SELECT * FROM vtxo WHERE redeemed = false; + +-- name: SelectNotRedeemedVtxosWithPubkey :many +SELECT * FROM vtxo WHERE redeemed = false AND pubkey = ?; + +-- name: SelectVtxoByOutpoint :one +SELECT * FROM vtxo WHERE txid = ? AND vout = ?; + +-- name: SelectVtxosByPoolTxid :many +SELECT * FROM vtxo WHERE pool_tx = ?; + +-- name: MarkVtxoAsRedeemed :exec +UPDATE vtxo SET redeemed = true WHERE txid = ? AND vout = ?; + +-- name: MarkVtxoAsSwept :exec +UPDATE vtxo SET swept = true WHERE txid = ? AND vout = ?; + +-- name: MarkVtxoAsSpent :exec +UPDATE vtxo SET spent = true, spent_by = ? WHERE txid = ? AND vout = ?; + +-- name: UpdateVtxoExpireAt :exec +UPDATE vtxo SET expire_at = ? WHERE txid = ? AND vout = ?; diff --git a/server/internal/infrastructure/db/sqlite/utils.go b/server/internal/infrastructure/db/sqlite/utils.go index 5cbd4cb..14fc7c1 100644 --- a/server/internal/infrastructure/db/sqlite/utils.go +++ b/server/internal/infrastructure/db/sqlite/utils.go @@ -1,11 +1,13 @@ package sqlitedb import ( + "context" "database/sql" "fmt" "os" "path/filepath" + "github.com/ark-network/ark/internal/infrastructure/db/sqlite/sqlc/queries" _ "modernc.org/sqlite" ) @@ -43,3 +45,41 @@ func extendArray[T any](arr []T, position int) []T { return arr } + +func execTx( + ctx context.Context, + db *sql.DB, + txBody func(*queries.Queries) error, +) (err error) { + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to start transaction: %w", err) + } + + querier := queries.New(db) + + defer func() { + if p := recover(); p != nil { + rollbackErr := tx.Rollback() + if rollbackErr != nil { + err = fmt.Errorf("panic: %v, rollback error: %w", p, rollbackErr) + } + panic(p) // Re-throw after rollback + } else if err != nil { + rollbackErr := tx.Rollback() + if rollbackErr != nil { + err = fmt.Errorf("original error: %v, rollback error: %w", err, rollbackErr) + } + } + }() + + if err = txBody(querier.WithTx(tx)); err != nil { + return fmt.Errorf("failed to execute transaction: %w", err) + } + + if err = tx.Commit(); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + return nil +} diff --git a/server/internal/infrastructure/db/sqlite/vtxo_repo.go b/server/internal/infrastructure/db/sqlite/vtxo_repo.go index 8eb6a6b..0cea81b 100644 --- a/server/internal/infrastructure/db/sqlite/vtxo_repo.go +++ b/server/internal/infrastructure/db/sqlite/vtxo_repo.go @@ -6,93 +6,12 @@ import ( "fmt" "github.com/ark-network/ark/internal/core/domain" + "github.com/ark-network/ark/internal/infrastructure/db/sqlite/sqlc/queries" ) -const ( - createVtxoTable = ` -CREATE TABLE IF NOT EXISTS vtxo ( - txid TEXT NOT NULL PRIMARY KEY, - vout INTEGER NOT NULL, - pubkey TEXT NOT NULL, - amount INTEGER NOT NULL, - pool_tx TEXT NOT NULL, - spent_by TEXT NOT NULL, - spent BOOLEAN NOT NULL, - redeemed BOOLEAN NOT NULL, - swept BOOLEAN NOT NULL, - expire_at INTEGER NOT NULL, - payment_id TEXT, - FOREIGN KEY (payment_id) REFERENCES payment(id) -); -` - - upsertVtxos = ` -INSERT INTO vtxo (txid, vout, pubkey, amount, pool_tx, spent_by, spent, redeemed, swept, expire_at) -VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(txid) DO UPDATE SET - vout = excluded.vout, - pubkey = excluded.pubkey, - amount = excluded.amount, - pool_tx = excluded.pool_tx, - spent_by = excluded.spent_by, - spent = excluded.spent, - redeemed = excluded.redeemed, - swept = excluded.swept, - expire_at = excluded.expire_at; -` - - selectSweepableVtxos = ` -SELECT * FROM vtxo WHERE redeemed = false AND swept = false -` - - selectNotRedeemedVtxos = ` -SELECT * FROM vtxo WHERE redeemed = false -` - - selectNotRedeemedVtxosWithPubkey = ` -SELECT * FROM vtxo WHERE redeemed = false AND pubkey = ? -` - - selectVtxoByOutpoint = ` -SELECT * FROM vtxo WHERE txid = ? AND vout = ? -` - - selectVtxosByPoolTxid = ` -SELECT * FROM vtxo WHERE pool_tx = ? -` - - markVtxoAsRedeemed = ` -UPDATE vtxo SET redeemed = true WHERE txid = ? AND vout = ? -` - - markVtxoAsSwept = ` -UPDATE vtxo SET swept = true WHERE txid = ? AND vout = ? -` - - markVtxoAsSpent = ` -UPDATE vtxo SET spent = true, spent_by = ? WHERE txid = ? AND vout = ? -` - - updateVtxoExpireAt = ` -UPDATE vtxo SET expire_at = ? WHERE txid = ? AND vout = ? -` -) - -type vtxoRow struct { - txid *string - vout *uint32 - pubkey *string - amount *uint64 - poolTx *string - spentBy *string - spent *bool - redeemed *bool - swept *bool - expireAt *int64 - paymentID *string -} - type vxtoRepository struct { - db *sql.DB + db *sql.DB + querier *queries.Queries } func NewVtxoRepository(config ...interface{}) (domain.VtxoRepository, error) { @@ -104,16 +23,10 @@ func NewVtxoRepository(config ...interface{}) (domain.VtxoRepository, error) { return nil, fmt.Errorf("cannot open vtxo repository: invalid config") } - return newVtxoRepository(db) -} - -func newVtxoRepository(db *sql.DB) (*vxtoRepository, error) { - _, err := db.Exec(createVtxoTable) - if err != nil { - return nil, err - } - - return &vxtoRepository{db}, nil + return &vxtoRepository{ + db: db, + querier: queries.New(db), + }, nil } func (v *vxtoRepository) Close() { @@ -121,41 +34,35 @@ func (v *vxtoRepository) Close() { } func (v *vxtoRepository) AddVtxos(ctx context.Context, vtxos []domain.Vtxo) error { - tx, err := v.db.Begin() - if err != nil { - return err - } - - stmt, err := tx.Prepare(upsertVtxos) - if err != nil { - return err - } - - defer stmt.Close() - - for _, vtxo := range vtxos { - _, err := stmt.Exec( - vtxo.Txid, - vtxo.VOut, - vtxo.Pubkey, - vtxo.Amount, - vtxo.PoolTx, - vtxo.SpentBy, - vtxo.Spent, - vtxo.Redeemed, - vtxo.Swept, - vtxo.ExpireAt, - ) - if err != nil { - return err + txBody := func(querierWithTx *queries.Queries) error { + for _, vtxo := range vtxos { + if err := querierWithTx.UpsertVtxo( + ctx, + queries.UpsertVtxoParams{ + Txid: vtxo.Txid, + Vout: int64(vtxo.VOut), + Pubkey: vtxo.Pubkey, + Amount: int64(vtxo.Amount), + PoolTx: vtxo.PoolTx, + SpentBy: vtxo.SpentBy, + Spent: vtxo.Spent, + Redeemed: vtxo.Redeemed, + Swept: vtxo.Swept, + ExpireAt: vtxo.ExpireAt, + }, + ); err != nil { + return err + } } + + return nil } - return tx.Commit() + return execTx(ctx, v.db, txBody) } func (v *vxtoRepository) GetAllSweepableVtxos(ctx context.Context) ([]domain.Vtxo, error) { - rows, err := v.db.Query(selectSweepableVtxos) + rows, err := v.querier.SelectSweepableVtxos(ctx) if err != nil { return nil, err } @@ -166,13 +73,13 @@ func (v *vxtoRepository) GetAllSweepableVtxos(ctx context.Context) ([]domain.Vtx func (v *vxtoRepository) GetAllVtxos(ctx context.Context, pubkey string) ([]domain.Vtxo, []domain.Vtxo, error) { withPubkey := len(pubkey) > 0 - var rows *sql.Rows + var rows []queries.Vtxo var err error if withPubkey { - rows, err = v.db.Query(selectNotRedeemedVtxosWithPubkey, pubkey) + rows, err = v.querier.SelectNotRedeemedVtxosWithPubkey(ctx, pubkey) } else { - rows, err = v.db.Query(selectNotRedeemedVtxos) + rows, err = v.querier.SelectNotRedeemedVtxos(ctx) } if err != nil { return nil, nil, err @@ -198,22 +105,20 @@ func (v *vxtoRepository) GetAllVtxos(ctx context.Context, pubkey string) ([]doma } func (v *vxtoRepository) GetVtxos(ctx context.Context, outpoints []domain.VtxoKey) ([]domain.Vtxo, error) { - stmt, err := v.db.Prepare(selectVtxoByOutpoint) - if err != nil { - return nil, err - } - - defer stmt.Close() - vtxos := make([]domain.Vtxo, 0, len(outpoints)) - - for _, outpoint := range outpoints { - rows, err := stmt.Query(outpoint.Txid, outpoint.VOut) + for _, o := range outpoints { + vtxo, err := v.querier.SelectVtxoByOutpoint( + ctx, + queries.SelectVtxoByOutpointParams{ + Txid: o.Txid, + Vout: int64(o.VOut), + }, + ) if err != nil { return nil, err } - result, err := readRows(rows) + result, err := readRows([]queries.Vtxo{vtxo}) if err != nil { return nil, err } @@ -229,7 +134,7 @@ func (v *vxtoRepository) GetVtxos(ctx context.Context, outpoints []domain.VtxoKe } func (v *vxtoRepository) GetVtxosForRound(ctx context.Context, txid string) ([]domain.Vtxo, error) { - rows, err := v.db.Query(selectVtxosByPoolTxid, txid) + rows, err := v.querier.SelectVtxosByPoolTxid(ctx, txid) if err != nil { return nil, err } @@ -238,139 +143,110 @@ func (v *vxtoRepository) GetVtxosForRound(ctx context.Context, txid string) ([]d } func (v *vxtoRepository) RedeemVtxos(ctx context.Context, vtxos []domain.VtxoKey) error { - tx, err := v.db.Begin() - if err != nil { - return err - } - - stmt, err := tx.Prepare(markVtxoAsRedeemed) - if err != nil { - return err - } - - defer stmt.Close() - - for _, vtxo := range vtxos { - _, err := stmt.Exec(vtxo.Txid, vtxo.VOut) - if err != nil { - return err + txBody := func(querierWithTx *queries.Queries) error { + for _, vtxo := range vtxos { + if err := querierWithTx.MarkVtxoAsRedeemed( + ctx, + queries.MarkVtxoAsRedeemedParams{ + Txid: vtxo.Txid, + Vout: int64(vtxo.VOut), + }, + ); err != nil { + return err + } } + + return nil } - return tx.Commit() + return execTx(ctx, v.db, txBody) } func (v *vxtoRepository) SpendVtxos(ctx context.Context, vtxos []domain.VtxoKey, txid string) error { - tx, err := v.db.Begin() - if err != nil { - return err - } - - stmt, err := tx.Prepare(markVtxoAsSpent) - if err != nil { - return err - } - - defer stmt.Close() - - for _, vtxo := range vtxos { - _, err := stmt.Exec(txid, vtxo.Txid, vtxo.VOut) - if err != nil { - return err + txBody := func(querierWithTx *queries.Queries) error { + for _, vtxo := range vtxos { + if err := querierWithTx.MarkVtxoAsSpent( + ctx, + queries.MarkVtxoAsSpentParams{ + SpentBy: txid, + Txid: vtxo.Txid, + Vout: int64(vtxo.VOut), + }, + ); err != nil { + return err + } } + + return nil } - return tx.Commit() + return execTx(ctx, v.db, txBody) } func (v *vxtoRepository) SweepVtxos(ctx context.Context, vtxos []domain.VtxoKey) error { - tx, err := v.db.Begin() - if err != nil { - return err - } - - stmt, err := tx.Prepare(markVtxoAsSwept) - if err != nil { - return err - } - - defer stmt.Close() - - for _, vtxo := range vtxos { - _, err := stmt.Exec(vtxo.Txid, vtxo.VOut) - if err != nil { - return err + txBody := func(querierWithTx *queries.Queries) error { + for _, vtxo := range vtxos { + if err := querierWithTx.MarkVtxoAsSwept( + ctx, + queries.MarkVtxoAsSweptParams{ + Txid: vtxo.Txid, + Vout: int64(vtxo.VOut), + }, + ); err != nil { + return err + } } + + return nil } - return tx.Commit() + return execTx(ctx, v.db, txBody) } func (v *vxtoRepository) UpdateExpireAt(ctx context.Context, vtxos []domain.VtxoKey, expireAt int64) error { - tx, err := v.db.Begin() - if err != nil { - return err - } - - stmt, err := tx.Prepare(updateVtxoExpireAt) - if err != nil { - return err - } - - defer stmt.Close() - - for _, vtxo := range vtxos { - _, err := stmt.Exec(expireAt, vtxo.Txid, vtxo.VOut) - if err != nil { - return err + txBody := func(querierWithTx *queries.Queries) error { + for _, vtxo := range vtxos { + if err := querierWithTx.UpdateVtxoExpireAt( + ctx, + queries.UpdateVtxoExpireAtParams{ + ExpireAt: expireAt, + Txid: vtxo.Txid, + Vout: int64(vtxo.VOut), + }, + ); err != nil { + return err + } } + + return nil } - return tx.Commit() + return execTx(ctx, v.db, txBody) } -func rowToVtxo(row vtxoRow) domain.Vtxo { +func rowToVtxo(row queries.Vtxo) domain.Vtxo { return domain.Vtxo{ VtxoKey: domain.VtxoKey{ - Txid: *row.txid, - VOut: *row.vout, + Txid: row.Txid, + VOut: uint32(row.Vout), }, Receiver: domain.Receiver{ - Pubkey: *row.pubkey, - Amount: *row.amount, + Pubkey: row.Pubkey, + Amount: uint64(row.Amount), }, - PoolTx: *row.poolTx, - SpentBy: *row.spentBy, - Spent: *row.spent, - Redeemed: *row.redeemed, - Swept: *row.swept, - ExpireAt: *row.expireAt, + PoolTx: row.PoolTx, + SpentBy: row.SpentBy, + Spent: row.Spent, + Redeemed: row.Redeemed, + Swept: row.Swept, + ExpireAt: row.ExpireAt, } } -func readRows(rows *sql.Rows) ([]domain.Vtxo, error) { - defer rows.Close() - vtxos := make([]domain.Vtxo, 0) - - for rows.Next() { - var row vtxoRow - if err := rows.Scan( - &row.txid, - &row.vout, - &row.pubkey, - &row.amount, - &row.poolTx, - &row.spentBy, - &row.spent, - &row.redeemed, - &row.swept, - &row.expireAt, - &row.paymentID, - ); err != nil { - return nil, err - } - - vtxos = append(vtxos, rowToVtxo(row)) +func readRows(rows []queries.Vtxo) ([]domain.Vtxo, error) { + vtxos := make([]domain.Vtxo, 0, len(rows)) + for _, v := range rows { + vtxos = append(vtxos, rowToVtxo(v)) } return vtxos, nil