diff --git a/examples/basic-badgern/main.go b/examples/basic-badgern/main.go index 54b2780..c45e68d 100644 --- a/examples/basic-badgern/main.go +++ b/examples/basic-badgern/main.go @@ -4,8 +4,8 @@ import ( "fmt" "net/http" + "github.com/fiatjaf/eventstore/badgern" "github.com/fiatjaf/khatru" - "github.com/fiatjaf/khatru/plugins/storage/badgern" ) func main() { diff --git a/examples/basic-elasticsearch/main.go b/examples/basic-elasticsearch/main.go index 10d5c16..8c6cfd2 100644 --- a/examples/basic-elasticsearch/main.go +++ b/examples/basic-elasticsearch/main.go @@ -5,7 +5,7 @@ import ( "net/http" "github.com/fiatjaf/khatru" - "github.com/fiatjaf/khatru/plugins/storage/elasticsearch" + "github.com/fiatjaf/eventstore/elasticsearch" ) func main() { diff --git a/examples/basic-lmdbn/main.go b/examples/basic-lmdbn/main.go index e99b94e..343e571 100644 --- a/examples/basic-lmdbn/main.go +++ b/examples/basic-lmdbn/main.go @@ -6,7 +6,7 @@ import ( "os" "github.com/fiatjaf/khatru" - "github.com/fiatjaf/khatru/plugins/storage/lmdbn" + "github.com/fiatjaf/eventstore/lmdbn" ) func main() { diff --git a/examples/basic-postgres/main.go b/examples/basic-postgres/main.go index 4570477..4ee891c 100644 --- a/examples/basic-postgres/main.go +++ b/examples/basic-postgres/main.go @@ -5,7 +5,7 @@ import ( "net/http" "github.com/fiatjaf/khatru" - "github.com/fiatjaf/khatru/plugins/storage/postgresql" + "github.com/fiatjaf/eventstore/postgresql" ) func main() { diff --git a/examples/basic-sqlite3/main.go b/examples/basic-sqlite3/main.go index ea054f9..cec707a 100644 --- a/examples/basic-sqlite3/main.go +++ b/examples/basic-sqlite3/main.go @@ -5,7 +5,7 @@ import ( "net/http" "github.com/fiatjaf/khatru" - "github.com/fiatjaf/khatru/plugins/storage/sqlite3" + "github.com/fiatjaf/eventstore/sqlite3" ) func main() { diff --git a/examples/exclusive/main.go b/examples/exclusive/main.go index f1ac6a0..8b9a55a 100644 --- a/examples/exclusive/main.go +++ b/examples/exclusive/main.go @@ -6,9 +6,9 @@ import ( "net/http" "os" + "github.com/fiatjaf/eventstore/lmdbn" "github.com/fiatjaf/khatru" "github.com/fiatjaf/khatru/plugins" - "github.com/fiatjaf/khatru/plugins/storage/lmdbn" "github.com/nbd-wtf/go-nostr" ) diff --git a/go.mod b/go.mod index 609128d..82fc093 100644 --- a/go.mod +++ b/go.mod @@ -1,52 +1,48 @@ module github.com/fiatjaf/khatru -go 1.20 +go 1.21.0 require ( - github.com/aquasecurity/esquery v0.2.0 - github.com/bmatsuo/lmdb-go v1.8.0 - github.com/dgraph-io/badger/v4 v4.1.0 - github.com/elastic/go-elasticsearch/v8 v8.6.0 github.com/fasthttp/websocket v1.5.3 + github.com/fiatjaf/eventstore v0.0.1 github.com/gobwas/ws v1.2.0 - github.com/jmoiron/sqlx v1.3.1 - github.com/lib/pq v1.10.3 - github.com/mattn/go-sqlite3 v1.14.6 - github.com/nbd-wtf/go-nostr v0.20.0 + github.com/nbd-wtf/go-nostr v0.24.2 github.com/puzpuzpuz/xsync/v2 v2.5.1 github.com/rs/cors v1.7.0 - github.com/stretchr/testify v1.8.2 golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 ) require ( github.com/andybalholm/brotli v1.0.5 // indirect + github.com/aquasecurity/esquery v0.2.0 // indirect + github.com/bmatsuo/lmdb-go v1.8.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect - github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect + github.com/dgraph-io/badger/v4 v4.2.0 // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dustin/go-humanize v1.0.0 // indirect - github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c // indirect + github.com/elastic/elastic-transport-go/v8 v8.3.0 // indirect github.com/elastic/go-elasticsearch/v7 v7.6.0 // indirect + github.com/elastic/go-elasticsearch/v8 v8.10.1 // indirect github.com/fatih/structs v1.1.0 // indirect github.com/gobwas/httphead v0.1.0 // indirect github.com/gobwas/pool v0.2.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect + github.com/golang/glog v1.0.0 // indirect github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect - github.com/golang/protobuf v1.3.1 // indirect + github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.3 // indirect github.com/google/flatbuffers v1.12.1 // indirect + github.com/jmoiron/sqlx v1.3.5 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/compress v1.16.5 // indirect - github.com/kr/text v0.2.0 // indirect + github.com/lib/pq v1.10.9 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/mattn/go-sqlite3 v1.14.17 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/puzpuzpuz/xsync v1.5.2 // indirect github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect github.com/tidwall/gjson v1.14.4 // indirect github.com/tidwall/match v1.1.1 // indirect @@ -56,6 +52,7 @@ require ( go.opencensus.io v0.22.5 // indirect golang.org/x/net v0.8.0 // indirect golang.org/x/sys v0.8.0 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect + google.golang.org/protobuf v1.28.1 // indirect ) + +replace github.com/fiatjaf/eventstore => /home/fiatjaf/comp/eventstore diff --git a/go.sum b/go.sum index b3d369d..37d564a 100644 --- a/go.sum +++ b/go.sum @@ -11,10 +11,9 @@ github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 h1:KdUfX2zKommPRa+PD0sWZUyXe9w277ABlgELO7H04IM= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= -github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -22,26 +21,26 @@ github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5il github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= -github.com/dgraph-io/badger/v4 v4.1.0 h1:E38jc0f+RATYrycSUf9LMv/t47XAy+3CApyYSq4APOQ= -github.com/dgraph-io/badger/v4 v4.1.0/go.mod h1:P50u28d39ibBRmIJuQC/NSdBOg46HnHw7al2SW5QRHg= +github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8BzuWsEs= +github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak= github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c h1:onA2RpIyeCPvYAj1LFYiiMTrSpqVINWMfYFRS7lofJs= -github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= +github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWION0Bh6d9eCFBriiHo= +github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= github.com/elastic/go-elasticsearch/v7 v7.6.0 h1:sYpGLpEFHgLUKLsZUBfuaVI9QgHjS3JdH9fX4/z8QI8= github.com/elastic/go-elasticsearch/v7 v7.6.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= -github.com/elastic/go-elasticsearch/v8 v8.6.0 h1:xMaSe8jIh7NHzmNo9YBkewmaD2Pr+tX+zLkXxhieny4= -github.com/elastic/go-elasticsearch/v8 v8.6.0/go.mod h1:Usvydt+x0dv9a1TzEUaovqbJor8rmOHy5dSmPeMAE2k= +github.com/elastic/go-elasticsearch/v8 v8.10.1 h1:JJ3i2DimYTsJcUoEGbg6tNB0eehTNdid9c5kTR1TGuI= +github.com/elastic/go-elasticsearch/v8 v8.10.1/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg= github.com/fasthttp/websocket v1.5.3 h1:TPpQuLwJYfd4LJPXvHDYPMFWbLjsT91n3GpWtCQtdek= github.com/fasthttp/websocket v1.5.3/go.mod h1:46gg/UBmTU1kUaTcwQXpUxtRwG2PvIZYeA8oL6vF3Fs= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= -github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= -github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= +github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= @@ -50,52 +49,51 @@ github.com/gobwas/ws v1.2.0 h1:u0p9s3xLYpZCA1z5JgCkMeB34CKCMMQbM+G8Ii7YD0I= github.com/gobwas/ws v1.2.0/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= +github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/jgroeneveld/schema v1.0.0 h1:J0E10CrOkiSEsw6dfb1IfrDJD14pf6QLVJ3tRPl/syI= github.com/jgroeneveld/schema v1.0.0/go.mod h1:M14lv7sNMtGvo3ops1MwslaSYgDYxrSmbzWIQ0Mr5rs= github.com/jgroeneveld/trial v2.0.0+incompatible h1:d59ctdgor+VqdZCAiUfVN8K13s0ALDioG5DWwZNtRuQ= github.com/jgroeneveld/trial v2.0.0+incompatible/go.mod h1:I6INLW96EN8WysNBXUFI3M4RIC8ePg9ntAc/Wy+U/+M= -github.com/jmoiron/sqlx v1.3.1 h1:aLN7YINNZ7cYOPK3QC83dbM6KT0NMqVMw961TqrejlE= -github.com/jmoiron/sqlx v1.3.1/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= +github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g= +github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/lib/pq v1.10.3 h1:v9QZf2Sn6AmjXtQeFpdoq/eaNtYP6IN+7lcrygsIAtg= -github.com/lib/pq v1.10.3/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= -github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= -github.com/nbd-wtf/go-nostr v0.20.0 h1:97SYhg68jWh5G1bW1g454hA0dTV7btwtPg836n4no0o= -github.com/nbd-wtf/go-nostr v0.20.0/go.mod h1:iFfiZr8YYSC1vmdUei0VfDB7GH/RjS3cbmiD1I5BKyo= +github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM= +github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/nbd-wtf/go-nostr v0.24.2 h1:1PdFED7uHh3BlXfDVD96npBc0YAgj9hPT+l6NWog4kc= +github.com/nbd-wtf/go-nostr v0.24.2/go.mod h1:eE8Qf8QszZbCd9arBQyotXqATNUElWsTEEx+LLORhyQ= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/puzpuzpuz/xsync v1.5.2 h1:yRAP4wqSOZG+/4pxJ08fPTwrfL0IzE/LKQ/cw509qGY= -github.com/puzpuzpuz/xsync v1.5.2/go.mod h1:K98BYhX3k1dQ2M63t1YNVDanbwUPmBCAhNmVrrxfiGg= github.com/puzpuzpuz/xsync/v2 v2.5.1 h1:mVGYAvzDSu52+zaGyNjC+24Xw2bQi3kTr4QJ6N9pIIU= github.com/puzpuzpuz/xsync/v2 v2.5.1/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU= github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= @@ -103,13 +101,9 @@ github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk= github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= -github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tidwall/gjson v1.14.4 h1:uo0p8EbA09J7RQaflQ1aBRffTR7xedD2bcIVSYxLnkM= github.com/tidwall/gjson v1.14.4/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -180,11 +174,12 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/listener.go b/listener.go index 95acc51..0c51862 100644 --- a/listener.go +++ b/listener.go @@ -9,14 +9,14 @@ type Listener struct { filters nostr.Filters } -var listeners = xsync.NewTypedMapOf[*WebSocket, map[string]*Listener](pointerHasher[WebSocket]) +var listeners = xsync.NewTypedMapOf[*WebSocket, *xsync.MapOf[string, *Listener]](pointerHasher[WebSocket]) func GetListeningFilters() nostr.Filters { respfilters := make(nostr.Filters, 0, listeners.Size()*2) // here we go through all the existing listeners - listeners.Range(func(_ *WebSocket, subs map[string]*Listener) bool { - for _, listener := range subs { + listeners.Range(func(_ *WebSocket, subs *xsync.MapOf[string, *Listener]) bool { + subs.Range(func(_ string, listener *Listener) bool { for _, listenerfilter := range listener.filters { for _, respfilter := range respfilters { // check if this filter specifically is already added to respfilters @@ -32,7 +32,9 @@ func GetListeningFilters() nostr.Filters { nextconn: continue } - } + + return true + }) return true }) @@ -42,15 +44,17 @@ func GetListeningFilters() nostr.Filters { } func setListener(id string, ws *WebSocket, filters nostr.Filters) { - subs, _ := listeners.LoadOrCompute(ws, func() map[string]*Listener { return make(map[string]*Listener) }) - subs[id] = &Listener{filters: filters} + subs, _ := listeners.LoadOrCompute(ws, func() *xsync.MapOf[string, *Listener] { + return xsync.NewMapOf[*Listener]() + }) + subs.Store(id, &Listener{filters: filters}) } // Remove a specific subscription id from listeners for a given ws client func removeListenerId(ws *WebSocket, id string) { if subs, ok := listeners.Load(ws); ok { - delete(subs, id) - if len(subs) == 0 { + subs.Delete(id) + if subs.Size() == 0 { listeners.Delete(ws) } } @@ -62,13 +66,14 @@ func removeListener(ws *WebSocket) { } func notifyListeners(event *nostr.Event) { - listeners.Range(func(ws *WebSocket, subs map[string]*Listener) bool { - for id, listener := range subs { + listeners.Range(func(ws *WebSocket, subs *xsync.MapOf[string, *Listener]) bool { + subs.Range(func(id string, listener *Listener) bool { if !listener.filters.Match(event) { - continue + return true } ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event}) - } + return true + }) return true }) } diff --git a/plugins/storage/badgern/count.go b/plugins/storage/badgern/count.go deleted file mode 100644 index 85b4019..0000000 --- a/plugins/storage/badgern/count.go +++ /dev/null @@ -1,83 +0,0 @@ -package badgern - -import ( - "context" - "encoding/binary" - - "github.com/dgraph-io/badger/v4" - "github.com/nbd-wtf/go-nostr" - "github.com/nbd-wtf/go-nostr/nson" -) - -func (b BadgerBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) { - var count int64 = 0 - - queries, extraFilter, since, prefixLen, idxOffset, err := prepareQueries(filter) - if err != nil { - return 0, err - } - - err = b.View(func(txn *badger.Txn) error { - // iterate only through keys and in reverse order - opts := badger.DefaultIteratorOptions - opts.PrefetchValues = false - opts.Reverse = true - - // actually iterate - for _, q := range queries { - it := txn.NewIterator(opts) - defer it.Close() - - for it.Seek(q.startingPoint); it.ValidForPrefix(q.prefix); it.Next() { - item := it.Item() - key := item.Key() - - if !q.skipTimestamp { - createdAt := binary.BigEndian.Uint32(key[prefixLen:idxOffset]) - if createdAt < since { - break - } - } - - idx := make([]byte, 5) - idx[0] = rawEventStorePrefix - copy(idx[1:], key[idxOffset:]) - - // fetch actual event - item, err := txn.Get(idx) - if err != nil { - if err == badger.ErrDiscardedTxn { - return err - } - - panic(err) - } - - if extraFilter == nil { - count++ - } else { - err = item.Value(func(val []byte) error { - evt := &nostr.Event{} - if err := nson.Unmarshal(string(val), evt); err != nil { - return err - } - - // check if this matches the other filters that were not part of the index - if extraFilter == nil || extraFilter.Matches(evt) { - count++ - } - - return nil - }) - if err != nil { - panic(err) - } - } - } - } - - return nil - }) - - return count, err -} diff --git a/plugins/storage/badgern/delete.go b/plugins/storage/badgern/delete.go deleted file mode 100644 index d31b920..0000000 --- a/plugins/storage/badgern/delete.go +++ /dev/null @@ -1,78 +0,0 @@ -package badgern - -import ( - "context" - "encoding/hex" - - "github.com/dgraph-io/badger/v4" - "github.com/nbd-wtf/go-nostr" - "github.com/nbd-wtf/go-nostr/nson" -) - -func (b *BadgerBackend) DeleteEvent(ctx context.Context, evt *nostr.Event) error { - deletionHappened := false - - err := b.Update(func(txn *badger.Txn) error { - idx := make([]byte, 1, 5) - idx[0] = rawEventStorePrefix - - // query event by id to get its idx - id, _ := hex.DecodeString(evt.ID) - prefix := make([]byte, 1+32) - copy(prefix[1:], id) - opts := badger.DefaultIteratorOptions - opts.PrefetchValues = false - it := txn.NewIterator(opts) - it.Seek(prefix) - if it.ValidForPrefix(prefix) { - // the key is the last 32 bytes - idx = append(idx, it.Item().Key()[1+32:]...) - } - it.Close() - - // if no idx was found, end here, this event doesn't exist - if len(idx) == 1 { - return nil - } - - // fetch the event - item, err := txn.Get(idx) - if err != nil { - return err - } - - item.Value(func(val []byte) error { - evt := &nostr.Event{} - if err := nson.Unmarshal(string(val), evt); err != nil { - return err - } - - // set this so we'll run the GC later - deletionHappened = true - - // calculate all index keys we have for this event and delete them - for _, k := range getIndexKeysForEvent(evt, idx[1:]) { - if err := txn.Delete(k); err != nil { - return err - } - } - - // delete the raw event - return txn.Delete(idx) - }) - - return nil - }) - if err != nil { - return err - } - - // after deleting, run garbage collector - if deletionHappened { - if err := b.RunValueLogGC(0.8); err != nil { - panic(err) - } - } - - return nil -} diff --git a/plugins/storage/badgern/lib.go b/plugins/storage/badgern/lib.go deleted file mode 100644 index 46e1f8f..0000000 --- a/plugins/storage/badgern/lib.go +++ /dev/null @@ -1,159 +0,0 @@ -package badgern - -import ( - "encoding/binary" - "encoding/hex" - - "github.com/dgraph-io/badger/v4" - "github.com/nbd-wtf/go-nostr" -) - -const ( - rawEventStorePrefix byte = 0 - indexCreatedAtPrefix byte = 1 - indexIdPrefix byte = 2 - indexKindPrefix byte = 3 - indexPubkeyPrefix byte = 4 - indexPubkeyKindPrefix byte = 5 - indexTagPrefix byte = 6 -) - -type BadgerBackend struct { - Path string - MaxLimit int - - *badger.DB - seq *badger.Sequence -} - -func (b *BadgerBackend) Init() error { - db, err := badger.Open(badger.DefaultOptions(b.Path)) - if err != nil { - return err - } - b.DB = db - b.seq, err = db.GetSequence([]byte("events"), 1000) - if err != nil { - return err - } - - if b.MaxLimit == 0 { - b.MaxLimit = 500 - } - - // DEBUG: inspecting keys on startup - // db.View(func(txn *badger.Txn) error { - // opts := badger.DefaultIteratorOptions - // opts.PrefetchSize = 10 - // it := txn.NewIterator(opts) - // defer it.Close() - // for it.Rewind(); it.Valid(); it.Next() { - // item := it.Item() - // k := item.Key() - // err := item.Value(func(v []byte) error { - // fmt.Println("key:", k) - // return nil - // }) - // if err != nil { - // return err - // } - // } - // return nil - // }) - - return nil -} - -func (b BadgerBackend) Close() { - b.DB.Close() - b.seq.Release() -} - -func (b BadgerBackend) Serial() []byte { - v, _ := b.seq.Next() - vb := make([]byte, 5) - vb[0] = rawEventStorePrefix - binary.BigEndian.PutUint32(vb[1:], uint32(v)) - return vb -} - -func getIndexKeysForEvent(evt *nostr.Event, idx []byte) [][]byte { - keys := make([][]byte, 0, 18) - - // indexes - { - // ~ by id - id, _ := hex.DecodeString(evt.ID) - k := make([]byte, 1+32+4) - k[0] = indexIdPrefix - copy(k[1:], id) - copy(k[1+32:], idx) - keys = append(keys, k) - } - - { - // ~ by pubkey+date - pubkey, _ := hex.DecodeString(evt.PubKey) - k := make([]byte, 1+32+4+4) - k[0] = indexPubkeyPrefix - copy(k[1:], pubkey) - binary.BigEndian.PutUint32(k[1+32:], uint32(evt.CreatedAt)) - copy(k[1+32+4:], idx) - keys = append(keys, k) - } - - { - // ~ by kind+date - k := make([]byte, 1+2+4+4) - k[0] = indexKindPrefix - binary.BigEndian.PutUint16(k[1:], uint16(evt.Kind)) - binary.BigEndian.PutUint32(k[1+2:], uint32(evt.CreatedAt)) - copy(k[1+2+4:], idx) - keys = append(keys, k) - } - - { - // ~ by pubkey+kind+date - pubkey, _ := hex.DecodeString(evt.PubKey) - k := make([]byte, 1+32+2+4+4) - k[0] = indexPubkeyKindPrefix - copy(k[1:], pubkey) - binary.BigEndian.PutUint16(k[1+32:], uint16(evt.Kind)) - binary.BigEndian.PutUint32(k[1+32+2:], uint32(evt.CreatedAt)) - copy(k[1+32+2+4:], idx) - keys = append(keys, k) - } - - // ~ by tagvalue+date - for _, tag := range evt.Tags { - if len(tag) < 2 || len(tag[0]) != 1 || len(tag[1]) == 0 || len(tag[1]) > 100 { - continue - } - - var v []byte - if vb, _ := hex.DecodeString(tag[1]); len(vb) == 32 { - // store value as bytes - v = vb - } else { - v = []byte(tag[1]) - } - - k := make([]byte, 1+len(v)+4+4) - k[0] = indexTagPrefix - copy(k[1:], v) - binary.BigEndian.PutUint32(k[1+len(v):], uint32(evt.CreatedAt)) - copy(k[1+len(v)+4:], idx) - keys = append(keys, k) - } - - { - // ~ by date only - k := make([]byte, 1+4+4) - k[0] = indexCreatedAtPrefix - binary.BigEndian.PutUint32(k[1:], uint32(evt.CreatedAt)) - copy(k[1+4:], idx) - keys = append(keys, k) - } - - return keys -} diff --git a/plugins/storage/badgern/query.go b/plugins/storage/badgern/query.go deleted file mode 100644 index 1e5c692..0000000 --- a/plugins/storage/badgern/query.go +++ /dev/null @@ -1,325 +0,0 @@ -package badgern - -import ( - "container/heap" - "context" - "encoding/binary" - "encoding/hex" - "fmt" - - "github.com/dgraph-io/badger/v4" - "github.com/nbd-wtf/go-nostr" - "github.com/nbd-wtf/go-nostr/nson" -) - -type query struct { - i int - prefix []byte - startingPoint []byte - results chan *nostr.Event - skipTimestamp bool -} - -type queryEvent struct { - *nostr.Event - query int -} - -func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) { - ch := make(chan *nostr.Event) - - queries, extraFilter, since, prefixLen, idxOffset, err := prepareQueries(filter) - if err != nil { - return nil, err - } - - go func() { - err := b.View(func(txn *badger.Txn) error { - // iterate only through keys and in reverse order - opts := badger.DefaultIteratorOptions - opts.PrefetchValues = false - opts.Reverse = true - - // actually iterate - iteratorClosers := make([]func(), len(queries)) - for i, q := range queries { - go func(i int, q query) { - it := txn.NewIterator(opts) - iteratorClosers[i] = it.Close - - defer close(q.results) - - for it.Seek(q.startingPoint); it.ValidForPrefix(q.prefix); it.Next() { - item := it.Item() - key := item.Key() - - if !q.skipTimestamp { - createdAt := binary.BigEndian.Uint32(key[prefixLen:idxOffset]) - if createdAt < since { - break - } - } - - idx := make([]byte, 5) - idx[0] = rawEventStorePrefix - copy(idx[1:], key[idxOffset:]) - - // fetch actual event - item, err := txn.Get(idx) - if err != nil { - if err == badger.ErrDiscardedTxn { - return - } - - panic(err) - } - err = item.Value(func(val []byte) error { - evt := &nostr.Event{} - if err := nson.Unmarshal(string(val), evt); err != nil { - return err - } - - // check if this matches the other filters that were not part of the index - if extraFilter == nil || extraFilter.Matches(evt) { - q.results <- evt - } - - return nil - }) - if err != nil { - panic(err) - } - } - }(i, q) - } - - // max number of events we'll return - limit := b.MaxLimit - if filter.Limit > 0 && filter.Limit < limit { - limit = filter.Limit - } - - // receive results and ensure we only return the most recent ones always - emittedEvents := 0 - - // first pass - emitQueue := make(priorityQueue, 0, len(queries)+limit) - for _, q := range queries { - evt, ok := <-q.results - if ok { - emitQueue = append(emitQueue, &queryEvent{Event: evt, query: q.i}) - } - } - - // now it's a good time to schedule this - defer func() { - close(ch) - for _, itclose := range iteratorClosers { - itclose() - } - }() - - // queue may be empty here if we have literally nothing - if len(emitQueue) == 0 { - return nil - } - - heap.Init(&emitQueue) - - // iterate until we've emitted all events required - for { - // emit latest event in queue - latest := emitQueue[0] - ch <- latest.Event - - // stop when reaching limit - emittedEvents++ - if emittedEvents == limit { - break - } - - // fetch a new one from query results and replace the previous one with it - if evt, ok := <-queries[latest.query].results; ok { - emitQueue[0].Event = evt - heap.Fix(&emitQueue, 0) - } else { - // if this query has no more events we just remove this and proceed normally - heap.Remove(&emitQueue, 0) - - // check if the list is empty and end - if len(emitQueue) == 0 { - break - } - } - } - - return nil - }) - if err != nil { - panic(err) - } - }() - - return ch, nil -} - -type priorityQueue []*queryEvent - -func (pq priorityQueue) Len() int { return len(pq) } - -func (pq priorityQueue) Less(i, j int) bool { - return pq[i].CreatedAt > pq[j].CreatedAt -} - -func (pq priorityQueue) Swap(i, j int) { - pq[i], pq[j] = pq[j], pq[i] -} - -func (pq *priorityQueue) Push(x any) { - item := x.(*queryEvent) - *pq = append(*pq, item) -} - -func (pq *priorityQueue) Pop() any { - old := *pq - n := len(old) - item := old[n-1] - old[n-1] = nil // avoid memory leak - *pq = old[0 : n-1] - return item -} - -func prepareQueries(filter nostr.Filter) ( - queries []query, - extraFilter *nostr.Filter, - since uint32, - prefixLen int, - idxOffset int, - err error, -) { - var index byte - - if len(filter.IDs) > 0 { - index = indexIdPrefix - queries = make([]query, len(filter.IDs)) - for i, idHex := range filter.IDs { - prefix := make([]byte, 1+32) - prefix[0] = index - id, _ := hex.DecodeString(idHex) - if len(id) != 32 { - return nil, nil, 0, 0, 0, fmt.Errorf("invalid id '%s'", idHex) - } - copy(prefix[1:], id) - queries[i] = query{i: i, prefix: prefix, skipTimestamp: true} - } - } else if len(filter.Authors) > 0 { - if len(filter.Kinds) == 0 { - index = indexPubkeyPrefix - queries = make([]query, len(filter.Authors)) - for i, pubkeyHex := range filter.Authors { - pubkey, _ := hex.DecodeString(pubkeyHex) - if len(pubkey) != 32 { - continue - } - prefix := make([]byte, 1+32) - prefix[0] = index - copy(prefix[1:], pubkey) - queries[i] = query{i: i, prefix: prefix} - } - } else { - index = indexPubkeyKindPrefix - queries = make([]query, len(filter.Authors)*len(filter.Kinds)) - i := 0 - for _, pubkeyHex := range filter.Authors { - for _, kind := range filter.Kinds { - pubkey, _ := hex.DecodeString(pubkeyHex) - if len(pubkey) != 32 { - return nil, nil, 0, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) - } - prefix := make([]byte, 1+32+2) - prefix[0] = index - copy(prefix[1:], pubkey) - binary.BigEndian.PutUint16(prefix[1+32:], uint16(kind)) - queries[i] = query{i: i, prefix: prefix} - i++ - } - } - } - extraFilter = &nostr.Filter{Tags: filter.Tags} - } else if len(filter.Tags) > 0 { - index = indexTagPrefix - - // determine the size of the queries array by inspecting all tags sizes - size := 0 - for _, values := range filter.Tags { - size += len(values) - } - queries = make([]query, size) - - extraFilter = &nostr.Filter{Kinds: filter.Kinds} - i := 0 - for _, values := range filter.Tags { - for _, value := range values { - bv, _ := hex.DecodeString(value) - var size int - if len(bv) == 32 { - // hex tag - size = 32 - } else { - // string tag - bv = []byte(value) - size = len(bv) - } - prefix := make([]byte, 1+size) - prefix[0] = index - copy(prefix[1:], bv) - queries[i] = query{i: i, prefix: prefix} - i++ - } - } - } else if len(filter.Kinds) > 0 { - index = indexKindPrefix - queries = make([]query, len(filter.Kinds)) - for i, kind := range filter.Kinds { - prefix := make([]byte, 1+2) - prefix[0] = index - binary.BigEndian.PutUint16(prefix[1:], uint16(kind)) - queries[i] = query{i: i, prefix: prefix} - } - } else { - index = indexCreatedAtPrefix - queries = make([]query, 1) - prefix := make([]byte, 1) - prefix[0] = index - queries[0] = query{i: 0, prefix: prefix} - extraFilter = nil - } - - prefixLen = len(queries[0].prefix) - - if index == indexIdPrefix { - idxOffset = prefixLen - } else { - idxOffset = prefixLen + 4 - } - - var until uint32 = 4294967295 - if filter.Until != nil { - if fu := uint32(*filter.Until); fu < until { - until = fu + 1 - } - } - for i, q := range queries { - queries[i].startingPoint = binary.BigEndian.AppendUint32(q.prefix, uint32(until)) - queries[i].results = make(chan *nostr.Event, 12) - } - - // this is where we'll end the iteration - if filter.Since != nil { - if fs := uint32(*filter.Since); fs > since { - since = fs - } - } - - return queries, extraFilter, since, prefixLen, idxOffset, nil -} diff --git a/plugins/storage/badgern/save.go b/plugins/storage/badgern/save.go deleted file mode 100644 index b8d2cb8..0000000 --- a/plugins/storage/badgern/save.go +++ /dev/null @@ -1,32 +0,0 @@ -package badgern - -import ( - "context" - - "github.com/dgraph-io/badger/v4" - "github.com/nbd-wtf/go-nostr" - "github.com/nbd-wtf/go-nostr/nson" -) - -func (b *BadgerBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error { - return b.Update(func(txn *badger.Txn) error { - nson, err := nson.Marshal(evt) - if err != nil { - return err - } - - idx := b.Serial() - // raw event store - if err := txn.Set(idx, []byte(nson)); err != nil { - return err - } - - for _, k := range getIndexKeysForEvent(evt, idx[1:]) { - if err := txn.Set(k, nil); err != nil { - return err - } - } - - return nil - }) -} diff --git a/plugins/storage/elasticsearch/elasticsearch.go b/plugins/storage/elasticsearch/elasticsearch.go deleted file mode 100644 index 25ca22f..0000000 --- a/plugins/storage/elasticsearch/elasticsearch.go +++ /dev/null @@ -1,182 +0,0 @@ -package elasticsearch - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "strings" - "time" - - "github.com/elastic/go-elasticsearch/v8" - "github.com/elastic/go-elasticsearch/v8/esutil" - "github.com/nbd-wtf/go-nostr" -) - -type IndexedEvent struct { - Event nostr.Event `json:"event"` - ContentSearch string `json:"content_search"` -} - -var indexMapping = ` -{ - "settings": { - "number_of_shards": 1, - "number_of_replicas": 0 - }, - "mappings": { - "dynamic": false, - "properties": { - "event": { - "dynamic": false, - "properties": { - "id": {"type": "keyword"}, - "pubkey": {"type": "keyword"}, - "kind": {"type": "integer"}, - "tags": {"type": "keyword"}, - "created_at": {"type": "date"} - } - }, - "content_search": {"type": "text"} - } - } -} -` - -type ElasticsearchStorage struct { - URL string - IndexName string - - es *elasticsearch.Client - bi esutil.BulkIndexer -} - -func (ess *ElasticsearchStorage) Init() error { - if ess.IndexName == "" { - ess.IndexName = "events" - } - - cfg := elasticsearch.Config{} - if ess.URL != "" { - cfg.Addresses = strings.Split(ess.URL, ",") - } - es, err := elasticsearch.NewClient(cfg) - if err != nil { - return err - } - - res, err := es.Indices.Create(ess.IndexName, es.Indices.Create.WithBody(strings.NewReader(indexMapping))) - if err != nil { - return err - } - if res.IsError() { - body, _ := io.ReadAll(res.Body) - txt := string(body) - if !strings.Contains(txt, "resource_already_exists_exception") { - return fmt.Errorf("%s", txt) - } - } - - // bulk indexer - bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ - Index: ess.IndexName, - Client: es, - NumWorkers: 2, - FlushInterval: 3 * time.Second, - }) - if err != nil { - return fmt.Errorf("error creating the indexer: %s", err) - } - - ess.es = es - ess.bi = bi - - return nil -} - -func (ess *ElasticsearchStorage) DeleteEvent(ctx context.Context, evt *nostr.Event) error { - done := make(chan error) - err := ess.bi.Add( - ctx, - esutil.BulkIndexerItem{ - Action: "delete", - DocumentID: evt.ID, - OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { - close(done) - }, - OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { - if err != nil { - done <- err - } else { - // ok if deleted item not found - if res.Status == 404 { - close(done) - return - } - txt, _ := json.Marshal(res) - err := fmt.Errorf("ERROR: %s", txt) - done <- err - } - }, - }, - ) - if err != nil { - return err - } - - err = <-done - return err -} - -func (ess *ElasticsearchStorage) SaveEvent(ctx context.Context, evt *nostr.Event) error { - ie := &IndexedEvent{ - Event: *evt, - } - - // post processing: index for FTS - // some ideas: - // - index kind=0 fields a set of dedicated mapped fields - // (or use a separate index for profiles with a dedicated mapping) - // - if it's valid JSON just index the "values" and not the keys - // - more content introspection: language detection - // - denormalization... attach profile + ranking signals to events - if evt.Kind != 4 { - ie.ContentSearch = evt.Content - } - - data, err := json.Marshal(ie) - if err != nil { - return err - } - - done := make(chan error) - - // adapted from: - // https://github.com/elastic/go-elasticsearch/blob/main/_examples/bulk/indexer.go#L196 - err = ess.bi.Add( - ctx, - esutil.BulkIndexerItem{ - Action: "index", - DocumentID: evt.ID, - Body: bytes.NewReader(data), - OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { - close(done) - }, - OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { - if err != nil { - done <- err - } else { - err := fmt.Errorf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) - done <- err - } - }, - }, - ) - if err != nil { - return err - } - - err = <-done - return err -} diff --git a/plugins/storage/elasticsearch/query.go b/plugins/storage/elasticsearch/query.go deleted file mode 100644 index 9fa6f05..0000000 --- a/plugins/storage/elasticsearch/query.go +++ /dev/null @@ -1,261 +0,0 @@ -package elasticsearch - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "log" - "reflect" - - "github.com/aquasecurity/esquery" - "github.com/elastic/go-elasticsearch/v8/esutil" - "github.com/nbd-wtf/go-nostr" -) - -type EsSearchResult struct { - Took int - TimedOut bool `json:"timed_out"` - Hits struct { - Total struct { - Value int - Relation string - } - Hits []struct { - Source IndexedEvent `json:"_source"` - } - } -} - -type EsCountResult struct { - Count int64 -} - -func buildDsl(filter nostr.Filter) ([]byte, error) { - dsl := esquery.Bool() - - prefixFilter := func(fieldName string, values []string) { - if len(values) == 0 { - return - } - prefixQ := esquery.Bool() - for _, v := range values { - if len(v) < 64 { - prefixQ.Should(esquery.Prefix(fieldName, v)) - } else { - prefixQ.Should(esquery.Term(fieldName, v)) - } - } - dsl.Must(prefixQ) - } - - // ids - prefixFilter("event.id", filter.IDs) - - // authors - prefixFilter("event.pubkey", filter.Authors) - - // kinds - if len(filter.Kinds) > 0 { - dsl.Must(esquery.Terms("event.kind", toInterfaceSlice(filter.Kinds)...)) - } - - // tags - if len(filter.Tags) > 0 { - tagQ := esquery.Bool() - for char, terms := range filter.Tags { - vs := toInterfaceSlice(append(terms, char)) - tagQ.Should(esquery.Terms("event.tags", vs...)) - } - dsl.Must(tagQ) - } - - // since - if filter.Since != nil { - dsl.Must(esquery.Range("event.created_at").Gt(filter.Since)) - } - - // until - if filter.Until != nil { - dsl.Must(esquery.Range("event.created_at").Lt(filter.Until)) - } - - // search - if filter.Search != "" { - dsl.Must(esquery.Match("content_search", filter.Search)) - } - - return json.Marshal(esquery.Query(dsl)) -} - -func (ess *ElasticsearchStorage) getByID(filter nostr.Filter) ([]*nostr.Event, error) { - got, err := ess.es.Mget( - esutil.NewJSONReader(filter), - ess.es.Mget.WithIndex(ess.IndexName)) - if err != nil { - return nil, err - } - - var mgetResponse struct { - Docs []struct { - Found bool - Source IndexedEvent `json:"_source"` - } - } - if err := json.NewDecoder(got.Body).Decode(&mgetResponse); err != nil { - return nil, err - } - - events := make([]*nostr.Event, 0, len(mgetResponse.Docs)) - for _, e := range mgetResponse.Docs { - if e.Found { - events = append(events, &e.Source.Event) - } - } - - return events, nil -} - -func (ess *ElasticsearchStorage) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) { - ch := make(chan *nostr.Event) - - // optimization: get by id - if isGetByID(filter) { - if evts, err := ess.getByID(filter); err == nil { - for _, evt := range evts { - ch <- evt - } - close(ch) - } else { - return nil, fmt.Errorf("error getting by id: %w", err) - } - } - - dsl, err := buildDsl(filter) - if err != nil { - return nil, err - } - - limit := 1000 - if filter.Limit > 0 && filter.Limit < limit { - limit = filter.Limit - } - - es := ess.es - res, err := es.Search( - es.Search.WithContext(ctx), - es.Search.WithIndex(ess.IndexName), - - es.Search.WithBody(bytes.NewReader(dsl)), - es.Search.WithSize(limit), - es.Search.WithSort("event.created_at:desc"), - ) - if err != nil { - log.Fatalf("Error getting response: %s", err) - } - defer res.Body.Close() - - if res.IsError() { - txt, _ := io.ReadAll(res.Body) - fmt.Println("oh no", string(txt)) - return nil, fmt.Errorf("%s", txt) - } - - var r EsSearchResult - if err := json.NewDecoder(res.Body).Decode(&r); err != nil { - return nil, err - } - - go func() { - for _, e := range r.Hits.Hits { - ch <- &e.Source.Event - } - close(ch) - }() - - return ch, nil -} - -func isGetByID(filter nostr.Filter) bool { - isGetById := len(filter.IDs) > 0 && - len(filter.Authors) == 0 && - len(filter.Kinds) == 0 && - len(filter.Tags) == 0 && - len(filter.Search) == 0 && - filter.Since == nil && - filter.Until == nil - - if isGetById { - for _, id := range filter.IDs { - if len(id) != 64 { - return false - } - } - } - return isGetById -} - -// from: https://stackoverflow.com/a/12754757 -func toInterfaceSlice(slice interface{}) []interface{} { - s := reflect.ValueOf(slice) - if s.Kind() != reflect.Slice { - panic("InterfaceSlice() given a non-slice type") - } - - // Keep the distinction between nil and empty slice input - if s.IsNil() { - return nil - } - - ret := make([]interface{}, s.Len()) - - for i := 0; i < s.Len(); i++ { - ret[i] = s.Index(i).Interface() - } - - return ret -} - -func (ess *ElasticsearchStorage) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) { - count := int64(0) - - // optimization: get by id - if isGetByID(filter) { - if evts, err := ess.getByID(filter); err == nil { - count += int64(len(evts)) - } else { - return 0, fmt.Errorf("error getting by id: %w", err) - } - } - - dsl, err := buildDsl(filter) - if err != nil { - return 0, err - } - - es := ess.es - res, err := es.Count( - es.Count.WithContext(ctx), - es.Count.WithIndex(ess.IndexName), - - es.Count.WithBody(bytes.NewReader(dsl)), - ) - if err != nil { - log.Fatalf("Error getting response: %s", err) - } - defer res.Body.Close() - - if res.IsError() { - txt, _ := io.ReadAll(res.Body) - fmt.Println("oh no", string(txt)) - return 0, fmt.Errorf("%s", txt) - } - - var r EsCountResult - if err := json.NewDecoder(res.Body).Decode(&r); err != nil { - return 0, err - } - - return r.Count + count, nil -} diff --git a/plugins/storage/elasticsearch/query_test.go b/plugins/storage/elasticsearch/query_test.go deleted file mode 100644 index 4f58801..0000000 --- a/plugins/storage/elasticsearch/query_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package elasticsearch - -import ( - "bytes" - "encoding/json" - "fmt" - "testing" - - "github.com/nbd-wtf/go-nostr" -) - -func TestQuery(t *testing.T) { - now := nostr.Now() - yesterday := now - 60*60*24 - filter := &nostr.Filter{ - IDs: []string{"abc", "123", "971b9489b4fd4e41a85951607922b982d981fa9d55318bc304f21f390721404c"}, - Kinds: []int{0, 1}, - Tags: nostr.TagMap{ - "e": []string{"abc"}, - "p": []string{"aaa", "bbb"}, - }, - Since: &yesterday, - Until: &now, - Limit: 100, - Search: "other stuff", - } - - dsl, err := buildDsl(filter) - if err != nil { - t.Fatal(err) - } - pprint(dsl) -} - -func pprint(j []byte) { - var dst bytes.Buffer - err := json.Indent(&dst, j, "", " ") - if err != nil { - fmt.Println("invalid JSON", err, string(j)) - } else { - fmt.Println(dst.String()) - } -} diff --git a/plugins/storage/lmdbn/count.go b/plugins/storage/lmdbn/count.go deleted file mode 100644 index 9862267..0000000 --- a/plugins/storage/lmdbn/count.go +++ /dev/null @@ -1,91 +0,0 @@ -package lmdbn - -import ( - "bytes" - "context" - "encoding/binary" - - "github.com/bmatsuo/lmdb-go/lmdb" - "github.com/nbd-wtf/go-nostr" - "github.com/nbd-wtf/go-nostr/nson" -) - -func (b *LMDBBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) { - var count int64 = 0 - - dbi, queries, extraFilter, since, prefixLen, err := b.prepareQueries(filter) - if err != nil { - return 0, err - } - - err = b.lmdbEnv.View(func(txn *lmdb.Txn) error { - // actually iterate - for _, q := range queries { - cursor, err := txn.OpenCursor(dbi) - if err != nil { - continue - } - - var k []byte - var idx []byte - var iterr error - - if _, _, errsr := cursor.Get(q.startingPoint, nil, lmdb.SetRange); errsr != nil { - if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound { - // in this case it's really an error - panic(err) - } else { - // we're at the end and we just want notes before this, - // so we just need to set the cursor the last key, this is not a real error - k, idx, iterr = cursor.Get(nil, nil, lmdb.Last) - } - } else { - // move one back as the first step - k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev) - } - - for { - // we already have a k and a v and an err from the cursor setup, so check and use these - if iterr != nil || !bytes.Equal(q.prefix, k[0:prefixLen]) { - break - } - - if !q.skipTimestamp { - createdAt := binary.BigEndian.Uint32(k[prefixLen:]) - if createdAt < since { - break - } - } - - // fetch actual event - val, err := txn.Get(b.rawEventStore, idx) - if err != nil { - panic(err) - } - - if extraFilter == nil { - count++ - } else { - evt := &nostr.Event{} - if err := nson.Unmarshal(string(val), evt); err != nil { - return err - } - - // check if this matches the other filters that were not part of the index - if extraFilter == nil || extraFilter.Matches(evt) { - count++ - } - - return nil - } - - // move one back (we'll look into k and v and err in the next iteration) - k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev) - } - } - - return nil - }) - - return count, err -} diff --git a/plugins/storage/lmdbn/delete.go b/plugins/storage/lmdbn/delete.go deleted file mode 100644 index 69cc54e..0000000 --- a/plugins/storage/lmdbn/delete.go +++ /dev/null @@ -1,50 +0,0 @@ -package lmdbn - -import ( - "context" - "encoding/hex" - - "github.com/bmatsuo/lmdb-go/lmdb" - "github.com/nbd-wtf/go-nostr" - "github.com/nbd-wtf/go-nostr/nson" -) - -func (b *LMDBBackend) DeleteEvent(ctx context.Context, evt *nostr.Event) error { - err := b.lmdbEnv.Update(func(txn *lmdb.Txn) error { - id, _ := hex.DecodeString(evt.ID) - idx, err := txn.Get(b.indexId, id) - if operr, ok := err.(*lmdb.OpError); ok && operr.Errno == lmdb.NotFound { - // we already do not have this - return nil - } - if err != nil { - return err - } - - // fetch the event - val, err := txn.Get(b.rawEventStore, idx) - if err != nil { - return err - } - - evt := &nostr.Event{} - if err := nson.Unmarshal(string(val), evt); err != nil { - return err - } - - // calculate all index keys we have for this event and delete them - for _, k := range b.getIndexKeysForEvent(evt) { - if err := txn.Del(k.dbi, k.key, nil); err != nil { - return err - } - } - - // delete the raw event - return txn.Del(b.rawEventStore, idx, nil) - }) - if err != nil { - return err - } - - return nil -} diff --git a/plugins/storage/lmdbn/lib.go b/plugins/storage/lmdbn/lib.go deleted file mode 100644 index e81f11f..0000000 --- a/plugins/storage/lmdbn/lib.go +++ /dev/null @@ -1,208 +0,0 @@ -package lmdbn - -import ( - "encoding/binary" - "encoding/hex" - "sync/atomic" - - "github.com/bmatsuo/lmdb-go/lmdb" - "github.com/nbd-wtf/go-nostr" -) - -const ( - maxuint16 = 65535 - maxuint32 = 4294967295 -) - -type LMDBBackend struct { - Path string - MaxLimit int - - lmdbEnv *lmdb.Env - - rawEventStore lmdb.DBI - indexCreatedAt lmdb.DBI - indexId lmdb.DBI - indexKind lmdb.DBI - indexPubkey lmdb.DBI - indexPubkeyKind lmdb.DBI - indexTag lmdb.DBI - - lastId atomic.Uint32 -} - -func (b *LMDBBackend) Init() error { - if b.MaxLimit == 0 { - b.MaxLimit = 500 - } - - // open lmdb - env, err := lmdb.NewEnv() - if err != nil { - return err - } - - env.SetMaxDBs(7) - env.SetMaxReaders(500) - env.SetMapSize(1 << 38) // ~273GB - - err = env.Open(b.Path, lmdb.NoTLS, 0644) - if err != nil { - return err - } - b.lmdbEnv = env - - // open each db - if err := b.lmdbEnv.Update(func(txn *lmdb.Txn) error { - if dbi, err := txn.OpenDBI("raw", lmdb.Create); err != nil { - return err - } else { - b.rawEventStore = dbi - return nil - } - }); err != nil { - return err - } - if err := b.lmdbEnv.Update(func(txn *lmdb.Txn) error { - if dbi, err := txn.OpenDBI("created_at", lmdb.Create); err != nil { - return err - } else { - b.indexCreatedAt = dbi - } - if dbi, err := txn.OpenDBI("id", lmdb.Create); err != nil { - return err - } else { - b.indexId = dbi - } - if dbi, err := txn.OpenDBI("kind", lmdb.Create); err != nil { - return err - } else { - b.indexKind = dbi - } - if dbi, err := txn.OpenDBI("pubkey", lmdb.Create); err != nil { - return err - } else { - b.indexPubkey = dbi - } - if dbi, err := txn.OpenDBI("pubkeyKind", lmdb.Create); err != nil { - return err - } else { - b.indexPubkeyKind = dbi - } - if dbi, err := txn.OpenDBI("tag", lmdb.Create); err != nil { - return err - } else { - b.indexTag = dbi - } - return nil - }); err != nil { - return err - } - - // get lastId - if err := b.lmdbEnv.View(func(txn *lmdb.Txn) error { - txn.RawRead = true - cursor, err := txn.OpenCursor(b.rawEventStore) - if err != nil { - return err - } - defer cursor.Close() - k, _, err := cursor.Get(nil, nil, lmdb.Last) - if operr, ok := err.(*lmdb.OpError); ok && operr.Errno == lmdb.NotFound { - // nothing found, so we're at zero - return nil - } - if err != nil { - } - b.lastId.Store(binary.BigEndian.Uint32(k)) - - return nil - }); err != nil { - return err - } - - return nil -} - -func (b *LMDBBackend) Close() { - b.lmdbEnv.Close() -} - -func (b *LMDBBackend) Serial() []byte { - v := b.lastId.Add(1) - vb := make([]byte, 4) - binary.BigEndian.PutUint32(vb[:], uint32(v)) - return vb -} - -type key struct { - dbi lmdb.DBI - key []byte -} - -func (b *LMDBBackend) getIndexKeysForEvent(evt *nostr.Event) []key { - keys := make([]key, 0, 18) - - // indexes - { - // ~ by id - k, _ := hex.DecodeString(evt.ID) - keys = append(keys, key{dbi: b.indexId, key: k}) - } - - { - // ~ by pubkey+date - pubkey, _ := hex.DecodeString(evt.PubKey) - k := make([]byte, 32+4) - copy(k[:], pubkey) - binary.BigEndian.PutUint32(k[32:], uint32(evt.CreatedAt)) - keys = append(keys, key{dbi: b.indexPubkey, key: k}) - } - - { - // ~ by kind+date - k := make([]byte, 2+4) - binary.BigEndian.PutUint16(k[:], uint16(evt.Kind)) - binary.BigEndian.PutUint32(k[2:], uint32(evt.CreatedAt)) - keys = append(keys, key{dbi: b.indexKind, key: k}) - } - - { - // ~ by pubkey+kind+date - pubkey, _ := hex.DecodeString(evt.PubKey) - k := make([]byte, 32+2+4) - copy(k[:], pubkey) - binary.BigEndian.PutUint16(k[32:], uint16(evt.Kind)) - binary.BigEndian.PutUint32(k[32+2:], uint32(evt.CreatedAt)) - keys = append(keys, key{dbi: b.indexPubkeyKind, key: k}) - } - - // ~ by tagvalue+date - for _, tag := range evt.Tags { - if len(tag) < 2 || len(tag[0]) != 1 || len(tag[1]) == 0 || len(tag[1]) > 100 { - continue - } - - var v []byte - if vb, _ := hex.DecodeString(tag[1]); len(vb) == 32 { - // store value as bytes - v = vb - } else { - v = []byte(tag[1]) - } - - k := make([]byte, len(v)+4) - copy(k[:], v) - binary.BigEndian.PutUint32(k[len(v):], uint32(evt.CreatedAt)) - keys = append(keys, key{dbi: b.indexTag, key: k}) - } - - { - // ~ by date only - k := make([]byte, 4) - binary.BigEndian.PutUint32(k[:], uint32(evt.CreatedAt)) - keys = append(keys, key{dbi: b.indexCreatedAt, key: k}) - } - - return keys -} diff --git a/plugins/storage/lmdbn/query.go b/plugins/storage/lmdbn/query.go deleted file mode 100644 index 04a1cf7..0000000 --- a/plugins/storage/lmdbn/query.go +++ /dev/null @@ -1,321 +0,0 @@ -package lmdbn - -import ( - "bytes" - "container/heap" - "context" - "encoding/binary" - "encoding/hex" - "fmt" - - "github.com/bmatsuo/lmdb-go/lmdb" - "github.com/nbd-wtf/go-nostr" - "github.com/nbd-wtf/go-nostr/nson" -) - -type query struct { - i int - prefix []byte - startingPoint []byte - results chan *nostr.Event - skipTimestamp bool -} - -type queryEvent struct { - *nostr.Event - query int -} - -func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) { - ch := make(chan *nostr.Event) - - dbi, queries, extraFilter, since, prefixLen, err := b.prepareQueries(filter) - if err != nil { - return nil, err - } - - go func() { - err := b.lmdbEnv.View(func(txn *lmdb.Txn) error { - // actually iterate - cursorClosers := make([]func(), len(queries)) - for i, q := range queries { - go func(i int, q query) { - defer close(q.results) - - cursor, err := txn.OpenCursor(dbi) - if err != nil { - return - } - cursorClosers[i] = cursor.Close - - var k []byte - var idx []byte - var iterr error - - if _, _, errsr := cursor.Get(q.startingPoint, nil, lmdb.SetRange); errsr != nil { - if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound { - // in this case it's really an error - panic(err) - } else { - // we're at the end and we just want notes before this, - // so we just need to set the cursor the last key, this is not a real error - k, idx, iterr = cursor.Get(nil, nil, lmdb.Last) - } - } else { - // move one back as the first step - k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev) - } - - for { - select { - case <-ctx.Done(): - break - default: - } - - // we already have a k and a v and an err from the cursor setup, so check and use these - if iterr != nil || !bytes.Equal(q.prefix, k[0:prefixLen]) { - return - } - - if !q.skipTimestamp { - createdAt := binary.BigEndian.Uint32(k[prefixLen:]) - if createdAt < since { - break - } - } - - // fetch actual event - val, err := txn.Get(b.rawEventStore, idx) - if err != nil { - panic(err) - } - - evt := &nostr.Event{} - if err := nson.Unmarshal(string(val), evt); err != nil { - panic(err) - } - - // check if this matches the other filters that were not part of the index - if extraFilter == nil || extraFilter.Matches(evt) { - q.results <- evt - } - - // move one back (we'll look into k and v and err in the next iteration) - k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev) - } - }(i, q) - } - - // max number of events we'll return - limit := b.MaxLimit - if filter.Limit > 0 && filter.Limit < limit { - limit = filter.Limit - } - - // receive results and ensure we only return the most recent ones always - emittedEvents := 0 - - // first pass - emitQueue := make(priorityQueue, 0, len(queries)+limit) - for _, q := range queries { - evt, ok := <-q.results - if ok { - emitQueue = append(emitQueue, &queryEvent{Event: evt, query: q.i}) - } - } - - // now it's a good time to schedule this - defer func() { - close(ch) - for _, cclose := range cursorClosers { - cclose() - } - }() - - // queue may be empty here if we have literally nothing - if len(emitQueue) == 0 { - return nil - } - - heap.Init(&emitQueue) - - // iterate until we've emitted all events required - for { - // emit latest event in queue - latest := emitQueue[0] - ch <- latest.Event - - // stop when reaching limit - emittedEvents++ - if emittedEvents >= limit { - break - } - - // fetch a new one from query results and replace the previous one with it - if evt, ok := <-queries[latest.query].results; ok { - emitQueue[0].Event = evt - heap.Fix(&emitQueue, 0) - } else { - // if this query has no more events we just remove this and proceed normally - heap.Remove(&emitQueue, 0) - - // check if the list is empty and end - if len(emitQueue) == 0 { - break - } - } - } - - return nil - }) - if err != nil { - panic(err) - } - }() - - return ch, nil -} - -type priorityQueue []*queryEvent - -func (pq priorityQueue) Len() int { return len(pq) } - -func (pq priorityQueue) Less(i, j int) bool { - return pq[i].CreatedAt > pq[j].CreatedAt -} - -func (pq priorityQueue) Swap(i, j int) { - pq[i], pq[j] = pq[j], pq[i] -} - -func (pq *priorityQueue) Push(x any) { - item := x.(*queryEvent) - *pq = append(*pq, item) -} - -func (pq *priorityQueue) Pop() any { - old := *pq - n := len(old) - item := old[n-1] - old[n-1] = nil // avoid memory leak - *pq = old[0 : n-1] - return item -} - -func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( - dbi lmdb.DBI, - queries []query, - extraFilter *nostr.Filter, - since uint32, - prefixLen int, - err error, -) { - if len(filter.IDs) > 0 { - dbi = b.indexId - queries = make([]query, len(filter.IDs)) - for i, idHex := range filter.IDs { - prefix, _ := hex.DecodeString(idHex) - if len(prefix) != 32 { - return dbi, nil, nil, 0, 0, fmt.Errorf("invalid id '%s'", idHex) - } - queries[i] = query{i: i, prefix: prefix, skipTimestamp: true} - } - } else if len(filter.Authors) > 0 { - if len(filter.Kinds) == 0 { - dbi = b.indexPubkey - queries = make([]query, len(filter.Authors)) - for i, pubkeyHex := range filter.Authors { - prefix, _ := hex.DecodeString(pubkeyHex) - if len(prefix) != 32 { - return dbi, nil, nil, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) - } - queries[i] = query{i: i, prefix: prefix} - } - } else { - dbi = b.indexPubkeyKind - queries = make([]query, len(filter.Authors)*len(filter.Kinds)) - i := 0 - for _, pubkeyHex := range filter.Authors { - for _, kind := range filter.Kinds { - pubkey, _ := hex.DecodeString(pubkeyHex) - if len(pubkey) != 32 { - return dbi, nil, nil, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) - } - prefix := make([]byte, 32+2) - copy(prefix[:], pubkey) - binary.BigEndian.PutUint16(prefix[+32:], uint16(kind)) - queries[i] = query{i: i, prefix: prefix} - i++ - } - } - } - extraFilter = &nostr.Filter{Tags: filter.Tags} - } else if len(filter.Tags) > 0 { - dbi = b.indexTag - - // determine the size of the queries array by inspecting all tags sizes - size := 0 - for _, values := range filter.Tags { - size += len(values) - } - queries = make([]query, size) - - extraFilter = &nostr.Filter{Kinds: filter.Kinds} - i := 0 - for _, values := range filter.Tags { - for _, value := range values { - bv, _ := hex.DecodeString(value) - var size int - if len(bv) == 32 { - // hex tag - size = 32 - } else { - // string tag - bv = []byte(value) - size = len(bv) - } - prefix := make([]byte, size) - copy(prefix[:], bv) - queries[i] = query{i: i, prefix: prefix} - i++ - } - } - } else if len(filter.Kinds) > 0 { - dbi = b.indexKind - queries = make([]query, len(filter.Kinds)) - for i, kind := range filter.Kinds { - prefix := make([]byte, 2) - binary.BigEndian.PutUint16(prefix[:], uint16(kind)) - queries[i] = query{i: i, prefix: prefix} - } - } else { - dbi = b.indexCreatedAt - queries = make([]query, 1) - prefix := make([]byte, 0) - queries[0] = query{i: 0, prefix: prefix} - extraFilter = nil - } - - prefixLen = len(queries[0].prefix) - - var until uint32 = 4294967295 - if filter.Until != nil { - if fu := uint32(*filter.Until); fu < until { - until = fu + 1 - } - } - for i, q := range queries { - queries[i].startingPoint = binary.BigEndian.AppendUint32(q.prefix, uint32(until)) - queries[i].results = make(chan *nostr.Event, 12) - } - - // this is where we'll end the iteration - if filter.Since != nil { - if fs := uint32(*filter.Since); fs > since { - since = fs - } - } - - return dbi, queries, extraFilter, since, prefixLen, nil -} diff --git a/plugins/storage/lmdbn/save.go b/plugins/storage/lmdbn/save.go deleted file mode 100644 index 427bd87..0000000 --- a/plugins/storage/lmdbn/save.go +++ /dev/null @@ -1,38 +0,0 @@ -package lmdbn - -import ( - "context" - "fmt" - - "github.com/bmatsuo/lmdb-go/lmdb" - "github.com/nbd-wtf/go-nostr" - "github.com/nbd-wtf/go-nostr/nson" -) - -func (b *LMDBBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error { - // sanity checking - if evt.CreatedAt > maxuint32 || evt.Kind > maxuint16 { - return fmt.Errorf("event with values out of expected boundaries") - } - - return b.lmdbEnv.Update(func(txn *lmdb.Txn) error { - nson, err := nson.Marshal(evt) - if err != nil { - return err - } - - idx := b.Serial() - // raw event store - if err := txn.Put(b.rawEventStore, idx, []byte(nson), 0); err != nil { - return err - } - - for _, k := range b.getIndexKeysForEvent(evt) { - if err := txn.Put(k.dbi, k.key, idx, 0); err != nil { - return err - } - } - - return nil - }) -} diff --git a/plugins/storage/postgresql/delete.go b/plugins/storage/postgresql/delete.go deleted file mode 100644 index 32260b6..0000000 --- a/plugins/storage/postgresql/delete.go +++ /dev/null @@ -1,12 +0,0 @@ -package postgresql - -import ( - "context" - - "github.com/nbd-wtf/go-nostr" -) - -func (b PostgresBackend) DeleteEvent(ctx context.Context, evt *nostr.Event) error { - _, err := b.DB.ExecContext(ctx, "DELETE FROM event WHERE id = $1", evt.ID) - return err -} diff --git a/plugins/storage/postgresql/init.go b/plugins/storage/postgresql/init.go deleted file mode 100644 index 44606b9..0000000 --- a/plugins/storage/postgresql/init.go +++ /dev/null @@ -1,71 +0,0 @@ -package postgresql - -import ( - "github.com/jmoiron/sqlx" - "github.com/jmoiron/sqlx/reflectx" - _ "github.com/lib/pq" -) - -const ( - queryLimit = 100 - queryIDsLimit = 500 - queryAuthorsLimit = 500 - queryKindsLimit = 10 - queryTagsLimit = 10 -) - -func (b *PostgresBackend) Init() error { - db, err := sqlx.Connect("postgres", b.DatabaseURL) - if err != nil { - return err - } - - // sqlx default is 0 (unlimited), while postgresql by default accepts up to 100 connections - db.SetMaxOpenConns(80) - - db.Mapper = reflectx.NewMapperFunc("json", sqlx.NameMapper) - b.DB = db - - _, err = b.DB.Exec(` -CREATE OR REPLACE FUNCTION tags_to_tagvalues(jsonb) RETURNS text[] - AS 'SELECT array_agg(t->>1) FROM (SELECT jsonb_array_elements($1) AS t)s WHERE length(t->>0) = 1;' - LANGUAGE SQL - IMMUTABLE - RETURNS NULL ON NULL INPUT; - -CREATE TABLE IF NOT EXISTS event ( - id text NOT NULL, - pubkey text NOT NULL, - created_at integer NOT NULL, - kind integer NOT NULL, - tags jsonb NOT NULL, - content text NOT NULL, - sig text NOT NULL, - - tagvalues text[] GENERATED ALWAYS AS (tags_to_tagvalues(tags)) STORED -); - -CREATE UNIQUE INDEX IF NOT EXISTS ididx ON event USING btree (id text_pattern_ops); -CREATE INDEX IF NOT EXISTS pubkeyprefix ON event USING btree (pubkey text_pattern_ops); -CREATE INDEX IF NOT EXISTS timeidx ON event (created_at DESC); -CREATE INDEX IF NOT EXISTS kindidx ON event (kind); -CREATE INDEX IF NOT EXISTS arbitrarytagvalues ON event USING gin (tagvalues); - `) - - if b.QueryLimit == 0 { - b.QueryLimit = queryLimit - } - if b.QueryIDsLimit == 0 { - b.QueryIDsLimit = queryIDsLimit - } - if b.QueryAuthorsLimit == 0 { - b.QueryAuthorsLimit = queryAuthorsLimit - } - if b.QueryKindsLimit == 0 { - b.QueryKindsLimit = queryKindsLimit - } - if b.QueryTagsLimit == 0 { - b.QueryTagsLimit = queryTagsLimit - } - return err -} diff --git a/plugins/storage/postgresql/postgresql.go b/plugins/storage/postgresql/postgresql.go deleted file mode 100644 index a8ba1c8..0000000 --- a/plugins/storage/postgresql/postgresql.go +++ /dev/null @@ -1,15 +0,0 @@ -package postgresql - -import ( - "github.com/jmoiron/sqlx" -) - -type PostgresBackend struct { - *sqlx.DB - DatabaseURL string - QueryLimit int - QueryIDsLimit int - QueryAuthorsLimit int - QueryKindsLimit int - QueryTagsLimit int -} diff --git a/plugins/storage/postgresql/query.go b/plugins/storage/postgresql/query.go deleted file mode 100644 index 8f126f8..0000000 --- a/plugins/storage/postgresql/query.go +++ /dev/null @@ -1,193 +0,0 @@ -package postgresql - -import ( - "context" - "database/sql" - "encoding/hex" - "fmt" - "strconv" - "strings" - - "github.com/jmoiron/sqlx" - "github.com/nbd-wtf/go-nostr" -) - -func (b PostgresBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch chan *nostr.Event, err error) { - ch = make(chan *nostr.Event) - - query, params, err := b.queryEventsSql(filter, false) - if err != nil { - return nil, err - } - - rows, err := b.DB.Query(query, params...) - if err != nil && err != sql.ErrNoRows { - return nil, fmt.Errorf("failed to fetch events using query %q: %w", query, err) - } - - go func() { - defer rows.Close() - defer close(ch) - for rows.Next() { - var evt nostr.Event - var timestamp int64 - err := rows.Scan(&evt.ID, &evt.PubKey, ×tamp, - &evt.Kind, &evt.Tags, &evt.Content, &evt.Sig) - if err != nil { - return - } - evt.CreatedAt = nostr.Timestamp(timestamp) - ch <- &evt - } - }() - - return ch, nil -} - -func (b PostgresBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) { - query, params, err := b.queryEventsSql(filter, true) - if err != nil { - return 0, err - } - - var count int64 - if err = b.DB.QueryRow(query, params...).Scan(&count); err != nil && err != sql.ErrNoRows { - return 0, fmt.Errorf("failed to fetch events using query %q: %w", query, err) - } - return count, nil -} - -func (b PostgresBackend) queryEventsSql(filter nostr.Filter, doCount bool) (string, []any, error) { - var conditions []string - var params []any - - if filter.IDs != nil { - if len(filter.IDs) > b.QueryIDsLimit { - // too many ids, fail everything - return "", nil, nil - } - - likeids := make([]string, 0, len(filter.IDs)) - for _, id := range filter.IDs { - // to prevent sql attack here we will check if - // these ids are valid 32byte hex - parsed, err := hex.DecodeString(id) - if err != nil || len(parsed) != 32 { - continue - } - likeids = append(likeids, fmt.Sprintf("id LIKE '%x%%'", parsed)) - } - if len(likeids) == 0 { - // ids being [] mean you won't get anything - return "", nil, nil - } - conditions = append(conditions, "("+strings.Join(likeids, " OR ")+")") - } - - if filter.Authors != nil { - if len(filter.Authors) > b.QueryAuthorsLimit { - // too many authors, fail everything - return "", nil, nil - } - - likekeys := make([]string, 0, len(filter.Authors)) - for _, key := range filter.Authors { - // to prevent sql attack here we will check if - // these keys are valid 32byte hex - parsed, err := hex.DecodeString(key) - if err != nil || len(parsed) != 32 { - continue - } - likekeys = append(likekeys, fmt.Sprintf("pubkey LIKE '%x%%'", parsed)) - } - if len(likekeys) == 0 { - // authors being [] mean you won't get anything - return "", nil, nil - } - conditions = append(conditions, "("+strings.Join(likekeys, " OR ")+")") - } - - if filter.Kinds != nil { - if len(filter.Kinds) > b.QueryKindsLimit { - // too many kinds, fail everything - return "", nil, nil - } - - if len(filter.Kinds) == 0 { - // kinds being [] mean you won't get anything - return "", nil, nil - } - // no sql injection issues since these are ints - inkinds := make([]string, len(filter.Kinds)) - for i, kind := range filter.Kinds { - inkinds[i] = strconv.Itoa(kind) - } - conditions = append(conditions, `kind IN (`+strings.Join(inkinds, ",")+`)`) - } - - tagQuery := make([]string, 0, 1) - for _, values := range filter.Tags { - if len(values) == 0 { - // any tag set to [] is wrong - return "", nil, nil - } - - // add these tags to the query - tagQuery = append(tagQuery, values...) - - if len(tagQuery) > b.QueryTagsLimit { - // too many tags, fail everything - return "", nil, nil - } - } - - if len(tagQuery) > 0 { - arrayBuild := make([]string, len(tagQuery)) - for i, tagValue := range tagQuery { - arrayBuild[i] = "?" - params = append(params, tagValue) - } - - // we use a very bad implementation in which we only check the tag values and - // ignore the tag names - conditions = append(conditions, - "tagvalues && ARRAY["+strings.Join(arrayBuild, ",")+"]") - } - - if filter.Since != nil { - conditions = append(conditions, "created_at > ?") - params = append(params, filter.Since) - } - if filter.Until != nil { - conditions = append(conditions, "created_at < ?") - params = append(params, filter.Until) - } - - if len(conditions) == 0 { - // fallback - conditions = append(conditions, "true") - } - - if filter.Limit < 1 || filter.Limit > b.QueryLimit { - params = append(params, b.QueryLimit) - } else { - params = append(params, filter.Limit) - } - - var query string - if doCount { - query = sqlx.Rebind(sqlx.BindType("postgres"), `SELECT - COUNT(*) - FROM event WHERE `+ - strings.Join(conditions, " AND ")+ - " ORDER BY created_at DESC LIMIT ?") - } else { - query = sqlx.Rebind(sqlx.BindType("postgres"), `SELECT - id, pubkey, created_at, kind, tags, content, sig - FROM event WHERE `+ - strings.Join(conditions, " AND ")+ - " ORDER BY created_at DESC LIMIT ?") - } - - return query, params, nil -} diff --git a/plugins/storage/postgresql/query_test.go b/plugins/storage/postgresql/query_test.go deleted file mode 100644 index eda8cb7..0000000 --- a/plugins/storage/postgresql/query_test.go +++ /dev/null @@ -1,405 +0,0 @@ -package postgresql - -import ( - "fmt" - "strconv" - "strings" - "testing" - - "github.com/nbd-wtf/go-nostr" - "github.com/stretchr/testify/assert" -) - -var defaultBackend = PostgresBackend{ - QueryLimit: queryLimit, - QueryIDsLimit: queryIDsLimit, - QueryAuthorsLimit: queryAuthorsLimit, - QueryKindsLimit: queryKindsLimit, - QueryTagsLimit: queryTagsLimit, -} - -func TestQueryEventsSql(t *testing.T) { - var tests = []struct { - name string - backend PostgresBackend - filter *nostr.Filter - query string - params []any - err error - }{ - { - name: "empty filter", - backend: defaultBackend, - filter: &nostr.Filter{}, - query: "SELECT id, pubkey, created_at, kind, tags, content, sig FROM event WHERE true ORDER BY created_at DESC LIMIT $1", - params: []any{100}, - err: nil, - }, - { - name: "valid filter limit", - backend: defaultBackend, - filter: &nostr.Filter{ - Limit: 50, - }, - query: "SELECT id, pubkey, created_at, kind, tags, content, sig FROM event WHERE true ORDER BY created_at DESC LIMIT $1", - params: []any{50}, - err: nil, - }, - { - name: "too large filter limit", - backend: defaultBackend, - filter: &nostr.Filter{ - Limit: 2000, - }, - query: "SELECT id, pubkey, created_at, kind, tags, content, sig FROM event WHERE true ORDER BY created_at DESC LIMIT $1", - params: []any{100}, - err: nil, - }, - { - name: "ids filter", - backend: defaultBackend, - filter: &nostr.Filter{ - IDs: []string{"083ec57f36a7b39ab98a57bedab4f85355b2ee89e4b205bed58d7c3ef9edd294"}, - }, - query: `SELECT id, pubkey, created_at, kind, tags, content, sig - FROM event - WHERE (id LIKE '083ec57f36a7b39ab98a57bedab4f85355b2ee89e4b205bed58d7c3ef9edd294%') - ORDER BY created_at DESC LIMIT $1`, - params: []any{100}, - err: nil, - }, - { - name: "kind filter", - backend: defaultBackend, - filter: &nostr.Filter{ - Kinds: []int{1, 2, 3}, - }, - query: `SELECT id, pubkey, created_at, kind, tags, content, sig - FROM event - WHERE kind IN(1,2,3) - ORDER BY created_at DESC LIMIT $1`, - params: []any{100}, - err: nil, - }, - { - name: "authors filter", - backend: defaultBackend, - filter: &nostr.Filter{ - Authors: []string{"7bdef7bdebb8721f77927d0e77c66059360fa62371fdf15f3add93923a613229"}, - }, - query: `SELECT id, pubkey, created_at, kind, tags, content, sig - FROM event - WHERE (pubkey LIKE '7bdef7bdebb8721f77927d0e77c66059360fa62371fdf15f3add93923a613229%') - ORDER BY created_at DESC LIMIT $1`, - params: []any{100}, - err: nil, - }, - // errors - { - name: "nil filter", - backend: defaultBackend, - filter: nil, - query: "", - params: nil, - err: fmt.Errorf("filter cannot be null"), - }, - { - name: "too many ids", - backend: defaultBackend, - filter: &nostr.Filter{ - IDs: strSlice(501), - }, - query: "", - params: nil, - // REVIEW: should return error - err: nil, - }, - { - name: "invalid ids", - backend: defaultBackend, - filter: &nostr.Filter{ - IDs: []string{"stuff"}, - }, - query: "", - params: nil, - // REVIEW: should return error - err: nil, - }, - { - name: "too many authors", - backend: defaultBackend, - filter: &nostr.Filter{ - Authors: strSlice(501), - }, - query: "", - params: nil, - // REVIEW: should return error - err: nil, - }, - { - name: "invalid authors", - backend: defaultBackend, - filter: &nostr.Filter{ - Authors: []string{"stuff"}, - }, - query: "", - params: nil, - // REVIEW: should return error - err: nil, - }, - { - name: "too many kinds", - backend: defaultBackend, - filter: &nostr.Filter{ - Kinds: intSlice(11), - }, - query: "", - params: nil, - // REVIEW: should return error - err: nil, - }, - { - name: "no kinds", - backend: defaultBackend, - filter: &nostr.Filter{ - Kinds: []int{}, - }, - query: "", - params: nil, - // REVIEW: should return error - err: nil, - }, - { - name: "tags of empty array", - backend: defaultBackend, - filter: &nostr.Filter{ - Tags: nostr.TagMap{ - "#e": []string{}, - }, - }, - query: "", - params: nil, - // REVIEW: should return error - err: nil, - }, - { - name: "too many tag values", - backend: defaultBackend, - filter: &nostr.Filter{ - Tags: nostr.TagMap{ - "#e": strSlice(11), - }, - }, - query: "", - params: nil, - // REVIEW: should return error - err: nil, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - query, params, err := tt.backend.queryEventsSql(tt.filter, false) - assert.Equal(t, tt.err, err) - if err != nil { - return - } - - assert.Equal(t, clean(tt.query), clean(query)) - assert.Equal(t, tt.params, params) - }) - } -} - -func clean(s string) string { - return strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(s, "\t", ""), "\n", ""), " ", "") -} - -func intSlice(n int) []int { - slice := make([]int, 0, n) - for i := 0; i < n; i++ { - slice = append(slice, i) - } - return slice -} - -func strSlice(n int) []string { - slice := make([]string, 0, n) - for i := 0; i < n; i++ { - slice = append(slice, strconv.Itoa(i)) - } - return slice -} - -func TestCountEventsSql(t *testing.T) { - var tests = []struct { - name string - backend PostgresBackend - filter *nostr.Filter - query string - params []any - err error - }{ - { - name: "empty filter", - backend: defaultBackend, - filter: &nostr.Filter{}, - query: "SELECT COUNT(*) FROM event WHERE true ORDER BY created_at DESC LIMIT $1", - params: []any{100}, - err: nil, - }, - { - name: "ids filter", - backend: defaultBackend, - filter: &nostr.Filter{ - IDs: []string{"083ec57f36a7b39ab98a57bedab4f85355b2ee89e4b205bed58d7c3ef9edd294"}, - }, - query: `SELECT COUNT(*) - FROM event - WHERE (id LIKE '083ec57f36a7b39ab98a57bedab4f85355b2ee89e4b205bed58d7c3ef9edd294%') - ORDER BY created_at DESC LIMIT $1`, - params: []any{100}, - err: nil, - }, - { - name: "kind filter", - backend: defaultBackend, - filter: &nostr.Filter{ - Kinds: []int{1, 2, 3}, - }, - query: `SELECT COUNT(*) - FROM event - WHERE kind IN(1,2,3) - ORDER BY created_at DESC LIMIT $1`, - params: []any{100}, - err: nil, - }, - { - name: "authors filter", - backend: defaultBackend, - filter: &nostr.Filter{ - Authors: []string{"7bdef7bdebb8721f77927d0e77c66059360fa62371fdf15f3add93923a613229"}, - }, - query: `SELECT COUNT(*) - FROM event - WHERE (pubkey LIKE '7bdef7bdebb8721f77927d0e77c66059360fa62371fdf15f3add93923a613229%') - ORDER BY created_at DESC LIMIT $1`, - params: []any{100}, - err: nil, - }, - // errors - { - name: "nil filter", - backend: defaultBackend, - filter: nil, - query: "", - params: nil, - err: fmt.Errorf("filter cannot be null"), - }, - { - name: "too many ids", - backend: defaultBackend, - filter: &nostr.Filter{ - IDs: strSlice(501), - }, - query: "", - params: nil, - // REVIEW: should return error - err: nil, - }, - { - name: "invalid ids", - backend: defaultBackend, - filter: &nostr.Filter{ - IDs: []string{"stuff"}, - }, - query: "", - params: nil, - // REVIEW: should return error - err: nil, - }, - { - name: "too many authors", - backend: defaultBackend, - filter: &nostr.Filter{ - Authors: strSlice(501), - }, - query: "", - params: nil, - // REVIEW: should return error - err: nil, - }, - { - name: "invalid authors", - backend: defaultBackend, - filter: &nostr.Filter{ - Authors: []string{"stuff"}, - }, - query: "", - params: nil, - // REVIEW: should return error - err: nil, - }, - { - name: "too many kinds", - backend: defaultBackend, - filter: &nostr.Filter{ - Kinds: intSlice(11), - }, - query: "", - params: nil, - // REVIEW: should return error - err: nil, - }, - { - name: "no kinds", - backend: defaultBackend, - filter: &nostr.Filter{ - Kinds: []int{}, - }, - query: "", - params: nil, - // REVIEW: should return error - err: nil, - }, - { - name: "tags of empty array", - backend: defaultBackend, - filter: &nostr.Filter{ - Tags: nostr.TagMap{ - "#e": []string{}, - }, - }, - query: "", - params: nil, - // REVIEW: should return error - err: nil, - }, - { - name: "too many tag values", - backend: defaultBackend, - filter: &nostr.Filter{ - Tags: nostr.TagMap{ - "#e": strSlice(11), - }, - }, - query: "", - params: nil, - // REVIEW: should return error - err: nil, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - query, params, err := tt.backend.queryEventsSql(tt.filter, true) - assert.Equal(t, tt.err, err) - if err != nil { - return - } - - assert.Equal(t, clean(tt.query), clean(query)) - assert.Equal(t, tt.params, params) - }) - } -} diff --git a/plugins/storage/postgresql/save.go b/plugins/storage/postgresql/save.go deleted file mode 100644 index 6552649..0000000 --- a/plugins/storage/postgresql/save.go +++ /dev/null @@ -1,54 +0,0 @@ -package postgresql - -import ( - "context" - "encoding/json" - - "github.com/fiatjaf/khatru" - "github.com/nbd-wtf/go-nostr" -) - -func (b *PostgresBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error { - sql, params, _ := saveEventSql(evt) - res, err := b.DB.ExecContext(ctx, sql, params...) - if err != nil { - return err - } - - nr, err := res.RowsAffected() - if err != nil { - return err - } - - if nr == 0 { - return khatru.ErrDupEvent - } - - return nil -} - -func (b *PostgresBackend) BeforeSave(ctx context.Context, evt *nostr.Event) { - // do nothing -} - -func (b *PostgresBackend) AfterSave(evt *nostr.Event) { - // delete all but the 100 most recent ones for each key - b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND created_at < ( - SELECT created_at FROM event WHERE pubkey = $1 - ORDER BY created_at DESC OFFSET 100 LIMIT 1 - )`, evt.PubKey, evt.Kind) -} - -func saveEventSql(evt *nostr.Event) (string, []any, error) { - const query = `INSERT INTO event ( - id, pubkey, created_at, kind, tags, content, sig) - VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (id) DO NOTHING` - - var ( - tagsj, _ = json.Marshal(evt.Tags) - params = []any{evt.ID, evt.PubKey, evt.CreatedAt, evt.Kind, tagsj, evt.Content, evt.Sig} - ) - - return query, params, nil -} diff --git a/plugins/storage/postgresql/save_test.go b/plugins/storage/postgresql/save_test.go deleted file mode 100644 index 337ece7..0000000 --- a/plugins/storage/postgresql/save_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package postgresql - -import ( - "testing" - - "github.com/nbd-wtf/go-nostr" - "github.com/stretchr/testify/assert" -) - -func TestSaveEventSql(t *testing.T) { - now := nostr.Now() - tests := []struct { - name string - event *nostr.Event - query string - params []any - err error - }{ - { - name: "basic", - event: &nostr.Event{ - ID: "id", - PubKey: "pk", - CreatedAt: now, - Kind: nostr.KindTextNote, - Content: "test", - Sig: "sig", - }, - query: `INSERT INTO event ( - id, pubkey, created_at, kind, tags, content, sig) - VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (id) DO NOTHING`, - params: []any{"id", "pk", now, nostr.KindTextNote, []byte("null"), "test", "sig"}, - err: nil, - }, - { - name: "tags", - event: &nostr.Event{ - ID: "id", - PubKey: "pk", - CreatedAt: now, - Kind: nostr.KindTextNote, - Tags: nostr.Tags{nostr.Tag{"foo", "bar"}}, - Content: "test", - Sig: "sig", - }, - query: `INSERT INTO event ( - id, pubkey, created_at, kind, tags, content, sig) - VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (id) DO NOTHING`, - params: []any{"id", "pk", now, nostr.KindTextNote, []byte("[[\"foo\",\"bar\"]]"), "test", "sig"}, - err: nil, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - query, params, err := saveEventSql(tt.event) - assert.Equal(t, clean(tt.query), clean(query)) - assert.Equal(t, tt.params, params) - assert.Equal(t, tt.err, err) - }) - } -} diff --git a/plugins/storage/sqlite3/delete.go b/plugins/storage/sqlite3/delete.go deleted file mode 100644 index f74703d..0000000 --- a/plugins/storage/sqlite3/delete.go +++ /dev/null @@ -1,12 +0,0 @@ -package sqlite3 - -import ( - "context" - - "github.com/nbd-wtf/go-nostr" -) - -func (b SQLite3Backend) DeleteEvent(ctx context.Context, evt *nostr.Event) error { - _, err := b.DB.ExecContext(ctx, "DELETE FROM event WHERE id = $1", evt.ID) - return err -} diff --git a/plugins/storage/sqlite3/init.go b/plugins/storage/sqlite3/init.go deleted file mode 100644 index 4a17aa7..0000000 --- a/plugins/storage/sqlite3/init.go +++ /dev/null @@ -1,33 +0,0 @@ -package sqlite3 - -import ( - "github.com/jmoiron/sqlx" - "github.com/jmoiron/sqlx/reflectx" - _ "github.com/mattn/go-sqlite3" -) - -func (b *SQLite3Backend) Init() error { - db, err := sqlx.Connect("sqlite3", b.DatabaseURL) - if err != nil { - return err - } - - // sqlx default is 0 (unlimited), while sqlite3 by default accepts up to 100 connections - db.SetMaxOpenConns(80) - - db.Mapper = reflectx.NewMapperFunc("json", sqlx.NameMapper) - b.DB = db - - _, err = b.DB.Exec(` -CREATE TABLE IF NOT EXISTS event ( - id text NOT NULL, - pubkey text NOT NULL, - created_at integer NOT NULL, - kind integer NOT NULL, - tags jsonb NOT NULL, - content text NOT NULL, - sig text NOT NULL -); - `) - return err -} diff --git a/plugins/storage/sqlite3/query.go b/plugins/storage/sqlite3/query.go deleted file mode 100644 index d4bcca2..0000000 --- a/plugins/storage/sqlite3/query.go +++ /dev/null @@ -1,192 +0,0 @@ -package sqlite3 - -import ( - "context" - "database/sql" - "encoding/hex" - "fmt" - "strconv" - "strings" - - "github.com/jmoiron/sqlx" - "github.com/nbd-wtf/go-nostr" -) - -func (b SQLite3Backend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch chan *nostr.Event, err error) { - ch = make(chan *nostr.Event) - - query, params, err := queryEventsSql(filter, false) - if err != nil { - return nil, err - } - - rows, err := b.DB.Query(query, params...) - if err != nil && err != sql.ErrNoRows { - return nil, fmt.Errorf("failed to fetch events using query %q: %w", query, err) - } - - go func() { - defer rows.Close() - defer close(ch) - for rows.Next() { - var evt nostr.Event - var timestamp int64 - err := rows.Scan(&evt.ID, &evt.PubKey, ×tamp, - &evt.Kind, &evt.Tags, &evt.Content, &evt.Sig) - if err != nil { - return - } - evt.CreatedAt = nostr.Timestamp(timestamp) - ch <- &evt - } - }() - - return ch, nil -} - -func (b SQLite3Backend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) { - query, params, err := queryEventsSql(filter, true) - if err != nil { - return 0, err - } - - var count int64 - err = b.DB.QueryRow(query, params...).Scan(&count) - if err != nil && err != sql.ErrNoRows { - return 0, fmt.Errorf("failed to fetch events using query %q: %w", query, err) - } - return count, nil -} - -func queryEventsSql(filter nostr.Filter, doCount bool) (string, []any, error) { - var conditions []string - var params []any - - if filter.IDs != nil { - if len(filter.IDs) > 500 { - // too many ids, fail everything - return "", nil, nil - } - - likeids := make([]string, 0, len(filter.IDs)) - for _, id := range filter.IDs { - // to prevent sql attack here we will check if - // these ids are valid 32byte hex - parsed, err := hex.DecodeString(id) - if err != nil || len(parsed) != 32 { - continue - } - likeids = append(likeids, fmt.Sprintf("id LIKE '%x%%'", parsed)) - } - if len(likeids) == 0 { - // ids being [] mean you won't get anything - return "", nil, nil - } - conditions = append(conditions, "("+strings.Join(likeids, " OR ")+")") - } - - if filter.Authors != nil { - if len(filter.Authors) > 500 { - // too many authors, fail everything - return "", nil, nil - } - - likekeys := make([]string, 0, len(filter.Authors)) - for _, key := range filter.Authors { - // to prevent sql attack here we will check if - // these keys are valid 32byte hex - parsed, err := hex.DecodeString(key) - if err != nil || len(parsed) != 32 { - continue - } - likekeys = append(likekeys, fmt.Sprintf("pubkey LIKE '%x%%'", parsed)) - } - if len(likekeys) == 0 { - // authors being [] mean you won't get anything - return "", nil, nil - } - conditions = append(conditions, "("+strings.Join(likekeys, " OR ")+")") - } - - if filter.Kinds != nil { - if len(filter.Kinds) > 10 { - // too many kinds, fail everything - return "", nil, nil - } - - if len(filter.Kinds) == 0 { - // kinds being [] mean you won't get anything - return "", nil, nil - } - // no sql injection issues since these are ints - inkinds := make([]string, len(filter.Kinds)) - for i, kind := range filter.Kinds { - inkinds[i] = strconv.Itoa(kind) - } - conditions = append(conditions, `kind IN (`+strings.Join(inkinds, ",")+`)`) - } - - tagQuery := make([]string, 0, 1) - for _, values := range filter.Tags { - if len(values) == 0 { - // any tag set to [] is wrong - return "", nil, nil - } - - // add these tags to the query - tagQuery = append(tagQuery, values...) - - if len(tagQuery) > 10 { - // too many tags, fail everything - return "", nil, nil - } - } - - // we use a very bad implementation in which we only check the tag values and - // ignore the tag names - for _, tagValue := range tagQuery { - params = append(params, "%"+tagValue+"%") - conditions = append(conditions, "tags LIKE ?") - } - - if filter.Since != nil { - conditions = append(conditions, "created_at > ?") - params = append(params, filter.Since) - } - if filter.Until != nil { - conditions = append(conditions, "created_at < ?") - params = append(params, filter.Until) - } - if filter.Search != "" { - conditions = append(conditions, "content LIKE ?") - params = append(params, "%"+filter.Search+"%") - } - - if len(conditions) == 0 { - // fallback - conditions = append(conditions, "true") - } - - if filter.Limit < 1 || filter.Limit > 100 { - params = append(params, 100) - } else { - params = append(params, filter.Limit) - } - - var query string - if doCount { - query = sqlx.Rebind(sqlx.BindType("sqlite3"), `SELECT - COUNT(*) - FROM event WHERE `+ - strings.Join(conditions, " AND ")+ - " ORDER BY created_at DESC LIMIT ?") - } else { - query = sqlx.Rebind(sqlx.BindType("sqlite3"), `SELECT - id, pubkey, created_at, kind, tags, content, sig - FROM event WHERE `+ - strings.Join(conditions, " AND ")+ - " ORDER BY created_at DESC LIMIT ?") - } - - return query, params, nil -} diff --git a/plugins/storage/sqlite3/save.go b/plugins/storage/sqlite3/save.go deleted file mode 100644 index 555ceb1..0000000 --- a/plugins/storage/sqlite3/save.go +++ /dev/null @@ -1,44 +0,0 @@ -package sqlite3 - -import ( - "context" - "encoding/json" - - "github.com/fiatjaf/khatru" - "github.com/nbd-wtf/go-nostr" -) - -func (b *SQLite3Backend) SaveEvent(ctx context.Context, evt *nostr.Event) error { - // insert - tagsj, _ := json.Marshal(evt.Tags) - res, err := b.DB.ExecContext(ctx, ` - INSERT INTO event (id, pubkey, created_at, kind, tags, content, sig) - VALUES ($1, $2, $3, $4, $5, $6, $7) - `, evt.ID, evt.PubKey, evt.CreatedAt, evt.Kind, tagsj, evt.Content, evt.Sig) - if err != nil { - return err - } - - nr, err := res.RowsAffected() - if err != nil { - return err - } - - if nr == 0 { - return khatru.ErrDupEvent - } - - return nil -} - -func (b *SQLite3Backend) BeforeSave(ctx context.Context, evt *nostr.Event) { - // do nothing -} - -func (b *SQLite3Backend) AfterSave(evt *nostr.Event) { - // delete all but the 100 most recent ones for each key - b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND created_at < ( - SELECT created_at FROM event WHERE pubkey = $1 - ORDER BY created_at DESC OFFSET 100 LIMIT 1 - )`, evt.PubKey, evt.Kind) -} diff --git a/plugins/storage/sqlite3/sqlite3.go b/plugins/storage/sqlite3/sqlite3.go deleted file mode 100644 index d0d6e03..0000000 --- a/plugins/storage/sqlite3/sqlite3.go +++ /dev/null @@ -1,10 +0,0 @@ -package sqlite3 - -import ( - "github.com/jmoiron/sqlx" -) - -type SQLite3Backend struct { - *sqlx.DB - DatabaseURL string -}