diff --git a/app/handle-event.go b/app/handle-event.go index ac278d7..417f96a 100644 --- a/app/handle-event.go +++ b/app/handle-event.go @@ -8,13 +8,13 @@ import ( "lol.mleku.dev/chk" "lol.mleku.dev/log" - acl "next.orly.dev/pkg/acl" + "next.orly.dev/pkg/acl" "next.orly.dev/pkg/encoders/envelopes/authenvelope" "next.orly.dev/pkg/encoders/envelopes/eventenvelope" "next.orly.dev/pkg/encoders/envelopes/okenvelope" "next.orly.dev/pkg/encoders/kind" "next.orly.dev/pkg/encoders/reason" - utils "next.orly.dev/pkg/utils" + "next.orly.dev/pkg/utils" ) func (l *Listener) HandleEvent(msg []byte) (err error) { @@ -151,7 +151,7 @@ func (l *Listener) HandleEvent(msg []byte) (err error) { return } // Deliver the event to subscribers immediately after sending OK response - l.publishers.Deliver(env.E) + go l.publishers.Deliver(env.E) log.D.F("saved event %0x", env.E.ID) var isNewFromAdmin bool for _, admin := range l.Admins { diff --git a/main.go b/main.go index c23b2a7..232f07e 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "net/http" "os" "os/signal" + "runtime" "time" "github.com/pkg/profile" @@ -19,6 +20,7 @@ import ( ) func main() { + runtime.GOMAXPROCS(runtime.NumCPU() * 4) var err error var cfg *config.C if cfg, err = config.New(); chk.T(err) { diff --git a/pkg/acl/follows.go b/pkg/acl/follows.go index d0b6aaa..f835507 100644 --- a/pkg/acl/follows.go +++ b/pkg/acl/follows.go @@ -12,7 +12,7 @@ import ( "lol.mleku.dev/errorf" "lol.mleku.dev/log" "next.orly.dev/app/config" - database "next.orly.dev/pkg/database" + "next.orly.dev/pkg/database" "next.orly.dev/pkg/database/indexes/types" "next.orly.dev/pkg/encoders/bech32encoding" "next.orly.dev/pkg/encoders/envelopes" @@ -25,7 +25,7 @@ import ( "next.orly.dev/pkg/encoders/kind" "next.orly.dev/pkg/encoders/tag" "next.orly.dev/pkg/protocol/publish" - utils "next.orly.dev/pkg/utils" + "next.orly.dev/pkg/utils" "next.orly.dev/pkg/utils/normalize" "next.orly.dev/pkg/utils/values" ) @@ -298,7 +298,7 @@ func (f *Follows) startSubscriptions(ctx context.Context) { } else { // Only dispatch if the event was newly saved (no error) if f.pubs != nil { - f.pubs.Deliver(res.Event) + go f.pubs.Deliver(res.Event) } log.I.F( "saved new event from follows syncer: %0x", diff --git a/pkg/database/get-serial-by-id.go b/pkg/database/get-serial-by-id.go index 0220de1..36e26b9 100644 --- a/pkg/database/get-serial-by-id.go +++ b/pkg/database/get-serial-by-id.go @@ -19,18 +19,16 @@ func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) { if idxs, err = GetIndexesFromFilter(&filter.F{Ids: tag.NewFromBytesSlice(id)}); chk.E(err) { return } - - for i, idx := range idxs { - log.T.F( - "GetSerialById: searching range %d: start=%x, end=%x", i, idx.Start, - idx.End, - ) - } + // for i, idx := range idxs { + // log.T.F( + // "GetSerialById: searching range %d: start=%x, end=%x", i, idx.Start, + // idx.End, + // ) + // } if len(idxs) == 0 { err = errorf.E("no indexes found for id %0x", id) return } - idFound := false if err = d.View( func(txn *badger.Txn) (err error) { @@ -49,16 +47,15 @@ func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) { idFound = true } else { // Item not found in database - log.T.F( - "GetSerialById: ID not found in database: %s", hex.Enc(id), - ) + // log.T.F( + // "GetSerialById: ID not found in database: %s", hex.Enc(id), + // ) } return }, ); chk.E(err) { return } - if !idFound { err = errorf.T("id not found in database: %s", hex.Enc(id)) return @@ -67,7 +64,6 @@ func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) { return } -// // func (d *D) GetSerialBytesById(id []byte) (ser []byte, err error) { // var idxs []Range // if idxs, err = GetIndexesFromFilter(&filter.F{Ids: tag.New(id)}); chk.E(err) { diff --git a/pkg/database/query-events.go b/pkg/database/query-events.go index 0a9a0d5..d2ffc87 100644 --- a/pkg/database/query-events.go +++ b/pkg/database/query-events.go @@ -43,49 +43,49 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) ( var expDeletes types.Uint40s var expEvs event.S if f.Ids != nil && f.Ids.Len() > 0 { - for _, id := range f.Ids.T { - log.T.F("QueryEvents: looking for ID=%s", hex.Enc(id)) - } - log.T.F("QueryEvents: ids path, count=%d", f.Ids.Len()) + // for _, id := range f.Ids.T { + // log.T.F("QueryEvents: looking for ID=%s", hex.Enc(id)) + // } + // log.T.F("QueryEvents: ids path, count=%d", f.Ids.Len()) for _, idx := range f.Ids.T { - log.T.F("QueryEvents: lookup id=%s", hex.Enc(idx)) + // log.T.F("QueryEvents: lookup id=%s", hex.Enc(idx)) // we know there is only Ids in this, so run the ID query and fetch. var ser *types.Uint40 var idErr error if ser, idErr = d.GetSerialById(idx); idErr != nil { // Check if this is a "not found" error which is expected for IDs we don't have if strings.Contains(idErr.Error(), "id not found in database") { - log.T.F( - "QueryEvents: ID not found in database: %s", - hex.Enc(idx), - ) + // log.T.F( + // "QueryEvents: ID not found in database: %s", + // hex.Enc(idx), + // ) } else { // Log unexpected errors but continue processing other IDs - log.E.F( - "QueryEvents: error looking up id=%s err=%v", - hex.Enc(idx), idErr, - ) + // log.E.F( + // "QueryEvents: error looking up id=%s err=%v", + // hex.Enc(idx), idErr, + // ) } continue } // Check if the serial is nil, which indicates the ID wasn't found if ser == nil { - log.T.F("QueryEvents: Serial is nil for ID: %s", hex.Enc(idx)) + // log.T.F("QueryEvents: Serial is nil for ID: %s", hex.Enc(idx)) continue } // fetch the events var ev *event.E if ev, err = d.FetchEventBySerial(ser); err != nil { - log.T.F( - "QueryEvents: fetch by serial failed for id=%s ser=%v err=%v", - hex.Enc(idx), ser, err, - ) + // log.T.F( + // "QueryEvents: fetch by serial failed for id=%s ser=%v err=%v", + // hex.Enc(idx), ser, err, + // ) continue } - log.T.F( - "QueryEvents: found id=%s kind=%d created_at=%d", - hex.Enc(ev.ID), ev.Kind, ev.CreatedAt, - ) + // log.T.F( + // "QueryEvents: found id=%s kind=%d created_at=%d", + // hex.Enc(ev.ID), ev.Kind, ev.CreatedAt, + // ) // check for an expiration tag and delete after returning the result if CheckExpiration(ev) { log.T.F( @@ -98,16 +98,16 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) ( } // skip events that have been deleted by a proper deletion event if derr := d.CheckForDeleted(ev, nil); derr != nil { - log.T.F( - "QueryEvents: id=%s filtered out due to deletion: %v", - hex.Enc(ev.ID), derr, - ) + // log.T.F( + // "QueryEvents: id=%s filtered out due to deletion: %v", + // hex.Enc(ev.ID), derr, + // ) continue } - log.T.F( - "QueryEvents: id=%s SUCCESSFULLY FOUND, adding to results", - hex.Enc(ev.ID), - ) + // log.T.F( + // "QueryEvents: id=%s SUCCESSFULLY FOUND, adding to results", + // hex.Enc(ev.ID), + // ) evs = append(evs, ev) } // sort the events by timestamp @@ -301,20 +301,19 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) ( if ev, err = d.FetchEventBySerial(ser); err != nil { continue } - // Add logging for tag filter debugging if f.Tags != nil && f.Tags.Len() > 0 { - var eventTags []string - if ev.Tags != nil && ev.Tags.Len() > 0 { - for _, t := range *ev.Tags { - if t.Len() >= 2 { - eventTags = append( - eventTags, - string(t.Key())+"="+string(t.Value()), - ) - } - } - } + // var eventTags []string + // if ev.Tags != nil && ev.Tags.Len() > 0 { + // for _, t := range *ev.Tags { + // if t.Len() >= 2 { + // eventTags = append( + // eventTags, + // string(t.Key())+"="+string(t.Value()), + // ) + // } + // } + // } // log.T.F( // "QueryEvents: processing event ID=%s kind=%d tags=%v", // hex.Enc(ev.ID), ev.Kind, eventTags, diff --git a/pkg/database/query-for-deleted.go b/pkg/database/query-for-deleted.go index dc2d23c..875f1de 100644 --- a/pkg/database/query-for-deleted.go +++ b/pkg/database/query-for-deleted.go @@ -123,7 +123,7 @@ func (d *D) CheckForDeleted(ev *event.E, admins [][]byte) (err error) { } } if ev.CreatedAt < maxTs { - err = errorf.E( + err = fmt.Errorf( "blocked: %0x was deleted: the event is older than the delete event %0x: event: %d delete: %d", ev.ID, maxId, ev.CreatedAt, maxTs, ) @@ -165,17 +165,17 @@ func (d *D) CheckForDeleted(ev *event.E, admins [][]byte) (err error) { idPkTss = append(idPkTss, tmp...) // find the newest deletion without sorting to reduce cost maxTs := idPkTss[0].Ts - maxId := idPkTss[0].Id + // maxId := idPkTss[0].Id for i := 1; i < len(idPkTss); i++ { if idPkTss[i].Ts > maxTs { maxTs = idPkTss[i].Ts - maxId = idPkTss[i].Id + // maxId = idPkTss[i].Id } } if ev.CreatedAt < maxTs { - err = errorf.E( - "blocked: %0x was deleted by address %s: event is older than the delete: event: %d delete: %d", - ev.ID, at, maxId, ev.CreatedAt, maxTs, + err = fmt.Errorf( + "blocked: was deleted by address %s: event is older than the delete: event: %d delete: %d", + at, ev.CreatedAt, maxTs, ) return } @@ -206,8 +206,8 @@ func (d *D) CheckForDeleted(ev *event.E, admins [][]byte) (err error) { // For e-tag deletions (delete by ID), any deletion event means the event cannot be resubmitted // regardless of timestamp, since it's a specific deletion of this exact event err = errorf.E( - "blocked: %0x was deleted by ID and cannot be resubmitted", - ev.ID, + "blocked: was deleted by ID and cannot be resubmitted", + // ev.ID, ) return } @@ -216,8 +216,8 @@ func (d *D) CheckForDeleted(ev *event.E, admins [][]byte) (err error) { // For e-tag deletions (delete by ID), any deletion event means the event cannot be resubmitted // regardless of timestamp, since it's a specific deletion of this exact event err = errorf.E( - "blocked: %0x was deleted by ID and cannot be resubmitted", - ev.ID, + "blocked: was deleted by ID and cannot be resubmitted", + // ev.ID, ) return } diff --git a/pkg/database/query-for-ids.go b/pkg/database/query-for-ids.go index e33fc05..5ed9017 100644 --- a/pkg/database/query-for-ids.go +++ b/pkg/database/query-for-ids.go @@ -2,10 +2,10 @@ package database import ( "context" + "errors" "sort" "lol.mleku.dev/chk" - "lol.mleku.dev/errorf" "next.orly.dev/pkg/database/indexes/types" "next.orly.dev/pkg/encoders/filter" "next.orly.dev/pkg/interfaces/store" @@ -20,7 +20,7 @@ func (d *D) QueryForIds(c context.Context, f *filter.F) ( ) { if f.Ids != nil && f.Ids.Len() > 0 { // if there is Ids in the query, this is an error for this query - err = errorf.E("query for Ids is invalid for a filter with Ids") + err = errors.New("query for Ids is invalid for a filter with Ids") return } var idxs []Range diff --git a/pkg/database/save-event.go b/pkg/database/save-event.go index 5272d68..96a10e0 100644 --- a/pkg/database/save-event.go +++ b/pkg/database/save-event.go @@ -3,18 +3,16 @@ package database import ( "bytes" "context" + "errors" "fmt" "strings" "github.com/dgraph-io/badger/v4" "lol.mleku.dev/chk" - "lol.mleku.dev/errorf" - "lol.mleku.dev/log" "next.orly.dev/pkg/database/indexes" "next.orly.dev/pkg/database/indexes/types" "next.orly.dev/pkg/encoders/event" "next.orly.dev/pkg/encoders/filter" - "next.orly.dev/pkg/encoders/hex" "next.orly.dev/pkg/encoders/kind" "next.orly.dev/pkg/encoders/tag" ) @@ -39,13 +37,13 @@ func (d *D) GetSerialsFromFilter(f *filter.F) ( // SaveEvent saves an event to the database, generating all the necessary indexes. func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) { if ev == nil { - err = errorf.E("nil event") + err = errors.New("nil event") return } // check if the event already exists var ser *types.Uint40 if ser, err = d.GetSerialById(ev.ID); err == nil && ser != nil { - err = errorf.E("blocked: event already exists: %0x", ev.ID) + err = errors.New("blocked: event already exists") return } @@ -55,7 +53,7 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) { err = nil } else if err != nil { // For any other error, return it - log.E.F("error checking if event exists: %s", err) + // log.E.F("error checking if event exists: %s", err) return } @@ -65,7 +63,7 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) { // "SaveEvent: rejecting resubmission of deleted event ID=%s: %v", // hex.Enc(ev.ID), err, // ) - err = errorf.E("blocked: %s", err.Error()) + err = fmt.Errorf("blocked: %s", err.Error()) return } // check for replacement @@ -89,11 +87,11 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) { } // Only replace if the new event is newer or same timestamp if ev.CreatedAt < oldEv.CreatedAt { - log.I.F( - "SaveEvent: rejecting older replaceable event ID=%s (created_at=%d) - existing event ID=%s (created_at=%d)", - hex.Enc(ev.ID), ev.CreatedAt, hex.Enc(oldEv.ID), - oldEv.CreatedAt, - ) + // log.I.F( + // "SaveEvent: rejecting older replaceable event ID=%s (created_at=%d) - existing event ID=%s (created_at=%d)", + // hex.Enc(ev.ID), ev.CreatedAt, hex.Enc(oldEv.ID), + // oldEv.CreatedAt, + // ) shouldReplace = false break } @@ -104,11 +102,11 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) { if oldEv, err = d.FetchEventBySerial(s); chk.E(err) { continue } - log.I.F( - "SaveEvent: replacing older replaceable event ID=%s (created_at=%d) with newer event ID=%s (created_at=%d)", - hex.Enc(oldEv.ID), oldEv.CreatedAt, hex.Enc(ev.ID), - ev.CreatedAt, - ) + // log.I.F( + // "SaveEvent: replacing older replaceable event ID=%s (created_at=%d) with newer event ID=%s (created_at=%d)", + // hex.Enc(oldEv.ID), oldEv.CreatedAt, hex.Enc(ev.ID), + // ev.CreatedAt, + // ) if err = d.DeleteEventBySerial( c, s, oldEv, ); chk.E(err) { @@ -117,7 +115,7 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) { } } else { // Don't save the older event - return an error - err = errorf.E("blocked: event is older than existing replaceable event") + err = errors.New("blocked: event is older than existing replaceable event") return } } @@ -125,7 +123,7 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) { // find the events and check timestamps before deleting dTag := ev.Tags.GetFirst([]byte("d")) if dTag == nil { - err = errorf.E("event is missing a d tag identifier") + err = errors.New("event is missing a d tag identifier") return } f := &filter.F{ @@ -149,11 +147,11 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) { } // Only replace if the new event is newer or same timestamp if ev.CreatedAt < oldEv.CreatedAt { - log.I.F( - "SaveEvent: rejecting older addressable event ID=%s (created_at=%d) - existing event ID=%s (created_at=%d)", - hex.Enc(ev.ID), ev.CreatedAt, hex.Enc(oldEv.ID), - oldEv.CreatedAt, - ) + // log.I.F( + // "SaveEvent: rejecting older addressable event ID=%s (created_at=%d) - existing event ID=%s (created_at=%d)", + // hex.Enc(ev.ID), ev.CreatedAt, hex.Enc(oldEv.ID), + // oldEv.CreatedAt, + // ) shouldReplace = false break } @@ -164,11 +162,11 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) { if oldEv, err = d.FetchEventBySerial(s); chk.E(err) { continue } - log.I.F( - "SaveEvent: replacing older addressable event ID=%s (created_at=%d) with newer event ID=%s (created_at=%d)", - hex.Enc(oldEv.ID), oldEv.CreatedAt, hex.Enc(ev.ID), - ev.CreatedAt, - ) + // log.I.F( + // "SaveEvent: replacing older addressable event ID=%s (created_at=%d) with newer event ID=%s (created_at=%d)", + // hex.Enc(oldEv.ID), oldEv.CreatedAt, hex.Enc(ev.ID), + // ev.CreatedAt, + // ) if err = d.DeleteEventBySerial( c, s, oldEv, ); chk.E(err) { @@ -177,7 +175,7 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) { } } else { // Don't save the older event - return an error - err = errorf.E("blocked: event is older than existing addressable event") + err = errors.New("blocked: event is older than existing addressable event") return } } @@ -232,14 +230,14 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) { return }, ) - log.T.F( - "total data written: %d bytes keys %d bytes values for event ID %s", kc, - vc, hex.Enc(ev.ID), - ) - log.T.C( - func() string { - return fmt.Sprintf("event:\n%s\n", ev.Serialize()) - }, - ) + // log.T.F( + // "total data written: %d bytes keys %d bytes values for event ID %s", kc, + // vc, hex.Enc(ev.ID), + // ) + // log.T.C( + // func() string { + // return fmt.Sprintf("event:\n%s\n", ev.Serialize()) + // }, + // ) return } diff --git a/pkg/database/subscriptions.go b/pkg/database/subscriptions.go index 39678f4..dcf9538 100644 --- a/pkg/database/subscriptions.go +++ b/pkg/database/subscriptions.go @@ -48,7 +48,7 @@ func (d *D) IsSubscriptionActive(pubkey []byte) (bool, error) { err := d.DB.Update( func(txn *badger.Txn) error { item, err := txn.Get([]byte(key)) - if err == badger.ErrKeyNotFound { + if errors.Is(err, badger.ErrKeyNotFound) { sub := &Subscription{TrialEnd: now.AddDate(0, 0, 30)} data, err := json.Marshal(sub) if err != nil { @@ -90,7 +90,7 @@ func (d *D) ExtendSubscription(pubkey []byte, days int) error { func(txn *badger.Txn) error { var sub Subscription item, err := txn.Get([]byte(key)) - if err == badger.ErrKeyNotFound { + if errors.Is(err, badger.ErrKeyNotFound) { sub.PaidUntil = now.AddDate(0, 0, days) } else if err != nil { return err