Replace errorf with errors and fmt.Errorf, remove redundant logging across database operations, minimize unused imports, and improve concurrent event delivery logic. Added CPU utilization optimization in the main runtime configuration.

This commit is contained in:
2025-09-13 00:47:53 +01:00
parent c45276ef08
commit fc546ddc0b
9 changed files with 109 additions and 114 deletions

View File

@@ -12,7 +12,7 @@ import (
"lol.mleku.dev/errorf"
"lol.mleku.dev/log"
"next.orly.dev/app/config"
database "next.orly.dev/pkg/database"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/database/indexes/types"
"next.orly.dev/pkg/encoders/bech32encoding"
"next.orly.dev/pkg/encoders/envelopes"
@@ -25,7 +25,7 @@ import (
"next.orly.dev/pkg/encoders/kind"
"next.orly.dev/pkg/encoders/tag"
"next.orly.dev/pkg/protocol/publish"
utils "next.orly.dev/pkg/utils"
"next.orly.dev/pkg/utils"
"next.orly.dev/pkg/utils/normalize"
"next.orly.dev/pkg/utils/values"
)
@@ -298,7 +298,7 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
} else {
// Only dispatch if the event was newly saved (no error)
if f.pubs != nil {
f.pubs.Deliver(res.Event)
go f.pubs.Deliver(res.Event)
}
log.I.F(
"saved new event from follows syncer: %0x",

View File

@@ -19,18 +19,16 @@ func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
if idxs, err = GetIndexesFromFilter(&filter.F{Ids: tag.NewFromBytesSlice(id)}); chk.E(err) {
return
}
for i, idx := range idxs {
log.T.F(
"GetSerialById: searching range %d: start=%x, end=%x", i, idx.Start,
idx.End,
)
}
// for i, idx := range idxs {
// log.T.F(
// "GetSerialById: searching range %d: start=%x, end=%x", i, idx.Start,
// idx.End,
// )
// }
if len(idxs) == 0 {
err = errorf.E("no indexes found for id %0x", id)
return
}
idFound := false
if err = d.View(
func(txn *badger.Txn) (err error) {
@@ -49,16 +47,15 @@ func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
idFound = true
} else {
// Item not found in database
log.T.F(
"GetSerialById: ID not found in database: %s", hex.Enc(id),
)
// log.T.F(
// "GetSerialById: ID not found in database: %s", hex.Enc(id),
// )
}
return
},
); chk.E(err) {
return
}
if !idFound {
err = errorf.T("id not found in database: %s", hex.Enc(id))
return
@@ -67,7 +64,6 @@ func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
return
}
//
// func (d *D) GetSerialBytesById(id []byte) (ser []byte, err error) {
// var idxs []Range
// if idxs, err = GetIndexesFromFilter(&filter.F{Ids: tag.New(id)}); chk.E(err) {

View File

@@ -43,49 +43,49 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
var expDeletes types.Uint40s
var expEvs event.S
if f.Ids != nil && f.Ids.Len() > 0 {
for _, id := range f.Ids.T {
log.T.F("QueryEvents: looking for ID=%s", hex.Enc(id))
}
log.T.F("QueryEvents: ids path, count=%d", f.Ids.Len())
// for _, id := range f.Ids.T {
// log.T.F("QueryEvents: looking for ID=%s", hex.Enc(id))
// }
// log.T.F("QueryEvents: ids path, count=%d", f.Ids.Len())
for _, idx := range f.Ids.T {
log.T.F("QueryEvents: lookup id=%s", hex.Enc(idx))
// log.T.F("QueryEvents: lookup id=%s", hex.Enc(idx))
// we know there is only Ids in this, so run the ID query and fetch.
var ser *types.Uint40
var idErr error
if ser, idErr = d.GetSerialById(idx); idErr != nil {
// Check if this is a "not found" error which is expected for IDs we don't have
if strings.Contains(idErr.Error(), "id not found in database") {
log.T.F(
"QueryEvents: ID not found in database: %s",
hex.Enc(idx),
)
// log.T.F(
// "QueryEvents: ID not found in database: %s",
// hex.Enc(idx),
// )
} else {
// Log unexpected errors but continue processing other IDs
log.E.F(
"QueryEvents: error looking up id=%s err=%v",
hex.Enc(idx), idErr,
)
// log.E.F(
// "QueryEvents: error looking up id=%s err=%v",
// hex.Enc(idx), idErr,
// )
}
continue
}
// Check if the serial is nil, which indicates the ID wasn't found
if ser == nil {
log.T.F("QueryEvents: Serial is nil for ID: %s", hex.Enc(idx))
// log.T.F("QueryEvents: Serial is nil for ID: %s", hex.Enc(idx))
continue
}
// fetch the events
var ev *event.E
if ev, err = d.FetchEventBySerial(ser); err != nil {
log.T.F(
"QueryEvents: fetch by serial failed for id=%s ser=%v err=%v",
hex.Enc(idx), ser, err,
)
// log.T.F(
// "QueryEvents: fetch by serial failed for id=%s ser=%v err=%v",
// hex.Enc(idx), ser, err,
// )
continue
}
log.T.F(
"QueryEvents: found id=%s kind=%d created_at=%d",
hex.Enc(ev.ID), ev.Kind, ev.CreatedAt,
)
// log.T.F(
// "QueryEvents: found id=%s kind=%d created_at=%d",
// hex.Enc(ev.ID), ev.Kind, ev.CreatedAt,
// )
// check for an expiration tag and delete after returning the result
if CheckExpiration(ev) {
log.T.F(
@@ -98,16 +98,16 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
}
// skip events that have been deleted by a proper deletion event
if derr := d.CheckForDeleted(ev, nil); derr != nil {
log.T.F(
"QueryEvents: id=%s filtered out due to deletion: %v",
hex.Enc(ev.ID), derr,
)
// log.T.F(
// "QueryEvents: id=%s filtered out due to deletion: %v",
// hex.Enc(ev.ID), derr,
// )
continue
}
log.T.F(
"QueryEvents: id=%s SUCCESSFULLY FOUND, adding to results",
hex.Enc(ev.ID),
)
// log.T.F(
// "QueryEvents: id=%s SUCCESSFULLY FOUND, adding to results",
// hex.Enc(ev.ID),
// )
evs = append(evs, ev)
}
// sort the events by timestamp
@@ -301,20 +301,19 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
if ev, err = d.FetchEventBySerial(ser); err != nil {
continue
}
// Add logging for tag filter debugging
if f.Tags != nil && f.Tags.Len() > 0 {
var eventTags []string
if ev.Tags != nil && ev.Tags.Len() > 0 {
for _, t := range *ev.Tags {
if t.Len() >= 2 {
eventTags = append(
eventTags,
string(t.Key())+"="+string(t.Value()),
)
}
}
}
// var eventTags []string
// if ev.Tags != nil && ev.Tags.Len() > 0 {
// for _, t := range *ev.Tags {
// if t.Len() >= 2 {
// eventTags = append(
// eventTags,
// string(t.Key())+"="+string(t.Value()),
// )
// }
// }
// }
// log.T.F(
// "QueryEvents: processing event ID=%s kind=%d tags=%v",
// hex.Enc(ev.ID), ev.Kind, eventTags,

View File

@@ -123,7 +123,7 @@ func (d *D) CheckForDeleted(ev *event.E, admins [][]byte) (err error) {
}
}
if ev.CreatedAt < maxTs {
err = errorf.E(
err = fmt.Errorf(
"blocked: %0x was deleted: the event is older than the delete event %0x: event: %d delete: %d",
ev.ID, maxId, ev.CreatedAt, maxTs,
)
@@ -165,17 +165,17 @@ func (d *D) CheckForDeleted(ev *event.E, admins [][]byte) (err error) {
idPkTss = append(idPkTss, tmp...)
// find the newest deletion without sorting to reduce cost
maxTs := idPkTss[0].Ts
maxId := idPkTss[0].Id
// maxId := idPkTss[0].Id
for i := 1; i < len(idPkTss); i++ {
if idPkTss[i].Ts > maxTs {
maxTs = idPkTss[i].Ts
maxId = idPkTss[i].Id
// maxId = idPkTss[i].Id
}
}
if ev.CreatedAt < maxTs {
err = errorf.E(
"blocked: %0x was deleted by address %s: event is older than the delete: event: %d delete: %d",
ev.ID, at, maxId, ev.CreatedAt, maxTs,
err = fmt.Errorf(
"blocked: was deleted by address %s: event is older than the delete: event: %d delete: %d",
at, ev.CreatedAt, maxTs,
)
return
}
@@ -206,8 +206,8 @@ func (d *D) CheckForDeleted(ev *event.E, admins [][]byte) (err error) {
// For e-tag deletions (delete by ID), any deletion event means the event cannot be resubmitted
// regardless of timestamp, since it's a specific deletion of this exact event
err = errorf.E(
"blocked: %0x was deleted by ID and cannot be resubmitted",
ev.ID,
"blocked: was deleted by ID and cannot be resubmitted",
// ev.ID,
)
return
}
@@ -216,8 +216,8 @@ func (d *D) CheckForDeleted(ev *event.E, admins [][]byte) (err error) {
// For e-tag deletions (delete by ID), any deletion event means the event cannot be resubmitted
// regardless of timestamp, since it's a specific deletion of this exact event
err = errorf.E(
"blocked: %0x was deleted by ID and cannot be resubmitted",
ev.ID,
"blocked: was deleted by ID and cannot be resubmitted",
// ev.ID,
)
return
}

View File

@@ -2,10 +2,10 @@ package database
import (
"context"
"errors"
"sort"
"lol.mleku.dev/chk"
"lol.mleku.dev/errorf"
"next.orly.dev/pkg/database/indexes/types"
"next.orly.dev/pkg/encoders/filter"
"next.orly.dev/pkg/interfaces/store"
@@ -20,7 +20,7 @@ func (d *D) QueryForIds(c context.Context, f *filter.F) (
) {
if f.Ids != nil && f.Ids.Len() > 0 {
// if there is Ids in the query, this is an error for this query
err = errorf.E("query for Ids is invalid for a filter with Ids")
err = errors.New("query for Ids is invalid for a filter with Ids")
return
}
var idxs []Range

View File

@@ -3,18 +3,16 @@ package database
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"github.com/dgraph-io/badger/v4"
"lol.mleku.dev/chk"
"lol.mleku.dev/errorf"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database/indexes"
"next.orly.dev/pkg/database/indexes/types"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/filter"
"next.orly.dev/pkg/encoders/hex"
"next.orly.dev/pkg/encoders/kind"
"next.orly.dev/pkg/encoders/tag"
)
@@ -39,13 +37,13 @@ func (d *D) GetSerialsFromFilter(f *filter.F) (
// SaveEvent saves an event to the database, generating all the necessary indexes.
func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
if ev == nil {
err = errorf.E("nil event")
err = errors.New("nil event")
return
}
// check if the event already exists
var ser *types.Uint40
if ser, err = d.GetSerialById(ev.ID); err == nil && ser != nil {
err = errorf.E("blocked: event already exists: %0x", ev.ID)
err = errors.New("blocked: event already exists")
return
}
@@ -55,7 +53,7 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
err = nil
} else if err != nil {
// For any other error, return it
log.E.F("error checking if event exists: %s", err)
// log.E.F("error checking if event exists: %s", err)
return
}
@@ -65,7 +63,7 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
// "SaveEvent: rejecting resubmission of deleted event ID=%s: %v",
// hex.Enc(ev.ID), err,
// )
err = errorf.E("blocked: %s", err.Error())
err = fmt.Errorf("blocked: %s", err.Error())
return
}
// check for replacement
@@ -89,11 +87,11 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
}
// Only replace if the new event is newer or same timestamp
if ev.CreatedAt < oldEv.CreatedAt {
log.I.F(
"SaveEvent: rejecting older replaceable event ID=%s (created_at=%d) - existing event ID=%s (created_at=%d)",
hex.Enc(ev.ID), ev.CreatedAt, hex.Enc(oldEv.ID),
oldEv.CreatedAt,
)
// log.I.F(
// "SaveEvent: rejecting older replaceable event ID=%s (created_at=%d) - existing event ID=%s (created_at=%d)",
// hex.Enc(ev.ID), ev.CreatedAt, hex.Enc(oldEv.ID),
// oldEv.CreatedAt,
// )
shouldReplace = false
break
}
@@ -104,11 +102,11 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
continue
}
log.I.F(
"SaveEvent: replacing older replaceable event ID=%s (created_at=%d) with newer event ID=%s (created_at=%d)",
hex.Enc(oldEv.ID), oldEv.CreatedAt, hex.Enc(ev.ID),
ev.CreatedAt,
)
// log.I.F(
// "SaveEvent: replacing older replaceable event ID=%s (created_at=%d) with newer event ID=%s (created_at=%d)",
// hex.Enc(oldEv.ID), oldEv.CreatedAt, hex.Enc(ev.ID),
// ev.CreatedAt,
// )
if err = d.DeleteEventBySerial(
c, s, oldEv,
); chk.E(err) {
@@ -117,7 +115,7 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
}
} else {
// Don't save the older event - return an error
err = errorf.E("blocked: event is older than existing replaceable event")
err = errors.New("blocked: event is older than existing replaceable event")
return
}
}
@@ -125,7 +123,7 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
// find the events and check timestamps before deleting
dTag := ev.Tags.GetFirst([]byte("d"))
if dTag == nil {
err = errorf.E("event is missing a d tag identifier")
err = errors.New("event is missing a d tag identifier")
return
}
f := &filter.F{
@@ -149,11 +147,11 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
}
// Only replace if the new event is newer or same timestamp
if ev.CreatedAt < oldEv.CreatedAt {
log.I.F(
"SaveEvent: rejecting older addressable event ID=%s (created_at=%d) - existing event ID=%s (created_at=%d)",
hex.Enc(ev.ID), ev.CreatedAt, hex.Enc(oldEv.ID),
oldEv.CreatedAt,
)
// log.I.F(
// "SaveEvent: rejecting older addressable event ID=%s (created_at=%d) - existing event ID=%s (created_at=%d)",
// hex.Enc(ev.ID), ev.CreatedAt, hex.Enc(oldEv.ID),
// oldEv.CreatedAt,
// )
shouldReplace = false
break
}
@@ -164,11 +162,11 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
continue
}
log.I.F(
"SaveEvent: replacing older addressable event ID=%s (created_at=%d) with newer event ID=%s (created_at=%d)",
hex.Enc(oldEv.ID), oldEv.CreatedAt, hex.Enc(ev.ID),
ev.CreatedAt,
)
// log.I.F(
// "SaveEvent: replacing older addressable event ID=%s (created_at=%d) with newer event ID=%s (created_at=%d)",
// hex.Enc(oldEv.ID), oldEv.CreatedAt, hex.Enc(ev.ID),
// ev.CreatedAt,
// )
if err = d.DeleteEventBySerial(
c, s, oldEv,
); chk.E(err) {
@@ -177,7 +175,7 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
}
} else {
// Don't save the older event - return an error
err = errorf.E("blocked: event is older than existing addressable event")
err = errors.New("blocked: event is older than existing addressable event")
return
}
}
@@ -232,14 +230,14 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
return
},
)
log.T.F(
"total data written: %d bytes keys %d bytes values for event ID %s", kc,
vc, hex.Enc(ev.ID),
)
log.T.C(
func() string {
return fmt.Sprintf("event:\n%s\n", ev.Serialize())
},
)
// log.T.F(
// "total data written: %d bytes keys %d bytes values for event ID %s", kc,
// vc, hex.Enc(ev.ID),
// )
// log.T.C(
// func() string {
// return fmt.Sprintf("event:\n%s\n", ev.Serialize())
// },
// )
return
}

View File

@@ -48,7 +48,7 @@ func (d *D) IsSubscriptionActive(pubkey []byte) (bool, error) {
err := d.DB.Update(
func(txn *badger.Txn) error {
item, err := txn.Get([]byte(key))
if err == badger.ErrKeyNotFound {
if errors.Is(err, badger.ErrKeyNotFound) {
sub := &Subscription{TrialEnd: now.AddDate(0, 0, 30)}
data, err := json.Marshal(sub)
if err != nil {
@@ -90,7 +90,7 @@ func (d *D) ExtendSubscription(pubkey []byte, days int) error {
func(txn *badger.Txn) error {
var sub Subscription
item, err := txn.Get([]byte(key))
if err == badger.ErrKeyNotFound {
if errors.Is(err, badger.ErrKeyNotFound) {
sub.PaidUntil = now.AddDate(0, 0, days)
} else if err != nil {
return err