mirror of
https://github.com/aljazceru/khatru.git
synced 2026-01-11 01:14:22 +01:00
refactor framework interface, simplify basic and whitelisted, bring expensive on and rewrite it.
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
FROM golang:1.15.5
|
||||
FROM golang:1.18
|
||||
|
||||
WORKDIR /go/src/app
|
||||
COPY ./ .
|
||||
|
||||
RUN go get -d -v ./...
|
||||
RUN go install -v ./...
|
||||
RUN cd basic && make
|
||||
RUN cd basic && make
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
// every hour, delete all very old events
|
||||
func cleanupRoutine(db *sqlx.DB) {
|
||||
for {
|
||||
time.Sleep(60 * time.Minute)
|
||||
db.Exec(`DELETE FROM event WHERE created_at < $1`, time.Now().AddDate(0, -3, 0))
|
||||
}
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
package main
|
||||
|
||||
func (b *BasicRelay) DeleteEvent(id string, pubkey string) error {
|
||||
_, err := b.DB.Exec("DELETE FROM events WHERE id = $1 AND pubkey = $2")
|
||||
return err
|
||||
}
|
||||
@@ -13,7 +13,7 @@ services:
|
||||
condition: service_healthy
|
||||
ports:
|
||||
- 2700:2700
|
||||
command: "./basic/relayer"
|
||||
command: "./basic/relayer-basic"
|
||||
|
||||
postgres:
|
||||
image: postgres
|
||||
@@ -29,4 +29,4 @@ services:
|
||||
test: ["CMD-SHELL", "pg_isready -U nostr"] # database username here - nostr, should be changed if other user
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
retries: 5
|
||||
|
||||
@@ -1,44 +1,69 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/fiatjaf/go-nostr"
|
||||
"github.com/fiatjaf/relayer"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/jmoiron/sqlx/reflectx"
|
||||
"github.com/fiatjaf/relayer/storage/postgresql"
|
||||
"github.com/kelseyhightower/envconfig"
|
||||
)
|
||||
|
||||
type BasicRelay struct {
|
||||
type Relay struct {
|
||||
PostgresDatabase string `envconfig:"POSTGRESQL_DATABASE"`
|
||||
|
||||
DB *sqlx.DB
|
||||
}
|
||||
|
||||
func (b *BasicRelay) Name() string {
|
||||
func (r *Relay) Name() string {
|
||||
return "BasicRelay"
|
||||
}
|
||||
|
||||
func (b *BasicRelay) Init() error {
|
||||
err := envconfig.Process("", b)
|
||||
func (r *Relay) Storage() relayer.Storage {
|
||||
return &postgresql.PostgresBackend{DatabaseURL: r.PostgresDatabase}
|
||||
}
|
||||
|
||||
func (r *Relay) Init() error {
|
||||
err := envconfig.Process("", r)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't process envconfig: %w", err)
|
||||
}
|
||||
|
||||
if db, err := initDB(b.PostgresDatabase); err != nil {
|
||||
return fmt.Errorf("failed to open database: %w", err)
|
||||
} else {
|
||||
db.Mapper = reflectx.NewMapperFunc("json", sqlx.NameMapper)
|
||||
b.DB = db
|
||||
}
|
||||
// every hour, delete all very old events
|
||||
go func() {
|
||||
db := r.Storage().(*postgresql.PostgresBackend)
|
||||
|
||||
go cleanupRoutine(b.DB)
|
||||
for {
|
||||
time.Sleep(60 * time.Minute)
|
||||
db.DB.Exec(`DELETE FROM event WHERE created_at < $1`, time.Now().AddDate(0, -3, 0)) // 3 months
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
var b BasicRelay
|
||||
func (r *Relay) AcceptEvent(evt *nostr.Event) bool {
|
||||
// block events that are too large
|
||||
jsonb, _ := json.Marshal(evt)
|
||||
if len(jsonb) > 10000 {
|
||||
return false
|
||||
}
|
||||
|
||||
relayer.Start(&b)
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *Relay) BeforeSave(evt *nostr.Event) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
func (r *Relay) AfterSave(evt *nostr.Event) {
|
||||
// delete all but the 100 most recent ones for each key
|
||||
r.Storage().(*postgresql.PostgresBackend).DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND created_at < (
|
||||
SELECT created_at FROM event WHERE pubkey = $1
|
||||
ORDER BY created_at DESC OFFSET 100 LIMIT 1
|
||||
)`, evt.PubKey, evt.Kind)
|
||||
}
|
||||
|
||||
func main() {
|
||||
relayer.Start(&Relay{})
|
||||
}
|
||||
|
||||
@@ -1,42 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/fiatjaf/relayer"
|
||||
"github.com/jmoiron/sqlx"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
func initDB(dburl string) (*sqlx.DB, error) {
|
||||
db, err := sqlx.Connect("postgres", dburl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = db.Exec(`
|
||||
CREATE FUNCTION tags_to_tagvalues(jsonb) RETURNS text[]
|
||||
AS 'SELECT array_agg(t->>1) FROM (SELECT jsonb_array_elements($1) AS t)s;'
|
||||
LANGUAGE SQL
|
||||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS event (
|
||||
id text NOT NULL,
|
||||
pubkey text NOT NULL,
|
||||
created_at integer NOT NULL,
|
||||
kind integer NOT NULL,
|
||||
tags jsonb NOT NULL,
|
||||
content text NOT NULL,
|
||||
sig text NOT NULL,
|
||||
|
||||
tagvalues text[] GENERATED ALWAYS AS (tags_to_tagvalues(tags)) STORED
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS ididx ON event USING btree (id text_pattern_ops);
|
||||
CREATE INDEX IF NOT EXISTS pubkeyprefix ON event USING btree (pubkey text_pattern_ops);
|
||||
CREATE INDEX IF NOT EXISTS timeidx ON event (created_at);
|
||||
CREATE INDEX IF NOT EXISTS kindidx ON event (kind);
|
||||
CREATE INDEX IF NOT EXISTS arbitrarytagvalues ON event USING gin (tagvalues);
|
||||
`)
|
||||
relayer.Log.Print(err)
|
||||
return db, nil
|
||||
}
|
||||
158
basic/query.go
158
basic/query.go
@@ -1,158 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/fiatjaf/go-nostr"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
func (b *BasicRelay) QueryEvents(filter *nostr.Filter) (events []nostr.Event, err error) {
|
||||
var conditions []string
|
||||
var params []any
|
||||
|
||||
if filter == nil {
|
||||
err = errors.New("filter cannot be null")
|
||||
return
|
||||
}
|
||||
|
||||
if filter.IDs != nil {
|
||||
if len(filter.IDs) > 500 {
|
||||
// too many ids, fail everything
|
||||
return
|
||||
}
|
||||
|
||||
likeids := make([]string, 0, len(filter.IDs))
|
||||
for _, id := range filter.IDs {
|
||||
// to prevent sql attack here we will check if
|
||||
// these ids are valid 32byte hex
|
||||
parsed, err := hex.DecodeString(id)
|
||||
if err != nil || len(parsed) <= 32 {
|
||||
continue
|
||||
}
|
||||
likeids = append(likeids, fmt.Sprintf("id LIKE '%x%%'", parsed))
|
||||
}
|
||||
if len(likeids) == 0 {
|
||||
// ids being [] mean you won't get anything
|
||||
return
|
||||
}
|
||||
conditions = append(conditions, "("+strings.Join(likeids, " OR ")+")")
|
||||
}
|
||||
|
||||
if filter.Authors != nil {
|
||||
if len(filter.Authors) > 500 {
|
||||
// too many authors, fail everything
|
||||
return
|
||||
}
|
||||
|
||||
likekeys := make([]string, 0, len(filter.Authors))
|
||||
for _, key := range filter.Authors {
|
||||
// to prevent sql attack here we will check if
|
||||
// these keys are valid 32byte hex
|
||||
parsed, err := hex.DecodeString(key)
|
||||
if err != nil || len(parsed) != 32 {
|
||||
continue
|
||||
}
|
||||
likekeys = append(likekeys, fmt.Sprintf("pubkey LIKE '%x%%'", parsed))
|
||||
}
|
||||
if len(likekeys) == 0 {
|
||||
// authors being [] mean you won't get anything
|
||||
return
|
||||
}
|
||||
conditions = append(conditions, "("+strings.Join(likekeys, " OR ")+")")
|
||||
}
|
||||
|
||||
if filter.Kinds != nil {
|
||||
if len(filter.Kinds) > 10 {
|
||||
// too many kinds, fail everything
|
||||
return
|
||||
}
|
||||
|
||||
if len(filter.Kinds) == 0 {
|
||||
// kinds being [] mean you won't get anything
|
||||
return
|
||||
}
|
||||
// no sql injection issues since these are ints
|
||||
inkinds := make([]string, len(filter.Kinds))
|
||||
for i, kind := range filter.Kinds {
|
||||
inkinds[i] = strconv.Itoa(kind)
|
||||
}
|
||||
conditions = append(conditions, `kind IN (`+strings.Join(inkinds, ",")+`)`)
|
||||
}
|
||||
|
||||
tagQuery := make([]string, 0, 1)
|
||||
for _, values := range filter.Tags {
|
||||
if len(values) == 0 {
|
||||
// any tag set to [] is wrong
|
||||
return
|
||||
}
|
||||
|
||||
// add these tags to the query
|
||||
tagQuery = append(tagQuery, values...)
|
||||
|
||||
if len(tagQuery) > 10 {
|
||||
// too many tags, fail everything
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if len(tagQuery) > 0 {
|
||||
arrayBuild := make([]string, len(tagQuery))
|
||||
for i, tagValue := range tagQuery {
|
||||
arrayBuild[i] = "?"
|
||||
params = append(params, tagValue)
|
||||
}
|
||||
|
||||
// we use a very bad implementation in which we only check the tag values and
|
||||
// ignore the tag names
|
||||
conditions = append(conditions,
|
||||
"tagvalues && ARRAY["+strings.Join(arrayBuild, ",")+"]")
|
||||
}
|
||||
|
||||
if filter.Since != nil {
|
||||
conditions = append(conditions, "created_at > ?")
|
||||
params = append(params, filter.Since.Unix())
|
||||
}
|
||||
if filter.Until != nil {
|
||||
conditions = append(conditions, "created_at < ?")
|
||||
params = append(params, filter.Until.Unix())
|
||||
}
|
||||
|
||||
if len(conditions) == 0 {
|
||||
// fallback
|
||||
conditions = append(conditions, "true")
|
||||
}
|
||||
|
||||
query := b.DB.Rebind(`SELECT
|
||||
id, pubkey, created_at, kind, tags, content, sig
|
||||
FROM event WHERE ` +
|
||||
strings.Join(conditions, " AND ") +
|
||||
" ORDER BY created_at LIMIT 100")
|
||||
|
||||
rows, err := b.DB.Query(query, params...)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
log.Warn().Err(err).Interface("filter", filter).Str("query", query).
|
||||
Msg("failed to fetch events")
|
||||
return nil, fmt.Errorf("failed to fetch events: %w", err)
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var evt nostr.Event
|
||||
var timestamp int64
|
||||
err := rows.Scan(&evt.ID, &evt.PubKey, ×tamp,
|
||||
&evt.Kind, &evt.Tags, &evt.Content, &evt.Sig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to scan row: %w", err)
|
||||
}
|
||||
evt.CreatedAt = time.Unix(timestamp, 0)
|
||||
events = append(events, evt)
|
||||
}
|
||||
|
||||
return events, nil
|
||||
}
|
||||
@@ -1,58 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/fiatjaf/go-nostr"
|
||||
)
|
||||
|
||||
func (b *BasicRelay) SaveEvent(evt *nostr.Event) error {
|
||||
// disallow large contents
|
||||
if len(evt.Content) > 1000 {
|
||||
return errors.New("event content too large")
|
||||
}
|
||||
|
||||
// react to different kinds of events
|
||||
if evt.Kind == nostr.KindSetMetadata || evt.Kind == nostr.KindContactList || (10000 <= evt.Kind && evt.Kind < 20000) {
|
||||
// delete past events from this user
|
||||
b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2`, evt.PubKey, evt.Kind)
|
||||
} else if evt.Kind == nostr.KindRecommendServer {
|
||||
// delete past recommend_server events equal to this one
|
||||
b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND content = $3`,
|
||||
evt.PubKey, evt.Kind, evt.Content)
|
||||
} else {
|
||||
// delete all but the 100 most recent ones
|
||||
b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND created_at < (
|
||||
SELECT created_at FROM event WHERE pubkey = $1
|
||||
ORDER BY created_at DESC OFFSET 100 LIMIT 1
|
||||
)`,
|
||||
evt.PubKey, evt.Kind)
|
||||
}
|
||||
|
||||
// insert
|
||||
tagsj, _ := json.Marshal(evt.Tags)
|
||||
_, err := b.DB.Exec(`
|
||||
INSERT INTO event (id, pubkey, created_at, kind, tags, content, sig)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
`, evt.ID, evt.PubKey, evt.CreatedAt.Unix(), evt.Kind, tagsj, evt.Content, evt.Sig)
|
||||
if err != nil {
|
||||
if strings.Index(err.Error(), "UNIQUE") != -1 {
|
||||
// already exists
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed to save event %s: %w", evt.ID, err)
|
||||
}
|
||||
|
||||
// delete ephemeral events after a minute
|
||||
go func() {
|
||||
time.Sleep(75 * time.Second)
|
||||
b.DB.Exec("DELETE FROM event WHERE id = $1", evt.ID)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user