turn relayer into a server framework and put actual relay code into ./basic

This commit is contained in:
fiatjaf
2021-12-25 21:22:40 -03:00
parent ac93e5c028
commit 30eae726c1
12 changed files with 268 additions and 203 deletions

View File

@@ -1,3 +1 @@
This is a simple relay implementation for [nostr](https://github.com/fiatjaf/nostr). Nostr Relay Framework -- use it to implement your own custom relay.
There is a public instance at https://nostr-relay.herokuapp.com/.

View File

@@ -1,9 +1,13 @@
package main package main
import "time" import (
"time"
"github.com/jmoiron/sqlx"
)
// every hour, delete all very old events // every hour, delete all very old events
func cleanupRoutine() { func cleanupRoutine(db *sqlx.DB) {
for { for {
time.Sleep(60 * time.Minute) time.Sleep(60 * time.Minute)
db.Exec(`DELETE FROM event WHERE created_at < $1`, time.Now().AddDate(0, -3, 0)) db.Exec(`DELETE FROM event WHERE created_at < $1`, time.Now().AddDate(0, -3, 0))

44
basic/main.go Normal file
View File

@@ -0,0 +1,44 @@
package main
import (
"fmt"
"github.com/fiatjaf/relayer"
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/reflectx"
"github.com/kelseyhightower/envconfig"
)
type BasicRelay struct {
PostgresDatabase string `envconfig:"POSTGRESQL_DATABASE"`
DB *sqlx.DB
}
func (b *BasicRelay) Name() string {
return "BasicRelay"
}
func (b *BasicRelay) Init() error {
err := envconfig.Process("", b)
if err != nil {
return fmt.Errorf("couldn't process envconfig: %w", err)
}
if db, err := initDB(b.PostgresDatabase); err != nil {
return fmt.Errorf("failed to open database: %w", err)
} else {
db.Mapper = reflectx.NewMapperFunc("json", sqlx.NameMapper)
b.DB = db
}
go cleanupRoutine(b.DB)
return nil
}
func main() {
var b BasicRelay
relayer.Start(&b)
}

View File

@@ -3,10 +3,11 @@ package main
import ( import (
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/rs/zerolog/log"
) )
func initDB() (*sqlx.DB, error) { func initDB(dburl string) (*sqlx.DB, error) {
db, err := sqlx.Connect("postgres", s.PostgresDatabase) db, err := sqlx.Connect("postgres", dburl)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -9,9 +9,12 @@ import (
"github.com/fiatjaf/go-nostr/event" "github.com/fiatjaf/go-nostr/event"
"github.com/fiatjaf/go-nostr/filter" "github.com/fiatjaf/go-nostr/filter"
"github.com/rs/zerolog/log"
) )
func queryEvents(filter *filter.EventFilter) (events []event.Event, err error) { func (b *BasicRelay) QueryEvents(
filter *filter.EventFilter,
) (events []event.Event, err error) {
var conditions []string var conditions []string
var params []interface{} var params []interface{}
@@ -69,11 +72,11 @@ func queryEvents(filter *filter.EventFilter) (events []event.Event, err error) {
conditions = append(conditions, "true") conditions = append(conditions, "true")
} }
query := db.Rebind("SELECT * FROM event WHERE " + query := b.DB.Rebind("SELECT * FROM event WHERE " +
strings.Join(conditions, " AND ") + strings.Join(conditions, " AND ") +
" ORDER BY created_at LIMIT 100") " ORDER BY created_at LIMIT 100")
err = db.Select(&events, query, params...) err = b.DB.Select(&events, query, params...)
if err != nil && err != sql.ErrNoRows { if err != nil && err != sql.ErrNoRows {
log.Warn().Err(err).Interface("filter", filter).Msg("failed to fetch events") log.Warn().Err(err).Interface("filter", filter).Msg("failed to fetch events")
err = fmt.Errorf("failed to fetch events: %w", err) err = fmt.Errorf("failed to fetch events: %w", err)

55
basic/save.go Normal file
View File

@@ -0,0 +1,55 @@
package main
import (
"encoding/json"
"errors"
"fmt"
"strings"
"github.com/fiatjaf/go-nostr/event"
)
func (b *BasicRelay) SaveEvent(evt *event.Event) error {
// disallow large contents
if len(evt.Content) > 1000 {
return errors.New("event content too large")
}
// react to different kinds of events
switch evt.Kind {
case event.KindSetMetadata:
// delete past set_metadata events from this user
b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = 0`, evt.PubKey)
case event.KindRecommendServer:
// delete past recommend_server events equal to this one
b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = 2 AND content = $2`,
evt.PubKey, evt.Content)
case event.KindContactList:
// delete past contact lists from this same pubkey
b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = 3`, evt.PubKey)
default:
// delete all but the 10 most recent ones
b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND created_at < (
SELECT created_at FROM event WHERE pubkey = $1
ORDER BY created_at DESC OFFSET 10 LIMIT 1
)`,
evt.PubKey, evt.Kind)
}
// insert
tagsj, _ := json.Marshal(evt.Tags)
_, err := b.DB.Exec(`
INSERT INTO event (id, pubkey, created_at, kind, tags, content, sig)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`, evt.ID, evt.PubKey, evt.CreatedAt, evt.Kind, tagsj, evt.Content, evt.Sig)
if err != nil {
if strings.Index(err.Error(), "UNIQUE") != -1 {
// already exists
return nil
}
return fmt.Errorf("failed to save event from %s", evt.PubKey)
}
return nil
}

View File

@@ -1,12 +1,12 @@
package main package relayer
import ( import (
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"net/http" "net/http"
"strings"
"time" "time"
"github.com/fiatjaf/go-nostr/event" "github.com/fiatjaf/go-nostr/event"
@@ -34,196 +34,154 @@ var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true }, CheckOrigin: func(r *http.Request) bool { return true },
} }
func handleWebsocket(w http.ResponseWriter, r *http.Request) { func handleWebsocket(relay Relay) func(http.ResponseWriter, *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil) return func(w http.ResponseWriter, r *http.Request) {
if err != nil { conn, err := upgrader.Upgrade(w, r, nil)
log.Warn().Err(err).Msg("failed to upgrade websocket") if err != nil {
return log.Warn().Err(err).Msg("failed to upgrade websocket")
} return
}
// reader // reader
go func() { go func() {
defer func() { defer func() {
conn.Close() conn.Close()
}() }()
conn.SetReadLimit(maxMessageSize) conn.SetReadLimit(maxMessageSize)
conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(pongWait)) conn.SetReadDeadline(time.Now().Add(pongWait))
return nil conn.SetPongHandler(func(string) error {
}) conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for { for {
typ, message, err := conn.ReadMessage() typ, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(
err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Warn().Err(err).Msg("unexpected close error")
}
break
}
if typ == websocket.PingMessage {
conn.WriteMessage(websocket.PongMessage, nil)
continue
}
go func(message []byte) {
var err error
defer func() {
if err != nil {
conn.WriteJSON([]interface{}{"NOTICE", err.Error()})
}
}()
var request []json.RawMessage
err = json.Unmarshal(message, &request)
if err == nil && len(request) < 2 {
err = errors.New("request has less than parameters")
return
}
if err != nil { if err != nil {
err = nil if websocket.IsUnexpectedCloseError(
return err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Warn().Err(err).Msg("unexpected close error")
}
break
} }
var typ string if typ == websocket.PingMessage {
json.Unmarshal(request[0], &typ) conn.WriteMessage(websocket.PongMessage, nil)
continue
}
switch typ { go func(message []byte) {
case "EVENT": var err error
// it's a new event
err = saveEvent(request[1])
case "REQ": defer func() {
var id string if err != nil {
json.Unmarshal(request[1], &id) conn.WriteJSON([]interface{}{"NOTICE", err.Error()})
if id == "" { }
err = errors.New("REQ has no <id>") }()
var request []json.RawMessage
err = json.Unmarshal(message, &request)
if err == nil && len(request) < 2 {
err = errors.New("request has less than 2 parameters")
return
}
if err != nil {
err = nil
return return
} }
filters := make(filter.EventFilters, len(request)-2) var typ string
for i, filterReq := range request[2:] { json.Unmarshal(request[0], &typ)
err = json.Unmarshal(filterReq, &filters[i])
switch typ {
case "EVENT":
// it's a new event
var evt event.Event
err := json.Unmarshal(request[1], &evt)
if err != nil {
err = fmt.Errorf("failed to decode event: %w", err)
return
}
// check serialization
serialized := evt.Serialize()
// assign ID
hash := sha256.Sum256(serialized)
evt.ID = hex.EncodeToString(hash[:])
// check signature (requires the ID to be set)
if ok, err := evt.CheckSignature(); err != nil {
err = errors.New("signature verification error")
return
} else if !ok {
err = errors.New("signature invalid")
return
}
err = relay.SaveEvent(&evt)
if err != nil { if err != nil {
return return
} }
events, err := queryEvents(&filters[i]) notifyListeners(&evt)
if err == nil { case "REQ":
for _, event := range events { var id string
conn.WriteJSON([]interface{}{"EVENT", id, event}) json.Unmarshal(request[1], &id)
if id == "" {
err = errors.New("REQ has no <id>")
return
}
filters := make(filter.EventFilters, len(request)-2)
for i, filterReq := range request[2:] {
err = json.Unmarshal(filterReq, &filters[i])
if err != nil {
return
}
events, err := relay.QueryEvents(&filters[i])
if err == nil {
for _, event := range events {
conn.WriteJSON([]interface{}{"EVENT", id, event})
}
} }
} }
setListener(id, conn, filters)
case "CLOSE":
var id string
json.Unmarshal(request[0], &id)
if id == "" {
err = errors.New("CLOSE has no <id>")
return
}
removeListener(conn, id)
} }
}(message)
setListener(id, conn, filters) }
case "CLOSE":
var id string
json.Unmarshal(request[0], &id)
if id == "" {
err = errors.New("CLOSE has no <id>")
return
}
removeListener(conn, id)
}
}(message)
}
}()
// writer
go func() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
conn.Close()
}() }()
for { // writer
select { go func() {
case <-ticker.C: ticker := time.NewTicker(pingPeriod)
err := conn.WriteMessage(websocket.PingMessage, nil) defer func() {
if err != nil { ticker.Stop()
log.Warn().Err(err).Msg("error writing ping, closing websocket") conn.Close()
return }()
for {
select {
case <-ticker.C:
err := conn.WriteMessage(websocket.PingMessage, nil)
if err != nil {
log.Warn().Err(err).Msg("error writing ping, closing websocket")
return
}
} }
} }
} }()
}() }
}
func saveEvent(body []byte) error {
var evt event.Event
err := json.Unmarshal(body, &evt)
if err != nil {
log.Warn().Err(err).Str("body", string(body)).Msg("couldn't decode body")
return errors.New("failed to decode event")
}
// disallow large contents
if len(evt.Content) > 1000 {
log.Warn().Err(err).Msg("event content too large")
return errors.New("event content too large")
}
// check serialization
serialized := evt.Serialize()
// assign ID
hash := sha256.Sum256(serialized)
evt.ID = hex.EncodeToString(hash[:])
// check signature (requires the ID to be set)
if ok, err := evt.CheckSignature(); err != nil {
log.Warn().Err(err).Msg("signature verification error")
return errors.New("signature verification error")
} else if !ok {
log.Warn().Err(err).Msg("signature invalid")
return errors.New("signature invalid")
}
// react to different kinds of events
switch evt.Kind {
case event.KindSetMetadata:
// delete past set_metadata events from this user
db.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = 0`, evt.PubKey)
case event.KindRecommendServer:
// delete past recommend_server events equal to this one
db.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = 2 AND content = $2`,
evt.PubKey, evt.Content)
case event.KindContactList:
// delete past contact lists from this same pubkey
db.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = 3`, evt.PubKey)
default:
// delete all but the 10 most recent ones
db.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND created_at < (
SELECT created_at FROM event WHERE pubkey = $1
ORDER BY created_at DESC OFFSET 10 LIMIT 1
)`,
evt.PubKey, evt.Kind)
}
// insert
tagsj, _ := json.Marshal(evt.Tags)
_, err = db.Exec(`
INSERT INTO event (id, pubkey, created_at, kind, tags, content, sig)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`, evt.ID, evt.PubKey, evt.CreatedAt, evt.Kind, tagsj, evt.Content, evt.Sig)
if err != nil {
if strings.Index(err.Error(), "UNIQUE") != -1 {
// already exists
return nil
}
log.Warn().Err(err).Str("pubkey", evt.PubKey).Msg("failed to save")
return errors.New("failed to save event")
}
notifyListeners(&evt)
return nil
} }

