diff --git a/README.md b/README.md index ba4cb5a..b0d7533 100644 --- a/README.md +++ b/README.md @@ -1,3 +1 @@ -This is a simple relay implementation for [nostr](https://github.com/fiatjaf/nostr). - -There is a public instance at https://nostr-relay.herokuapp.com/. +Nostr Relay Framework -- use it to implement your own custom relay. diff --git a/Makefile b/basic/Makefile similarity index 100% rename from Makefile rename to basic/Makefile diff --git a/cleanup.go b/basic/cleanup.go similarity index 69% rename from cleanup.go rename to basic/cleanup.go index 974e4d6..773c015 100644 --- a/cleanup.go +++ b/basic/cleanup.go @@ -1,9 +1,13 @@ package main -import "time" +import ( + "time" + + "github.com/jmoiron/sqlx" +) // every hour, delete all very old events -func cleanupRoutine() { +func cleanupRoutine(db *sqlx.DB) { for { time.Sleep(60 * time.Minute) db.Exec(`DELETE FROM event WHERE created_at < $1`, time.Now().AddDate(0, -3, 0)) diff --git a/basic/main.go b/basic/main.go new file mode 100644 index 0000000..3a4fbd1 --- /dev/null +++ b/basic/main.go @@ -0,0 +1,44 @@ +package main + +import ( + "fmt" + + "github.com/fiatjaf/relayer" + "github.com/jmoiron/sqlx" + "github.com/jmoiron/sqlx/reflectx" + "github.com/kelseyhightower/envconfig" +) + +type BasicRelay struct { + PostgresDatabase string `envconfig:"POSTGRESQL_DATABASE"` + + DB *sqlx.DB +} + +func (b *BasicRelay) Name() string { + return "BasicRelay" +} + +func (b *BasicRelay) Init() error { + err := envconfig.Process("", b) + if err != nil { + return fmt.Errorf("couldn't process envconfig: %w", err) + } + + if db, err := initDB(b.PostgresDatabase); err != nil { + return fmt.Errorf("failed to open database: %w", err) + } else { + db.Mapper = reflectx.NewMapperFunc("json", sqlx.NameMapper) + b.DB = db + } + + go cleanupRoutine(b.DB) + + return nil +} + +func main() { + var b BasicRelay + + relayer.Start(&b) +} diff --git a/postgresql.go b/basic/postgresql.go similarity index 83% rename from postgresql.go rename to basic/postgresql.go index b145b6a..0fd9056 100644 --- a/postgresql.go +++ b/basic/postgresql.go @@ -3,10 +3,11 @@ package main import ( "github.com/jmoiron/sqlx" _ "github.com/lib/pq" + "github.com/rs/zerolog/log" ) -func initDB() (*sqlx.DB, error) { - db, err := sqlx.Connect("postgres", s.PostgresDatabase) +func initDB(dburl string) (*sqlx.DB, error) { + db, err := sqlx.Connect("postgres", dburl) if err != nil { return nil, err } diff --git a/query.go b/basic/query.go similarity index 88% rename from query.go rename to basic/query.go index f5414fe..8fe39f6 100644 --- a/query.go +++ b/basic/query.go @@ -9,9 +9,12 @@ import ( "github.com/fiatjaf/go-nostr/event" "github.com/fiatjaf/go-nostr/filter" + "github.com/rs/zerolog/log" ) -func queryEvents(filter *filter.EventFilter) (events []event.Event, err error) { +func (b *BasicRelay) QueryEvents( + filter *filter.EventFilter, +) (events []event.Event, err error) { var conditions []string var params []interface{} @@ -69,11 +72,11 @@ func queryEvents(filter *filter.EventFilter) (events []event.Event, err error) { conditions = append(conditions, "true") } - query := db.Rebind("SELECT * FROM event WHERE " + + query := b.DB.Rebind("SELECT * FROM event WHERE " + strings.Join(conditions, " AND ") + " ORDER BY created_at LIMIT 100") - err = db.Select(&events, query, params...) + err = b.DB.Select(&events, query, params...) if err != nil && err != sql.ErrNoRows { log.Warn().Err(err).Interface("filter", filter).Msg("failed to fetch events") err = fmt.Errorf("failed to fetch events: %w", err) diff --git a/basic/save.go b/basic/save.go new file mode 100644 index 0000000..cb2dfe7 --- /dev/null +++ b/basic/save.go @@ -0,0 +1,55 @@ +package main + +import ( + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/fiatjaf/go-nostr/event" +) + +func (b *BasicRelay) SaveEvent(evt *event.Event) error { + // disallow large contents + if len(evt.Content) > 1000 { + return errors.New("event content too large") + } + + // react to different kinds of events + switch evt.Kind { + case event.KindSetMetadata: + // delete past set_metadata events from this user + b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = 0`, evt.PubKey) + case event.KindRecommendServer: + // delete past recommend_server events equal to this one + b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = 2 AND content = $2`, + evt.PubKey, evt.Content) + case event.KindContactList: + // delete past contact lists from this same pubkey + b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = 3`, evt.PubKey) + default: + // delete all but the 10 most recent ones + b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND created_at < ( + SELECT created_at FROM event WHERE pubkey = $1 + ORDER BY created_at DESC OFFSET 10 LIMIT 1 + )`, + evt.PubKey, evt.Kind) + } + + // insert + tagsj, _ := json.Marshal(evt.Tags) + _, err := b.DB.Exec(` + INSERT INTO event (id, pubkey, created_at, kind, tags, content, sig) + VALUES ($1, $2, $3, $4, $5, $6, $7) + `, evt.ID, evt.PubKey, evt.CreatedAt, evt.Kind, tagsj, evt.Content, evt.Sig) + if err != nil { + if strings.Index(err.Error(), "UNIQUE") != -1 { + // already exists + return nil + } + + return fmt.Errorf("failed to save event from %s", evt.PubKey) + } + + return nil +} diff --git a/handlers.go b/handlers.go index d152989..c083852 100644 --- a/handlers.go +++ b/handlers.go @@ -1,12 +1,12 @@ -package main +package relayer import ( "crypto/sha256" "encoding/hex" "encoding/json" "errors" + "fmt" "net/http" - "strings" "time" "github.com/fiatjaf/go-nostr/event" @@ -34,196 +34,154 @@ var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } -func handleWebsocket(w http.ResponseWriter, r *http.Request) { - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Warn().Err(err).Msg("failed to upgrade websocket") - return - } +func handleWebsocket(relay Relay) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Warn().Err(err).Msg("failed to upgrade websocket") + return + } - // reader - go func() { - defer func() { - conn.Close() - }() + // reader + go func() { + defer func() { + conn.Close() + }() - conn.SetReadLimit(maxMessageSize) - conn.SetReadDeadline(time.Now().Add(pongWait)) - conn.SetPongHandler(func(string) error { + conn.SetReadLimit(maxMessageSize) conn.SetReadDeadline(time.Now().Add(pongWait)) - return nil - }) + conn.SetPongHandler(func(string) error { + conn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) - for { - typ, message, err := conn.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError( - err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - log.Warn().Err(err).Msg("unexpected close error") - } - break - } - - if typ == websocket.PingMessage { - conn.WriteMessage(websocket.PongMessage, nil) - continue - } - - go func(message []byte) { - var err error - - defer func() { - if err != nil { - conn.WriteJSON([]interface{}{"NOTICE", err.Error()}) - } - }() - - var request []json.RawMessage - err = json.Unmarshal(message, &request) - if err == nil && len(request) < 2 { - err = errors.New("request has less than parameters") - return - } + for { + typ, message, err := conn.ReadMessage() if err != nil { - err = nil - return + if websocket.IsUnexpectedCloseError( + err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Warn().Err(err).Msg("unexpected close error") + } + break } - var typ string - json.Unmarshal(request[0], &typ) + if typ == websocket.PingMessage { + conn.WriteMessage(websocket.PongMessage, nil) + continue + } - switch typ { - case "EVENT": - // it's a new event - err = saveEvent(request[1]) + go func(message []byte) { + var err error - case "REQ": - var id string - json.Unmarshal(request[1], &id) - if id == "" { - err = errors.New("REQ has no ") + defer func() { + if err != nil { + conn.WriteJSON([]interface{}{"NOTICE", err.Error()}) + } + }() + + var request []json.RawMessage + err = json.Unmarshal(message, &request) + if err == nil && len(request) < 2 { + err = errors.New("request has less than 2 parameters") + return + } + if err != nil { + err = nil return } - filters := make(filter.EventFilters, len(request)-2) - for i, filterReq := range request[2:] { - err = json.Unmarshal(filterReq, &filters[i]) + var typ string + json.Unmarshal(request[0], &typ) + + switch typ { + case "EVENT": + // it's a new event + var evt event.Event + err := json.Unmarshal(request[1], &evt) + if err != nil { + err = fmt.Errorf("failed to decode event: %w", err) + return + } + + // check serialization + serialized := evt.Serialize() + + // assign ID + hash := sha256.Sum256(serialized) + evt.ID = hex.EncodeToString(hash[:]) + + // check signature (requires the ID to be set) + if ok, err := evt.CheckSignature(); err != nil { + err = errors.New("signature verification error") + return + } else if !ok { + err = errors.New("signature invalid") + return + } + + err = relay.SaveEvent(&evt) if err != nil { return } - events, err := queryEvents(&filters[i]) - if err == nil { - for _, event := range events { - conn.WriteJSON([]interface{}{"EVENT", id, event}) + notifyListeners(&evt) + case "REQ": + var id string + json.Unmarshal(request[1], &id) + if id == "" { + err = errors.New("REQ has no ") + return + } + + filters := make(filter.EventFilters, len(request)-2) + for i, filterReq := range request[2:] { + err = json.Unmarshal(filterReq, &filters[i]) + if err != nil { + return + } + + events, err := relay.QueryEvents(&filters[i]) + if err == nil { + for _, event := range events { + conn.WriteJSON([]interface{}{"EVENT", id, event}) + } } } + + setListener(id, conn, filters) + + case "CLOSE": + var id string + json.Unmarshal(request[0], &id) + if id == "" { + err = errors.New("CLOSE has no ") + return + } + + removeListener(conn, id) } - - setListener(id, conn, filters) - - case "CLOSE": - var id string - json.Unmarshal(request[0], &id) - if id == "" { - err = errors.New("CLOSE has no ") - return - } - - removeListener(conn, id) - } - }(message) - } - }() - - // writer - go func() { - ticker := time.NewTicker(pingPeriod) - defer func() { - ticker.Stop() - conn.Close() + }(message) + } }() - for { - select { - case <-ticker.C: - err := conn.WriteMessage(websocket.PingMessage, nil) - if err != nil { - log.Warn().Err(err).Msg("error writing ping, closing websocket") - return + // writer + go func() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + conn.Close() + }() + + for { + select { + case <-ticker.C: + err := conn.WriteMessage(websocket.PingMessage, nil) + if err != nil { + log.Warn().Err(err).Msg("error writing ping, closing websocket") + return + } } } - } - }() -} - -func saveEvent(body []byte) error { - var evt event.Event - err := json.Unmarshal(body, &evt) - if err != nil { - log.Warn().Err(err).Str("body", string(body)).Msg("couldn't decode body") - return errors.New("failed to decode event") - } - - // disallow large contents - if len(evt.Content) > 1000 { - log.Warn().Err(err).Msg("event content too large") - return errors.New("event content too large") - } - - // check serialization - serialized := evt.Serialize() - - // assign ID - hash := sha256.Sum256(serialized) - evt.ID = hex.EncodeToString(hash[:]) - - // check signature (requires the ID to be set) - if ok, err := evt.CheckSignature(); err != nil { - log.Warn().Err(err).Msg("signature verification error") - return errors.New("signature verification error") - } else if !ok { - log.Warn().Err(err).Msg("signature invalid") - return errors.New("signature invalid") - } - - // react to different kinds of events - switch evt.Kind { - case event.KindSetMetadata: - // delete past set_metadata events from this user - db.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = 0`, evt.PubKey) - case event.KindRecommendServer: - // delete past recommend_server events equal to this one - db.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = 2 AND content = $2`, - evt.PubKey, evt.Content) - case event.KindContactList: - // delete past contact lists from this same pubkey - db.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = 3`, evt.PubKey) - default: - // delete all but the 10 most recent ones - db.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND created_at < ( - SELECT created_at FROM event WHERE pubkey = $1 - ORDER BY created_at DESC OFFSET 10 LIMIT 1 - )`, - evt.PubKey, evt.Kind) - } - - // insert - tagsj, _ := json.Marshal(evt.Tags) - _, err = db.Exec(` - INSERT INTO event (id, pubkey, created_at, kind, tags, content, sig) - VALUES ($1, $2, $3, $4, $5, $6, $7) - `, evt.ID, evt.PubKey, evt.CreatedAt, evt.Kind, tagsj, evt.Content, evt.Sig) - if err != nil { - if strings.Index(err.Error(), "UNIQUE") != -1 { - // already exists - return nil - } - - log.Warn().Err(err).Str("pubkey", evt.PubKey).Msg("failed to save") - return errors.New("failed to save event") - } - - notifyListeners(&evt) - return nil + }() + } } diff --git a/interface.go b/interface.go new file mode 100644 index 0000000..28051c1 --- /dev/null +++ b/interface.go @@ -0,0 +1,15 @@ +package relayer + +import ( + "github.com/fiatjaf/go-nostr/event" + "github.com/fiatjaf/go-nostr/filter" +) + +var Log = log + +type Relay interface { + Name() string + Init() error + SaveEvent(*event.Event) error + QueryEvents(*filter.EventFilter) ([]event.Event, error) +} diff --git a/listener.go b/listener.go index be8dccc..6927767 100644 --- a/listener.go +++ b/listener.go @@ -1,4 +1,4 @@ -package main +package relayer import ( "sync" diff --git a/notice.go b/notice.go index 9d96d4a..468158d 100644 --- a/notice.go +++ b/notice.go @@ -1,4 +1,4 @@ -package main +package relayer type Notice struct { Kind string `json:"kind"` diff --git a/main.go b/relayer.go similarity index 56% rename from main.go rename to relayer.go index 7e3abfc..d19fb95 100644 --- a/main.go +++ b/relayer.go @@ -1,4 +1,4 @@ -package main +package relayer import ( "net/http" @@ -6,9 +6,6 @@ import ( "time" "github.com/gorilla/mux" - "github.com/jmoiron/sqlx" - "github.com/jmoiron/sqlx/reflectx" - "github.com/kelseyhightower/envconfig" "github.com/rs/cors" "github.com/rs/zerolog" ) @@ -16,32 +13,22 @@ import ( type Settings struct { Host string `envconfig:"HOST" default:"0.0.0.0"` Port string `envconfig:"PORT" default:"7447"` - - PostgresDatabase string `envconfig:"POSTGRESQL_DATABASE"` } var s Settings -var err error -var db *sqlx.DB var log = zerolog.New(os.Stderr).Output(zerolog.ConsoleWriter{Out: os.Stderr}) + var router = mux.NewRouter() -func main() { - err = envconfig.Process("", &s) - if err != nil { - log.Fatal().Err(err).Msg("couldn't process envconfig") - } +func Start(relay Relay) { + Log = log.With().Str("name", relay.Name()).Logger() - db, err = initDB() - if err != nil { - log.Fatal().Err(err).Msg("failed to open database") + if err := relay.Init(); err != nil { + Log.Fatal().Err(err).Msg("failed to start") } - db.Mapper = reflectx.NewMapperFunc("json", sqlx.NameMapper) - - go cleanupRoutine() // NIP01 - router.Path("/").Methods("GET").HandlerFunc(handleWebsocket) + router.Path("/").Methods("GET").HandlerFunc(handleWebsocket(relay)) srv := &http.Server{ Handler: cors.Default().Handler(router),