15
interface.go Normal file
View File

@@ -0,0 +1,15 @@
package relayer
import (
"github.com/fiatjaf/go-nostr/event"
"github.com/fiatjaf/go-nostr/filter"
)
var Log = log
type Relay interface {
Name() string
Init() error
SaveEvent(*event.Event) error
QueryEvents(*filter.EventFilter) ([]event.Event, error)
}

View File

@@ -1,4 +1,4 @@
package main package relayer
import ( import (
"sync" "sync"

View File

@@ -1,4 +1,4 @@
package main package relayer
type Notice struct { type Notice struct {
Kind string `json:"kind"` Kind string `json:"kind"`

View File

@@ -1,4 +1,4 @@
package main package relayer
import ( import (
"net/http" "net/http"
@@ -6,9 +6,6 @@ import (
"time" "time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/reflectx"
"github.com/kelseyhightower/envconfig"
"github.com/rs/cors" "github.com/rs/cors"
"github.com/rs/zerolog" "github.com/rs/zerolog"
) )
@@ -16,32 +13,22 @@ import (
type Settings struct { type Settings struct {
Host string `envconfig:"HOST" default:"0.0.0.0"` Host string `envconfig:"HOST" default:"0.0.0.0"`
Port string `envconfig:"PORT" default:"7447"` Port string `envconfig:"PORT" default:"7447"`
PostgresDatabase string `envconfig:"POSTGRESQL_DATABASE"`
} }
var s Settings var s Settings
var err error
var db *sqlx.DB
var log = zerolog.New(os.Stderr).Output(zerolog.ConsoleWriter{Out: os.Stderr}) var log = zerolog.New(os.Stderr).Output(zerolog.ConsoleWriter{Out: os.Stderr})
var router = mux.NewRouter() var router = mux.NewRouter()
func main() { func Start(relay Relay) {
err = envconfig.Process("", &s) Log = log.With().Str("name", relay.Name()).Logger()
if err != nil {
log.Fatal().Err(err).Msg("couldn't process envconfig")
}
db, err = initDB() if err := relay.Init(); err != nil {
if err != nil { Log.Fatal().Err(err).Msg("failed to start")
log.Fatal().Err(err).Msg("failed to open database")
} }
db.Mapper = reflectx.NewMapperFunc("json", sqlx.NameMapper)
go cleanupRoutine()
// NIP01 // NIP01
router.Path("/").Methods("GET").HandlerFunc(handleWebsocket) router.Path("/").Methods("GET").HandlerFunc(handleWebsocket(relay))
srv := &http.Server{ srv := &http.Server{
Handler: cors.Default().Handler(router), Handler: cors.Default().Handler(router),